Multi-Run Plans#

Introduction#

This section is a brief tutorial on multi-run plans (introduced in Bluesky v1.6.0). A traditional single-run plan contains a set of instructions for performing only one run, which is assigned a scan ID and a UID. When a multi-run plan is executed by the Run Engine, multiple runs can be performed as part of a single plan. Data from each run can be independently displayed and saved to the database via Databroker. Prior versions of Bluesky supported only sequential execution of multiple runs within a plan: building larger plans by creating a sequence of smaller plans and preassembled plans shipped with Bluesky is a standard practice. In Bluesky v1.6.0 a number of features were introduced to allow plans with nested runs. Two runs are considered nested if one ‘outer’ run is interrupted, another ‘inner’ run is executed, and then the first run is resumed and completed. The number of levels of nesting is not limited by Bluesky. Interruptions can be initiated by the plan itself (simply by opening another run before closing currently executed run) or externally (e.g. by triggering a suspender and causing execution of pre- or post-plan). This tutorial includes a brief explanation of the new Bluesky features for supporting multi-run plans and several examples that demonstrate the implementation of plans that contain sequential, nested and recursive runs.

Definition of a ‘Run’#

From the point of view of Bluesky, a run is a sequence of instructions (messages) for controlling the instrumental equipment that starts with open_run and ends with close_run message. We may also apply the term ‘run’ to a block of code which generates such a sequence of messages. Data from each run is bundled together via an assigned distinct Scan ID and UID. The set of documents is also generated for each run, including mandatory ‘start’ and ‘stop’ documents. The documents can be processed by callbacks (such as BestEffortCallback) and saved to the database via Databroker.

In the plan, the run may be defined by explicitely enclosing the code in bps.open_run() and bps.close_run() stubs:

# Using 'bps.open_run()' and 'bps.close_run()' stubs to define a run

import bluesky.plan_stubs as bps
from bluesky import RunEngine

RE = RunEngine({})

def sample_plan():
    ...
    yield from bps.open_run(md={})  # 'md' - metadata to be added to the 'start' document
    ...
    < code that controls execution of the scan >
    ...
    yield from bps.close_run()

RE(sample_plan())

or using @bpp.run_decorator, which inserts open_run and close_run control messages before and after the sequence generated by the enclosed code:

# Using 'bpp.run_decorator' to define a run

import bluesky.preprocessors as bpp
from bluesky import RunEngine

RE = RunEngine({})

@bpp.run_decorator(md={})  # 'md' - metadata to be added to the 'start' document
def sample_plan():
    ...
    < code that controls execution of the scan >
    ...

RE(sample_plan())

The rules for basic Bluesky plans require that the currently running scan is closed before the next scan is opened, therefore the following code works:

# This code works, since the first run is closed before the second one is opened

import bluesky.plan_stubs as bps
from bluesky import RunEngine

RE = RunEngine({})

def sample_plan():
    yield from bps.open_run(md={})
    < code that controls execution of the scan >
    yield from bps.close_run()  # Closing the first run (scan)
    yield from bps.open_run(md={})  # Opening the second run (scan)
    < code that controls execution of the scan >
    yield from bps.close_run()

RE(sample_plan())

but the following code fails:

# This code fails, since the second run is opened before the first run is closed

import bluesky.plan_stubs as bps
from bluesky import RunEngine

RE = RunEngine({})

def sample_plan():
    yield from bps.open_run(md={})  # Opening the first run
    < code that controls execution of the scan >
    yield from bps.open_run(md={})  # Opening the second run before the first one is closed
    < code that controls execution of the scan >
    yield from bps.close_run()
    yield from bps.close_run()

RE(sample_plan())

Note, that the preassembled plans, such as bluesky.plans.count or bluesky.plans.list_scan, are complete single-run plans, enclosed in open_run and close_run messages, therefore the following code fails as well:

# This code fails while attempting to start a preassembled plan from an open run

import bluesky.plan_stubs as bps
from bluesky.plans import count
from bluesky import RunEngine

RE = RunEngine({})

