Implementing Devices#

In Using Devices we learned how to instantiate some existing ophyd-async Devices. These Devices were ophyd level simulations, so did not talk to any underlying control system. In this tutorial we will instantiate some demo Devices that talk to underlying control system implementations, then explore how the Devices themselves are implemented.

Pick your control system#

Most bluesky users will be interfacing to an underlying control system like EPICS or Tango. The underlying control system will provide functionality like Engineering display screens and historical archiving of control system data. It is possibly to use ophyd-async with multiple control systems, so this tutorial is written with tabbed sections to allow us to only show the information relevant to one particular control system.

To summarize what each control system does:

EPICS is a set of software tools and applications which provide a software infrastructure for use in building distributed control systems to operate devices such as Particle Accelerators, Large Experiments and major Telescopes. Such distributed control systems typically comprise tens or even hundreds of computers, networked together to allow communication between them and to provide control and feedback of the various parts of the device from a central control room.

EPICS uses Client/Server and Publish/Subscribe techniques to communicate between the various computers. Most servers (called Input/Output Controllers or IOCs) perform real-world I/O and local control tasks, and publish this information to clients using robust, EPICS specific network protocols Channel Access and pvAccess. Clients use a process variable (PV) as an identifier to get, put or monitor the value and metadata of a particular control system variable without knowing which server hosts that variable.

EPICS has a flat architecture where any client can request the PV of any server. Sites typically introduce hierarchy by imposing a naming convention on these PVs.

Tango is an Open Source solution for SCADA and DCS. Open Source means you get all the source code under an Open Source free licence (LGPL and GPL). Supervisory Control and Data Acquisition (SCADA) systems are typically industrial type systems using standard hardware. Distributed Control Systems (DCS) are more flexible control systems used in more complex environments. Sardana is a good example of a Tango based Beamline SCADA.

Tango is typically deployed with a central database, which provides a nameserver to lookup which distributed server provides access to the Device. Once located, the Device server can be introspected to see what Attributes of the Device exist.

FastCS is a control system agnostic framework for building Device support in Python that will work for both EPICS and Tango without depending on either. It allows Device support to be written once in a declarative way, then at runtime the control system can be selected. It also adds support for multi-device introspection to both EPICS and Tango, which allows the same ophyd-async Device and the same FastCS Device to use either EPICS or Tango as a transport layer.

FastCS is currently in an early phase of development, being used for PandA and Odin Devices within ophyd-async.

Run the demo#

Ophyd-async ships with some demo devices that do the same thing for each control system, and a script that will create them along with a RunEngine. Let’s run it in an interactive ipython shell:

$ ipython -i -m ophyd_async.epics.demo
Python 3.11.11 (main, Dec  4 2024, 20:38:25) [GCC 12.2.0]
Type 'copyright', 'credits' or 'license' for more information
IPython 8.30.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: 

TODO

TODO

We can now go ahead and run the same grid scan we did in the previous tutorial:

In [1]: RE(bp.grid_scan([pdet], stage.x, 1, 2, 3, stage.y, 2, 3, 3))

Transient Scan ID: 1     Time: 2025-01-14 11:29:05
Persistent Unique Scan ID: '2e0e75d8-33dd-430f-8cd8-6d6ae053c429'
New stream: 'primary'
+-----------+------------+------------+------------+----------------------+----------------------+----------------------+
|   seq_num |       time |    stage-x |    stage-y | pdet-channel-1-value | pdet-channel-2-value | pdet-channel-3-value |
+-----------+------------+------------+------------+----------------------+----------------------+----------------------+
|         1 | 11:29:05.7 |      1.000 |      2.000 |                  921 |                  887 |                  859 |
|         2 | 11:29:06.6 |      1.000 |      2.500 |                  959 |                  926 |                  898 |
|         3 | 11:29:07.5 |      1.000 |      3.000 |                  937 |                  903 |                  875 |
|         4 | 11:29:08.3 |      1.500 |      2.000 |                  976 |                  975 |                  974 |
|         5 | 11:29:09.1 |      1.500 |      2.500 |                  843 |                  843 |                  842 |
|         6 | 11:29:09.8 |      1.500 |      3.000 |                  660 |                  660 |                  659 |
|         7 | 11:29:10.6 |      2.000 |      2.000 |                  761 |                  740 |                  722 |
|         8 | 11:29:11.4 |      2.000 |      2.500 |                  537 |                  516 |                  498 |
|         9 | 11:29:12.2 |      2.000 |      3.000 |                  487 |                  467 |                  448 |
+-----------+------------+------------+------------+----------------------+----------------------+----------------------+
generator grid_scan ['2e0e75d8'] (scan num: 1)

