Note
Ophyd async is included on a provisional basis until the v1.0 release and may change API on minor release numbers before then
Make a Simple Device#
To make a simple device, you need to subclass from the
StandardReadable
class, create some Signal
instances, and optionally implement
other suitable Bluesky Protocols
like
bluesky.protocols.Movable
.
The rest of this guide will show examples from src/ophyd_async/epics/sim/__init__.py
Readable#
For a simple bluesky.protocols.Readable
object like a Sensor
, you need to
define some signals, then tell the superclass which signals should contribute to
read()
and read_configuration()
:
class Sensor(StandardReadable, EpicsDevice):
"""A demo sensor that produces a scalar value based on X and Y Movers"""
value: A[SignalR[float], PvSuffix("Value"), Format.HINTED_SIGNAL]
mode: A[SignalRW[EnergyMode], PvSuffix("Mode"), Format.CONFIG_SIGNAL]
First some Signals are constructed and stored on the Device. Each one is passed its Python type, which could be:
An array (
numpy.typing.NDArray
ie.numpy.typing.NDArray[numpy.uint16]
orSequence[str]
)
The rest of the arguments are PV connection information, in this case the PV suffix.
Finally super().__init__()
is called with:
Possibly empty Device
name
: will also dash-prefix its child Device names is setOptional
primary
signal: a Signal that should be renamed to take the name of the Device and output atread()
read
signals: Signals that should be output toread()
without renamingconfig
signals: Signals that should be output toread_configuration()
without renaming
All signals passed into this init method will be monitored between stage()
and unstage()
and their cached values returned on read()
and
read_configuration()
for perfomance.
Movable#
For a more complicated device like a Mover
, you can still use StandardReadable
and implement some addition protocols:
class Mover(StandardReadable, Movable, Stoppable):
"""A demo movable that moves based on velocity"""
def __init__(self, prefix: str, name="") -> None:
# Define some signals
with self.add_children_as_readables(Format.HINTED_SIGNAL):
self.readback = epics_signal_r(float, prefix + "Readback")
with self.add_children_as_readables(Format.CONFIG_SIGNAL):
self.velocity = epics_signal_rw(float, prefix + "Velocity")
self.units = epics_signal_r(str, prefix + "Readback.EGU")
self.setpoint = epics_signal_rw(float, prefix + "Setpoint")
self.precision = epics_signal_r(int, prefix + "Readback.PREC")
# Signals that collide with standard methods should have a trailing underscore
self.stop_ = epics_signal_x(prefix + "Stop.PROC")
# Whether set() should complete successfully or not
self._set_success = True
super().__init__(name=name)
def set_name(self, name: str, *, child_name_separator: str | None = None) -> None:
super().set_name(name, child_name_separator=child_name_separator)
# Readback should be named the same as its parent in read()
self.readback.set_name(name)
@WatchableAsyncStatus.wrap
async def set(self, value: float, timeout: CalculatableTimeout = CALCULATE_TIMEOUT):
new_position = value
self._set_success = True
old_position, units, precision, velocity = await asyncio.gather(
self.setpoint.get_value(),
self.units.get_value(),
self.precision.get_value(),
self.velocity.get_value(),
)
if timeout is CALCULATE_TIMEOUT:
try:
timeout = (
abs((new_position - old_position) / velocity) + DEFAULT_TIMEOUT
)
except ZeroDivisionError as error:
msg = "Mover has zero velocity"
raise ValueError(msg) from error
# Make an Event that will be set on completion, and a Status that will
# error if not done in time
done = asyncio.Event()
done_status = AsyncStatus(asyncio.wait_for(done.wait(), timeout)) # type: ignore
# Wait for the value to set, but don't wait for put completion callback
await self.setpoint.set(new_position, wait=False)
async for current_position in observe_value(
self.readback, done_status=done_status
):
yield WatcherUpdate(
current=current_position,
initial=old_position,
target=new_position,
name=self.name,
unit=units,
precision=precision,
)
if np.isclose(current_position, new_position):
done.set()
break
if not self._set_success:
raise RuntimeError("Motor was stopped")
async def stop(self, success=True):
self._set_success = success
status = self.stop_.trigger()
await status
The set()
method implements bluesky.protocols.Movable
. This
creates a coroutine
do_set()
which gets the old position, units and
precision in parallel, sets the setpoint, then observes the readback value,
informing watchers of the progress. When it gets to the requested value it
completes. This co-routine is wrapped in a timeout handler, and passed to an
AsyncStatus
which will start executing it as soon as the Run Engine adds a
callback to it. The stop()
method then pokes a PV if the move needs to be
interrupted.
Assembly#
Compound assemblies can be used to group Devices into larger logical Devices:
class SampleStage(Device):
"""A demo sample stage with X and Y movables"""
def __init__(self, prefix: str, name="") -> None:
# Define some child Devices
self.x = Mover(prefix + "X:")
self.y = Mover(prefix + "Y:")
# Set name of device and child devices
super().__init__(name=name)
This applies prefixes on construction:
SampleStage is passed a prefix like
DEVICE:
SampleStage.x will append its prefix
X:
to getDEVICE:X:
SampleStage.x.velocity will append its suffix
Velocity
to getDEVICE:X:Velocity
If SampleStage is further nested in another Device another layer of prefix nesting would occur
Note
SampleStage does not pass any signals into its superclass init. This means
that its read()
method will return an empty dictionary. This means you
can rd sample_stage.x
, but not rd sample_stage
.