def sample_plan():
    yield from bps.open_run(md={})  # Starting the first run
    < code that controls execution of the scan >
    yield from bpp.count(<some arguments>)  # Attempting to run a preassembled plan from an open run
    yield from bps.close_run()

RE(sample_plan())

An example of the situation when a preassembled plan is called from another open run is when a preassembled plan is included in a suspender pre- or post-plan. When the suspender is triggered, the current run is interrupted (not closed) and the pre- or post-plan attempts to open another run (the mechanism is the same as in the case of nested runs, see below). As a result, Run Engine fails for the same reason as in the two previous code examples. The new multi-run plan Bluesky features allow to implement nested plans, as well as include full-featured scans in pre- and post-plans.

Bluesky Features for Support of Multi-run Plans#

In order to handle simultaneously open runs within a plan, Run Engine is looking at the run key attribute of each control message to decide which scan is currently being executed. The default value for the run key is None, but it could be manually set in the plan for any block of code which define the run. A run key value may be of any type, but it is strongly recommended that manually assigned run keys are human-readable informative strings.

The new ‘inner’ run can be opened from within the ‘outer’ run only if the run keys of the ‘inner’ and ‘outer’ scans are different. Otherwise the plan exectuion fails.

The run key is used by Run Engine

  • to maintain the state of each run independently from other open runs;

  • to include run metadata, such as scan ID and UID, into the emitted documents. (Metadata is then used to route the documents to the appropriate callbacks. If documents are saved using Databroker, the metadata allows to associate documents with runs and retrieve run data from the database.)

Run key is assigned to a block of code using bpp.set_run_key_wrapper or @bpp.set_run_key_decorator:

import bluesky.preprocessors as bpp
from bluesky import RunEngine

# Using decorator
@bpp.set_run_key_decorator("run_key_example_1")
@bpp.run_decorator(md={})
def sample_plan():
    ...
    < code that controls execution of the run >
    ...

RE(sample_plan())

from bluesky.plans import scan
from ophyd.sim import hw
det, motor = hw().det, hw().motor

# Using wrapper
s = scan([det], motor, -1, 1, 10)
s_wrapped = bpp.set_run_key_wrapper(s, "run_key_example_2")
RE(s_wrapped)

The implementation of @bpp.set_run_key_decorator and bpp.set_run_key_wrapper is replacing the default value None of the attribute run in each message generated within the enclosed block with the user-defined run key.

The @bpp.set_run_key_decorator and bpp.set_run_key_wrapper are primarily intended to be applied to a function that contains a run implementation, but may be also used with any block of plan code. For example, one may write a plan that simultaneously opens multiple runs and executes them in parallel by generating groups of messages with run ids of the open scans. This is currently not recommended and should be attempted only at the developer’s own risk.

Plans with Sequential Runs#

Sequential calling of multiple runs is supported by older versions of Bluesky. There is no need to use multi-run plan features if runs are not overlapping (the next run is opened only after the previous run is closed), but run keys still can be assigned to all or some runs if needed.

In the following example, two preassembled plans are called in sequence. Run Engine is subscribed to a single instance of BestEffortCallback, which is set up to display data specific for each run when the run opened.

# Example: consecutive execution of single-run plans

from databroker import Broker
from ophyd.sim import hw

from bluesky import RunEngine
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky.plans import rel_scan, scan

hw = hw()

RE = RunEngine({})

db = Broker.named("temp")
RE.subscribe(db.insert)

bec = BestEffortCallback()
RE.subscribe(bec)


def plan_sequential_runs(npts):
    # Single-run plans may be called consecutively. No special handling is required
    #   as long as the previous scan is closed before the next one is opened
    yield from scan([hw.det1], hw.motor1, -1, 1, npts)
    yield from rel_scan([hw.det1, hw.det2], hw.motor1, -1, 1, npts)
In [1]: RE(plan_sequential_runs(10))


