How to derive one signal from others#

Sometimes the low level Signal interface of a Device is not very user friendly. You may wish to provide a layer of Signals above that calculate their values from low level Signals, and are capable of setting their values too. We call this a Derived Signal. This article will show how they are constructed and how they can be used.

Single Derived Signal#

The simplest API involves mapping a single Derived Signal to many low level Signal. There are 3 helpers to create these Derived signals:

If a signal is readable, then it requires a raw_to_derived function that maps the raw values of low level Signals into the datatype of the Derived Signal and the raw_devices that will be read/monitored to give those values.

If a signal is writeable, then it requires a set_derived async function that sets the raw signals based on the derived value.

In the below example we see all 3 of these helpers in action:

from __future__ import annotations

import asyncio

from ophyd_async.core import (
    Device,
    DeviceVector,
    StandardReadable,
    StrictEnum,
    derived_signal_r,
    derived_signal_rw,
    derived_signal_w,
    soft_signal_rw,
)


class BeamstopPosition(StrictEnum):
    IN_POSITION = "In position"
    OUT_OF_POSITION = "Out of position"


class ReadOnlyBeamstop(Device):
    """Reads from 2 motors to work out if the beamstop is in position.

    E.g. bps.rd(beamstop.position)
    """

    def __init__(self, name=""):
        # Raw signals
        self.x = soft_signal_rw(float)
        self.y = soft_signal_rw(float)
        # Derived signals
        self.position = derived_signal_r(self._get_position, x=self.x, y=self.y)
        super().__init__(name=name)

    def _get_position(self, x: float, y: float) -> BeamstopPosition:
        if abs(x) < 1 and abs(y) < 2:
            return BeamstopPosition.IN_POSITION
        else:
            return BeamstopPosition.OUT_OF_POSITION


class MovableBeamstop(Device):
    """As well as reads, this one allows you to move it.

    E.g. bps.mv(beamstop.position, BeamstopPosition.IN_POSITION)
    """

    def __init__(self, name=""):
        # Raw signals
        self.x = soft_signal_rw(float)
        self.y = soft_signal_rw(float)
        # Derived signals
        self.position = derived_signal_rw(
            self._get_position, self._set_from_position, x=self.x, y=self.y
        )
        super().__init__(name=name)

    def _get_position(self, x: float, y: float) -> BeamstopPosition:
        if abs(x) < 1 and abs(y) < 2:
            return BeamstopPosition.IN_POSITION
        else:
            return BeamstopPosition.OUT_OF_POSITION

    async def _set_from_position(self, position: BeamstopPosition) -> None:
        if position == BeamstopPosition.IN_POSITION:
            await asyncio.gather(self.x.set(0), self.y.set(0))
        else:
            await asyncio.gather(self.x.set(3), self.y.set(5))


class Exploder(StandardReadable):
    """This one takes a value and sets all its signal to that value.

    This allows convenience "set all" functions, while the individual
    signals are still free to be set to different values.
    """

    def __init__(self, num_signals: int, name=""):
        with self.add_children_as_readables():
            self.signals = DeviceVector(
                {i: soft_signal_rw(int, units="cts") for i in range(1, num_signals + 1)}
            )
        self.set_all = derived_signal_w(self._set_all, derived_units="cts")
        super().__init__(name=name)

    async def _set_all(self, value: int) -> None:
        coros = [sig.set(value) for sig in self.signals.values()]
        await asyncio.gather(*coros)

Note

These examples show the low level Signals and Derived Signals in the same Device, but they could equally be separated into different Devices

Multi Derived Signal#

The more general API involves a two way mapping between many Derived Signals and many low level Signals. This is done by implementing a Raw typing.TypedDict subclass with the names and datatypes of the low level Signals, a Derived typing.TypedDict subclass with the names and datatypes of the derived Signals, and Transform class with raw_to_derived and derived_to_raw methods to convert between the two. Some transforms will also require parameters which get their values from other Signals for both methods. These should be put in as type hints on the Transform subclass.

To create the derived signals, we make a DerivedSignalFactory instance that knows about the Transform class, the raw_devices that will be read/monitored to provide the raw values for the transform, and optionally the set_derived method to set them. The methods like DerivedSignalFactory.derived_signal_rw allow Derived signals to be created for each attribute in the Derived TypedDict subclass.

