Source code for ophyd_async.epics.adpilatus._pilatus_controller

import asyncio
from enum import Enum
from typing import TypeVar, get_args

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    DetectorTrigger,
    TriggerInfo,
    wait_for_value,
)
from ophyd_async.epics import adcore

from ._pilatus_io import PilatusDriverIO, PilatusTriggerMode


#: Cite: https://media.dectris.com/User_Manual-PILATUS2-V1_4.pdf
#: The required minimum time difference between ExpPeriod and ExpTime
#: (readout time) is 2.28 ms
#: We provide an option to override for newer Pilatus models
[docs] class PilatusReadoutTime(float, Enum): """Pilatus readout time per model in ms""" # Cite: https://media.dectris.com/User_Manual-PILATUS2-V1_4.pdf PILATUS2 = 2.28e-3 # Cite: https://media.dectris.com/user-manual-pilatus3-2020.pdf PILATUS3 = 0.95e-3
PilatusControllerT = TypeVar("PilatusControllerT", bound="PilatusController") class PilatusController(adcore.ADBaseController[PilatusDriverIO]): _supported_trigger_types = { DetectorTrigger.INTERNAL: PilatusTriggerMode.INTERNAL, DetectorTrigger.CONSTANT_GATE: PilatusTriggerMode.EXT_ENABLE, DetectorTrigger.VARIABLE_GATE: PilatusTriggerMode.EXT_ENABLE, } def __init__( self, driver: PilatusDriverIO, good_states: frozenset[adcore.DetectorState] = adcore.DEFAULT_GOOD_STATES, readout_time: float = PilatusReadoutTime.PILATUS3, ) -> None: super().__init__(driver, good_states=good_states) self._readout_time = readout_time @classmethod def controller_and_drv( cls: type[PilatusControllerT], prefix: str, good_states: frozenset[adcore.DetectorState] = adcore.DEFAULT_GOOD_STATES, name: str = "", readout_time: float = PilatusReadoutTime.PILATUS3, ) -> tuple[PilatusControllerT, PilatusDriverIO]: driver_cls = get_args(cls.__orig_bases__[0])[0] # type: ignore driver = driver_cls(prefix, name=name) controller = cls(driver, good_states=good_states, readout_time=readout_time) return controller, driver def get_deadtime(self, exposure: float | None) -> float: return self._readout_time async def prepare(self, trigger_info: TriggerInfo): if trigger_info.livetime is not None: await self.set_exposure_time_and_acquire_period_if_supplied( trigger_info.livetime ) await asyncio.gather( self.driver.trigger_mode.set(self._get_trigger_mode(trigger_info.trigger)), self.driver.num_images.set( 999_999 if trigger_info.total_number_of_triggers == 0 else trigger_info.total_number_of_triggers ), self.driver.image_mode.set(adcore.ImageMode.MULTIPLE), ) async def arm(self): # Standard arm the detector and wait for the acquire PV to be True self._arm_status = await self.start_acquiring_driver_and_ensure_status() # The pilatus has an additional PV that goes True when the camserver # is actually ready. Should wait for that too or we risk dropping # a frame await wait_for_value( self.driver.armed, True, timeout=DEFAULT_TIMEOUT, ) @classmethod def _get_trigger_mode(cls, trigger: DetectorTrigger) -> PilatusTriggerMode: if trigger not in cls._supported_trigger_types.keys(): raise ValueError( f"{cls.__name__} only supports the following trigger " f"types: {cls._supported_trigger_types.keys()} but was asked to " f"use {trigger}" ) return cls._supported_trigger_types[trigger]