Implementing Devices#
In Using Devices we learned how to instantiate some existing ophyd-async Devices. These Devices were ophyd level simulations, so did not talk to any underlying control system. In this tutorial we will instantiate some demo Devices that talk to underlying control system implementations, then explore how the Devices themselves are implemented.
Pick your control system#
Most bluesky users will be interfacing to an underlying control system like EPICS or Tango. The underlying control system will provide functionality like Engineering display screens and historical archiving of control system data. It is possibly to use ophyd-async with multiple control systems, so this tutorial is written with tabbed sections to allow us to only show the information relevant to one particular control system.
To summarize what each control system does:
EPICS is a set of software tools and applications which provide a software infrastructure for use in building distributed control systems to operate devices such as Particle Accelerators, Large Experiments and major Telescopes. Such distributed control systems typically comprise tens or even hundreds of computers, networked together to allow communication between them and to provide control and feedback of the various parts of the device from a central control room.
EPICS uses Client/Server and Publish/Subscribe techniques to communicate between the various computers. Most servers (called Input/Output Controllers or IOCs) perform real-world I/O and local control tasks, and publish this information to clients using robust, EPICS specific network protocols Channel Access and pvAccess. Clients use a process variable (PV) as an identifier to get, put or monitor the value and metadata of a particular control system variable without knowing which server hosts that variable.
EPICS has a flat architecture where any client can request the PV of any server. Sites typically introduce hierarchy by imposing a naming convention on these PVs.
Tango is an Open Source solution for SCADA and DCS. Open Source means you get all the source code under an Open Source free licence (LGPL and GPL). Supervisory Control and Data Acquisition (SCADA) systems are typically industrial type systems using standard hardware. Distributed Control Systems (DCS) are more flexible control systems used in more complex environments. Sardana is a good example of a Tango based Beamline SCADA.
Tango is typically deployed with a central database, which provides a nameserver to lookup which distributed server provides access to the Device. Once located, the Device server can be introspected to see what Attributes of the Device exist.
FastCS is a control system agnostic framework for building Device support in Python that will work for both EPICS and Tango without depending on either. It allows Device support to be written once in a declarative way, then at runtime the control system can be selected. It also adds support for multi-device introspection to both EPICS and Tango, which allows the same ophyd-async Device and the same FastCS Device to use either EPICS or Tango as a transport layer.
FastCS is currently in an early phase of development, being used for PandA and Odin Devices within ophyd-async.
Run the demo#
Ophyd-async ships with some demo devices that do the same thing for each control system, and a script that will create them along with a RunEngine. Let’s run it in an interactive ipython shell:
$ ipython --matplotlib=qt6 -i -m ophyd_async.epics.demo
Python 3.11.11 (main, Dec 4 2024, 20:38:25) [GCC 12.2.0]
Type 'copyright', 'credits' or 'license' for more information
IPython 8.30.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]:
$ ipython --matplotlib=qt6 -i -m ophyd_async.tango.demo
Python 3.12.3 (main, Mar 3 2026, 12:15:18) [GCC 13.3.0]
Type 'copyright', 'credits' or 'license' for more information
IPython 9.9.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]:
TODO
We can now go ahead and run the same grid scan we did in the previous tutorial:
In [1]: RE(bp.grid_scan([pdet], stage.x, 1, 2, 3, stage.y, 2, 3, 3))
Transient Scan ID: 1 Time: 2025-01-14 11:29:05
Persistent Unique Scan ID: '2e0e75d8-33dd-430f-8cd8-6d6ae053c429'
New stream: 'primary'
+-----------+------------+------------+------------+----------------------+----------------------+----------------------+
| seq_num | time | stage-x | stage-y | pdet-channel-1-value | pdet-channel-2-value | pdet-channel-3-value |
+-----------+------------+------------+------------+----------------------+----------------------+----------------------+
| 1 | 11:29:05.7 | 1.000 | 2.000 | 921 | 887 | 859 |
| 2 | 11:29:06.6 | 1.000 | 2.500 | 959 | 926 | 898 |
| 3 | 11:29:07.5 | 1.000 | 3.000 | 937 | 903 | 875 |
| 4 | 11:29:08.3 | 1.500 | 2.000 | 976 | 975 | 974 |
| 5 | 11:29:09.1 | 1.500 | 2.500 | 843 | 843 | 842 |
| 6 | 11:29:09.8 | 1.500 | 3.000 | 660 | 660 | 659 |
| 7 | 11:29:10.6 | 2.000 | 2.000 | 761 | 740 | 722 |
| 8 | 11:29:11.4 | 2.000 | 2.500 | 537 | 516 | 498 |
| 9 | 11:29:12.2 | 2.000 | 3.000 | 487 | 467 | 448 |
+-----------+------------+------------+------------+----------------------+----------------------+----------------------+
generator grid_scan ['2e0e75d8'] (scan num: 1)
Out[1]: RunEngineResult(run_start_uids=('2e0e75d8-33dd-430f-8cd8-6d6ae053c429',), plan_result='2e0e75d8-33dd-430f-8cd8-6d6ae053c429', exit_status='success', interrupted=False, reason='', exception=None)
See how Devices are instantiated#
Now we will take a look at the demo script and see how it is instantiated. The beginning section with imports is the same as in the first tutorial, but then the control system specific differences appear.
"""Used for tutorial `Implementing Devices`."""
# Import bluesky and ophyd
import bluesky.plan_stubs as bps # noqa: F401
import bluesky.plans as bp # noqa: F401
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky.run_engine import RunEngine, autoawait_in_bluesky_event_loop
from ophyd_async.core import init_devices
from ophyd_async.epics import demo, testing
# Create a run engine and make ipython use it for `await` commands
RE = RunEngine(call_returns_result=True)
autoawait_in_bluesky_event_loop()
# Add a callback for plotting
bec = BestEffortCallback()
RE.subscribe(bec)
# Start IOC with demo pvs in subprocess
prefix = testing.generate_random_pv_prefix()
ioc = demo.start_ioc_subprocess(prefix, num_channels=3)
# All Devices created within this block will be
# connected and named at the end of the with block
with init_devices():
# Create a sample stage with X and Y motors
stage = demo.DemoStage(f"{prefix}STAGE:")
# Create a multi channel counter with the same number
# of counters as the IOC
pdet = demo.DemoPointDetector(f"{prefix}DET:", num_channels=3)
EPICS PVs are normally broadcast to your entire network subnet. To avoid PV name clashes, we pick a random prefix, then start the demo IOC using this PV prefix. Starting an IOC here is done just for the demo, in production the IOC would already be running before you started bluesky.
We then pass the PV prefix for each Device down using prior knowledge about the PVs that this particular IOC creates. For example, we know that there will be a DemoStage, and all its PVs will start with prefix + "STAGE:".
Note
There is no introspection of PVs in a device in EPICS, if we tell the IOC to make 3 channels on the point detector, we must also tell the ophyd-async device that the point detector has 3 channels.
"""Used for tutorial `Implementing Devices`."""
# Import bluesky and ophyd
import bluesky.plan_stubs as bps # noqa: F401
import bluesky.plans as bp # noqa: F401
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky.run_engine import RunEngine, autoawait_in_bluesky_event_loop
from ophyd_async.core import init_devices
from ophyd_async.tango import demo
from ophyd_async.tango.testing import generate_random_trl_prefix
# Create a run engine and make ipython use it for `await` commands
RE = RunEngine(call_returns_result=True)
autoawait_in_bluesky_event_loop()
# Add a callback for plotting
bec = BestEffortCallback()
RE.subscribe(bec)
# Start demo DeviceServer in subprocess
prefix = generate_random_trl_prefix()
ds = demo.start_device_server_subprocess(prefix, num_channels=3)
# All Devices created within this block will be
# connected and named at the end of the with block
with init_devices():
# Create a sample stage with X and Y motors
stage = demo.DemoStage(ds.trls[f"{prefix}/X"], ds.trls[f"{prefix}/Y"])
# Create a multi channel counter with the same number
# of counters as the IOC
pdet = demo.DemoPointDetector(
ds.trls[f"{prefix}/DET"],
[
ds.trls[f"{prefix}/C1"],
ds.trls[f"{prefix}/C2"],
ds.trls[f"{prefix}/C3"],
],
)
Here we start the TANGO device server application with a device name prefix and add all of the devices into a MultiDeviceTestContext which allows us to run without an external TANGO database. Starting this device server here is done just for the demo, in production the application would already be running before you started bluesky.
We then pass the same device Tango Resource Locator (TRL) for each Device down using prior knowledge about the attributes that these particular devices will create. For example, we know that there will be a DemoStage, and the device names will start with prefix + "/X" and prefix + "/Y".
Note
There is no introspection of child devices of a device in TANGO, if we tell the device server to make 3 channels on the point detector, we must also tell the ophyd-async device that the point detector has 3 channels.
TODO
Look at the Device implementations#
The demo creates the following structure of Devices:
---
config:
theme: neutral
---
flowchart LR
DemoPointDetector-- channel ---DeviceVector
DeviceVector-- 1 ---pdet.1(DemoPointDetectorChannel)
DeviceVector-- 2 ---pdet.2(DemoPointDetectorChannel)
DeviceVector-- 3 ---pdet.3(DemoPointDetectorChannel)
DemoStage-- x ---stage.x(DemoMotor)
DemoStage-- y ---stage.y(DemoMotor)
The DemoStage contains two DemoMotors, called x and y. The DemoPointDetector contains a DeviceVector called channel that contains 3 DemoPointDetectorChannels, called 1, 2 and 3.
We will now inspect the Demo classes in the diagram to see how they talk to the underlying control system.
DemoPointDetectorChannel#
Let’s start with the lowest level sort of Device, a single channel of our point detector. It contains Signals, which are the smallest sort of Device in ophyd-async, with a current value of a given datatype. In this case, there are two:
value: the current value of the channel in integer countsmode: a configuration enum which varies the output of the channel
We specify to the Device baseclass that we would like a Signal of a given type (e.g. SignalR[int]) via a type hint, and it will create that signal for us in a control system specific way. The type of value is the python builtin int, and the type of mode is an enum we have declared ourselves, where the string values must exactly match what the control system produces.
See also
SignalDatatypeT defines the list of all possible datatypes you can use for Signals
We can optionally annotate this type hint with some additional information, like Format. This will tell the StandardReadable baseclass which Signals are important in a plan like bp.grid_scan. In this case we specify that mode should be reported as a configuration parameter once at the start of the scan, and value should be fetched without caching and plotted at each point of the scan.
from typing import Annotated as A
from ophyd_async.core import SignalR, SignalRW, StandardReadable, StrictEnum
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import EpicsDevice, PvSuffix
class EnergyMode(StrictEnum):
"""Energy mode for `DemoPointDetectorChannel`."""
LOW = "Low Energy"
"""Low energy mode"""
HIGH = "High Energy"
"""High energy mode"""
class DemoPointDetectorChannel(StandardReadable, EpicsDevice):
"""A channel for `DemoPointDetector` with int value based on X and Y Motors."""
value: A[SignalR[int], PvSuffix("Value"), Format.HINTED_UNCACHED_SIGNAL]
mode: A[SignalRW[EnergyMode], PvSuffix("Mode"), Format.CONFIG_SIGNAL]
When the Device is instantiated, the EpicsDevice baseclass will look at all the type hints for annotations with a PvSuffix. It will append that to the PV prefix that is passed into the device. In this case if we made a DemoPointDetectorChannel(prefix="PREFIX:"), then value would have PV PREFIX:Value. PvSuffix also allows you to specify different suffixes for the read and write PVs if they are different.
from typing import Annotated as A
from ophyd_async.core import SignalR, SignalRW, StandardReadable, StrictEnum
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.tango.core import TangoDevice, TangoPolling
class EnergyMode(StrictEnum):
"""Energy mode for `DemoPointDetectorChannel`."""
LOW = "Low Energy"
"""Low energy mode"""
HIGH = "High Energy"
"""High energy mode"""
class DemoPointDetectorChannel(TangoDevice, StandardReadable):
"""A channel for `DemoPointDetector` with int value based on X and Y Motors."""
value: A[SignalR[int], TangoPolling(0.1, 0.1, 0.1), Format.HINTED_UNCACHED_SIGNAL]
mode: A[SignalRW[EnergyMode], TangoPolling(0.1, 0.1, 0.1), Format.CONFIG_SIGNAL]
When the Device is instantiated, the TangoDevice baseclass will look at all the signals and construct a full TRL for each signal from the supplied device TRL and the signal name. In this case if we made a DemoPointDetectorChannel(trl="test/device/C1"), then the signal value would have TRL test/device/C1/value. If the server doesn’t support events, then using the TangoPolling annotation gives the parameters for ophyd to poll instead.
TODO
DemoPointDetector#
Moving up a level, we have the point detector itself. This also has some Signals to control acquisition which are created in the same way as above:
acquire_time: a configuration float saying how long each point should be acquired forstart: aTriggerableCommandto start a single acquisitionacquiring: a boolean that is True when acquiringreset: aTriggerableCommandto reset the counts on all channels
We also have a DeviceVector called channel with DemoPointDetectorChannel instances within it. These will all contribute their configuration values at the start of scan, and their values at every point in the scan.
Finally, we need to communicate to bluesky that it has to trigger() an acquisition before it can read() from the underlying channels. We do this by implementing the Triggerable protocol. This involves writing a trigger() method with the logic that must be run, calling TriggerableCommand.trigger, SignalW.set and SignalR.get_value to manipulate the values of the underlying Signals and Commands, returning when complete. This is wrapped in an AsyncStatus, which is used by bluesky to run this operation in the background and know when it is complete.
from typing import Annotated as A
from bluesky.protocols import Triggerable
from ophyd_async.core import (
DEFAULT_TIMEOUT,
AsyncStatus,
DeviceVector,
SignalR,
SignalRW,
StandardReadable,
TriggerableCommand,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import EpicsDevice, PvSuffix
from ._point_detector_channel import DemoPointDetectorChannel
class DemoPointDetector(StandardReadable, EpicsDevice, Triggerable):
"""A demo detector that produces a point values based on X and Y motors."""
acquire_time: A[SignalRW[float], PvSuffix("AcquireTime"), Format.CONFIG_SIGNAL]
start: A[TriggerableCommand, PvSuffix("Start.PROC")]
acquiring: A[SignalR[bool], PvSuffix("Acquiring")]
reset: A[TriggerableCommand, PvSuffix("Reset.PROC")]
def __init__(self, prefix: str, num_channels: int = 3, name: str = "") -> None:
with self.add_children_as_readables():
self.channel = DeviceVector(
{
i: DemoPointDetectorChannel(f"{prefix}{i}:")
for i in range(1, num_channels + 1)
}
)
super().__init__(prefix=prefix, name=name)
@AsyncStatus.wrap
async def trigger(self):
await self.reset.trigger()
timeout = await self.acquire_time.get_value() + DEFAULT_TIMEOUT
await self.start.trigger(timeout=timeout)
TriggerableCommand is used for start and reset rather than Signals because these are actions to fire on the hardware, not values to read or write. Calling .trigger() executes the action and waits for the hardware to acknowledge it.
Although the Signals are declared via type hints, the DeviceVector requires explicit instantiation in an __init__ method. This is because it requires the num_channels to be passed in to the constructor to know how many channels require creation. This means that we also need to do the PV concatenation ourselves, so if the PV prefix for the device as PREFIX: then the first channel would have prefix PREFIX:CHAN1:. We also register them with StandardReadable in a different way, adding them within a StandardReadable.add_children_as_readables context manager which adds all the children created within its body.
Whilst it is not required for the call to super().__init__ to be after all signals have been created it is more efficient to do so. However, there may be some edge cases where signals need to be created after this e.g. for derived signals that depend on their parent.
from typing import Annotated as A
from bluesky.protocols import Triggerable
from ophyd_async.core import (
DEFAULT_TIMEOUT,
AsyncStatus,
DeviceVector,
SignalR,
SignalRW,
StandardReadable,
TriggerableCommand,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.tango.core import TangoDevice
from ._point_detector_channel import DemoPointDetectorChannel
class DemoPointDetector(TangoDevice, StandardReadable, Triggerable):
"""A demo detector that produces a point values based on X and Y motors."""
acquire_time: A[SignalRW[float], Format.CONFIG_SIGNAL]
start: TriggerableCommand
acquiring: SignalR[bool]
reset: TriggerableCommand
def __init__(self, trl: str, channel_trls: list[str], name: str = "") -> None:
with self.add_children_as_readables():
self.channel = DeviceVector(
{
i + 1: DemoPointDetectorChannel(channel_trl)
for i, channel_trl in enumerate(channel_trls)
}
)
super().__init__(trl, name=name)
@AsyncStatus.wrap
async def trigger(self):
await self.reset.trigger()
timeout = await self.acquire_time.get_value() + DEFAULT_TIMEOUT
await self.start.trigger(timeout=timeout)
Although the Signals are declared via type hints, the DeviceVector requires explicit instantiation in an __init__ method. This is because it requires a list of TRLs to be passed in to the constructor to know how many channels require creation, and how to connect to each channel through the channel’s TRL. We also register them with StandardReadable in a different way, adding them within a StandardReadable.add_children_as_readables context manager which adds all the children created within its body.
Whilst it is not required for the call to super().__init__ to be made after all signals have been created it is more efficient to do so. However, there may be some edge cases where signals need to be created after this e.g. for derived signals that depend on their parent.
TODO
See also
For more information on when to construct Devices declaratively using type hints, and when to construct them procedurally with an __init__ method, see Declarative vs Procedural Devices
DemoMotor#
Moving onto the motion side, we have DemoMotor. This has a few signals:
readback: the current position of the motor as a floatvelocity: a configuration parameter for the velocity in units/ssetpoint: the position the motor has been requested to move to as a float, it returns as soon as it’s been setstop_: aTriggerableCommandto stop the move immediately
DemoMotor inherits from StandardMovable, which provides the full
Movable, Locatable,
Stoppable, and
Subscribable protocol surface, as well as
WatcherUpdate progress-bar machinery. The device-specific parts — how to compute a
timeout from velocity, how to execute a stop, and how to detect move completion — live
in a separate DemoMotorMoveLogic dataclass that subclasses MovableLogic.
At each point in the scan the device reports readback named the same as the device
itself (e.g. stage.x), because StandardMovable.set_name automatically renames
the readback signal to match the device.
See also
When should a device extend movable for how to decide when to use
StandardMovable and how to implement MovableLogic.
from dataclasses import dataclass
from functools import cached_property
from typing import Annotated as A
import numpy as np
from ophyd_async.core import (
DEFAULT_TIMEOUT,
MovableLogic,
SignalR,
SignalRW,
StandardMovable,
StandardReadable,
TriggerableCommand,
set_and_wait_for_other_value,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import EpicsDevice, PvSuffix
@dataclass
class DemoMotorMoveLogic(MovableLogic[float]):
velocity: SignalRW[float]
stop_: TriggerableCommand
async def stop(self):
await self.stop_.trigger()
async def calculate_timeout(
self, old_position: float, new_position: float
) -> float:
velocity = await self.velocity.get_value()
return abs(new_position - old_position) / velocity + DEFAULT_TIMEOUT
async def move(self, new_position: float, timeout: float | None) -> None:
# If we are close to the desired position then break
await set_and_wait_for_other_value(
self.setpoint,
new_position,
self.readback,
lambda v: bool(np.isclose(v, new_position)),
timeout=timeout,
)
class DemoMotor(EpicsDevice, StandardReadable, StandardMovable):
"""A demo movable that moves based on velocity."""
# Define some signals
readback: A[SignalR[float], PvSuffix("Readback"), Format.HINTED_SIGNAL]
velocity: A[SignalRW[float], PvSuffix("Velocity"), Format.CONFIG_SIGNAL]
setpoint: A[SignalRW[float], PvSuffix("Setpoint")]
# If a signal name clashes with a bluesky verb add _ to the attribute name
stop_: A[TriggerableCommand, PvSuffix("Stop.PROC")]
@cached_property
def movable_logic(self) -> MovableLogic:
return DemoMotorMoveLogic(
readback=self.readback,
setpoint=self.setpoint,
velocity=self.velocity,
stop_=self.stop_,
)
from dataclasses import dataclass
from functools import cached_property
from typing import Annotated as A
from ophyd_async.core import (
DEFAULT_TIMEOUT,
MovableLogic,
SignalR,
SignalRW,
StandardMovable,
StandardReadable,
TriggerableCommand,
wait_for_value,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.tango.core import DevStateEnum, TangoDevice, TangoPolling
@dataclass
class DemoMotorMoveLogic(MovableLogic[float]):
velocity: SignalRW[float]
stop_: TriggerableCommand
state: SignalR[DevStateEnum]
async def stop(self):
await self.stop_.trigger()
async def calculate_timeout(
self, old_position: float, new_position: float
) -> float:
velocity = await self.velocity.get_value()
return abs(new_position - old_position) / velocity + DEFAULT_TIMEOUT
async def move(self, new_position: float, timeout: float | None) -> None:
# Write the setpoint and wait for the motor state to return to ON,
# which happens whether the move completes normally or is stopped.
await self.setpoint.set(new_position, timeout=timeout)
await wait_for_value(self.state, DevStateEnum.ON, timeout=timeout)
class DemoMotor(TangoDevice, StandardReadable, StandardMovable):
"""A demo movable that moves based on velocity."""
# If the server doesn't support events, the TangoPolling annotation gives
# the parameters for ophyd to poll instead
readback: A[SignalR[float], TangoPolling(0.1, 0.001, 0.001), Format.HINTED_SIGNAL]
velocity: A[SignalRW[float], TangoPolling(0.1, 0.001, 0.001), Format.CONFIG_SIGNAL]
setpoint: A[SignalRW[float], TangoPolling(0.1, 0.001, 0.001)]
state: A[SignalR[DevStateEnum], TangoPolling(0.1)]
# If a tango name clashes with a bluesky verb, add a trailing underscore
stop_: TriggerableCommand
@cached_property
def movable_logic(self) -> MovableLogic:
return DemoMotorMoveLogic(
readback=self.readback,
setpoint=self.setpoint,
velocity=self.velocity,
stop_=self.stop_,
state=self.state,
)
TODO
DemoStage#
Finally we get to the DemoStage, which is responsible for instantiating two DemoMotors. It also inherits from StandardReadable, which allows it to be used in plans that read() devices. It ensures that the output of read() is the same as if you were to read() both the DemoMotors, and merge the result:
from ophyd_async.core import StandardReadable
from ._motor import DemoMotor
class DemoStage(StandardReadable):
"""A simulated sample stage with X and Y movables."""
def __init__(self, prefix: str, name="") -> None:
# Define some child Devices
with self.add_children_as_readables():
self.x = DemoMotor(prefix + "X:")
self.y = DemoMotor(prefix + "Y:")
# Set name of device and child devices
super().__init__(name=name)
Like DemoPointDetector, the PV concatenation is done explicitly in code, and the children are added within a StandardReadable.add_children_as_readables context manager.
from ophyd_async.core import StandardReadable
from ._motor import DemoMotor
class DemoStage(StandardReadable):
"""A simulated sample stage with X and Y movables."""
def __init__(self, x_trl: str, y_trl: str, name="") -> None:
# Define some child Devices
with self.add_children_as_readables():
self.x = DemoMotor(x_trl)
self.y = DemoMotor(y_trl)
# Set name of device and child devices
super().__init__(name=name)
Like DemoPointDetector, the TRLs are specified explicitly in code and passed into the constructor, and the children are added within a StandardReadable.add_children_as_readables context manager.
TODO
Conclusion#
In this tutorial we have seen how to create some Devices that are backed by a Control system implementation. Read on to see how we would write some tests to ensure that they behave correctly.