Single EPICS PVs#
In this tutorial we will read, write, and monitor an EPICS PV in ophyd.
Set up for tutorial#
Before you begin, install ophyd
, pyepics
, bluesky
, and caproto
,
following the Installation Tutorial.
We’ll start this simulated hardware that implements a random walk. It has
just two PVs. One is a tunable parameter, random_walk:dt
, the time between
steps. The other is random_walk:x
, the current position of the random
walker.
python -m caproto.ioc_examples.random_walk --list-pvs
Start your favorite interactive Python environment, such as ipython
or
jupyter lab
.
Connect to a PV from Ophyd#
Let’s connect to the PV random_walk:dt
from Ophyd. We need two pieces of
information:
The PV name,
random_walk:dt
.A human-friendly name. This name is used to label the readings and will be used in any downstream data analysis or file-writing code. We might choose, for example,
time_delta
.
In [1]: from ophyd.signal import EpicsSignal
In [2]: time_delta = EpicsSignal("random_walk:dt", name="time_delta")
Note
It is conventional to name the Python variable on the left the same as the
value of name
, but not required. That is, this is conventional…
a = EpicsSignal("...", name="a")
…but all of these are also allowed.
a = EpicsSignal("...", name="b") # local variable different from name
a = EpicsSignal("...", name="some name with spaces in it")
a = b = EpicsSignal("...", name="b") # two local variables
Next let’s connect to random_walk:x
. It happens that this PV is not
writable—any writes would be rejected by EPICS—so we should use a read-only
EpicsSignal, EPICSSignalRO
, to represent it in in ophyd. In
EPICS, you just have to “know” this about your hardware. Fortunately if, in our
ignorance, we used writable EpicsSignal
instead, we could
still use it to read the PV. It would just have a vestigial set()
method
that wouldn’t work.
In [3]: from ophyd.signal import EpicsSignalRO
In [4]: x = EpicsSignalRO("random_walk:x", name="x")
Use it with the Bluesky RunEngine#
The signals can be used by the Bluesky RunEngine. Let’s configure a RunEngine to print a table.
In [5]: from bluesky import RunEngine
In [6]: from bluesky.callbacks import LiveTable
In [7]: RE = RunEngine()
In [8]: token = RE.subscribe(LiveTable(["time_delta", "x"]))
Because time_delta
is writable, it can be scanned like a “motor”. It can
also be read like a “detector”. (In Bluesky, all things that are “motors” are
also “detectors”.)
In [9]: from bluesky.plans import count, list_scan
In [10]: RE(count([time_delta])) # Use as a "detector".
+-----------+------------+------------+
| seq_num | time | time_delta |
+-----------+------------+------------+
| 1 | 22:02:06.7 | 3 |
+-----------+------------+------------+
generator count ['27db16fd'] (scan num: 1)
Out[10]: ('27db16fd-c01b-4f09-af46-5799e3bf2c2f',)
In [11]: RE(list_scan([], time_delta, [0.1, 0.3, 1, 3])) # Use as "motor".
+-----------+------------+------------+
| seq_num | time | time_delta |
+-----------+------------+------------+
| 1 | 22:02:06.8 | 0 |
| 2 | 22:02:06.8 | 0 |
| 3 | 22:02:06.8 | 1 |
| 4 | 22:02:06.8 | 3 |
+-----------+------------+------------+
generator list_scan ['4fbad057'] (scan num: 2)
Out[11]: ('4fbad057-51ff-456f-aed4-bf75c71f282d',)
For the following example, set time_delta
to 1
.
In [12]: from bluesky.plan_stubs import mv
In [13]: RE(mv(time_delta, 1))
Out[13]: ()
We know that x
represents a time-dependent variable. We can “poll” it at
regular intervals
In [14]: RE(count([x], num=5, delay=0.5)) # Read every 0.5 seconds.
+-----------+------------+------------+------------+
| seq_num | time | time_delta | x |
+-----------+------------+------------+------------+
| 1 | 22:02:06.9 | | -0 |
| 2 | 22:02:07.4 | | -0 |
| 3 | 22:02:07.9 | | -0 |
| 4 | 22:02:08.4 | | -0 |
| 5 | 22:02:08.9 | | -1 |
+-----------+------------+------------+------------+
generator count ['901f7d4c'] (scan num: 3)
Out[14]: ('901f7d4c-3a39-48f4-993b-f19ac6ff62fe',)
but this required us to choose an update frequency (0.5
). It’s often better
to rely on the control system to tell us when a new value is available. In
this example, we accumulate updates for x
whenever it changes.
In [15]: from bluesky.plan_stubs import monitor, unmonitor, open_run, close_run, sleep
In [16]: def monitor_x_for(duration, md=None):
....: yield from open_run(md) # optional metadata
....: yield from monitor(x, name="x_monitor")
....: yield from sleep(duration) # Wait for readings to accumulate.
....: yield from unmonitor(x)
....: yield from close_run()
....:
In [17]: RE.unsubscribe(token) # Remove the old table.
In [18]: RE(monitor_x_for(3), LiveTable(["x"], stream_name="x_monitor"))
+-----------+------------+------------+
| seq_num | time | x |
+-----------+------------+------------+
| 1 | 22:02:09.6 | -1 |
| 2 | 22:02:09.6 | -1 |
| 3 | 22:02:09.8 | -0 |
| 4 | 22:02:10.8 | 0 |
| 5 | 22:02:11.8 | -1 |
+-----------+------------+------------+
generator monitor_x_for ['d081afba'] (scan num: 4)
Out[18]: ('d081afba-d558-43d8-89b6-a1817285a7ee',)
If you are a scientist aiming to use Ophyd with the Bluesky Run Engine, you may stop at this point or read on to learn more about how the Run Engine interacts with these signals. If you are a controls engineer, the details that follow are likely important to you.
Use it directly#
Note
These methods should not be called inside a Bluesky plan. See [TODO link to explanation.]
Read#
The signal can be read. It return a dictionary with one item. The key is the
human-friendly name
we specified. The value is another dictionary,
containing the value
and the timestamp
of the reading from the control
system (in this case, EPICS).
In [19]: time_delta.read()
Out[19]: {'time_delta': {'value': 1.0, 'timestamp': 1732226526.941342}}
Describe#
Additional metadata is available. This always includes the data type, shape, and source (e.g. PV). It may also include units and other metadata.
In [20]: time_delta.describe()
Out[20]:
{'time_delta': {'source': 'PV:random_walk:dt',
'dtype': 'number',
'shape': [],
'units': '',
'lower_ctrl_limit': 0.0,
'upper_ctrl_limit': 0.0,
'precision': 0}}
Set#
This signal is writable, so it can also be set.
In [21]: time_delta.set(10).wait() # Set it to 10 and wait for it to get there.
Sometimes hardware gets stuck or does not do what it is told, and so it is good practice to put a timeout on how long you are willing to wait until deciding that there is an error that needs to be handled somehow.
In [22]: time_delta.set(10).wait(timeout=1) # Set it to 10 and wait up to 1 second.
If the signal fails to arrive, a TimeoutError
will be raised.
Note that set(...)
starts the motion but does not wait for it to
complete. It is a fast, “non-blocking” operation. This enables you to run
code between starting a motion and completing it.
In [23]: status = time_delta.set(5)
In [24]: print("Moving to 5...")
Moving to 5...
In [25]: status.wait(timeout=1)
In [26]: print("Moved to 5.")
Moved to 5.
Note
To move more than one signal in parallel, use the ophyd.status.wait()
function.
from ophyd.status import wait
# Given signals a and b, set both in motion.
status1 = a.set(1)
status2 = b.set(1)
# Wait for both to complete.
wait(status1, status2, timeout=1)
For more on what you can do with status
, see […].
Subscribe#
What’s the best way to read a signal that changes over time, like our x
signal?
First, set time_delta
to a reasonable value like 1
. This controls the
update rate of x
in our random walk simulation.
In [27]: time_delta.set(1).wait()
We could poll the signal in a loop and collect N readings spaced T seconds apart.
# Don't do this.
N = 5
T = 0.5
readings = []
for _ in range(N):
time.sleep(T)
reading = x.read()
readings.append(reading)
There are two problems with this counterexample.
We might not know how often we need to check for updates.
We often want to watch multiple signals with different update rates, and this pattern would quickly become messy.
Alternatively, we can use subscription.
In [28]: from collections import deque
In [29]: def accumulate(value, old_value, timestamp, **kwargs):
....: readings.append({"x": {"value": value, "timestamp": timestamp}})
....: readings = deque(maxlen=5)
....: x.subscribe(accumulate)
....:
Out[29]: 1
When the control system has a new reading
for us, it calls
readings.append(reading)
from a background thread. If we do other work or
sleep for awhile and then check back on readings
we’ll see that it has some
items in it.
In [30]: readings
Out[30]:
deque([{'x': {'value': -0.5635026556509564, 'timestamp': 1732226531.839315}},
{'x': {'value': -1.1288910936898935, 'timestamp': 1732226532.840343}},
{'x': {'value': -1.898038831595069, 'timestamp': 1732226533.841579}},
{'x': {'value': -1.096650234609581, 'timestamp': 1732226534.843428}}],
maxlen=5)
It will keep the last 5
. We used a deque
instead of a
plain list
here because a list
would grow without bound and, if left to
run long enough, consume all available memory, crashing the program.