Simulation and Error Checking

Bluesky provides three different approaches for simulating a plan without actually executing it:

  1. Introspect a plan by passing it to a “simulator” instead of a RunEngine.
  2. Execute a plan with the real RunEngine, but use simulated hardware objects.
  3. Redefine the RunEngine commands to change their meanings.

Approaches (1) and (2) are the most straightforward and most common.

Introspection

Recall that plans yield messages that describe what should be done; they do not communicate with hardware directly. Therefore it’s easy to use (or write) a simple function that iterates through the plan and summarizes or analyzes its actions.

summarize_plan Print summary of plan
plot_raster_path Plot the raster path for this plan
check_limits Check that a plan will not move devices outside of their limits.

Summarize

The simulator summarize_plan() print a summary of what a plan would do if executed by the RunEngine.

In [1]: from bluesky.simulators import summarize_plan

In [2]: from ophyd.sim import det, motor

In [3]: from bluesky.plans import scan

In [4]: summarize_plan(scan([det], motor, 1, 3 ,3))
=================================== Open Run ===================================
motor -> 1.0
  Read ['det', 'motor']
motor -> 2.0
  Read ['det', 'motor']
motor -> 3.0
  Read ['det', 'motor']
================================== Close Run ===================================

To see the unabridged contents of a plan, simply use the builtin Python function list(). Note that it is not possible to summarize plans that have adaptive logic because their contents are determined dynamically during plan executation.

In [5]: list(scan([det], motor, 1, 3 ,3))
Out[5]: 
[stage[None]: (SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier'])), (), {},
 stage[None]: (SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])), (), {},
 open_run[None]: (None), (), {'detectors': ['det'], 'motors': ('motor',), 'num_points': 3, 'num_intervals': 2, 'plan_args': {'detectors': ["SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier'])"], 'num': 3, 'args': ["SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])", 1, 3], 'per_step': 'None'}, 'plan_name': 'scan', 'hints': {'dimensions': [(['motor'], 'primary')]}, 'plan_pattern': 'inner_product', 'plan_pattern_module': 'bluesky.plan_patterns', 'plan_pattern_args': {'num': 3, 'args': ["SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])", 1, 3]}},
 checkpoint[None]: (None), (), {},
 set[None]: (SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])), (1.0,), {'group': 'set-e8ae86'},
 wait[None]: (None), (), {'group': 'set-e8ae86'},
 trigger[None]: (SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier'])), (), {'group': 'trigger-b7d546'},
 trigger[None]: (SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])), (), {'group': 'trigger-b7d546'},
 wait[None]: (None), (), {'group': 'trigger-b7d546'},
 create[None]: (None), (), {'name': 'primary'},
 read[None]: (SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier'])), (), {},
 read[None]: (SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])), (), {},
 save[None]: (None), (), {},
 checkpoint[None]: (None), (), {},
 set[None]: (SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])), (2.0,), {'group': 'set-03a8b7'},
 wait[None]: (None), (), {'group': 'set-03a8b7'},
 trigger[None]: (SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier'])), (), {'group': 'trigger-4dfaa5'},
 trigger[None]: (SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])), (), {'group': 'trigger-4dfaa5'},
 wait[None]: (None), (), {'group': 'trigger-4dfaa5'},
 create[None]: (None), (), {'name': 'primary'},
 read[None]: (SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier'])), (), {},
 read[None]: (SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])), (), {},
 save[None]: (None), (), {},
 checkpoint[None]: (None), (), {},
 set[None]: (SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])), (3.0,), {'group': 'set-78159f'},
 wait[None]: (None), (), {'group': 'set-78159f'},
 trigger[None]: (SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier'])), (), {'group': 'trigger-edc1e5'},
 trigger[None]: (SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])), (), {'group': 'trigger-edc1e5'},
 wait[None]: (None), (), {'group': 'trigger-edc1e5'},
 create[None]: (None), (), {'name': 'primary'},
 read[None]: (SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier'])), (), {},
 read[None]: (SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])), (), {},
 save[None]: (None), (), {},
 close_run[None]: (None), (), {'exit_status': None, 'reason': None},
 unstage[None]: (SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])), (), {},
 unstage[None]: (SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier'])), (), {}]

Check Limits

Suppose that this motor is configured with limits on its range of motion at +/- 1000. The check_limits() simulator can verify whether or not a plan will violate these limits, saving you from discovering this part way through a long experiment.

In [6]: from bluesky.simulators import check_limits

In [7]: check_limits(scan([det], motor, 1, 3 ,3))  # no problem here

In [8]: check_limits(scan([det], motor, 1, -3000, 3000))  # should raise an error
---------------------------------------------------------------------------
LimitsExceeded                            Traceback (most recent call last)
<ipython-input-8-5168c8638c33> in <module>
----> 1 check_limits(scan([det], motor, 1, -3000, 3000))  # should raise an error

~/virtualenv/python3.6.7/lib/python3.6/site-packages/bluesky-1.6.0rc2.post37+g29f93b6-py3.6.egg/bluesky/simulators.py in check_limits(plan)
    114                 raise LimitsExceeded("This plan would put {} at {} "
    115                                      "which is outside of its limits, {}."
--> 116                                      "".format(msg.obj.name, pos, (low, high)))

LimitsExceeded: This plan would put motor at -1000.6675558519507 which is outside of its limits, (-1000, 1000).

Simulated Hardware

Warning

This feature has recently been changed, and it has yet to be documented.

Customizing RunEngine Methods

The RunEngine allows you to customize the meaning of commands (like ‘set’ and ‘read’). One could use this feature to create a dummy RunEngine that, instead of actually reading and writing to hardware, merely reports what it would have done.

RunEngine.register_command(self, name, func)[source]

Register a new Message command.

Parameters:
name : str
func : callable

This can be a function or a method. The signature is f(msg).

See also

RunEngine.unregister_command()
RunEngine.print_command_registry()
RunEngine.commands
RunEngine.unregister_command(self, name)[source]

Unregister a Message command.

Parameters:
name : str

See also

RunEngine.register_command()
RunEngine.print_command_registry()
RunEngine.commands