Out[1]: RunEngineResult(run_start_uids=('2e0e75d8-33dd-430f-8cd8-6d6ae053c429',), plan_result='2e0e75d8-33dd-430f-8cd8-6d6ae053c429', exit_status='success', interrupted=False, reason='', exception=None)

See how Devices are instantiated#

Now we will take a look at the demo script and see how it is instantiated. The beginning section with imports is the same as in the first tutorial, but then the control system specific differences appear.

"""Used for tutorial `Implementing Devices`."""

# Import bluesky and ophyd
import bluesky.plan_stubs as bps  # noqa: F401
import bluesky.plans as bp  # noqa: F401
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky.run_engine import RunEngine, autoawait_in_bluesky_event_loop

from ophyd_async.core import init_devices
from ophyd_async.epics import demo, testing

# Create a run engine and make ipython use it for `await` commands
RE = RunEngine(call_returns_result=True)
autoawait_in_bluesky_event_loop()

# Add a callback for plotting
bec = BestEffortCallback()
RE.subscribe(bec)

# Start IOC with demo pvs in subprocess
prefix = testing.generate_random_pv_prefix()
ioc = demo.start_ioc_subprocess(prefix, num_channels=3)

# All Devices created within this block will be
# connected and named at the end of the with block
with init_devices():
    # Create a sample stage with X and Y motors
    stage = demo.DemoStage(f"{prefix}STAGE:")
    # Create a multi channel counter with the same number
    # of counters as the IOC
    pdet = demo.DemoPointDetector(f"{prefix}DET:", num_channels=3)

EPICS PVs are normally broadcast to your entire network subnet. To avoid PV name clashes, we pick a random prefix, then start the demo IOC using this PV prefix. Starting an IOC here is done just for the demo, in production the IOC would already be running before you started bluesky.

We then pass the PV prefix for each Device down using prior knowledge about the PVs that this particular IOC creates. For example, we know that there will be a DemoStage, and all its PVs will start with prefix + "STAGE:".

Note

There is no introspection of PVs in a device in EPICS, if we tell the IOC to make 3 channels on the point detector, we must also tell the ophyd-async device that the point detector has 3 channels.

TODO

TODO

Look at the Device implementations#

The demo creates the following structure of Devices:

        flowchart LR
    DemoPointDetector-- channel ---DeviceVector
    DeviceVector-- 1 ---pdet.1(DemoPointDetectorChannel)
    DeviceVector-- 2 ---pdet.2(DemoPointDetectorChannel)
    DeviceVector-- 3 ---pdet.3(DemoPointDetectorChannel)
    DemoStage-- x ---stage.x(DemoMotor)
    DemoStage-- y ---stage.y(DemoMotor)
    

The DemoStage contains two DemoMotors, called x and y. The DemoPointDetector contains a DeviceVector called channel that contains 3 DemoPointDetectorChannels, called 1, 2 and 3.

We will now inspect the Demo classes in the diagram to see how they talk to the underlying control system.

DemoPointDetectorChannel#

Let’s start with the lowest level sort of Device, a single channel of our point detector. It contains Signals, which are the smallest sort of Device in ophyd-async, with a current value of a given datatype. In this case, there are two:

  • value: the current value of the channel in integer counts

  • mode: a configuration enum which varies the output of the channel

We specify to the Device baseclass that we would like a Signal of a given type (e.g. SignalR[int]) via a type hint, and it will create that signal for us in a control system specific way. The type of value is the python builtin int, and the type of mode is an enum we have declared ourselves, where the string values must exactly match what the control system produces.

See also

SignalDatatypeT defines the list of all possible datatypes you can use for Signals

We can optionally annotate this type hint with some additional information, like Format. This will tell the StandardReadable baseclass which Signals are important in a plan like bp.grid_scan. In this case we specify that mode should be reported as a configuration parameter once at the start of the scan, and value should be fetched without caching and plotted at each point of the scan.

from typing import Annotated as A

from ophyd_async.core import SignalR, SignalRW, StandardReadable, StrictEnum
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import EpicsDevice, PvSuffix


class EnergyMode(StrictEnum):
    """Energy mode for `DemoPointDetectorChannel`."""

    LOW = "Low Energy"
    """Low energy mode"""

    HIGH = "High Energy"
    """High energy mode"""


