Achieving Complex Behaviors with Set Methods#

In this tutorial we will learn how to encode more complex behaviors into Devices by defining set methods.

This will allow us to set multiple PVs at once, as well as to perform calculations on input values as needed.

Set up for tutorial#

First, let’s ensure our simulated IOCs are running.

The IOCs may already be running in the background. Run this command to verify that they are running: it should produce output with STARTING or RUNNING on each line. In the event of a problem, edit this command to replace status with restart all and run again.

!../supervisor/start_supervisor.sh status
decay                            RUNNING   pid 2695, uptime 0:00:18
mini_beamline                    RUNNING   pid 2696, uptime 0:00:18
random_walk                      RUNNING   pid 2697, uptime 0:00:18
random_walk_horiz                RUNNING   pid 2698, uptime 0:00:18
random_walk_vert                 RUNNING   pid 2699, uptime 0:00:18
simple                           RUNNING   pid 2700, uptime 0:00:18
thermo_sim                       RUNNING   pid 2701, uptime 0:00:18
trigger_with_pc                  RUNNING   pid 2702, uptime 0:00:18

Adding a set method to Device#

Sometimes, setting a value to a Signal and knowing when it is “done” involves just one PV. Here’s a simple example from the previous tutorial:

from ophyd import Device, Component, EpicsSignal, EpicsSignalRO

class RandomWalk(Device):
    x = Component(EpicsSignalRO, 'x')
    dt = Component(EpicsSignal, 'dt')
    
random_walk = RandomWalk('random_walk:', name='random_walk')
random_walk.wait_for_connection()

status = random_walk.dt.set(2)

In other cases it involves coordination across multiple PVs, such as a setpoint PV and a readback PV, or a setpoint PV and a “done” PV. For those cases, we define a set method on the Device to manage the coordination across multiple Signals.

from ophyd import DeviceStatus

class Decay(Device):
    """
    A device with a setpoint and readback that decays exponentially toward the setpoint.
    """
    readback = Component(EpicsSignalRO, ':I')
    setpoint = Component(EpicsSignal, ':SP')
    
    def set(self, setpoint):
        """
        Set the setpoint and return a Status object that monitors the readback.
        """
        status = DeviceStatus(self)
        
        # Wire up a callback that will mark the status object as finished
        # when the readback approaches within some tolerance of the setpoint.
        def callback(old_value, value, **kwargs):
            TOLERANCE = 1  # hard-coded; we'll make this configurable later on...
            if abs(value - setpoint) < TOLERANCE:
                status.set_finished()
                self.readback.clear_sub(callback)
            
        self.readback.subscribe(callback)
        
        # Now 'put' the value.
        self.setpoint.put(setpoint)
        
        # And return the Status object, which the caller can use to
        # tell when the action is complete.
        return status
        
    
decay = Decay('decay', name='decay')
decay.wait_for_connection()
decay
Decay(prefix='decay', name='decay', read_attrs=['readback', 'setpoint'], configuration_attrs=[])
decay.read()
OrderedDict([('decay_readback',
              {'value': 101.44042305263626, 'timestamp': 1682197714.338845}),
             ('decay_setpoint',
              {'value': 100.0, 'timestamp': 1682197694.962231})])
status = decay.set(115)

We can watch for completion either by registering a callback:

def callback(status):
    print("DONE:", status)
    
status.add_callback(callback)

or by blocking:

status = decay.set(120)
status.wait()  # blocks here
print("DONE!")
DONE!

Make the tolerance configurable with a “soft” Signal#

from ophyd import Signal

class Decay(Device):
    """
    A device with a setpoint and readback that decays exponentially toward the setpoint.
    """
    readback = Component(EpicsSignalRO, ':I')
    setpoint = Component(EpicsSignal, ':SP')
    tolerance = Component(Signal, value=1)  # not associated with anything in EPICS---a pure ophyd construct
    
    def set(self, setpoint):
        """
        Set the setpoint and return a Status object that monitors the readback.
        """
        status = DeviceStatus(self)
        
        # Wire up a callback that will mark the status object as finished
        # when the readback approaches within some tolerance of the setpoint.
        def callback(old_value, value, **kwargs):
            if abs(value - setpoint) < self.tolerance.get():
                status.set_finished()
                self.readback.clear_sub(callback)
            
        self.readback.subscribe(callback)
        
        # Now 'put' the value.
        self.setpoint.put(setpoint)
        
        # And return the Status object, which the caller can use to
        # tell when the action is complete.
        return status
        
    
decay = Decay('decay', name='decay')
status = decay.set(125)
status.add_callback(callback)
decay.tolerance.set(2)
status = decay.set(130)
status.add_callback(callback)

Let the IOC tell us when it is done#

Some IOCs (but not all) provide a specific signal that we can use to know when a set is complete. In that case we can remove the “tolerance” logic entirely if we want to and trust the IOC.

class Decay(Device):
    """
    A device with a setpoint and readback that decays exponentially toward the setpoint.
    """
    readback = Component(EpicsSignalRO, ':I')
    setpoint = Component(EpicsSignal, ':SP')
    done = Component(EpicsSignalRO, ':done')
    
    def set(self, setpoint):
        """
        Set the setpoint and return a Status object that monitors the 'done' PV.
        """
        status = DeviceStatus(self)
        
        # Wire up a callback that will mark the status object as finished
        # when the done signal goes from low to high---that is, a positive edge.
        def callback(old_value, value, **kwargs):
            if old_value == 0 and value == 1:
                status.set_finished()
                self.done.clear_sub(callback)
            
        self.done.subscribe(callback)
        
        # Now 'put' the value.
        self.setpoint.put(setpoint)
        
        # And return the Status object, which the caller can use to
        # tell when the action is complete.
        return status
        
    
decay = Decay('decay', name='decay')
decay
Decay(prefix='decay', name='decay', read_attrs=['readback', 'setpoint', 'done'], configuration_attrs=[])
status = decay.set(135)
status.add_callback(callback)

PVPositioner#

The pattern of readback, setpoint and done is pretty common, so ophyd has a special Device subclass that writes the set() method for you if you provide components with these particular names.

from ophyd import PVPositioner

class Decay(PVPositioner):
    """
    A device with a setpoint and readback that decays exponentially toward the setpoint.
    """
    readback = Component(EpicsSignalRO, ':I')
    setpoint = Component(EpicsSignal, ':SP')
    done = Component(EpicsSignalRO, ':done')
    # actuate = Component(EpicsSignal, ...)  # the "Go" button, not applicable to this IOC, but sometimes needed
    
decay = Decay('decay', name='decay')
status = decay.set(140)
status.add_callback(callback)