Ophyd async is included on a provisional basis until the v1.0 release and may change API on minor release numbers before then

Make a Simple Device#

To make a simple device, you need to subclass from the StandardReadable class, create some Signal instances, and optionally implement other suitable Bluesky Protocols like Movable.

The rest of this guide will show examples from src/ophyd_async/epics/demo/


For a simple Readable object like a Sensor, you need to define some signals, then tell the superclass which signals should contribute to read() and read_configuration():

class Sensor(StandardReadable):
    """A demo sensor that produces a scalar value based on X and Y Movers"""

    def __init__(self, prefix: str, name="") -> None:
        # Define some signals
        self.value = epics_signal_r(float, prefix + "Value")
        self.mode = epics_signal_rw(EnergyMode, prefix + "Mode")
        # Set name and signals for read() and read_configuration()

First some Signals are constructed and stored on the Device. Each one is passed its Python type, which could be:

The rest of the arguments are PV connection information, in this case the PV suffix.

Finally super().__init__() is called with:

  • Possibly empty Device name: will also dash-prefix its child Device names is set

  • Optional primary signal: a Signal that should be renamed to take the name of the Device and output at read()

  • read signals: Signals that should be output to read() without renaming

  • config signals: Signals that should be output to read_configuration() without renaming

All signals passed into this init method will be monitored between stage() and unstage() and their cached values returned on read() and read_configuration() for perfomance.


For a more complicated device like a Mover, you can still use StandardReadable and implement some addition protocols:

class Mover(StandardReadable, Movable, Stoppable):
    """A demo movable that moves based on velocity"""

    def __init__(self, prefix: str, name="") -> None:
        # Define some signals
        self.setpoint = epics_signal_rw(float, prefix + "Setpoint")
        self.readback = epics_signal_r(float, prefix + "Readback")
        self.velocity = epics_signal_rw(float, prefix + "Velocity")
        self.units = epics_signal_r(str, prefix + "Readback.EGU")
        self.precision = epics_signal_r(int, prefix + "Readback.PREC")
        # Signals that collide with standard methods should have a trailing underscore
        self.stop_ = epics_signal_x(prefix + "Stop.PROC")
        # Whether set() should complete successfully or not
        self._set_success = True
        # Set name and signals for read() and read_configuration()
            config=[self.velocity, self.units],

    def set_name(self, name: str):
        # Readback should be named the same as its parent in read()

    async def _move(self, new_position: float, watchers: List[Callable] = []):
        self._set_success = True
        # time.monotonic won't go backwards in case of NTP corrections
        start = time.monotonic()
        old_position, units, precision = await asyncio.gather(
        # Wait for the value to set, but don't wait for put completion callback
        await self.setpoint.set(new_position, wait=False)
        async for current_position in observe_value(self.readback):
            for watcher in watchers:
                    time_elapsed=time.monotonic() - start,
            if np.isclose(current_position, new_position):
        if not self._set_success:
            raise RuntimeError("Motor was stopped")

    def move(self, new_position: float, timeout: Optional[float] = None):
        """Commandline only synchronous move of a Motor"""
        from bluesky.run_engine import call_in_bluesky_event_loop, in_bluesky_event_loop

        if in_bluesky_event_loop():
            raise RuntimeError("Will deadlock run engine if run in a plan")
        call_in_bluesky_event_loop(self._move(new_position), timeout)  # type: ignore

    # TODO: this fails if we call from the cli, but works if we "ipython await" it
    def set(self, new_position: float, timeout: Optional[float] = None) -> AsyncStatus:
        watchers: List[Callable] = []
        coro = asyncio.wait_for(self._move(new_position, watchers), timeout=timeout)
        return AsyncStatus(coro, watchers)

    async def stop(self, success=True):
        self._set_success = success
        status = self.stop_.trigger()
        await status

The set() method implements Movable. This creates a coroutine do_set() which gets the old position, units and precision in parallel, sets the setpoint, then observes the readback value, informing watchers of the progress. When it gets to the requested value it completes. This co-routine is wrapped in a timeout handler, and passed to an AsyncStatus which will start executing it as soon as the Run Engine adds a callback to it. The stop() method then pokes a PV if the move needs to be interrupted.