Note
Ophyd async is included on a provisional basis until the v1.0 release and may change API on minor release numbers before then
Using existing Devices#
To use an Ophyd Device that has already been written, you need to make a
RunEngine, then instantiate the Device in that process. This tutorial will take
you through this process. It assumes you have already run through the Bluesky
tutorial on The RunEngine
.
Create Startup file#
For this tutorial we will use IPython. We will instantiate the RunEngine and
Devices in a startup file. This is just a regular Python file that IPython
will execute before giving us a prompt to execute scans. Copy the text
below and place it in an epics_demo.py
file:
# Import bluesky and ophyd
import matplotlib.pyplot as plt
from bluesky import RunEngine
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky.plan_stubs import mov, movr, rd # noqa
from bluesky.plans import grid_scan # noqa
from bluesky.utils import ProgressBarManager, register_transform
from ophyd import Component, Device, EpicsSignal, EpicsSignalRO
from ophyd_async.core import init_devices
from ophyd_async.epics import sim
# Create a run engine, with plotting, progressbar and transform
RE = RunEngine({}, call_returns_result=True)
bec = BestEffortCallback()
RE.subscribe(bec)
RE.waiting_hook = ProgressBarManager()
plt.ion()
register_transform("RE", prefix="<")
# Start IOC with demo pvs in subprocess
pv_prefix = sim.start_ioc_subprocess()
# Create ophyd devices
class OldSensor(Device):
mode = Component(EpicsSignal, "Mode", kind="config")
value = Component(EpicsSignalRO, "Value", kind="hinted")
det_old = OldSensor(pv_prefix, name="det_old")
# Create ophyd-async devices
with init_devices():
det = sim.Sensor(pv_prefix)
det_group = sim.SensorGroup(pv_prefix)
samp = sim.SampleStage(pv_prefix)
The top section of the file is explained in the Bluesky tutorial, but the bottom section is Ophyd specific.
First of all we start up a specific EPICS IOC for the demo devices. This is only used in this tutorial:
pv_prefix = sim.start_ioc_subprocess()
Next we create an example Ophyd device for comparison purposes. It is here to show that you can mix Ophyd and Ophyd Async devices in the same RunEngine:
class OldSensor(Device):
mode = Component(EpicsSignal, "Mode", kind="config")
value = Component(EpicsSignalRO, "Value", kind="hinted")
det_old = OldSensor(pv_prefix, name="det_old")
Finally we create the Ophyd Async devices imported from the epics.sim
module:
with init_devices():
det = sim.Sensor(pv_prefix)
det_group = sim.SensorGroup(pv_prefix)
samp = sim.SampleStage(pv_prefix)
The first thing to note is The with statement
. This uses init_devices
as a context
manager to collect up the top level Device
instances created in the context,
and run the following:
If
set_name=True
(the default), then callDevice.set_name
passing the name of the variable within the context. For example, here we calldet.set_name("det")
If
connect=True
(the default), then callDevice.connect
in parallel for all top level Devices, waiting for up totimeout
seconds. For example, here we callasyncio.wait([det.connect(), samp.connect()])
If
mock=True
is passed, then don’t connect to PVs, but set Devices into simulation mode
The Devices we create in this example are a “sample stage” with a couple of
“movers” called x
and y
and a “sensor” called det
that gives a
different reading depending on the position of the “movers”.
Note
There are very few devices implemented using ophyd async, see ophyd_async.epics.devices and ophyd-tango-devices for some common ones associated with each control system
Run IPython#
You can now run ipython with this startup file:
$ ipython -i epics_demo.py
IPython 8.5.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]:
This is like a regular python console with the contents of that file executed. IPython adds some extra features like tab completion and magics (shortcut commands).
Run some plans#
Ophyd Devices give an interface to the bluesky.run_engine.RunEngine
so they
can be used in plans. We can move the samp.x
mover to 100mm using
bluesky.plan_stubs.mv
:
In [1]: RE(mov(samp.x, 100))
Out[1]: RunEngineResult(run_start_uids=(), plan_result=(<WatchableAsyncStatus, device: samp-x, task: <coroutine object WatchableAsyncStatus._notify_watchers_from at 0x7f2f5716fa40>, done>,), exit_status='success', interrupted=False, reason='', exception=None)
If this is too verbose to write, we registered a shorthand with
bluesky.utils.register_transform
: <my_plan(args)
is translated to
RE(my_plan(args))
. The command above can also be run as:
In [2]: <mov(samp.x, 100)
Out[2]: RunEngineResult(run_start_uids=(), plan_result=(<WatchableAsyncStatus, device: samp-x, task: <coroutine object WatchableAsyncStatus._notify_watchers_from at 0x7f2f5716fc40>, done>,), exit_status='success', interrupted=False, reason='', exception=None)
We can get the primary reading of samp.x
, in this case its readback value,
using bluesky.plan_stubs.rd
:
In [3]: <rd(samp.x)
Out[3]: RunEngineResult(run_start_uids=(), plan_result=100.0, exit_status='success', interrupted=False, reason='', exception=None)
We can do a relative move of samp.x
by 10mm, using bluesky.plan_stubs.mvr
:
In [4]: <movr(samp.x, -10)
Out[4]: RunEngineResult(run_start_uids=(), plan_result=(<WatchableAsyncStatus, device: samp-x, task: <coroutine object WatchableAsyncStatus._notify_watchers_from at 0x7f2f5716f140>, done>,), exit_status='success', interrupted=False, reason='', exception=None)
Individual Devices will also expose some of the parameters of the underlying
hardware on itself. In the case of a Mover
, we can set and get its
velocity
:
In [5]: <rd(samp.x.velocity)
Out[5]: RunEngineResult(run_start_uids=(), plan_result=100.0, exit_status='success', interrupted=False, reason='', exception=None)
Do a scan#
We can also use the bluesky.run_engine.RunEngine
to run scans. For instance we
can do a bluesky.plans.grid_scan
of x
and y
and plot det
:
In [6]: <grid_scan([det], samp.x, 1, 2, 5, samp.y, 1, 2, 5)
Transient Scan ID: 1 Time: 2025-01-23 17:34:58
Persistent Unique Scan ID: 'dfbd5463-2123-4f16-bc7c-3009cefb1cf8'
Out[6]: RunEngineResult(run_start_uids=('dfbd5463-2123-4f16-bc7c-3009cefb1cf8',), plan_result='dfbd5463-2123-4f16-bc7c-3009cefb1cf8', exit_status='success', interrupted=False, reason='', exception=None)
There is also an “energy mode” that can be changed to modify the det
output.
In [7]: <rd(det.mode)
Out[7]: RunEngineResult(run_start_uids=(), plan_result=<EnergyMode.LOW: 'Low Energy'>, exit_status='success', interrupted=False, reason='', exception=None)
Although this is an enum.Enum
and programmatic code should import and
use instances of EnergyMode
, we can set it using a
string value on the commandline:
In [8]: <mov(det.mode, "High Energy")
Out[8]: RunEngineResult(run_start_uids=(), plan_result=(<AsyncStatus, device: det-mode, task: <coroutine object SignalW.set at 0x7f2f571c6940>, done>,), exit_status='success', interrupted=False, reason='', exception=None)
The same scan will now give a slightly different output. If we include the v1 device we can see it gives the same result:
In [9]: <grid_scan([det, det_old], samp.x, 1, 2, 5, samp.y, 1, 2, 5)
Transient Scan ID: 2 Time: 2025-01-23 17:35:02
Persistent Unique Scan ID: '54920238-deb5-418d-94b8-8d6b56bbf8a5'
Out[9]: RunEngineResult(run_start_uids=('54920238-deb5-418d-94b8-8d6b56bbf8a5',), plan_result='54920238-deb5-418d-94b8-8d6b56bbf8a5', exit_status='success', interrupted=False, reason='', exception=None)
See also
How-to Make a Simple Device to make your own Ophyd Async devices.