Source code for ophyd_async.tango.demo._point_detector

from typing import Annotated as A

from bluesky.protocols import Triggerable

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    AsyncStatus,
    DeviceVector,
    SignalR,
    SignalRW,
    StandardReadable,
    TriggerableCommand,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.tango.core import TangoDevice

from ._point_detector_channel import DemoPointDetectorChannel


[docs] class DemoPointDetector(TangoDevice, StandardReadable, Triggerable): """A demo detector that produces a point values based on X and Y motors.""" acquire_time: A[SignalRW[float], Format.CONFIG_SIGNAL] start: TriggerableCommand acquiring: SignalR[bool] reset: TriggerableCommand def __init__(self, trl: str, channel_trls: list[str], name: str = "") -> None: with self.add_children_as_readables(): self.channel = DeviceVector( { i + 1: DemoPointDetectorChannel(channel_trl) for i, channel_trl in enumerate(channel_trls) } ) super().__init__(trl, name=name)
[docs] @AsyncStatus.wrap async def trigger(self): await self.reset.trigger() timeout = await self.acquire_time.get_value() + DEFAULT_TIMEOUT await self.start.trigger(timeout=timeout)