Source code for ophyd_async.epics.adcore._arm_logic

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    AsyncStatus,
    DetectorArmLogic,
    SignalR,
    set_and_wait_for_other_value,
    set_and_wait_for_value,
)
from ophyd_async.epics.core import stop_busy_record, wait_for_good_state

from ._io import ADBaseIO, ADState, NDCircularBuffIO


[docs] class ADArmLogic(DetectorArmLogic): def __init__( self, driver: ADBaseIO, driver_armed_signal: SignalR[bool] | None = None ): self.driver = driver if driver_armed_signal is not None: self.driver_armed_signal = driver_armed_signal else: self.driver_armed_signal = driver.acquire self.acquire_status: AsyncStatus | None = None
[docs] async def arm(self): self.acquire_status = await set_and_wait_for_other_value( set_signal=self.driver.acquire, set_value=True, match_signal=self.driver_armed_signal, match_value=True, wait_for_set_completion=False, timeout=DEFAULT_TIMEOUT, )
[docs] async def wait_for_idle(self): if self.acquire_status: await self.acquire_status await wait_for_good_state( self.driver.detector_state, {ADState.IDLE, ADState.ABORTED}, timeout=DEFAULT_TIMEOUT, )
[docs] async def disarm(self): await stop_busy_record(self.driver.acquire)
[docs] class ADContAcqArmLogic(DetectorArmLogic): def __init__(self, driver: ADBaseIO, cb_plugin: NDCircularBuffIO): self.driver = driver self.cb_plugin = cb_plugin self.acquire_status: AsyncStatus | None = None
[docs] async def arm(self): self.acquire_status = await set_and_wait_for_value( self.cb_plugin.capture, True, wait_for_set_completion=False, timeout=DEFAULT_TIMEOUT, )
[docs] async def wait_for_idle(self): if self.acquire_status: await self.acquire_status
[docs] async def disarm(self): await stop_busy_record(self.cb_plugin.capture)