Source code for ophyd_async.epics.adsimdetector._sim_controller
import asyncio
from typing import Set
from ophyd_async.core import (
DEFAULT_TIMEOUT,
DetectorControl,
DetectorTrigger,
)
from ophyd_async.core._detector import TriggerInfo
from ophyd_async.core._status import AsyncStatus
from ophyd_async.epics import adcore
[docs]
class SimController(DetectorControl):
def __init__(
self,
driver: adcore.ADBaseIO,
good_states: Set[adcore.DetectorState] = set(adcore.DEFAULT_GOOD_STATES),
) -> None:
self.driver = driver
self.good_states = good_states
self.frame_timeout: float
self._arm_status: AsyncStatus | None = None
[docs]
def get_deadtime(self, exposure: float) -> float:
return 0.002
[docs]
async def prepare(self, trigger_info: TriggerInfo):
assert (
trigger_info.trigger == DetectorTrigger.internal
), "fly scanning (i.e. external triggering) is not supported for this device"
self.frame_timeout = (
DEFAULT_TIMEOUT + await self.driver.acquire_time.get_value()
)
await asyncio.gather(
self.driver.num_images.set(trigger_info.number),
self.driver.image_mode.set(adcore.ImageMode.multiple),
)
[docs]
async def arm(self):
self._arm_status = await adcore.start_acquiring_driver_and_ensure_status(
self.driver, good_states=self.good_states, timeout=self.frame_timeout
)
[docs]
async def wait_for_idle(self):
if self._arm_status:
await self._arm_status
[docs]
async def disarm(self):
# We can't use caput callback as we already used it in arm() and we can't have
# 2 or they will deadlock
await adcore.stop_busy_record(self.driver.acquire, False, timeout=1)