Source code for ophyd_async.epics.adkinetix._kinetix_controller

import asyncio
from typing import Optional

from ophyd_async.core import AsyncStatus, DetectorControl, DetectorTrigger
from ophyd_async.epics import adcore

from ._kinetix_io import KinetixDriverIO, KinetixTriggerMode

KINETIX_TRIGGER_MODE_MAP = {
    DetectorTrigger.internal: KinetixTriggerMode.internal,
    DetectorTrigger.constant_gate: KinetixTriggerMode.gate,
    DetectorTrigger.variable_gate: KinetixTriggerMode.gate,
    DetectorTrigger.edge_trigger: KinetixTriggerMode.edge,
}


[docs] class KinetixController(DetectorControl): def __init__( self, driver: KinetixDriverIO, ) -> None: self._drv = driver
[docs] def get_deadtime(self, exposure: float) -> float: return 0.001
[docs] async def arm( self, num: int, trigger: DetectorTrigger = DetectorTrigger.internal, exposure: Optional[float] = None, ) -> AsyncStatus: await asyncio.gather( self._drv.trigger_mode.set(KINETIX_TRIGGER_MODE_MAP[trigger]), self._drv.num_images.set(num), self._drv.image_mode.set(adcore.ImageMode.multiple), ) if exposure is not None and trigger not in [ DetectorTrigger.variable_gate, DetectorTrigger.constant_gate, ]: await self._drv.acquire_time.set(exposure) return await adcore.start_acquiring_driver_and_ensure_status(self._drv)
[docs] async def disarm(self): await adcore.stop_busy_record(self._drv.acquire, False, timeout=1)