In the below example we see this is action:

import asyncio
import math
from typing import TypedDict

from bluesky.protocols import Movable

from ophyd_async.core import (
    AsyncStatus,
    DerivedSignalFactory,
    Device,
    Transform,
    soft_signal_rw,
)

from ._motor import SimMotor


class TwoJackRaw(TypedDict):
    jack1: float
    jack2: float


class TwoJackDerived(TypedDict):
    height: float
    angle: float


class TwoJackTransform(Transform):
    distance: float

    def raw_to_derived(self, *, jack1: float, jack2: float) -> TwoJackDerived:
        diff = jack2 - jack1
        return TwoJackDerived(
            height=jack1 + diff / 2,
            # need the cast as returns numpy float rather than float64, but this
            # is ok at runtime
            angle=math.atan(diff / self.distance),
        )

    def derived_to_raw(self, *, height: float, angle: float) -> TwoJackRaw:
        diff = math.tan(angle) * self.distance
        return TwoJackRaw(
            jack1=height - diff / 2,
            jack2=height + diff / 2,
        )


class VerticalMirror(Device, Movable[TwoJackDerived]):
    def __init__(self, name=""):
        # Raw signals
        self.y1 = SimMotor()
        self.y2 = SimMotor()
        # Parameter
        self.y1_y2_distance = soft_signal_rw(float, initial_value=1)
        # Derived signals
        self._factory = DerivedSignalFactory(
            TwoJackTransform,
            self.set,
            jack1=self.y1,
            jack2=self.y2,
            distance=self.y1_y2_distance,
        )
        self.height = self._factory.derived_signal_rw(float, "height")
        self.angle = self._factory.derived_signal_rw(float, "angle")
        super().__init__(name=name)

    @AsyncStatus.wrap
    async def set(self, derived: TwoJackDerived) -> None:  # type: ignore until bluesky 1.13.2
        transform = await self._factory.transform()
        raw = transform.derived_to_raw(**derived)
        await asyncio.gather(
            self.y1.set(raw["jack1"]),
            self.y2.set(raw["jack2"]),
        )

In VerticalMirror we use the names of the Derived classes (height and angle) as externally accessible names for both the derived signals, and the dictionary passed to the set() method. If this is not desired, either because the names don’t make sense in this particular Device, or because you are composing derived signals from multiple transforms together in the same Device, then you can pass an internal set method to the DerivedSignalFactory that uses the Transform names. This leaves you free to create a public set() method using your desired names, and to name the derived signals with those same names.

An example to illustrate this is below:

import asyncio
from typing import TypedDict

from bluesky.protocols import Movable

from ophyd_async.core import AsyncStatus, DerivedSignalFactory, Device, soft_signal_rw

from ._mirror_vertical import TwoJackDerived, TwoJackTransform
from ._motor import SimMotor


class HorizontalMirrorDerived(TypedDict):
    x: float
    roll: float


class HorizontalMirror(Device, Movable):
    def __init__(self, name=""):
        # Raw signals
        self.x1 = SimMotor()
        self.x2 = SimMotor()
        # Parameter
        self.x1_x2_distance = soft_signal_rw(float, initial_value=1)
        # Derived signals
        self._factory = DerivedSignalFactory(
            TwoJackTransform,
            self._set_mirror,
            jack1=self.x1,
            jack2=self.x2,
            distance=self.x1_x2_distance,
        )
        self.x = self._factory.derived_signal_rw(float, "height")
        self.roll = self._factory.derived_signal_rw(float, "angle")
        super().__init__(name=name)

    async def _set_mirror(self, derived: TwoJackDerived) -> None:
        transform = await self._factory.transform()
        raw = transform.derived_to_raw(**derived)
        await asyncio.gather(
            self.x1.set(raw["jack1"]),
            self.x2.set(raw["jack2"]),
        )

    @AsyncStatus.wrap
    async def set(self, value: HorizontalMirrorDerived) -> None:
        await self._set_mirror(TwoJackDerived(height=value["x"], angle=value["roll"]))