Implementing Devices#

In Using Devices we learned how to instantiate some existing ophyd-async Devices. These Devices were ophyd level simulations, so did not talk to any underlying control system. In this tutorial we will instantiate some demo Devices that talk to underlying control system implementations, then explore how the Devices themselves are implemented.

Pick your control system#

Most bluesky users will be interfacing to an underlying control system like EPICS or Tango. The underlying control system will provide functionality like Engineering display screens and historical archiving of control system data. It is possibly to use ophyd-async with multiple control systems, so this tutorial is written with tabbed sections to allow us to only show the information relevant to one particular control system.

To summarize what each control system does:

EPICS is a set of software tools and applications which provide a software infrastructure for use in building distributed control systems to operate devices such as Particle Accelerators, Large Experiments and major Telescopes. Such distributed control systems typically comprise tens or even hundreds of computers, networked together to allow communication between them and to provide control and feedback of the various parts of the device from a central control room.

EPICS uses Client/Server and Publish/Subscribe techniques to communicate between the various computers. Most servers (called Input/Output Controllers or IOCs) perform real-world I/O and local control tasks, and publish this information to clients using robust, EPICS specific network protocols Channel Access and pvAccess. Clients use a process variable (PV) as an identifier to get, put or monitor the value and metadata of a particular control system variable without knowing which server hosts that variable.

EPICS has a flat architecture where any client can request the PV of any server. Sites typically introduce hierarchy by imposing a naming convention on these PVs.

Tango is an Open Source solution for SCADA and DCS. Open Source means you get all the source code under an Open Source free licence (LGPL and GPL). Supervisory Control and Data Acquisition (SCADA) systems are typically industrial type systems using standard hardware. Distributed Control Systems (DCS) are more flexible control systems used in more complex environments. Sardana is a good example of a Tango based Beamline SCADA.

Tango is typically deployed with a central database, which provides a nameserver to lookup which distributed server provides access to the Device. Once located, the Device server can be introspected to see what Attributes of the Device exist.

FastCS is a control system agnostic framework for building Device support in Python that will work for both EPICS and Tango without depending on either. It allows Device support to be written once in a declarative way, then at runtime the control system can be selected. It also adds support for multi-device introspection to both EPICS and Tango, which allows the same ophyd-async Device and the same FastCS Device to use either EPICS or Tango as a transport layer.

FastCS is currently in an early phase of development, being used for PandA and Odin Devices within ophyd-async.

Run the demo#

Ophyd-async ships with some demo devices that do the same thing for each control system, and a script that will create them along with a RunEngine. Let’s run it in an interactive ipython shell:

$ ipython --matplotlib=qt6 -i -m ophyd_async.epics.demo
Python 3.11.11 (main, Dec  4 2024, 20:38:25) [GCC 12.2.0]
Type 'copyright', 'credits' or 'license' for more information
IPython 8.30.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: 
$ ipython --matplotlib=qt6 -i -m ophyd_async.tango.demo
Python 3.12.3 (main, Mar  3 2026, 12:15:18) [GCC 13.3.0]
Type 'copyright', 'credits' or 'license' for more information
IPython 9.9.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: 

TODO

We can now go ahead and run the same grid scan we did in the previous tutorial:

In [1]: RE(bp.grid_scan([pdet], stage.x, 1, 2, 3, stage.y, 2, 3, 3))

