Source code for ophyd_async.epics.demo._point_detector

from typing import Annotated as A

from bluesky.protocols import Triggerable

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    AsyncStatus,
    DeviceVector,
    SignalR,
    SignalRW,
    SignalX,
    StandardReadable,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import EpicsDevice, PvSuffix

from ._point_detector_channel import DemoPointDetectorChannel


[docs] class DemoPointDetector(StandardReadable, EpicsDevice, Triggerable): """A demo detector that produces a point values based on X and Y motors.""" acquire_time: A[SignalRW[float], PvSuffix("AcquireTime"), Format.CONFIG_SIGNAL] start: A[SignalX, PvSuffix("Start.PROC")] acquiring: A[SignalR[bool], PvSuffix("Acquiring")] reset: A[SignalX, PvSuffix("Reset.PROC")] def __init__(self, prefix: str, num_channels: int = 3, name: str = "") -> None: with self.add_children_as_readables(): self.channel = DeviceVector( { i: DemoPointDetectorChannel(f"{prefix}{i}:") for i in range(1, num_channels + 1) } ) super().__init__(prefix=prefix, name=name)
[docs] @AsyncStatus.wrap async def trigger(self): await self.reset.trigger() timeout = await self.acquire_time.get_value() + DEFAULT_TIMEOUT await self.start.trigger(timeout=timeout)