Achieving Complex Behaviors with Set Methods
Contents
Achieving Complex Behaviors with Set Methods#
In this tutorial we will learn how to encode more complex behaviors into Devices by defining set
methods.
This will allow us to set multiple PVs at once, as well as to perform calculations on input values as needed.
Set up for tutorial#
First, let’s ensure our simulated IOCs are running.
The IOCs may already be running in the background. Run this command to verify
that they are running: it should produce output with STARTING or RUNNING on each line.
In the event of a problem, edit this command to replace status
with restart all
and run again.
!../supervisor/start_supervisor.sh status
decay RUNNING pid 2695, uptime 0:00:18
mini_beamline RUNNING pid 2696, uptime 0:00:18
random_walk RUNNING pid 2697, uptime 0:00:18
random_walk_horiz RUNNING pid 2698, uptime 0:00:18
random_walk_vert RUNNING pid 2699, uptime 0:00:18
simple RUNNING pid 2700, uptime 0:00:18
thermo_sim RUNNING pid 2701, uptime 0:00:18
trigger_with_pc RUNNING pid 2702, uptime 0:00:18
Adding a set method to Device
#
Sometimes, setting a value to a Signal and knowing when it is “done” involves just one PV. Here’s a simple example from the previous tutorial:
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO
class RandomWalk(Device):
x = Component(EpicsSignalRO, 'x')
dt = Component(EpicsSignal, 'dt')
random_walk = RandomWalk('random_walk:', name='random_walk')
random_walk.wait_for_connection()
status = random_walk.dt.set(2)
In other cases it involves coordination across multiple PVs, such as a setpoint PV and a readback PV, or a setpoint PV and a “done” PV. For those cases, we define a set
method on the Device to manage the coordination across multiple Signals.
from ophyd import DeviceStatus
class Decay(Device):
"""
A device with a setpoint and readback that decays exponentially toward the setpoint.
"""
readback = Component(EpicsSignalRO, ':I')
setpoint = Component(EpicsSignal, ':SP')
def set(self, setpoint):
"""
Set the setpoint and return a Status object that monitors the readback.
"""
status = DeviceStatus(self)
# Wire up a callback that will mark the status object as finished
# when the readback approaches within some tolerance of the setpoint.
def callback(old_value, value, **kwargs):
TOLERANCE = 1 # hard-coded; we'll make this configurable later on...
if abs(value - setpoint) < TOLERANCE:
status.set_finished()
self.readback.clear_sub(callback)
self.readback.subscribe(callback)
# Now 'put' the value.
self.setpoint.put(setpoint)
# And return the Status object, which the caller can use to
# tell when the action is complete.
return status
decay = Decay('decay', name='decay')
decay.wait_for_connection()
decay
Decay(prefix='decay', name='decay', read_attrs=['readback', 'setpoint'], configuration_attrs=[])
decay.read()
OrderedDict([('decay_readback',
{'value': 101.44042305263626, 'timestamp': 1682197714.338845}),
('decay_setpoint',
{'value': 100.0, 'timestamp': 1682197694.962231})])
status = decay.set(115)
We can watch for completion either by registering a callback:
def callback(status):
print("DONE:", status)
status.add_callback(callback)
or by blocking:
status = decay.set(120)
status.wait() # blocks here
print("DONE!")
DONE!
Make the tolerance configurable with a “soft” Signal#
from ophyd import Signal
class Decay(Device):
"""
A device with a setpoint and readback that decays exponentially toward the setpoint.
"""
readback = Component(EpicsSignalRO, ':I')
setpoint = Component(EpicsSignal, ':SP')
tolerance = Component(Signal, value=1) # not associated with anything in EPICS---a pure ophyd construct
def set(self, setpoint):
"""
Set the setpoint and return a Status object that monitors the readback.
"""
status = DeviceStatus(self)
# Wire up a callback that will mark the status object as finished
# when the readback approaches within some tolerance of the setpoint.
def callback(old_value, value, **kwargs):
if abs(value - setpoint) < self.tolerance.get():
status.set_finished()
self.readback.clear_sub(callback)
self.readback.subscribe(callback)
# Now 'put' the value.
self.setpoint.put(setpoint)
# And return the Status object, which the caller can use to
# tell when the action is complete.
return status
decay = Decay('decay', name='decay')
status = decay.set(125)
status.add_callback(callback)
decay.tolerance.set(2)
status = decay.set(130)
status.add_callback(callback)
Let the IOC tell us when it is done#
Some IOCs (but not all) provide a specific signal that we can use to know when a set is complete. In that case we can remove the “tolerance” logic entirely if we want to and trust the IOC.
class Decay(Device):
"""
A device with a setpoint and readback that decays exponentially toward the setpoint.
"""
readback = Component(EpicsSignalRO, ':I')
setpoint = Component(EpicsSignal, ':SP')
done = Component(EpicsSignalRO, ':done')
def set(self, setpoint):
"""
Set the setpoint and return a Status object that monitors the 'done' PV.
"""
status = DeviceStatus(self)
# Wire up a callback that will mark the status object as finished
# when the done signal goes from low to high---that is, a positive edge.
def callback(old_value, value, **kwargs):
if old_value == 0 and value == 1:
status.set_finished()
self.done.clear_sub(callback)
self.done.subscribe(callback)
# Now 'put' the value.
self.setpoint.put(setpoint)
# And return the Status object, which the caller can use to
# tell when the action is complete.
return status
decay = Decay('decay', name='decay')
decay
Decay(prefix='decay', name='decay', read_attrs=['readback', 'setpoint', 'done'], configuration_attrs=[])
status = decay.set(135)
status.add_callback(callback)
PVPositioner
#
The pattern of readback
, setpoint
and done
is pretty common, so ophyd has a special Device
subclass that writes the set()
method for you if you provide components with these particular names.
from ophyd import PVPositioner
class Decay(PVPositioner):
"""
A device with a setpoint and readback that decays exponentially toward the setpoint.
"""
readback = Component(EpicsSignalRO, ':I')
setpoint = Component(EpicsSignal, ':SP')
done = Component(EpicsSignalRO, ':done')
# actuate = Component(EpicsSignal, ...) # the "Go" button, not applicable to this IOC, but sometimes needed
decay = Decay('decay', name='decay')
status = decay.set(140)
status.add_callback(callback)