Transient Scan ID: 1     Time: 2025-01-14 11:29:05
Persistent Unique Scan ID: '2e0e75d8-33dd-430f-8cd8-6d6ae053c429'
New stream: 'primary'
+-----------+------------+------------+------------+----------------------+----------------------+----------------------+
|   seq_num |       time |    stage-x |    stage-y | pdet-channel-1-value | pdet-channel-2-value | pdet-channel-3-value |
+-----------+------------+------------+------------+----------------------+----------------------+----------------------+
|         1 | 11:29:05.7 |      1.000 |      2.000 |                  921 |                  887 |                  859 |
|         2 | 11:29:06.6 |      1.000 |      2.500 |                  959 |                  926 |                  898 |
|         3 | 11:29:07.5 |      1.000 |      3.000 |                  937 |                  903 |                  875 |
|         4 | 11:29:08.3 |      1.500 |      2.000 |                  976 |                  975 |                  974 |
|         5 | 11:29:09.1 |      1.500 |      2.500 |                  843 |                  843 |                  842 |
|         6 | 11:29:09.8 |      1.500 |      3.000 |                  660 |                  660 |                  659 |
|         7 | 11:29:10.6 |      2.000 |      2.000 |                  761 |                  740 |                  722 |
|         8 | 11:29:11.4 |      2.000 |      2.500 |                  537 |                  516 |                  498 |
|         9 | 11:29:12.2 |      2.000 |      3.000 |                  487 |                  467 |                  448 |
+-----------+------------+------------+------------+----------------------+----------------------+----------------------+
generator grid_scan ['2e0e75d8'] (scan num: 1)

Out[1]: RunEngineResult(run_start_uids=('2e0e75d8-33dd-430f-8cd8-6d6ae053c429',), plan_result='2e0e75d8-33dd-430f-8cd8-6d6ae053c429', exit_status='success', interrupted=False, reason='', exception=None)

See how Devices are instantiated#

Now we will take a look at the demo script and see how it is instantiated. The beginning section with imports is the same as in the first tutorial, but then the control system specific differences appear.

"""Used for tutorial `Implementing Devices`."""

# Import bluesky and ophyd
import bluesky.plan_stubs as bps  # noqa: F401
import bluesky.plans as bp  # noqa: F401
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky.run_engine import RunEngine, autoawait_in_bluesky_event_loop

from ophyd_async.core import init_devices
from ophyd_async.epics import demo, testing

# Create a run engine and make ipython use it for `await` commands
RE = RunEngine(call_returns_result=True)
autoawait_in_bluesky_event_loop()

# Add a callback for plotting
bec = BestEffortCallback()
RE.subscribe(bec)

# Start IOC with demo pvs in subprocess
prefix = testing.generate_random_pv_prefix()
ioc = demo.start_ioc_subprocess(prefix, num_channels=3)

# All Devices created within this block will be
# connected and named at the end of the with block
with init_devices():
    # Create a sample stage with X and Y motors
    stage = demo.DemoStage(f"{prefix}STAGE:")
    # Create a multi channel counter with the same number
    # of counters as the IOC
    pdet = demo.DemoPointDetector(f"{prefix}DET:", num_channels=3)

EPICS PVs are normally broadcast to your entire network subnet. To avoid PV name clashes, we pick a random prefix, then start the demo IOC using this PV prefix. Starting an IOC here is done just for the demo, in production the IOC would already be running before you started bluesky.

We then pass the PV prefix for each Device down using prior knowledge about the PVs that this particular IOC creates. For example, we know that there will be a DemoStage, and all its PVs will start with prefix + "STAGE:".

Note

There is no introspection of PVs in a device in EPICS, if we tell the IOC to make 3 channels on the point detector, we must also tell the ophyd-async device that the point detector has 3 channels.

"""Used for tutorial `Implementing Devices`."""

# Import bluesky and ophyd
import bluesky.plan_stubs as bps  # noqa: F401
import bluesky.plans as bp  # noqa: F401
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky.run_engine import RunEngine, autoawait_in_bluesky_event_loop

from ophyd_async.core import init_devices
from ophyd_async.tango import demo
from ophyd_async.tango.testing import generate_random_trl_prefix

# Create a run engine and make ipython use it for `await` commands
RE = RunEngine(call_returns_result=True)
autoawait_in_bluesky_event_loop()

# Add a callback for plotting
bec = BestEffortCallback()
RE.subscribe(bec)

# Start demo DeviceServer in subprocess
prefix = generate_random_trl_prefix()
ds = demo.start_device_server_subprocess(prefix, num_channels=3)

# All Devices created within this block will be
# connected and named at the end of the with block
with init_devices():
    # Create a sample stage with X and Y motors
    stage = demo.DemoStage(ds.trls[f"{prefix}/X"], ds.trls[f"{prefix}/Y"])
    # Create a multi channel counter with the same number
    # of counters as the IOC
    pdet = demo.DemoPointDetector(
        ds.trls[f"{prefix}/DET"],
        [
            ds.trls[f"{prefix}/C1"],
            ds.trls[f"{prefix}/C2"],
            ds.trls[f"{prefix}/C3"],
        ],
    )