class DemoPointDetectorChannel(StandardReadable, EpicsDevice):
    """A channel for `DemoPointDetector` with int value based on X and Y Motors."""

    value: A[SignalR[int], PvSuffix("Value"), Format.HINTED_UNCACHED_SIGNAL]
    mode: A[SignalRW[EnergyMode], PvSuffix("Mode"), Format.CONFIG_SIGNAL]

When the Device is instantiated, the EpicsDevice baseclass will look at all the type hints for annotations with a PvSuffix. It will append that to the PV prefix that is passed into the device. In this case if we made a DemoPointDetectorChannel(prefix="PREFIX:"), then value would have PV PREFIX:Value. PvSuffix also allows you to specify different suffixes for the read and write PVs if they are different.

TODO

TODO

DemoPointDetector#

Moving up a level, we have the point detector itself. This also has some Signals to control acquisition which are created in the same way as above:

  • acquire_time: a configuration float saying how long each point should be acquired for

  • start: an executable to start a single acquisition

  • acquiring: a boolean that is True when acquiring

  • reset: an executable to reset the counts on all channels

We also have a DeviceVector called channel with DemoPointDetectorChannel instances within it. These will all contribute their configuration values at the start of scan, and their values at every point in the scan.

Finally, we need to communicate to bluesky that it has to trigger() and acquisition before it can read() from the underlying channels. We do this by implementing the Triggerable protocol. This involves writing a trigger() method with the logic that must be run, calling SignalX.trigger, SignalW.set and SignalR.get_value to manipulate the values of the underlying Signals, returning when complete. This is wrapped in an AsyncStatus, which is used by bluesky to run this operation in the background and know when it is complete.

from typing import Annotated as A

from bluesky.protocols import Triggerable

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    AsyncStatus,
    DeviceVector,
    SignalR,
    SignalRW,
    SignalX,
    StandardReadable,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import EpicsDevice, PvSuffix

from ._point_detector_channel import DemoPointDetectorChannel


class DemoPointDetector(StandardReadable, EpicsDevice, Triggerable):
    """A demo detector that produces a point values based on X and Y motors."""

    acquire_time: A[SignalRW[float], PvSuffix("AcquireTime"), Format.CONFIG_SIGNAL]
    start: A[SignalX, PvSuffix("Start.PROC")]
    acquiring: A[SignalR[bool], PvSuffix("Acquiring")]
    reset: A[SignalX, PvSuffix("Reset.PROC")]

    def __init__(self, prefix: str, num_channels: int = 3, name: str = "") -> None:
        with self.add_children_as_readables():
            self.channel = DeviceVector(
                {
                    i: DemoPointDetectorChannel(f"{prefix}{i}:")
                    for i in range(1, num_channels + 1)
                }
            )
        super().__init__(prefix=prefix, name=name)

    @AsyncStatus.wrap
    async def trigger(self):
        await self.reset.trigger()
        timeout = await self.acquire_time.get_value() + DEFAULT_TIMEOUT
        await self.start.trigger(timeout=timeout)

Although the Signals are declared via type hints, the DeviceVector requires explicit instantiation in an __init__ method. This is because it requires the num_channels to be passed in to the constructor to know how many channels require creation. This means that we also need to do the PV concatenation ourselves, so if the PV prefix for the device as PREFIX: then the first channel would have prefix PREFIX:CHAN1:. We also register them with StandardReadable in a different way, adding them within a StandardReadable.add_children_as_readables context manager which adds all the children created within its body.

TODO

TODO

See also

For more information on when to construct Devices declaratively using type hints, and when to construct them procedurally with an __init__ method, see Declarative vs Procedural Devices

DemoMotor#

Moving onto the motion side, we have DemoMotor. This has a few more signals:

  • readback: the current position of the motor as a float

  • velocity: a configuration parameter for the velocity in units/s

  • units: the string units of the position

  • setpoint: the position the motor has been requested to move to as a float, it returns as soon as it’s been set

  • precision: the number of points after the decimal place of the position that are relevant

  • stop_: an executable to stop the move immediately

At each point in the scan it will report the readback, but we override the set_name() method so that it reports its position as stage.x rather than stage.x.readback.

