Source code for ophyd_async.fastcs.jungfrau._controller

import asyncio
import logging

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    DetectorController,
    DetectorTrigger,
    TriggerInfo,
    wait_for_value,
)

from ._signals import (
    JUNGFRAU_TRIGGER_MODE_MAP,
    AcquisitionType,
    DetectorStatus,
    JungfrauDriverIO,
    PedestalMode,
)

# Deadtime is dependant on a wide combination of settings and on trigger mode
# but this is safe upper-limit
JUNGFRAU_DEADTIME_S = 2e-5

logger = logging.getLogger("ophyd_async")


[docs] class JungfrauController(DetectorController): def __init__(self, driver: JungfrauDriverIO): self._driver = driver
[docs] def get_deadtime(self, exposure: float | None = None) -> float: return JUNGFRAU_DEADTIME_S
[docs] async def prepare(self, trigger_info: TriggerInfo) -> None: # ValueErrors and warnings in this function come from the jungfrau operation # docs: https://rtd.xfel.eu/docs/jungfrau-detector-documentation/en/latest/operation.html # Deadtime here is really used as "time between frames" acquisition_type = await self._driver.acquisition_type.get_value() logger.info(f"Preparing Jungfrau in {acquisition_type} mode.") if trigger_info.trigger not in ( DetectorTrigger.INTERNAL, DetectorTrigger.EDGE_TRIGGER, ): raise ValueError( "The trigger method can only be called with internal or edge triggering" ) if ( acquisition_type == AcquisitionType.PEDESTAL and trigger_info.trigger != DetectorTrigger.INTERNAL ): raise ValueError( "Jungfrau must be triggered internally while in pedestal mode." ) if not isinstance(trigger_info.number_of_events, int): raise TypeError("Number of events must be an integer") if acquisition_type != AcquisitionType.PEDESTAL: if ( trigger_info.trigger == DetectorTrigger.INTERNAL and trigger_info.number_of_events != 1 ): raise ValueError( "Number of events must be set to 1 in internal trigger mode during " "standard acquisitions." ) if ( trigger_info.trigger == DetectorTrigger.EDGE_TRIGGER and trigger_info.exposures_per_event != 1 ): raise ValueError( "Exposures per event must be set to 1 in edge trigger mode " "during standard acquisitions." ) if not trigger_info.livetime: raise ValueError("Must set TriggerInfo.livetime") if trigger_info.livetime < 2e-6: logger.warning("Exposure time shorter than 2μs is not recommended") period_between_frames = trigger_info.livetime + trigger_info.deadtime if period_between_frames < self.get_deadtime(): raise ValueError( f"Period between frames (exposure time - deadtime) = " f"{period_between_frames}s cannot be lower than minimum detector " f"deadtime {self.get_deadtime()}" ) coros = [ self._driver.trigger_mode.set( JUNGFRAU_TRIGGER_MODE_MAP[trigger_info.trigger] ), self._driver.period_between_frames.set(period_between_frames), self._driver.exposure_time.set(trigger_info.livetime), ] match acquisition_type: case AcquisitionType.STANDARD: frames_signal = ( trigger_info.exposures_per_event if trigger_info.trigger is DetectorTrigger.INTERNAL else trigger_info.number_of_events ) coros.extend( [ self._driver.frames_per_acq.set(frames_signal), ] ) case AcquisitionType.PEDESTAL: coros.extend( [ self._driver.pedestal_mode_frames.set( trigger_info.exposures_per_event ), self._driver.pedestal_mode_loops.set( trigger_info.number_of_events ), self._driver.pedestal_mode_state.set(PedestalMode.ON), ] ) await asyncio.gather(*coros)
[docs] async def arm(self): await self._driver.acquisition_start.trigger()
[docs] async def wait_for_idle(self): await wait_for_value( self._driver.detector_status, DetectorStatus.IDLE, timeout=DEFAULT_TIMEOUT )
[docs] async def disarm(self): await self._driver.acquisition_stop.trigger()