Here we start the TANGO device server application with a device name prefix and add all of the devices into a MultiDeviceTestContext which allows us to run without an external TANGO database. Starting this device server here is done just for the demo, in production the application would already be running before you started bluesky.

We then pass the same device Tango Resource Locator (TRL) for each Device down using prior knowledge about the attributes that these particular devices will create. For example, we know that there will be a DemoStage, and the device names will start with prefix + "/X" and prefix + "/Y".

Note

There is no introspection of child devices of a device in TANGO, if we tell the device server to make 3 channels on the point detector, we must also tell the ophyd-async device that the point detector has 3 channels.

TODO

Look at the Device implementations#

The demo creates the following structure of Devices:

        ---
config:
  theme: neutral

---
flowchart LR
    DemoPointDetector-- channel ---DeviceVector
    DeviceVector-- 1 ---pdet.1(DemoPointDetectorChannel)
    DeviceVector-- 2 ---pdet.2(DemoPointDetectorChannel)
    DeviceVector-- 3 ---pdet.3(DemoPointDetectorChannel)
    DemoStage-- x ---stage.x(DemoMotor)
    DemoStage-- y ---stage.y(DemoMotor)
    

The DemoStage contains two DemoMotors, called x and y. The DemoPointDetector contains a DeviceVector called channel that contains 3 DemoPointDetectorChannels, called 1, 2 and 3.

We will now inspect the Demo classes in the diagram to see how they talk to the underlying control system.

DemoPointDetectorChannel#

Let’s start with the lowest level sort of Device, a single channel of our point detector. It contains Signals, which are the smallest sort of Device in ophyd-async, with a current value of a given datatype. In this case, there are two:

  • value: the current value of the channel in integer counts

  • mode: a configuration enum which varies the output of the channel

We specify to the Device baseclass that we would like a Signal of a given type (e.g. SignalR[int]) via a type hint, and it will create that signal for us in a control system specific way. The type of value is the python builtin int, and the type of mode is an enum we have declared ourselves, where the string values must exactly match what the control system produces.

See also

SignalDatatypeT defines the list of all possible datatypes you can use for Signals

We can optionally annotate this type hint with some additional information, like Format. This will tell the StandardReadable baseclass which Signals are important in a plan like bp.grid_scan. In this case we specify that mode should be reported as a configuration parameter once at the start of the scan, and value should be fetched without caching and plotted at each point of the scan.

from typing import Annotated as A

from ophyd_async.core import SignalR, SignalRW, StandardReadable, StrictEnum
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import EpicsDevice, PvSuffix


class EnergyMode(StrictEnum):
    """Energy mode for `DemoPointDetectorChannel`."""

    LOW = "Low Energy"
    """Low energy mode"""

    HIGH = "High Energy"
    """High energy mode"""


class DemoPointDetectorChannel(StandardReadable, EpicsDevice):
    """A channel for `DemoPointDetector` with int value based on X and Y Motors."""

    value: A[SignalR[int], PvSuffix("Value"), Format.HINTED_UNCACHED_SIGNAL]
    mode: A[SignalRW[EnergyMode], PvSuffix("Mode"), Format.CONFIG_SIGNAL]

When the Device is instantiated, the EpicsDevice baseclass will look at all the type hints for annotations with a PvSuffix. It will append that to the PV prefix that is passed into the device. In this case if we made a DemoPointDetectorChannel(prefix="PREFIX:"), then value would have PV PREFIX:Value. PvSuffix also allows you to specify different suffixes for the read and write PVs if they are different.

from typing import Annotated as A

from ophyd_async.core import SignalR, SignalRW, StandardReadable, StrictEnum
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.tango.core import TangoDevice, TangoPolling


class EnergyMode(StrictEnum):
    """Energy mode for `DemoPointDetectorChannel`."""

    LOW = "Low Energy"
    """Low energy mode"""

    HIGH = "High Energy"
    """High energy mode"""