Transient Scan ID: 1     Time: 2024-11-13 01:39:46
Persistent Unique Scan ID: 'b54612ba-d32d-43f8-ba92-7ceea44b50b6'
New stream: 'primary'
+-----------+------------+------------+------------+
|   seq_num |       time |     motor1 |       det1 |
+-----------+------------+------------+------------+
|         1 | 01:39:46.0 |     -1.000 |      0.677 |
|         2 | 01:39:46.1 |     -0.778 |      1.491 |
|         3 | 01:39:46.1 |     -0.556 |      2.697 |
|         4 | 01:39:46.2 |     -0.333 |      4.004 |
|         5 | 01:39:46.2 |     -0.111 |      4.878 |
|         6 | 01:39:46.3 |      0.111 |      4.878 |
|         7 | 01:39:46.3 |      0.333 |      4.004 |
|         8 | 01:39:46.4 |      0.556 |      2.697 |
|         9 | 01:39:46.4 |      0.778 |      1.491 |
|        10 | 01:39:46.5 |      1.000 |      0.677 |
+-----------+------------+------------+------------+
generator scan ['b54612ba'] (scan num: 1)





Transient Scan ID: 2     Time: 2024-11-13 01:39:46
Persistent Unique Scan ID: '1347a9e1-70ac-460e-a698-2ff6d67842da'
New stream: 'primary'
+-----------+------------+------------+------------+------------+
|   seq_num |       time |     motor1 |       det2 |       det1 |
+-----------+------------+------------+------------+------------+
|         1 | 01:39:46.7 |      0.000 |      1.765 |      5.000 |
|         2 | 01:39:46.9 |      0.222 |      1.765 |      4.530 |
|         3 | 01:39:47.2 |      0.444 |      1.765 |      3.368 |
|         4 | 01:39:47.4 |      0.667 |      1.765 |      2.056 |
|         5 | 01:39:47.6 |      0.889 |      1.765 |      1.030 |
|         6 | 01:39:47.8 |      1.111 |      1.765 |      0.423 |
|         7 | 01:39:48.1 |      1.333 |      1.765 |      0.143 |
|         8 | 01:39:48.3 |      1.556 |      1.765 |      0.040 |
|         9 | 01:39:48.5 |      1.778 |      1.765 |      0.009 |
|        10 | 01:39:48.7 |      2.000 |      1.765 |      0.002 |
+-----------+------------+------------+------------+------------+
generator rel_scan ['1347a9e1'] (scan num: 2)



Out[1]: 
('b54612ba-d32d-43f8-ba92-7ceea44b50b6',
 '1347a9e1-70ac-460e-a698-2ff6d67842da')

Plans with Nested Runs#

The following example illustrates the use of @bpp.set_run_key_decorator to implement two nested runs: the ‘outer’ run interrupts measurements, calls the ‘inner’ run and then completes the measurements. The ‘outer’ and ‘inner’ runs are assigned different run ids (‘run_1’ and ‘run_2’). Note that the @bpp.set_run_key_decorator for the ‘outer’ run does not overwrite the run id of the ‘inner’ scan, despite the fact that it is generated inside the enclosed code, since the decorator is designed to replace the run id attribute of the message only if it has the default value of None, i.e. the run id of a message can be replaced by the decorator only the first time it is processed by the decorator.

If multiple runs are to be opened simultaneously, each run needs to be subscribed to its own instance of callback. Standard RunEngine subscription mechanism does not provide this capability. Instead, subscription should be performed via RunRouter. The code in the following example demonstrates how to use BestEffortCallback to monitor data from multiple nested runs.

# Example: nested runs

from databroker import Broker
from event_model import RunRouter
from ophyd.sim import hw

import bluesky.plan_stubs as bps
import bluesky.preprocessors as bpp
from bluesky import RunEngine
from bluesky.callbacks.best_effort import BestEffortCallback

hw = hw()

RE = RunEngine({})

db = Broker.named("temp")
RE.subscribe(db.insert)


def factory(name, doc):
    # Documents from each run is routed to an independent
    #   instance of BestEffortCallback
    bec = BestEffortCallback()
    return [bec], []


rr = RunRouter([factory])
RE.subscribe(rr)


@bpp.set_run_key_decorator("run_2")
@bpp.run_decorator(md={})
def sim_plan_inner(npts):
    yield from bps.declare_stream(hw.motor1, hw.motor2, hw.det2, name="primary")
    for j in range(npts):
        yield from bps.mov(hw.motor1, j * 0.1 + 1, hw.motor2, j * 0.2 - 2)
        yield from bps.trigger_and_read([hw.motor1, hw.motor2, hw.det2])


