Source code for ophyd_async.testing._single_derived

from __future__ import annotations

import asyncio

from ophyd_async.core import (
    Device,
    DeviceVector,
    StandardReadable,
    StrictEnum,
    derived_signal_r,
    derived_signal_rw,
    derived_signal_w,
    soft_signal_rw,
)


[docs] class BeamstopPosition(StrictEnum): IN_POSITION = "In position" OUT_OF_POSITION = "Out of position"
[docs] class ReadOnlyBeamstop(Device): """Reads from 2 motors to work out if the beamstop is in position. E.g. bps.rd(beamstop.position) """ def __init__(self, name=""): # Raw signals self.x = soft_signal_rw(float) self.y = soft_signal_rw(float) # Derived signals self.position = derived_signal_r(self._get_position, x=self.x, y=self.y) super().__init__(name=name) def _get_position(self, x: float, y: float) -> BeamstopPosition: if abs(x) < 1 and abs(y) < 2: return BeamstopPosition.IN_POSITION else: return BeamstopPosition.OUT_OF_POSITION
[docs] class MovableBeamstop(Device): """As well as reads, this one allows you to move it. E.g. bps.mv(beamstop.position, BeamstopPosition.IN_POSITION) """ def __init__(self, name=""): # Raw signals self.x = soft_signal_rw(float) self.y = soft_signal_rw(float) # Derived signals self.position = derived_signal_rw( self._get_position, self._set_from_position, x=self.x, y=self.y ) super().__init__(name=name) def _get_position(self, x: float, y: float) -> BeamstopPosition: if abs(x) < 1 and abs(y) < 2: return BeamstopPosition.IN_POSITION else: return BeamstopPosition.OUT_OF_POSITION async def _set_from_position(self, position: BeamstopPosition) -> None: if position == BeamstopPosition.IN_POSITION: await asyncio.gather(self.x.set(0), self.y.set(0)) else: await asyncio.gather(self.x.set(3), self.y.set(5))
[docs] class Exploder(StandardReadable): """This one takes a value and sets all its signal to that value. This allows convenience "set all" functions, while the individual signals are still free to be set to different values. """ def __init__(self, num_signals: int, name=""): with self.add_children_as_readables(): self.signals = DeviceVector( {i: soft_signal_rw(int, units="cts") for i in range(1, num_signals + 1)} ) self.set_all = derived_signal_w(self._set_all, derived_units="cts") super().__init__(name=name) async def _set_all(self, value: int) -> None: coros = [sig.set(value) for sig in self.signals.values()] await asyncio.gather(*coros)