Simulation and Error Checking#

Bluesky provides three different approaches for simulating a plan without actually executing it:

  1. Introspect a plan by passing it to a “simulator” instead of a RunEngine.

  2. Execute a plan with the real RunEngine, but use simulated hardware objects.

  3. Redefine the RunEngine commands to change their meanings.

Approaches (1) and (2) are the most straightforward and most common.

Introspection#

Recall that plans yield messages that describe what should be done; they do not communicate with hardware directly. Therefore it’s easy to use (or write) a simple function that iterates through the plan and summarizes or analyzes its actions.

summarize_plan

Print summary of plan

plot_raster_path

Plot the raster path for this plan

check_limits

Run check_limits_async in the RE

Summarize#

The simulator summarize_plan() print a summary of what a plan would do if executed by the RunEngine.

In [1]: from bluesky.simulators import summarize_plan

In [2]: from ophyd.sim import det, motor

In [3]: from bluesky.plans import scan

In [4]: summarize_plan(scan([det], motor, 1, 3 ,3))
=================================== Open Run ===================================
motor -> 1.0
  Read ['det', 'motor']
motor -> 2.0
  Read ['det', 'motor']
motor -> 3.0
  Read ['det', 'motor']
================================== Close Run ===================================

To see the unabridged contents of a plan, simply use the builtin Python function list(). Note that it is not possible to summarize plans that have adaptive logic because their contents are determined dynamically during plan executation.

In [5]: list(scan([det], motor, 1, 3 ,3))
Out[5]: 
[Msg('stage', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={'group': '395b47bb-c641-4fde-bbc6-44c66fc16f06'}, run=None),
 Msg('stage', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={'group': '395b47bb-c641-4fde-bbc6-44c66fc16f06'}, run=None),
 Msg('open_run', obj=None, args=(), kwargs={'detectors': ['det'], 'motors': ('motor',), 'num_points': 3, 'num_intervals': 2, 'plan_args': {'detectors': ["SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier'])"], 'num': 3, 'args': ["SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])", 1, 3], 'per_step': 'None'}, 'plan_name': 'scan', 'hints': {'dimensions': [(['motor'], 'primary')]}, 'plan_pattern': 'inner_product', 'plan_pattern_module': 'bluesky.plan_patterns', 'plan_pattern_args': {'num': 3, 'args': ["SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])", 1, 3]}}, run=None),
 Msg('checkpoint', obj=None, args=(), kwargs={}, run=None),
 Msg('set', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(np.float64(1.0),), kwargs={'group': 'set-75884c'}, run=None),
 Msg('wait', obj=None, args=(), kwargs={'group': 'set-75884c'}, run=None),
 Msg('trigger', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={'group': 'trigger-93da1e'}, run=None),
 Msg('trigger', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={'group': 'trigger-93da1e'}, run=None),
 Msg('wait', obj=None, args=(), kwargs={'group': 'trigger-93da1e', 'error_on_timeout': True, 'timeout': None}, run=None),
 Msg('create', obj=None, args=(), kwargs={'name': 'primary'}, run=None),
 Msg('read', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={}, run=None),
 Msg('read', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={}, run=None),
 Msg('save', obj=None, args=(), kwargs={}, run=None),
 Msg('checkpoint', obj=None, args=(), kwargs={}, run=None),
 Msg('set', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(np.float64(2.0),), kwargs={'group': 'set-403600'}, run=None),
 Msg('wait', obj=None, args=(), kwargs={'group': 'set-403600'}, run=None),
 Msg('trigger', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={'group': 'trigger-4b18ae'}, run=None),
 Msg('trigger', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={'group': 'trigger-4b18ae'}, run=None),
 Msg('wait', obj=None, args=(), kwargs={'group': 'trigger-4b18ae', 'error_on_timeout': True, 'timeout': None}, run=None),
 Msg('create', obj=None, args=(), kwargs={'name': 'primary'}, run=None),
 Msg('read', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={}, run=None),
 Msg('read', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={}, run=None),
 Msg('save', obj=None, args=(), kwargs={}, run=None),
 Msg('checkpoint', obj=None, args=(), kwargs={}, run=None),
 Msg('set', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(np.float64(3.0),), kwargs={'group': 'set-971791'}, run=None),
 Msg('wait', obj=None, args=(), kwargs={'group': 'set-971791'}, run=None),
 Msg('trigger', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={'group': 'trigger-3fedbc'}, run=None),
 Msg('trigger', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={'group': 'trigger-3fedbc'}, run=None),
 Msg('wait', obj=None, args=(), kwargs={'group': 'trigger-3fedbc', 'error_on_timeout': True, 'timeout': None}, run=None),
 Msg('create', obj=None, args=(), kwargs={'name': 'primary'}, run=None),
 Msg('read', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={}, run=None),
 Msg('read', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={}, run=None),
 Msg('save', obj=None, args=(), kwargs={}, run=None),
 Msg('close_run', obj=None, args=(), kwargs={'exit_status': None, 'reason': None}, run=None),
 Msg('unstage', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={'group': 'df4c1cd9-1d33-4e24-9e90-5d37a6f43883'}, run=None),
 Msg('unstage', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={'group': 'df4c1cd9-1d33-4e24-9e90-5d37a6f43883'}, run=None)]

Check Limits#

Suppose that this motor is configured with limits on its range of motion at +/- 1000. The check_limits() simulator can verify whether or not a plan will violate these limits, saving you from discovering this part way through a long experiment.

In [6]: from bluesky.simulators import check_limits

In [7]: check_limits(scan([det], motor, 1, 3 ,3))  # no problem here

In [8]: check_limits(scan([det], motor, 1, -3000, 3000))  # should raise an error

Simulated Hardware#

Warning

This feature has recently been changed, and it has yet to be documented.

Customizing RunEngine Methods#

The RunEngine allows you to customize the meaning of commands (like ‘set’ and ‘read’). One could use this feature to create a dummy RunEngine that, instead of actually reading and writing to hardware, merely reports what it would have done.

RunEngine.register_command(name, func)[source]

Register a new Message command.

Parameters:
namestr
funccallable

This can be a function or a method. The signature is f(msg).

RunEngine.unregister_command(name)[source]

Unregister a Message command.

Parameters:
namestr