If we consider how we would use this in a scan, we could bp.scan(stage.x.setpoint, ...) directly, but that would only start the motor moving, not wait for it to complete the move. To do this, we need to implement another protocol: Movable. This requires implementing a set() method (again wrapped in an AsyncStatus) that does the following:

  • Work out where to move to

  • Start the motor moving

  • Optionally report back updates on how far the motor has moved so bluesky can provide a progress bar

  • Wait until the motor is at the target position

Finally, we implement Stoppable which tells bluesky what to do if the user aborts a plan. This requires implementing stop() to execute the stop_ signal and tell set() whether the move should be reported as successful completion, or if it should raise an error.

import asyncio
from typing import Annotated as A

import numpy as np
from bluesky.protocols import Movable, Stoppable

from ophyd_async.core import (
    CALCULATE_TIMEOUT,
    DEFAULT_TIMEOUT,
    CalculatableTimeout,
    SignalR,
    SignalRW,
    SignalX,
    StandardReadable,
    WatchableAsyncStatus,
    WatcherUpdate,
    observe_value,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import EpicsDevice, PvSuffix


class DemoMotor(EpicsDevice, StandardReadable, Movable, Stoppable):
    """A demo movable that moves based on velocity."""

    # Whether set() should complete successfully or not
    _set_success = True
    # Define some signals
    readback: A[SignalR[float], PvSuffix("Readback"), Format.HINTED_SIGNAL]
    velocity: A[SignalRW[float], PvSuffix("Velocity"), Format.CONFIG_SIGNAL]
    units: A[SignalR[str], PvSuffix("Readback.EGU"), Format.CONFIG_SIGNAL]
    setpoint: A[SignalRW[float], PvSuffix("Setpoint")]
    precision: A[SignalR[int], PvSuffix("Readback.PREC")]
    # If a signal name clashes with a bluesky verb add _ to the attribute name
    stop_: A[SignalX, PvSuffix("Stop.PROC")]

    def set_name(self, name: str, *, child_name_separator: str | None = None) -> None:
        super().set_name(name, child_name_separator=child_name_separator)
        # Readback should be named the same as its parent in read()
        self.readback.set_name(name)

    @WatchableAsyncStatus.wrap
    async def set(  # type: ignore
        self, new_position: float, timeout: CalculatableTimeout = CALCULATE_TIMEOUT
    ):
        # The move should complete successfully unless stop(success=False) is called
        self._set_success = True
        # Get some variables for the progress bar reporting
        old_position, units, precision, velocity = await asyncio.gather(
            self.setpoint.get_value(),
            self.units.get_value(),
            self.precision.get_value(),
            self.velocity.get_value(),
        )
        # If not supplied, calculate a suitable timeout for the move
        if timeout == CALCULATE_TIMEOUT:
            timeout = abs(new_position - old_position) / velocity + DEFAULT_TIMEOUT
        # Wait for the value to set, but don't wait for put completion callback
        await self.setpoint.set(new_position, wait=False)
        # Observe the readback Signal, and on each new position...
        async for current_position in observe_value(
            self.readback, done_timeout=timeout
        ):
            # Emit a progress bar update
            yield WatcherUpdate(
                current=current_position,
                initial=old_position,
                target=new_position,
                name=self.name,
                unit=units,
                precision=precision,
            )
            # If we are at the desired position the break
            if np.isclose(current_position, new_position):
                break
        # If we were told to stop and report an error then do so
        if not self._set_success:
            raise RuntimeError("Motor was stopped")

    async def stop(self, success=True):
        self._set_success = success
        await self.stop_.trigger()

TODO

TODO

DemoStage#

Finally we get to the DemoStage, which is responsible for instantiating two DemoMotors. It also inherits from StandardReadable, which allows it to be used in plans that read() devices. It ensures that the output of read() is the same as if you were to read() both the DemoMotors, and merge the result:

from ophyd_async.core import StandardReadable

from ._motor import DemoMotor


class DemoStage(StandardReadable):
    """A simulated sample stage with X and Y movables."""

    def __init__(self, prefix: str, name="") -> None:
        # Define some child Devices
        with self.add_children_as_readables():
            self.x = DemoMotor(prefix + "X:")
            self.y = DemoMotor(prefix + "Y:")
        # Set name of device and child devices
        super().__init__(name=name)

Like DemoPointDetector, the PV concatenation is done explicitly in code, and the children are added within a StandardReadable.add_children_as_readables context manager.

TODO

TODO

Conclusion#

In this tutorial we have seen how to create some Devices that are backed by a Control system implementation. Read on to see how we would write some tests to ensure that they behave correctly.