@bpp.set_run_key_decorator("run_1")
@bpp.run_decorator(md={})
def sim_plan_outer(npts):
    yield from bps.declare_stream(hw.motor, hw.det, name="primary")
    for j in range(int(npts / 2)):
        yield from bps.mov(hw.motor, j * 0.2)
        yield from bps.trigger_and_read([hw.motor, hw.det])

    yield from sim_plan_inner(npts + 1)

    for j in range(int(npts / 2), npts):
        yield from bps.mov(hw.motor, j * 0.2)
        yield from bps.trigger_and_read([hw.motor, hw.det])

The output of the plan contains data from two runs with each run assigned its own ID and UID. The tables for the runs are printed by two separate instances of BestEffortCallback. The data from two tables is printed in the order of acquisition: the table for the ‘inner’ run is printed in the gap of the table for the ‘outer’ run.

In [2]: RE(sim_plan_outer(10))


Transient Scan ID: 1     Time: 2024-11-13 01:39:49
Persistent Unique Scan ID: 'ec34c553-be35-4b1a-b214-e8938e64bcf7'
New stream: 'primary'
+-----------+------------+------------+------------+
|   seq_num |       time |        det |      motor |
+-----------+------------+------------+------------+
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 


Transient Scan ID: 2     Time: 2024-11-13 01:39:50
Persistent Unique Scan ID: 'f205d769-9032-43dc-abcd-f24f23bd0017'
New stream: 'primary'
+-----------+------------+------------+------------+------------+
|   seq_num |       time |       det2 |     motor1 |     motor2 |
+-----------+------------+------------+------------+------------+
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
+-----------+------------+------------+------------+------------+
generator sim_plan_outer ['f205d769'] (scan num: 2)



 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
+-----------+------------+------------+------------+
generator sim_plan_outer ['ec34c553'] (scan num: 1)



Out[2]: 
('ec34c553-be35-4b1a-b214-e8938e64bcf7',
 'f205d769-9032-43dc-abcd-f24f23bd0017')

The wrapper bpp.set_run_key_wrapper can be used instead of the decorator. For example the run sim_plan_inner from the previous example can be rewritten as follows:

def sim_plan_inner(npts):
    def f():
        for j in range(npts):
            yield from bps.mov(hw.motor1, j * 0.1 + 1, hw.motor2, j * 0.2 - 2)
            yield from bps.trigger_and_read([hw.motor1, hw.motor2, hw.det2])
    f = bpp.run_wrapper(f(), md={})
    return bpp.set_run_key_wrapper(f, "run_2")

Subscription to callbacks via RunRouter provides flexibility to subscribe each run to its own set of callbacks. In the following example run_key is added to the start document metadata and used to distinguish between two runs in the function factory that performs callback subscriptions.

# Example: subscribing runs to individual sets of callbacks

from databroker import Broker
from event_model import RunRouter
from ophyd.sim import hw

import bluesky.plan_stubs as bps
import bluesky.preprocessors as bpp
from bluesky import RunEngine
from bluesky.callbacks import LivePlot, LiveTable

hw = hw()

RE = RunEngine({})

db = Broker.named("temp")
RE.subscribe(db.insert)


def factory(name, doc):
    # Runs may be subscribed to different sets of callbacks. Metadata from start
    #   document may be used to identify, which run is currently being started.
    #   In this example, the run key is explicitely added to the start document
    #   and used to identify runs, but other data can be similarly used.
    cb_list = []
    if doc["run_key"] == "run_1":
        cb_list.append(LiveTable([hw.motor1, hw.det1]))
        cb_list.append(LivePlot("det1", x="motor1"))
    elif doc["run_key"] == "run_2":
        cb_list.append(LiveTable([hw.motor1, hw.motor2, hw.det2]))
    return cb_list, []


rr = RunRouter([factory])
RE.subscribe(rr)


