Source code for ophyd_async.epics.areadetector.single_trigger_det
import asyncio
from typing import Sequence
from bluesky.protocols import Triggerable
from ophyd_async.core import AsyncStatus, SignalR, StandardReadable
from .drivers.ad_base import ADBase
from .utils import ImageMode
from .writers.nd_plugin import NDPluginBase
[docs]
class SingleTriggerDet(StandardReadable, Triggerable):
def __init__(
self,
drv: ADBase,
read_uncached: Sequence[SignalR] = (),
name="",
**plugins: NDPluginBase,
) -> None:
self.drv = drv
self.__dict__.update(plugins)
self.set_readable_signals(
# Can't subscribe to read signals as race between monitor coming back and
# caput callback on acquire
read_uncached=[self.drv.array_counter] + list(read_uncached),
config=[self.drv.acquire_time],
)
super().__init__(name=name)
@AsyncStatus.wrap
async def stage(self) -> None:
await asyncio.gather(
self.drv.image_mode.set(ImageMode.single),
self.drv.wait_for_plugins.set(True),
)
await super().stage()
@AsyncStatus.wrap
async def trigger(self) -> None:
await self.drv.acquire.set(True)