Note

Ophyd async is included on a provisional basis until the v1.0 release and may change API on minor release numbers before then

Make a Simple Device#

To make a simple device, you need to subclass from the StandardReadable class, create some Signal instances, and optionally implement other suitable Bluesky Protocols like bluesky.protocols.Movable.

The rest of this guide will show examples from src/ophyd_async/epics/sim/__init__.py

Readable#

For a simple bluesky.protocols.Readable object like a Sensor, you need to define some signals, then tell the superclass which signals should contribute to read() and read_configuration():

class Sensor(StandardReadable, EpicsDevice):
    """A demo sensor that produces a scalar value based on X and Y Movers"""

    value: A[SignalR[float], PvSuffix("Value"), Format.HINTED_SIGNAL]
    mode: A[SignalRW[EnergyMode], PvSuffix("Mode"), Format.CONFIG_SIGNAL]

First some Signals are constructed and stored on the Device. Each one is passed its Python type, which could be:

  • A primitive (str, int, float)

  • An array (numpy.typing.NDArray ie. numpy.typing.NDArray[numpy.uint16] or Sequence[str])

  • An enum (enum.Enum) which must also extend str
    • str and EnumClass(StrictEnum) are the only valid datatype for an enumerated signal.

The rest of the arguments are PV connection information, in this case the PV suffix.

Finally super().__init__() is called with:

  • Possibly empty Device name: will also dash-prefix its child Device names is set

  • Optional primary signal: a Signal that should be renamed to take the name of the Device and output at read()

  • read signals: Signals that should be output to read() without renaming

  • config signals: Signals that should be output to read_configuration() without renaming

All signals passed into this init method will be monitored between stage() and unstage() and their cached values returned on read() and read_configuration() for perfomance.

Movable#

For a more complicated device like a Mover, you can still use StandardReadable and implement some addition protocols:

class Mover(StandardReadable, Movable, Stoppable):
    """A demo movable that moves based on velocity"""

    def __init__(self, prefix: str, name="") -> None:
        # Define some signals
        with self.add_children_as_readables(Format.HINTED_SIGNAL):
            self.readback = epics_signal_r(float, prefix + "Readback")
        with self.add_children_as_readables(Format.CONFIG_SIGNAL):
            self.velocity = epics_signal_rw(float, prefix + "Velocity")
            self.units = epics_signal_r(str, prefix + "Readback.EGU")
        self.setpoint = epics_signal_rw(float, prefix + "Setpoint")
        self.precision = epics_signal_r(int, prefix + "Readback.PREC")
        # Signals that collide with standard methods should have a trailing underscore
        self.stop_ = epics_signal_x(prefix + "Stop.PROC")
        # Whether set() should complete successfully or not
        self._set_success = True

        super().__init__(name=name)

    def set_name(self, name: str, *, child_name_separator: str | None = None) -> None:
        super().set_name(name, child_name_separator=child_name_separator)
        # Readback should be named the same as its parent in read()
        self.readback.set_name(name)

    @WatchableAsyncStatus.wrap
    async def set(self, value: float, timeout: CalculatableTimeout = CALCULATE_TIMEOUT):
        new_position = value
        self._set_success = True
        old_position, units, precision, velocity = await asyncio.gather(
            self.setpoint.get_value(),
            self.units.get_value(),
            self.precision.get_value(),
            self.velocity.get_value(),
        )
        if timeout == CALCULATE_TIMEOUT:
            assert velocity > 0, "Mover has zero velocity"
            timeout = abs(new_position - old_position) / velocity + DEFAULT_TIMEOUT
        # Make an Event that will be set on completion, and a Status that will
        # error if not done in time
        done = asyncio.Event()
        done_status = AsyncStatus(asyncio.wait_for(done.wait(), timeout))
        # Wait for the value to set, but don't wait for put completion callback
        await self.setpoint.set(new_position, wait=False)
        async for current_position in observe_value(
            self.readback, done_status=done_status
        ):
            yield WatcherUpdate(
                current=current_position,
                initial=old_position,
                target=new_position,
                name=self.name,
                unit=units,
                precision=precision,
            )
            if np.isclose(current_position, new_position):
                done.set()
                break
        if not self._set_success:
            raise RuntimeError("Motor was stopped")

    async def stop(self, success=True):
        self._set_success = success
        status = self.stop_.trigger()
        await status

The set() method implements bluesky.protocols.Movable. This creates a coroutine do_set() which gets the old position, units and precision in parallel, sets the setpoint, then observes the readback value, informing watchers of the progress. When it gets to the requested value it completes. This co-routine is wrapped in a timeout handler, and passed to an AsyncStatus which will start executing it as soon as the Run Engine adds a callback to it. The stop() method then pokes a PV if the move needs to be interrupted.

Assembly#

Compound assemblies can be used to group Devices into larger logical Devices:

class SampleStage(Device):
    """A demo sample stage with X and Y movables"""

    def __init__(self, prefix: str, name="") -> None:
        # Define some child Devices
        self.x = Mover(prefix + "X:")
        self.y = Mover(prefix + "Y:")
        # Set name of device and child devices
        super().__init__(name=name)

This applies prefixes on construction:

  • SampleStage is passed a prefix like DEVICE:

  • SampleStage.x will append its prefix X: to get DEVICE:X:

  • SampleStage.x.velocity will append its suffix Velocity to get DEVICE:X:Velocity

If SampleStage is further nested in another Device another layer of prefix nesting would occur

Note

SampleStage does not pass any signals into its superclass init. This means that its read() method will return an empty dictionary. This means you can rd sample_stage.x, but not rd sample_stage.