Note
Ophyd async is included on a provisional basis until the v1.0 release and may change API on minor release numbers before then
Make a Simple Device#
To make a simple device, you need to subclass from the
StandardReadable class, create some Signal instances, and optionally implement
other suitable Bluesky Protocols like
bluesky.protocols.Movable.
The rest of this guide will show examples from src/ophyd_async/epics/demo/__init__.py
Readable#
For a simple bluesky.protocols.Readable object like a Sensor, you need to
define some signals, then tell the superclass which signals should contribute to
read() and read_configuration():
class Sensor(StandardReadable):
    """A demo sensor that produces a scalar value based on X and Y Movers"""
    def __init__(self, prefix: str, name="") -> None:
        # Define some signals
        with self.add_children_as_readables(HintedSignal):
            self.value = epics_signal_r(float, prefix + "Value")
        with self.add_children_as_readables(ConfigSignal):
            self.mode = epics_signal_rw(EnergyMode, prefix + "Mode")
        super().__init__(name=name)
First some Signals are constructed and stored on the Device. Each one is passed its Python type, which could be:
- An array ( - numpy.typing.NDArrayor- Sequence[str])
The rest of the arguments are PV connection information, in this case the PV suffix.
Finally super().__init__() is called with:
- Possibly empty Device - name: will also dash-prefix its child Device names is set
- Optional - primarysignal: a Signal that should be renamed to take the name of the Device and output at- read()
- readsignals: Signals that should be output to- read()without renaming
- configsignals: Signals that should be output to- read_configuration()without renaming
All signals passed into this init method will be monitored between stage()
and unstage() and their cached values returned on read() and
read_configuration() for perfomance.
Movable#
For a more complicated device like a Mover, you can still use StandardReadable
and implement some addition protocols:
class Mover(StandardReadable, Movable, Stoppable):
    """A demo movable that moves based on velocity"""
    def __init__(self, prefix: str, name="") -> None:
        # Define some signals
        with self.add_children_as_readables(HintedSignal):
            self.readback = epics_signal_r(float, prefix + "Readback")
        with self.add_children_as_readables(ConfigSignal):
            self.velocity = epics_signal_rw(float, prefix + "Velocity")
            self.units = epics_signal_r(str, prefix + "Readback.EGU")
        self.setpoint = epics_signal_rw(float, prefix + "Setpoint")
        self.precision = epics_signal_r(int, prefix + "Readback.PREC")
        # Signals that collide with standard methods should have a trailing underscore
        self.stop_ = epics_signal_x(prefix + "Stop.PROC")
        # Whether set() should complete successfully or not
        self._set_success = True
        super().__init__(name=name)
    def set_name(self, name: str):
        super().set_name(name)
        # Readback should be named the same as its parent in read()
        self.readback.set_name(name)
    @WatchableAsyncStatus.wrap
    async def set(
        self, new_position: float, timeout: CalculatableTimeout = CalculateTimeout
    ):
        self._set_success = True
        old_position, units, precision, velocity = await asyncio.gather(
            self.setpoint.get_value(),
            self.units.get_value(),
            self.precision.get_value(),
            self.velocity.get_value(),
        )
        if timeout is CalculateTimeout:
            assert velocity > 0, "Mover has zero velocity"
            timeout = abs(new_position - old_position) / velocity + DEFAULT_TIMEOUT
        # Make an Event that will be set on completion, and a Status that will
        # error if not done in time
        done = asyncio.Event()
        done_status = AsyncStatus(asyncio.wait_for(done.wait(), timeout))
        # Wait for the value to set, but don't wait for put completion callback
        await self.setpoint.set(new_position, wait=False)
        async for current_position in observe_value(
            self.readback, done_status=done_status
        ):
            yield WatcherUpdate(
                current=current_position,
                initial=old_position,
                target=new_position,
                name=self.name,
                unit=units,
                precision=precision,
            )
            if np.isclose(current_position, new_position):
                done.set()
                break
        if not self._set_success:
            raise RuntimeError("Motor was stopped")
    async def stop(self, success=True):
        self._set_success = success
        status = self.stop_.trigger()
        await status
The set() method implements bluesky.protocols.Movable. This
creates a coroutine do_set() which gets the old position, units and
precision in parallel, sets the setpoint, then observes the readback value,
informing watchers of the progress. When it gets to the requested value it
completes. This co-routine is wrapped in a timeout handler, and passed to an
AsyncStatus which will start executing it as soon as the Run Engine adds a
callback to it. The stop() method then pokes a PV if the move needs to be
interrupted.
Assembly#
Compound assemblies can be used to group Devices into larger logical Devices:
class SampleStage(Device):
    """A demo sample stage with X and Y movables"""
    def __init__(self, prefix: str, name="") -> None:
        # Define some child Devices
        self.x = Mover(prefix + "X:")
        self.y = Mover(prefix + "Y:")
        # Set name of device and child devices
        super().__init__(name=name)
This applies prefixes on construction:
- SampleStage is passed a prefix like - DEVICE:
- SampleStage.x will append its prefix - X:to get- DEVICE:X:
- SampleStage.x.velocity will append its suffix - Velocityto get- DEVICE:X:Velocity
If SampleStage is further nested in another Device another layer of prefix nesting would occur
Note
SampleStage does not pass any signals into its superclass init. This means
that its read() method will return an empty dictionary. This means you
can rd sample_stage.x, but not rd sample_stage.