@bpp.set_run_key_decorator("run_2")
@bpp.run_decorator(md={"run_key": "run_2"})
def sim_plan_inner(npts):
    yield from bps.declare_stream(hw.motor1, hw.motor2, hw.det2, name="primary")
    for j in range(npts):
        yield from bps.mov(hw.motor1, j * 0.1 + 1, hw.motor2, j * 0.2 - 2)
        yield from bps.trigger_and_read([hw.motor1, hw.motor2, hw.det2])


@bpp.set_run_key_decorator("run_1")
@bpp.run_decorator(md={"run_key": "run_1"})
def sim_plan_outer(npts):
    yield from bps.declare_stream(hw.motor1, hw.det1, name="primary")
    for j in range(int(npts / 2)):
        yield from bps.mov(hw.motor1, j)
        yield from bps.trigger_and_read([hw.motor1, hw.det1])

    yield from sim_plan_inner(npts + 1)

    for j in range(int(npts / 2), npts):
        yield from bps.mov(hw.motor1, j)
        yield from bps.trigger_and_read([hw.motor1, hw.det1])
In [3]: RE(sim_plan_outer(10))


+-----------+------------+------------+-----------------+------------+
|   seq_num |       time |     motor1 | motor1_setpoint |       det1 |
+-----------+------------+------------+-----------------+------------+
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 


+-----------+------------+------------+-----------------+------------+-----------------+------------+
|   seq_num |       time |     motor1 | motor1_setpoint |     motor2 | motor2_setpoint |       det2 |
+-----------+------------+------------+-----------------+------------+-----------------+------------+
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
+-----------+------------+------------+-----------------+------------+-----------------+------------+
generator sim_plan_outer ['cd3a6752'] (scan num: 2)


 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
 failed to format row 
+-----------+------------+------------+-----------------+------------+
generator sim_plan_outer ['4a70be2a'] (scan num: 1)


Out[3]: 
('4a70be2a-f76a-4a46-865f-5d3b1592d65a',
 'cd3a6752-f4aa-4ed8-9f78-2a6e366f66ce')

In some cases it may be necessary to implement a run that could be interrupted and a new instance of the same run started. For example, the suspender pre- or post-plan may contain a run, which takes substantial time to execute. Such run may be interrupted if the suspender is repeatedly triggered. This will cause another instance of the pre- or post-plan to be started while the first one is still in the open state. This process is similar to recursive calling of the run (run which includes instructions to call itself). Recursive calls are possible if unique run key is assigned to a run each time it is started.

The following example illustrates dynamic generation of run keys. The plan may have no practical purpose besides demonstration of the principle. The plan is calling itself recursively multiple times until the global counter n_calls reaches the maximum value of n_calls_max. The unique run key is generated before at each call.

# Example: recursive runs

from databroker import Broker
from event_model import RunRouter
from ophyd.sim import hw

import bluesky.plan_stubs as bps
import bluesky.preprocessors as bpp
from bluesky import RunEngine
from bluesky.callbacks.best_effort import BestEffortCallback

hw = hw()

RE = RunEngine({})

db = Broker.named("temp")
RE.subscribe(db.insert)


def factory(name, doc):
    # Each run is subscribed to independent instance of BEC
    bec = BestEffortCallback()
    return [bec], []


rr = RunRouter([factory])
RE.subscribe(rr)

# Call counter and the maximum number calls
n_calls, n_calls_max = 0, 3


def sim_plan_recursive(npts):
    global n_calls, n_calls_max

    n_calls += 1  # Increment counter
    if n_calls <= n_calls_max:
        # Generate unique key for each run. The key generation algorithm
        #   must only guarantee that execution of the runs that are assigned
        #   the same key will never overlap in time.
        run_key = f"run_key_{n_calls}"

        @bpp.set_run_key_decorator(run_key)
        @bpp.run_decorator(md={})
        def plan(npts):
            yield from bps.declare_stream(hw.motor1, hw.det1, name="primary")
            for j in range(int(npts / 2)):
                yield from bps.mov(hw.motor1, j * 0.2)
                yield from bps.trigger_and_read([hw.motor1, hw.det1])

            # Different parameter values may be passed to the recursively called plans
            yield from sim_plan_recursive(npts + 2)

            for j in range(int(npts / 2), npts):
                yield from bps.mov(hw.motor1, j * 0.2)
                yield from bps.trigger_and_read([hw.motor1, hw.det1])

        yield from plan(npts)
