Implementing Devices#
In Using Devices we learned how to instantiate some existing ophyd-async Devices. These Devices were ophyd level simulations, so did not talk to any underlying control system. In this tutorial we will instantiate some demo Devices that talk to underlying control system implementations, then explore how the Devices themselves are implemented.
Pick your control system#
Most bluesky users will be interfacing to an underlying control system like EPICS or Tango. The underlying control system will provide functionality like Engineering display screens and historical archiving of control system data. It is possibly to use ophyd-async with multiple control systems, so this tutorial is written with tabbed sections to allow us to only show the information relevant to one particular control system.
To summarize what each control system does:
EPICS is a set of software tools and applications which provide a software infrastructure for use in building distributed control systems to operate devices such as Particle Accelerators, Large Experiments and major Telescopes. Such distributed control systems typically comprise tens or even hundreds of computers, networked together to allow communication between them and to provide control and feedback of the various parts of the device from a central control room.
EPICS uses Client/Server and Publish/Subscribe techniques to communicate between the various computers. Most servers (called Input/Output Controllers or IOCs) perform real-world I/O and local control tasks, and publish this information to clients using robust, EPICS specific network protocols Channel Access and pvAccess. Clients use a process variable (PV) as an identifier to get, put or monitor the value and metadata of a particular control system variable without knowing which server hosts that variable.
EPICS has a flat architecture where any client can request the PV of any server. Sites typically introduce hierarchy by imposing a naming convention on these PVs.
Tango is an Open Source solution for SCADA and DCS. Open Source means you get all the source code under an Open Source free licence (LGPL and GPL). Supervisory Control and Data Acquisition (SCADA) systems are typically industrial type systems using standard hardware. Distributed Control Systems (DCS) are more flexible control systems used in more complex environments. Sardana is a good example of a Tango based Beamline SCADA.
Tango is typically deployed with a central database, which provides a nameserver to lookup which distributed server provides access to the Device. Once located, the Device server can be introspected to see what Attributes of the Device exist.
FastCS is a control system agnostic framework for building Device support in Python that will work for both EPICS and Tango without depending on either. It allows Device support to be written once in a declarative way, then at runtime the control system can be selected. It also adds support for multi-device introspection to both EPICS and Tango, which allows the same ophyd-async Device and the same FastCS Device to use either EPICS or Tango as a transport layer.
FastCS is currently in an early phase of development, being used for PandA and Odin Devices within ophyd-async.
Run the demo#
Ophyd-async ships with some demo devices that do the same thing for each control system, and a script that will create them along with a RunEngine. Let’s run it in an interactive ipython shell:
$ ipython -i -m ophyd_async.epics.demo
Python 3.11.11 (main, Dec 4 2024, 20:38:25) [GCC 12.2.0]
Type 'copyright', 'credits' or 'license' for more information
IPython 8.30.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]:
TODO
TODO
We can now go ahead and run the same grid scan we did in the previous tutorial:
In [1]: RE(bp.grid_scan([pdet], stage.x, 1, 2, 3, stage.y, 2, 3, 3))
Transient Scan ID: 1 Time: 2025-01-14 11:29:05
Persistent Unique Scan ID: '2e0e75d8-33dd-430f-8cd8-6d6ae053c429'
New stream: 'primary'
+-----------+------------+------------+------------+----------------------+----------------------+----------------------+
| seq_num | time | stage-x | stage-y | pdet-channel-1-value | pdet-channel-2-value | pdet-channel-3-value |
+-----------+------------+------------+------------+----------------------+----------------------+----------------------+
| 1 | 11:29:05.7 | 1.000 | 2.000 | 921 | 887 | 859 |
| 2 | 11:29:06.6 | 1.000 | 2.500 | 959 | 926 | 898 |
| 3 | 11:29:07.5 | 1.000 | 3.000 | 937 | 903 | 875 |
| 4 | 11:29:08.3 | 1.500 | 2.000 | 976 | 975 | 974 |
| 5 | 11:29:09.1 | 1.500 | 2.500 | 843 | 843 | 842 |
| 6 | 11:29:09.8 | 1.500 | 3.000 | 660 | 660 | 659 |
| 7 | 11:29:10.6 | 2.000 | 2.000 | 761 | 740 | 722 |
| 8 | 11:29:11.4 | 2.000 | 2.500 | 537 | 516 | 498 |
| 9 | 11:29:12.2 | 2.000 | 3.000 | 487 | 467 | 448 |
+-----------+------------+------------+------------+----------------------+----------------------+----------------------+
generator grid_scan ['2e0e75d8'] (scan num: 1)
Out[1]: RunEngineResult(run_start_uids=('2e0e75d8-33dd-430f-8cd8-6d6ae053c429',), plan_result='2e0e75d8-33dd-430f-8cd8-6d6ae053c429', exit_status='success', interrupted=False, reason='', exception=None)
See how Devices are instantiated#
Now we will take a look at the demo script and see how it is instantiated. The beginning section with imports is the same as in the first tutorial, but then the control system specific differences appear.
"""Used for tutorial `Implementing Devices`."""
# Import bluesky and ophyd
import bluesky.plan_stubs as bps # noqa: F401
import bluesky.plans as bp # noqa: F401
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky.run_engine import RunEngine, autoawait_in_bluesky_event_loop
from ophyd_async.core import init_devices
from ophyd_async.epics import demo, testing
# Create a run engine and make ipython use it for `await` commands
RE = RunEngine(call_returns_result=True)
autoawait_in_bluesky_event_loop()
# Add a callback for plotting
bec = BestEffortCallback()
RE.subscribe(bec)
# Start IOC with demo pvs in subprocess
prefix = testing.generate_random_pv_prefix()
ioc = demo.start_ioc_subprocess(prefix, num_channels=3)
# All Devices created within this block will be
# connected and named at the end of the with block
with init_devices():
# Create a sample stage with X and Y motors
stage = demo.DemoStage(f"{prefix}STAGE:")
# Create a multi channel counter with the same number
# of counters as the IOC
pdet = demo.DemoPointDetector(f"{prefix}DET:", num_channels=3)
EPICS PVs are normally broadcast to your entire network subnet. To avoid PV name clashes, we pick a random prefix, then start the demo IOC using this PV prefix. Starting an IOC here is done just for the demo, in production the IOC would already be running before you started bluesky.
We then pass the PV prefix for each Device down using prior knowledge about the PVs that this particular IOC creates. For example, we know that there will be a DemoStage
, and all its PVs will start with prefix + "STAGE:"
.
Note
There is no introspection of PVs in a device in EPICS, if we tell the IOC to make 3 channels on the point detector, we must also tell the ophyd-async device that the point detector has 3 channels.
TODO
TODO
Look at the Device implementations#
The demo creates the following structure of Devices:
flowchart LR DemoPointDetector-- channel ---DeviceVector DeviceVector-- 1 ---pdet.1(DemoPointDetectorChannel) DeviceVector-- 2 ---pdet.2(DemoPointDetectorChannel) DeviceVector-- 3 ---pdet.3(DemoPointDetectorChannel) DemoStage-- x ---stage.x(DemoMotor) DemoStage-- y ---stage.y(DemoMotor)
The DemoStage
contains two DemoMotor
s, called x
and y
. The DemoPointDetector
contains a DeviceVector
called channel
that contains 3 DemoPointDetectorChannel
s, called 1
, 2
and 3
.
We will now inspect the Demo
classes in the diagram to see how they talk to the underlying control system.
DemoPointDetectorChannel
#
Let’s start with the lowest level sort of Device, a single channel of our point detector. It contains Signals, which are the smallest sort of Device in ophyd-async, with a current value of a given datatype. In this case, there are two:
value
: the current value of the channel in integer countsmode
: a configuration enum which varies the output of the channel
We specify to the Device baseclass that we would like a Signal of a given type (e.g. SignalR[int]
) via a type hint, and it will create that signal for us in a control system specific way. The type of value
is the python builtin int
, and the type of mode
is an enum we have declared ourselves, where the string values must exactly match what the control system produces.
See also
SignalDatatypeT
defines the list of all possible datatypes you can use for Signals
We can optionally annotate this type hint with some additional information, like Format
. This will tell the StandardReadable
baseclass which Signals are important in a plan like bp.grid_scan
. In this case we specify that mode
should be reported as a configuration parameter once at the start of the scan, and value
should be fetched without caching and plotted at each point of the scan.
from typing import Annotated as A
from ophyd_async.core import SignalR, SignalRW, StandardReadable, StrictEnum
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import EpicsDevice, PvSuffix
class EnergyMode(StrictEnum):
"""Energy mode for `DemoPointDetectorChannel`."""
LOW = "Low Energy"
"""Low energy mode"""
HIGH = "High Energy"
"""High energy mode"""
class DemoPointDetectorChannel(StandardReadable, EpicsDevice):
"""A channel for `DemoPointDetector` with int value based on X and Y Motors."""
value: A[SignalR[int], PvSuffix("Value"), Format.HINTED_UNCACHED_SIGNAL]
mode: A[SignalRW[EnergyMode], PvSuffix("Mode"), Format.CONFIG_SIGNAL]
When the Device is instantiated, the EpicsDevice
baseclass will look at all the type hints for annotations with a PvSuffix
. It will append that to the PV prefix that is passed into the device. In this case if we made a DemoPointDetectorChannel(prefix="PREFIX:")
, then value
would have PV PREFIX:Value
. PvSuffix
also allows you to specify different suffixes for the read and write PVs if they are different.
TODO
TODO
DemoPointDetector
#
Moving up a level, we have the point detector itself. This also has some Signals to control acquisition which are created in the same way as above:
acquire_time
: a configuration float saying how long each point should be acquired forstart
: an executable to start a single acquisitionacquiring
: a boolean that is True when acquiringreset
: an executable to reset the counts on all channels
We also have a DeviceVector
called channel
with DemoPointDetectorChannel
instances within it. These will all contribute their configuration values at the start of scan, and their values at every point in the scan.
Finally, we need to communicate to bluesky that it has to trigger()
and acquisition before it can read()
from the underlying channels. We do this by implementing the Triggerable
protocol. This involves writing a trigger()
method with the logic that must be run, calling SignalX.trigger
, SignalW.set
and SignalR.get_value
to manipulate the values of the underlying Signals, returning when complete. This is wrapped in an AsyncStatus
, which is used by bluesky to run this operation in the background and know when it is complete.
from typing import Annotated as A
from bluesky.protocols import Triggerable
from ophyd_async.core import (
DEFAULT_TIMEOUT,
AsyncStatus,
DeviceVector,
SignalR,
SignalRW,
SignalX,
StandardReadable,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import EpicsDevice, PvSuffix
from ._point_detector_channel import DemoPointDetectorChannel
class DemoPointDetector(StandardReadable, EpicsDevice, Triggerable):
"""A demo detector that produces a point values based on X and Y motors."""
acquire_time: A[SignalRW[float], PvSuffix("AcquireTime"), Format.CONFIG_SIGNAL]
start: A[SignalX, PvSuffix("Start.PROC")]
acquiring: A[SignalR[bool], PvSuffix("Acquiring")]
reset: A[SignalX, PvSuffix("Reset.PROC")]
def __init__(self, prefix: str, num_channels: int = 3, name: str = "") -> None:
with self.add_children_as_readables():
self.channel = DeviceVector(
{
i: DemoPointDetectorChannel(f"{prefix}{i}:")
for i in range(1, num_channels + 1)
}
)
super().__init__(prefix=prefix, name=name)
@AsyncStatus.wrap
async def trigger(self):
await self.reset.trigger()
timeout = await self.acquire_time.get_value() + DEFAULT_TIMEOUT
await self.start.trigger(timeout=timeout)
Although the Signals are declared via type hints, the DeviceVector requires explicit instantiation in an __init__
method. This is because it requires the num_channels
to be passed in to the constructor to know how many channels require creation. This means that we also need to do the PV concatenation ourselves, so if the PV prefix for the device as PREFIX:
then the first channel would have prefix PREFIX:CHAN1:
. We also register them with StandardReadable
in a different way, adding them within a StandardReadable.add_children_as_readables
context manager which adds all the children created within its body.
TODO
TODO
See also
For more information on when to construct Devices declaratively using type hints, and when to construct them procedurally with an __init__
method, see Declarative vs Procedural Devices
DemoMotor
#
Moving onto the motion side, we have DemoMotor
. This has a few more signals:
readback
: the current position of the motor as a floatvelocity
: a configuration parameter for the velocity in units/sunits
: the string units of the positionsetpoint
: the position the motor has been requested to move to as a float, it returns as soon as it’s been setprecision
: the number of points after the decimal place of the position that are relevantstop_
: an executable to stop the move immediately
At each point in the scan it will report the readback
, but we override the set_name()
method so that it reports its position as stage.x
rather than stage.x.readback
.
If we consider how we would use this in a scan, we could bp.scan(stage.x.setpoint, ...)
directly, but that would only start the motor moving, not wait for it to complete the move. To do this, we need to implement another protocol: Movable
. This requires implementing a set()
method (again wrapped in an AsyncStatus
) that does the following:
Work out where to move to
Start the motor moving
Optionally report back updates on how far the motor has moved so bluesky can provide a progress bar
Wait until the motor is at the target position
Finally, we implement Stoppable
which tells bluesky what to do if the user aborts a plan. This requires implementing stop()
to execute the stop_
signal and tell set()
whether the move should be reported as successful completion, or if it should raise an error.
import asyncio
from typing import Annotated as A
import numpy as np
from bluesky.protocols import Movable, Stoppable
from ophyd_async.core import (
CALCULATE_TIMEOUT,
DEFAULT_TIMEOUT,
CalculatableTimeout,
SignalR,
SignalRW,
SignalX,
StandardReadable,
WatchableAsyncStatus,
WatcherUpdate,
observe_value,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import EpicsDevice, PvSuffix
class DemoMotor(EpicsDevice, StandardReadable, Movable, Stoppable):
"""A demo movable that moves based on velocity."""
# Whether set() should complete successfully or not
_set_success = True
# Define some signals
readback: A[SignalR[float], PvSuffix("Readback"), Format.HINTED_SIGNAL]
velocity: A[SignalRW[float], PvSuffix("Velocity"), Format.CONFIG_SIGNAL]
units: A[SignalR[str], PvSuffix("Readback.EGU"), Format.CONFIG_SIGNAL]
setpoint: A[SignalRW[float], PvSuffix("Setpoint")]
precision: A[SignalR[int], PvSuffix("Readback.PREC")]
# If a signal name clashes with a bluesky verb add _ to the attribute name
stop_: A[SignalX, PvSuffix("Stop.PROC")]
def set_name(self, name: str, *, child_name_separator: str | None = None) -> None:
super().set_name(name, child_name_separator=child_name_separator)
# Readback should be named the same as its parent in read()
self.readback.set_name(name)
@WatchableAsyncStatus.wrap
async def set( # type: ignore
self, new_position: float, timeout: CalculatableTimeout = CALCULATE_TIMEOUT
):
# The move should complete successfully unless stop(success=False) is called
self._set_success = True
# Get some variables for the progress bar reporting
old_position, units, precision, velocity = await asyncio.gather(
self.setpoint.get_value(),
self.units.get_value(),
self.precision.get_value(),
self.velocity.get_value(),
)
# If not supplied, calculate a suitable timeout for the move
if timeout == CALCULATE_TIMEOUT:
timeout = abs(new_position - old_position) / velocity + DEFAULT_TIMEOUT
# Wait for the value to set, but don't wait for put completion callback
await self.setpoint.set(new_position, wait=False)
# Observe the readback Signal, and on each new position...
async for current_position in observe_value(
self.readback, done_timeout=timeout
):
# Emit a progress bar update
yield WatcherUpdate(
current=current_position,
initial=old_position,
target=new_position,
name=self.name,
unit=units,
precision=precision,
)
# If we are at the desired position the break
if np.isclose(current_position, new_position):
break
# If we were told to stop and report an error then do so
if not self._set_success:
raise RuntimeError("Motor was stopped")
async def stop(self, success=True):
self._set_success = success
await self.stop_.trigger()
TODO
TODO
DemoStage
#
Finally we get to the DemoStage
, which is responsible for instantiating two DemoMotor
s. It also inherits from StandardReadable
, which allows it to be used in plans that read()
devices. It ensures that the output of read()
is the same as if you were to read()
both the DemoMotor
s, and merge the result:
from ophyd_async.core import StandardReadable
from ._motor import DemoMotor
class DemoStage(StandardReadable):
"""A simulated sample stage with X and Y movables."""
def __init__(self, prefix: str, name="") -> None:
# Define some child Devices
with self.add_children_as_readables():
self.x = DemoMotor(prefix + "X:")
self.y = DemoMotor(prefix + "Y:")
# Set name of device and child devices
super().__init__(name=name)
Like DemoPointDetector
, the PV concatenation is done explicitly in code, and the children are added within a StandardReadable.add_children_as_readables
context manager.
TODO
TODO
Conclusion#
In this tutorial we have seen how to create some Devices that are backed by a Control system implementation. Read on to see how we would write some tests to ensure that they behave correctly.