Note
Ophyd async is included on a provisional basis until the v1.0 release and may change API on minor release numbers before then
Make a Simple Device#
To make a simple device, you need to subclass from the
StandardReadable
class, create some Signal
instances, and optionally implement
other suitable Bluesky Protocols
like
Movable
.
The rest of this guide will show examples from src/ophyd_async/epics/demo/__init__.py
Readable#
For a simple Readable
object like a Sensor
, you need to
define some signals, then tell the superclass which signals should contribute to
read()
and read_configuration()
:
class Sensor(StandardReadable):
"""A demo sensor that produces a scalar value based on X and Y Movers"""
def __init__(self, prefix: str, name="") -> None:
# Define some signals
self.value = epics_signal_r(float, prefix + "Value")
self.mode = epics_signal_rw(EnergyMode, prefix + "Mode")
# Set name and signals for read() and read_configuration()
self.set_readable_signals(
read=[self.value],
config=[self.mode],
)
super().__init__(name=name)
First some Signals are constructed and stored on the Device. Each one is passed its Python type, which could be:
An array (
numpy.typing.NDArray
orSequence[str]
)An enum (
enum.Enum
).
The rest of the arguments are PV connection information, in this case the PV suffix.
Finally super().__init__()
is called with:
Possibly empty Device
name
: will also dash-prefix its child Device names is setOptional
primary
signal: a Signal that should be renamed to take the name of the Device and output atread()
read
signals: Signals that should be output toread()
without renamingconfig
signals: Signals that should be output toread_configuration()
without renaming
All signals passed into this init method will be monitored between stage()
and unstage()
and their cached values returned on read()
and
read_configuration()
for perfomance.
Movable#
For a more complicated device like a Mover
, you can still use StandardReadable
and implement some addition protocols:
class Mover(StandardReadable, Movable, Stoppable):
"""A demo movable that moves based on velocity"""
def __init__(self, prefix: str, name="") -> None:
# Define some signals
self.setpoint = epics_signal_rw(float, prefix + "Setpoint")
self.readback = epics_signal_r(float, prefix + "Readback")
self.velocity = epics_signal_rw(float, prefix + "Velocity")
self.units = epics_signal_r(str, prefix + "Readback.EGU")
self.precision = epics_signal_r(int, prefix + "Readback.PREC")
# Signals that collide with standard methods should have a trailing underscore
self.stop_ = epics_signal_x(prefix + "Stop.PROC")
# Whether set() should complete successfully or not
self._set_success = True
# Set name and signals for read() and read_configuration()
self.set_readable_signals(
read=[self.readback],
config=[self.velocity, self.units],
)
super().__init__(name=name)
def set_name(self, name: str):
super().set_name(name)
# Readback should be named the same as its parent in read()
self.readback.set_name(name)
async def _move(self, new_position: float, watchers: List[Callable] = []):
self._set_success = True
# time.monotonic won't go backwards in case of NTP corrections
start = time.monotonic()
old_position, units, precision = await asyncio.gather(
self.setpoint.get_value(),
self.units.get_value(),
self.precision.get_value(),
)
# Wait for the value to set, but don't wait for put completion callback
await self.setpoint.set(new_position, wait=False)
async for current_position in observe_value(self.readback):
for watcher in watchers:
watcher(
name=self.name,
current=current_position,
initial=old_position,
target=new_position,
unit=units,
precision=precision,
time_elapsed=time.monotonic() - start,
)
if np.isclose(current_position, new_position):
break
if not self._set_success:
raise RuntimeError("Motor was stopped")
def move(self, new_position: float, timeout: Optional[float] = None):
"""Commandline only synchronous move of a Motor"""
from bluesky.run_engine import call_in_bluesky_event_loop, in_bluesky_event_loop
if in_bluesky_event_loop():
raise RuntimeError("Will deadlock run engine if run in a plan")
call_in_bluesky_event_loop(self._move(new_position), timeout) # type: ignore
# TODO: this fails if we call from the cli, but works if we "ipython await" it
def set(self, new_position: float, timeout: Optional[float] = None) -> AsyncStatus:
watchers: List[Callable] = []
coro = asyncio.wait_for(self._move(new_position, watchers), timeout=timeout)
return AsyncStatus(coro, watchers)
async def stop(self, success=True):
self._set_success = success
status = self.stop_.trigger()
await status
The set()
method implements Movable
. This
creates a coroutine
do_set()
which gets the old position, units and
precision in parallel, sets the setpoint, then observes the readback value,
informing watchers of the progress. When it gets to the requested value it
completes. This co-routine is wrapped in a timeout handler, and passed to an
AsyncStatus
which will start executing it as soon as the Run Engine adds a
callback to it. The stop()
method then pokes a PV if the move needs to be
interrupted.