In [4]: RE(sim_plan_recursive(4))


Transient Scan ID: 1     Time: 2024-11-13 01:39:57
Persistent Unique Scan ID: '571150c5-340d-42a7-900f-91ceda5327f0'
New stream: 'primary'
+-----------+------------+------------+------------+
|   seq_num |       time |       det1 |     motor1 |
+-----------+------------+------------+------------+
 failed to format row 
 failed to format row 


Transient Scan ID: 2     Time: 2024-11-13 01:39:58
Persistent Unique Scan ID: '2dadba8f-38ee-4cc2-98a3-9117c261c04c'
New stream: 'primary'
+-----------+------------+------------+------------+
|   seq_num |       time |       det1 |     motor1 |
+-----------+------------+------------+------------+
|         1 | 01:39:58.3 |      5.000 |      0.000 |
|         2 | 01:39:58.5 |      4.616 |      0.200 |
|         3 | 01:39:58.7 |      3.631 |      0.400 |


Transient Scan ID: 3     Time: 2024-11-13 01:39:58
Persistent Unique Scan ID: '5c9d2ef9-8e53-4ee2-b5c4-8a00fb5d313d'
New stream: 'primary'
+-----------+------------+------------+------------+
|   seq_num |       time |       det1 |     motor1 |
+-----------+------------+------------+------------+
|         1 | 01:39:59.0 |      5.000 |      0.000 |
|         2 | 01:39:59.2 |      4.616 |      0.200 |
|         3 | 01:39:59.4 |      3.631 |      0.400 |
|         4 | 01:39:59.6 |      2.434 |      0.600 |
|         5 | 01:39:59.8 |      1.390 |      0.800 |
|         6 | 01:40:00.0 |      0.677 |      1.000 |
|         7 | 01:40:00.2 |      0.281 |      1.200 |
|         8 | 01:40:00.5 |      0.099 |      1.400 |
+-----------+------------+------------+------------+
generator sim_plan_recursive ['5c9d2ef9'] (scan num: 3)



|         4 | 01:40:01.0 |      2.434 |      0.600 |
|         5 | 01:40:01.2 |      1.390 |      0.800 |
|         6 | 01:40:01.4 |      0.677 |      1.000 |
+-----------+------------+------------+------------+
generator sim_plan_recursive ['2dadba8f'] (scan num: 2)



 failed to format row 
 failed to format row 
+-----------+------------+------------+------------+
generator sim_plan_recursive ['571150c5'] (scan num: 1)



Out[4]: 
('571150c5-340d-42a7-900f-91ceda5327f0',
 '2dadba8f-38ee-4cc2-98a3-9117c261c04c',
 '5c9d2ef9-8e53-4ee2-b5c4-8a00fb5d313d')

The identical result can be achieved by using bpp.set_run_key_wrapper():

# Call counter and the maximum number calls
n_calls, n_calls_max = 0, 3

def sim_plan_recursive(npts):
    global n_calls, n_calls_max

    n_calls += 1  # Increment counter
    if n_calls <= n_calls_max:
        # Generate unique key for each run. The key generation algorithm
        #   must only guarantee that execution of the runs that are assigned
        #   the same key will never overlap in time.
        run_key = f"run_key_{n_calls}"

        @bpp.run_decorator(md={})
        def plan(npts):

            for j in range(int(npts/2)):
                yield from bps.mov(hw.motor1, j * 0.2)
                yield from bps.trigger_and_read([hw.motor1, hw.det1])

            # Different parameter values may be passed to the recursively called plans
            yield from sim_plan_recursive(npts + 2)

            for j in range(int(npts/2), npts):
                yield from bps.mov(hw.motor1, j * 0.2)
                yield from bps.trigger_and_read([hw.motor1, hw.det1])

        yield from bpp.set_run_key_wrapper(plan(npts), run_key)