Source code for ophyd_async.epics.adsimdetector._sim_controller
import asyncio
from typing import Optional, Set
from ophyd_async.core import (
DEFAULT_TIMEOUT,
AsyncStatus,
DetectorControl,
DetectorTrigger,
)
from ophyd_async.epics import adcore
[docs]
class SimController(DetectorControl):
def __init__(
self,
driver: adcore.ADBaseIO,
good_states: Set[adcore.DetectorState] = set(adcore.DEFAULT_GOOD_STATES),
) -> None:
self.driver = driver
self.good_states = good_states
[docs]
def get_deadtime(self, exposure: float) -> float:
return 0.002
[docs]
async def arm(
self,
num: int,
trigger: DetectorTrigger = DetectorTrigger.internal,
exposure: Optional[float] = None,
) -> AsyncStatus:
assert (
trigger == DetectorTrigger.internal
), "fly scanning (i.e. external triggering) is not supported for this device"
frame_timeout = DEFAULT_TIMEOUT + await self.driver.acquire_time.get_value()
await asyncio.gather(
self.driver.num_images.set(num),
self.driver.image_mode.set(adcore.ImageMode.multiple),
)
return await adcore.start_acquiring_driver_and_ensure_status(
self.driver, good_states=self.good_states, timeout=frame_timeout
)
[docs]
async def disarm(self):
# We can't use caput callback as we already used it in arm() and we can't have
# 2 or they will deadlock
await adcore.stop_busy_record(self.driver.acquire, False, timeout=1)