class DemoPointDetectorChannel(TangoDevice, StandardReadable):
    """A channel for `DemoPointDetector` with int value based on X and Y Motors."""

    value: A[SignalR[int], TangoPolling(0.1, 0.1, 0.1), Format.HINTED_UNCACHED_SIGNAL]
    mode: A[SignalRW[EnergyMode], TangoPolling(0.1, 0.1, 0.1), Format.CONFIG_SIGNAL]

When the Device is instantiated, the TangoDevice baseclass will look at all the signals and construct a full TRL for each signal from the supplied device TRL and the signal name. In this case if we made a DemoPointDetectorChannel(trl="test/device/C1"), then the signal value would have TRL test/device/C1/value. If the server doesn’t support events, then using the TangoPolling annotation gives the parameters for ophyd to poll instead.

TODO

DemoPointDetector#

Moving up a level, we have the point detector itself. This also has some Signals to control acquisition which are created in the same way as above:

  • acquire_time: a configuration float saying how long each point should be acquired for

  • start: a TriggerableCommand to start a single acquisition

  • acquiring: a boolean that is True when acquiring

  • reset: a TriggerableCommand to reset the counts on all channels

We also have a DeviceVector called channel with DemoPointDetectorChannel instances within it. These will all contribute their configuration values at the start of scan, and their values at every point in the scan.

Finally, we need to communicate to bluesky that it has to trigger() an acquisition before it can read() from the underlying channels. We do this by implementing the Triggerable protocol. This involves writing a trigger() method with the logic that must be run, calling TriggerableCommand.trigger, SignalW.set and SignalR.get_value to manipulate the values of the underlying Signals and Commands, returning when complete. This is wrapped in an AsyncStatus, which is used by bluesky to run this operation in the background and know when it is complete.

from typing import Annotated as A

from bluesky.protocols import Triggerable

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    AsyncStatus,
    DeviceVector,
    SignalR,
    SignalRW,
    StandardReadable,
    TriggerableCommand,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import EpicsDevice, PvSuffix

from ._point_detector_channel import DemoPointDetectorChannel


class DemoPointDetector(StandardReadable, EpicsDevice, Triggerable):
    """A demo detector that produces a point values based on X and Y motors."""

    acquire_time: A[SignalRW[float], PvSuffix("AcquireTime"), Format.CONFIG_SIGNAL]
    start: A[TriggerableCommand, PvSuffix("Start.PROC")]
    acquiring: A[SignalR[bool], PvSuffix("Acquiring")]
    reset: A[TriggerableCommand, PvSuffix("Reset.PROC")]

    def __init__(self, prefix: str, num_channels: int = 3, name: str = "") -> None:
        with self.add_children_as_readables():
            self.channel = DeviceVector(
                {
                    i: DemoPointDetectorChannel(f"{prefix}{i}:")
                    for i in range(1, num_channels + 1)
                }
            )
        super().__init__(prefix=prefix, name=name)

    @AsyncStatus.wrap
    async def trigger(self):
        await self.reset.trigger()
        timeout = await self.acquire_time.get_value() + DEFAULT_TIMEOUT
        await self.start.trigger(timeout=timeout)

TriggerableCommand is used for start and reset rather than Signals because these are actions to fire on the hardware, not values to read or write. Calling .trigger() executes the action and waits for the hardware to acknowledge it.

Although the Signals are declared via type hints, the DeviceVector requires explicit instantiation in an __init__ method. This is because it requires the num_channels to be passed in to the constructor to know how many channels require creation. This means that we also need to do the PV concatenation ourselves, so if the PV prefix for the device as PREFIX: then the first channel would have prefix PREFIX:CHAN1:. We also register them with StandardReadable in a different way, adding them within a StandardReadable.add_children_as_readables context manager which adds all the children created within its body.

Whilst it is not required for the call to super().__init__ to be after all signals have been created it is more efficient to do so. However, there may be some edge cases where signals need to be created after this e.g. for derived signals that depend on their parent.

from typing import Annotated as A

from bluesky.protocols import Triggerable

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    AsyncStatus,
    DeviceVector,
    SignalR,
    SignalRW,
    StandardReadable,
    TriggerableCommand,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.tango.core import TangoDevice

