How to derive one signal from others#
Sometimes the low level Signal interface of a Device is not very user friendly. You may wish to provide a layer of Signals above that calculate their values from low level Signals, and are capable of setting their values too. We call this a Derived Signal. This article will show how they are constructed and how they can be used.
Single Derived Signal#
The simplest API involves mapping a single Derived Signal to many low level Signal. There are 3 helpers to create these Derived signals:
If a signal is readable, then it requires a raw_to_derived
function that maps the raw values of low level Signals into the datatype of the Derived Signal and the raw_devices
that will be read/monitored to give those values.
If a signal is writeable, then it requires a set_derived
async function that sets the raw signals based on the derived value.
In the below example we see all 3 of these helpers in action:
from __future__ import annotations
import asyncio
from ophyd_async.core import (
Device,
DeviceVector,
StandardReadable,
StrictEnum,
derived_signal_r,
derived_signal_rw,
derived_signal_w,
soft_signal_rw,
)
class BeamstopPosition(StrictEnum):
IN_POSITION = "In position"
OUT_OF_POSITION = "Out of position"
class ReadOnlyBeamstop(Device):
"""Reads from 2 motors to work out if the beamstop is in position.
E.g. bps.rd(beamstop.position)
"""
def __init__(self, name=""):
# Raw signals
self.x = soft_signal_rw(float)
self.y = soft_signal_rw(float)
# Derived signals
self.position = derived_signal_r(self._get_position, x=self.x, y=self.y)
super().__init__(name=name)
def _get_position(self, x: float, y: float) -> BeamstopPosition:
if abs(x) < 1 and abs(y) < 2:
return BeamstopPosition.IN_POSITION
else:
return BeamstopPosition.OUT_OF_POSITION
class MovableBeamstop(Device):
"""As well as reads, this one allows you to move it.
E.g. bps.mv(beamstop.position, BeamstopPosition.IN_POSITION)
"""
def __init__(self, name=""):
# Raw signals
self.x = soft_signal_rw(float)
self.y = soft_signal_rw(float)
# Derived signals
self.position = derived_signal_rw(
self._get_position, self._set_from_position, x=self.x, y=self.y
)
super().__init__(name=name)
def _get_position(self, x: float, y: float) -> BeamstopPosition:
if abs(x) < 1 and abs(y) < 2:
return BeamstopPosition.IN_POSITION
else:
return BeamstopPosition.OUT_OF_POSITION
async def _set_from_position(self, position: BeamstopPosition) -> None:
if position == BeamstopPosition.IN_POSITION:
await asyncio.gather(self.x.set(0), self.y.set(0))
else:
await asyncio.gather(self.x.set(3), self.y.set(5))
class Exploder(StandardReadable):
"""This one takes a value and sets all its signal to that value.
This allows convenience "set all" functions, while the individual
signals are still free to be set to different values.
"""
def __init__(self, num_signals: int, name=""):
with self.add_children_as_readables():
self.signals = DeviceVector(
{i: soft_signal_rw(int, units="cts") for i in range(1, num_signals + 1)}
)
self.set_all = derived_signal_w(self._set_all, derived_units="cts")
super().__init__(name=name)
async def _set_all(self, value: int) -> None:
coros = [sig.set(value) for sig in self.signals.values()]
await asyncio.gather(*coros)
Note
These examples show the low level Signals and Derived Signals in the same Device, but they could equally be separated into different Devices
Multi Derived Signal#
The more general API involves a two way mapping between many Derived Signals and many low level Signals. This is done by implementing a Raw
typing.TypedDict
subclass with the names and datatypes of the low level Signals, a Derived
typing.TypedDict
subclass with the names and datatypes of the derived Signals, and Transform
class with raw_to_derived
and derived_to_raw
methods to convert between the two. Some transforms will also require parameters which get their values from other Signals for both methods. These should be put in as type hints on the Transform
subclass.
To create the derived signals, we make a DerivedSignalFactory
instance that knows about the Transform
class, the raw_devices
that will be read/monitored to provide the raw values for the transform, and optionally the set_derived
method to set them. The methods like DerivedSignalFactory.derived_signal_rw
allow Derived signals to be created for each attribute in the Derived
TypedDict subclass.
In the below example we see this is action:
import asyncio
import math
from typing import TypedDict
from bluesky.protocols import Movable
from ophyd_async.core import (
AsyncStatus,
DerivedSignalFactory,
Device,
Transform,
soft_signal_rw,
)
from ._motor import SimMotor
class TwoJackRaw(TypedDict):
jack1: float
jack2: float
class TwoJackDerived(TypedDict):
height: float
angle: float
class TwoJackTransform(Transform):
distance: float
def raw_to_derived(self, *, jack1: float, jack2: float) -> TwoJackDerived:
diff = jack2 - jack1
return TwoJackDerived(
height=jack1 + diff / 2,
# need the cast as returns numpy float rather than float64, but this
# is ok at runtime
angle=math.atan(diff / self.distance),
)
def derived_to_raw(self, *, height: float, angle: float) -> TwoJackRaw:
diff = math.tan(angle) * self.distance
return TwoJackRaw(
jack1=height - diff / 2,
jack2=height + diff / 2,
)
class VerticalMirror(Device, Movable[TwoJackDerived]):
def __init__(self, name=""):
# Raw signals
self.y1 = SimMotor()
self.y2 = SimMotor()
# Parameter
self.y1_y2_distance = soft_signal_rw(float, initial_value=1)
# Derived signals
self._factory = DerivedSignalFactory(
TwoJackTransform,
self.set,
jack1=self.y1,
jack2=self.y2,
distance=self.y1_y2_distance,
)
self.height = self._factory.derived_signal_rw(float, "height")
self.angle = self._factory.derived_signal_rw(float, "angle")
super().__init__(name=name)
@AsyncStatus.wrap
async def set(self, derived: TwoJackDerived) -> None: # type: ignore until bluesky 1.13.2
transform = await self._factory.transform()
raw = transform.derived_to_raw(**derived)
await asyncio.gather(
self.y1.set(raw["jack1"]),
self.y2.set(raw["jack2"]),
)
In VerticalMirror
we use the names of the Derived
classes (height
and angle
) as externally accessible names for both the derived signals, and the dictionary passed to the set()
method. If this is not desired, either because the names don’t make sense in this particular Device, or because you are composing derived signals from multiple transforms together in the same Device, then you can pass an internal set method to the DerivedSignalFactory
that uses the Transform
names. This leaves you free to create a public set()
method using your desired names, and to name the derived signals with those same names.
An example to illustrate this is below:
import asyncio
from typing import TypedDict
from bluesky.protocols import Movable
from ophyd_async.core import AsyncStatus, DerivedSignalFactory, Device, soft_signal_rw
from ._mirror_vertical import TwoJackDerived, TwoJackTransform
from ._motor import SimMotor
class HorizontalMirrorDerived(TypedDict):
x: float
roll: float
class HorizontalMirror(Device, Movable):
def __init__(self, name=""):
# Raw signals
self.x1 = SimMotor()
self.x2 = SimMotor()
# Parameter
self.x1_x2_distance = soft_signal_rw(float, initial_value=1)
# Derived signals
self._factory = DerivedSignalFactory(
TwoJackTransform,
self._set_mirror,
jack1=self.x1,
jack2=self.x2,
distance=self.x1_x2_distance,
)
self.x = self._factory.derived_signal_rw(float, "height")
self.roll = self._factory.derived_signal_rw(float, "angle")
super().__init__(name=name)
async def _set_mirror(self, derived: TwoJackDerived) -> None:
transform = await self._factory.transform()
raw = transform.derived_to_raw(**derived)
await asyncio.gather(
self.x1.set(raw["jack1"]),
self.x2.set(raw["jack2"]),
)
@AsyncStatus.wrap
async def set(self, value: HorizontalMirrorDerived) -> None:
await self._set_mirror(TwoJackDerived(height=value["x"], angle=value["roll"]))