Source code for ophyd_async.tango.demo._detector

import asyncio

from ophyd_async.core import (
    AsyncStatus,
    DeviceVector,
    StandardReadable,
)

from ._counter import TangoCounter
from ._mover import TangoMover


[docs] class TangoDetector(StandardReadable): """For use with tango detector devices.""" def __init__(self, mover_trl: str, counter_trls: list[str], name=""): # A detector device may be composed of tango sub-devices self.mover = TangoMover(mover_trl) self.counters = DeviceVector( {i + 1: TangoCounter(c_trl) for i, c_trl in enumerate(counter_trls)} ) # Define the readables for TangoDetector # DeviceVectors are incompatible with AsyncReadable. Ignore until fixed. self.add_readables([self.counters, self.mover]) # type: ignore super().__init__(name=name)
[docs] def set(self, value): return self.mover.set(value)
[docs] def stop(self, success: bool = True) -> AsyncStatus: return self.mover.stop(success)
[docs] @AsyncStatus.wrap async def trigger(self): statuses = [] for counter in self.counters.values(): statuses.append(counter.reset()) await asyncio.gather(*statuses) statuses.clear() for counter in self.counters.values(): statuses.append(counter.trigger()) await asyncio.gather(*statuses)