from ._point_detector_channel import DemoPointDetectorChannel


class DemoPointDetector(TangoDevice, StandardReadable, Triggerable):
    """A demo detector that produces a point values based on X and Y motors."""

    acquire_time: A[SignalRW[float], Format.CONFIG_SIGNAL]
    start: TriggerableCommand
    acquiring: SignalR[bool]
    reset: TriggerableCommand

    def __init__(self, trl: str, channel_trls: list[str], name: str = "") -> None:
        with self.add_children_as_readables():
            self.channel = DeviceVector(
                {
                    i + 1: DemoPointDetectorChannel(channel_trl)
                    for i, channel_trl in enumerate(channel_trls)
                }
            )
        super().__init__(trl, name=name)

    @AsyncStatus.wrap
    async def trigger(self):
        await self.reset.trigger()
        timeout = await self.acquire_time.get_value() + DEFAULT_TIMEOUT
        await self.start.trigger(timeout=timeout)

Although the Signals are declared via type hints, the DeviceVector requires explicit instantiation in an __init__ method. This is because it requires a list of TRLs to be passed in to the constructor to know how many channels require creation, and how to connect to each channel through the channel’s TRL. We also register them with StandardReadable in a different way, adding them within a StandardReadable.add_children_as_readables context manager which adds all the children created within its body.

Whilst it is not required for the call to super().__init__ to be made after all signals have been created it is more efficient to do so. However, there may be some edge cases where signals need to be created after this e.g. for derived signals that depend on their parent.

TODO

See also

For more information on when to construct Devices declaratively using type hints, and when to construct them procedurally with an __init__ method, see Declarative vs Procedural Devices

DemoMotor#

Moving onto the motion side, we have DemoMotor. This has a few signals:

  • readback: the current position of the motor as a float

  • velocity: a configuration parameter for the velocity in units/s

  • setpoint: the position the motor has been requested to move to as a float, it returns as soon as it’s been set

  • stop_: a TriggerableCommand to stop the move immediately

DemoMotor inherits from StandardMovable, which provides the full Movable, Locatable, Stoppable, and Subscribable protocol surface, as well as WatcherUpdate progress-bar machinery. The device-specific parts — how to compute a timeout from velocity, how to execute a stop, and how to detect move completion — live in a separate DemoMotorMoveLogic dataclass that subclasses MovableLogic.

At each point in the scan the device reports readback named the same as the device itself (e.g. stage.x), because StandardMovable.set_name automatically renames the readback signal to match the device.

See also

When should a device extend movable for how to decide when to use StandardMovable and how to implement MovableLogic.

from dataclasses import dataclass
from functools import cached_property
from typing import Annotated as A

import numpy as np

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    MovableLogic,
    SignalR,
    SignalRW,
    StandardMovable,
    StandardReadable,
    TriggerableCommand,
    set_and_wait_for_other_value,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import EpicsDevice, PvSuffix


@dataclass
class DemoMotorMoveLogic(MovableLogic[float]):
    velocity: SignalRW[float]
    stop_: TriggerableCommand

    async def stop(self):
        await self.stop_.trigger()

    async def calculate_timeout(
        self, old_position: float, new_position: float
    ) -> float:
        velocity = await self.velocity.get_value()
        return abs(new_position - old_position) / velocity + DEFAULT_TIMEOUT

    async def move(self, new_position: float, timeout: float | None) -> None:
        # If we are close to the desired position then break
        await set_and_wait_for_other_value(
            self.setpoint,
            new_position,
            self.readback,
            lambda v: bool(np.isclose(v, new_position)),
            timeout=timeout,
        )


class DemoMotor(EpicsDevice, StandardReadable, StandardMovable):
    """A demo movable that moves based on velocity."""

    # Define some signals
    readback: A[SignalR[float], PvSuffix("Readback"), Format.HINTED_SIGNAL]
    velocity: A[SignalRW[float], PvSuffix("Velocity"), Format.CONFIG_SIGNAL]
    setpoint: A[SignalRW[float], PvSuffix("Setpoint")]
    # If a signal name clashes with a bluesky verb add _ to the attribute name
    stop_: A[TriggerableCommand, PvSuffix("Stop.PROC")]

    @cached_property
    def movable_logic(self) -> MovableLogic:
        return DemoMotorMoveLogic(
            readback=self.readback,
            setpoint=self.setpoint,
            velocity=self.velocity,
            stop_=self.stop_,
        )
