Source code for ophyd_async.sim._point_detector

import asyncio
import time

import numpy as np

from ophyd_async.core import (
    AsyncStatus,
    DeviceVector,
    SignalR,
    StandardReadable,
    StrictEnum,
    gather_dict,
    soft_signal_r_and_setter,
    soft_signal_rw,
)
from ophyd_async.core import StandardReadableFormat as Format

from ._pattern_generator import PatternGenerator


class EnergyMode(StrictEnum):
    """Energy mode for `SimPointDetector`."""

    LOW = "Low Energy"
    """Low energy mode"""

    HIGH = "High Energy"
    """High energy mode"""


class SimPointDetectorChannel(StandardReadable):
    def __init__(self, value_signal: SignalR[int], name=""):
        with self.add_children_as_readables(Format.HINTED_SIGNAL):
            self.value = value_signal
        with self.add_children_as_readables(Format.CONFIG_SIGNAL):
            self.mode = soft_signal_rw(EnergyMode)
        super().__init__(name)


[docs] class SimPointDetector(StandardReadable): """Simalutes a point detector with multiple channels.""" def __init__( self, generator: PatternGenerator, num_channels: int = 3, name: str = "" ) -> None: self._generator = generator self.acquire_time = soft_signal_rw(float, 0.1) self.acquiring, self._set_acquiring = soft_signal_r_and_setter(bool) self._value_signals = dict( soft_signal_r_and_setter(int) for _ in range(num_channels) ) with self.add_children_as_readables(): self.channel = DeviceVector( { i + 1: SimPointDetectorChannel(value_signal) for i, value_signal in enumerate(self._value_signals) } ) super().__init__(name=name) async def _update_values(self, acquire_time: float): # Get the modes modes = await gather_dict( {channel: channel.mode.get_value() for channel in self.channel.values()} ) start = time.monotonic() # Make an array of relative update times at 10Hz intervals update_times = np.arange(0.1, acquire_time, 0.1) # With the end position appended update_times = np.concatenate((update_times, [acquire_time])) for update_time in update_times: # Calculate how long to wait to get there relative_time = time.monotonic() - start await asyncio.sleep(update_time - relative_time) # Update the channel value for i, channel in self.channel.items(): high_energy = modes[channel] == EnergyMode.HIGH point = self._generator.generate_point(i, high_energy) setter = self._value_signals[channel.value] setter(int(point * 10000 * update_time))
[docs] @AsyncStatus.wrap async def trigger(self): start = time.time() for setter in self._value_signals.values(): setter(0) await self._update_values(await self.acquire_time.get_value()) print("Trigger took", time.time() - start)