from dataclasses import dataclass
from functools import cached_property
from typing import Annotated as A

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    MovableLogic,
    SignalR,
    SignalRW,
    StandardMovable,
    StandardReadable,
    TriggerableCommand,
    wait_for_value,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.tango.core import DevStateEnum, TangoDevice, TangoPolling


@dataclass
class DemoMotorMoveLogic(MovableLogic[float]):
    velocity: SignalRW[float]
    stop_: TriggerableCommand
    state: SignalR[DevStateEnum]

    async def stop(self):
        await self.stop_.trigger()

    async def calculate_timeout(
        self, old_position: float, new_position: float
    ) -> float:
        velocity = await self.velocity.get_value()
        return abs(new_position - old_position) / velocity + DEFAULT_TIMEOUT

    async def move(self, new_position: float, timeout: float | None) -> None:
        # Write the setpoint and wait for the motor state to return to ON,
        # which happens whether the move completes normally or is stopped.
        await self.setpoint.set(new_position, timeout=timeout)
        await wait_for_value(self.state, DevStateEnum.ON, timeout=timeout)


class DemoMotor(TangoDevice, StandardReadable, StandardMovable):
    """A demo movable that moves based on velocity."""

    # If the server doesn't support events, the TangoPolling annotation gives
    # the parameters for ophyd to poll instead
    readback: A[SignalR[float], TangoPolling(0.1, 0.001, 0.001), Format.HINTED_SIGNAL]
    velocity: A[SignalRW[float], TangoPolling(0.1, 0.001, 0.001), Format.CONFIG_SIGNAL]
    setpoint: A[SignalRW[float], TangoPolling(0.1, 0.001, 0.001)]
    state: A[SignalR[DevStateEnum], TangoPolling(0.1)]
    # If a tango name clashes with a bluesky verb, add a trailing underscore
    stop_: TriggerableCommand

    @cached_property
    def movable_logic(self) -> MovableLogic:
        return DemoMotorMoveLogic(
            readback=self.readback,
            setpoint=self.setpoint,
            velocity=self.velocity,
            stop_=self.stop_,
            state=self.state,
        )

TODO

DemoStage#

Finally we get to the DemoStage, which is responsible for instantiating two DemoMotors. It also inherits from StandardReadable, which allows it to be used in plans that read() devices. It ensures that the output of read() is the same as if you were to read() both the DemoMotors, and merge the result:

from ophyd_async.core import StandardReadable

from ._motor import DemoMotor


class DemoStage(StandardReadable):
    """A simulated sample stage with X and Y movables."""

    def __init__(self, prefix: str, name="") -> None:
        # Define some child Devices
        with self.add_children_as_readables():
            self.x = DemoMotor(prefix + "X:")
            self.y = DemoMotor(prefix + "Y:")
        # Set name of device and child devices
        super().__init__(name=name)

Like DemoPointDetector, the PV concatenation is done explicitly in code, and the children are added within a StandardReadable.add_children_as_readables context manager.

from ophyd_async.core import StandardReadable

from ._motor import DemoMotor


class DemoStage(StandardReadable):
    """A simulated sample stage with X and Y movables."""

    def __init__(self, x_trl: str, y_trl: str, name="") -> None:
        # Define some child Devices
        with self.add_children_as_readables():
            self.x = DemoMotor(x_trl)
            self.y = DemoMotor(y_trl)
        # Set name of device and child devices
        super().__init__(name=name)

Like DemoPointDetector, the TRLs are specified explicitly in code and passed into the constructor, and the children are added within a StandardReadable.add_children_as_readables context manager.

TODO

Conclusion#

In this tutorial we have seen how to create some Devices that are backed by a Control system implementation. Read on to see how we would write some tests to ensure that they behave correctly.