Simulation and Error Checking#
Bluesky provides three different approaches for simulating a plan without actually executing it:
Introspect a plan by passing it to a “simulator” instead of a RunEngine.
Execute a plan with the real RunEngine, but use simulated hardware objects.
Redefine the RunEngine commands to change their meanings.
Approaches (1) and (2) are the most straightforward and most common.
Introspection#
Recall that plans yield messages that describe what should be done; they do not communicate with hardware directly. Therefore it’s easy to use (or write) a simple function that iterates through the plan and summarizes or analyzes its actions.
Print summary of plan |
|
Plot the raster path for this plan |
|
Run check_limits_async in the RE |
Summarize#
The simulator summarize_plan()
print a summary of what a plan would do if
executed by the RunEngine.
In [1]: from bluesky.simulators import summarize_plan
In [2]: from ophyd.sim import det, motor
In [3]: from bluesky.plans import scan
In [4]: summarize_plan(scan([det], motor, 1, 3 ,3))
=================================== Open Run ===================================
motor -> 1.0
Read ['det', 'motor']
motor -> 2.0
Read ['det', 'motor']
motor -> 3.0
Read ['det', 'motor']
================================== Close Run ===================================
To see the unabridged contents of a plan, simply use the builtin Python
function list()
. Note that it is not possible to summarize plans that
have adaptive logic because their contents are determined dynamically during
plan executation.
In [5]: list(scan([det], motor, 1, 3 ,3))
Out[5]:
[Msg('stage', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={'group': '16f098a5-7ad9-48e8-9c72-ed3177c5f88e'}, run=None),
Msg('stage', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={'group': '16f098a5-7ad9-48e8-9c72-ed3177c5f88e'}, run=None),
Msg('open_run', obj=None, args=(), kwargs={'detectors': ['det'], 'motors': ('motor',), 'num_points': 3, 'num_intervals': 2, 'plan_args': {'detectors': ["SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier'])"], 'num': 3, 'args': ["SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])", 1, 3], 'per_step': 'None'}, 'plan_name': 'scan', 'hints': {'dimensions': [(['motor'], 'primary')]}, 'plan_pattern': 'inner_product', 'plan_pattern_module': 'bluesky.plan_patterns', 'plan_pattern_args': {'num': 3, 'args': ["SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])", 1, 3]}}, run=None),
Msg('checkpoint', obj=None, args=(), kwargs={}, run=None),
Msg('set', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(np.float64(1.0),), kwargs={'group': 'set-fb8a3a'}, run=None),
Msg('wait', obj=None, args=(), kwargs={'group': 'set-fb8a3a'}, run=None),
Msg('trigger', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={'group': 'trigger-a3bee3'}, run=None),
Msg('trigger', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={'group': 'trigger-a3bee3'}, run=None),
Msg('wait', obj=None, args=(), kwargs={'group': 'trigger-a3bee3', 'error_on_timeout': True, 'timeout': None}, run=None),
Msg('create', obj=None, args=(), kwargs={'name': 'primary'}, run=None),
Msg('read', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={}, run=None),
Msg('read', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={}, run=None),
Msg('save', obj=None, args=(), kwargs={}, run=None),
Msg('checkpoint', obj=None, args=(), kwargs={}, run=None),
Msg('set', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(np.float64(2.0),), kwargs={'group': 'set-18a1b8'}, run=None),
Msg('wait', obj=None, args=(), kwargs={'group': 'set-18a1b8'}, run=None),
Msg('trigger', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={'group': 'trigger-e947c0'}, run=None),
Msg('trigger', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={'group': 'trigger-e947c0'}, run=None),
Msg('wait', obj=None, args=(), kwargs={'group': 'trigger-e947c0', 'error_on_timeout': True, 'timeout': None}, run=None),
Msg('create', obj=None, args=(), kwargs={'name': 'primary'}, run=None),
Msg('read', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={}, run=None),
Msg('read', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={}, run=None),
Msg('save', obj=None, args=(), kwargs={}, run=None),
Msg('checkpoint', obj=None, args=(), kwargs={}, run=None),
Msg('set', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(np.float64(3.0),), kwargs={'group': 'set-d2d897'}, run=None),
Msg('wait', obj=None, args=(), kwargs={'group': 'set-d2d897'}, run=None),
Msg('trigger', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={'group': 'trigger-e390ab'}, run=None),
Msg('trigger', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={'group': 'trigger-e390ab'}, run=None),
Msg('wait', obj=None, args=(), kwargs={'group': 'trigger-e390ab', 'error_on_timeout': True, 'timeout': None}, run=None),
Msg('create', obj=None, args=(), kwargs={'name': 'primary'}, run=None),
Msg('read', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={}, run=None),
Msg('read', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={}, run=None),
Msg('save', obj=None, args=(), kwargs={}, run=None),
Msg('close_run', obj=None, args=(), kwargs={'exit_status': None, 'reason': None}, run=None),
Msg('unstage', obj=SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration']), args=(), kwargs={'group': '0663c6c8-d4f9-48a1-9662-9d79f40b090e'}, run=None),
Msg('unstage', obj=SynGauss(prefix='', name='det', read_attrs=['val'], configuration_attrs=['Imax', 'center', 'sigma', 'noise', 'noise_multiplier']), args=(), kwargs={'group': '0663c6c8-d4f9-48a1-9662-9d79f40b090e'}, run=None)]
Check Limits#
Suppose that this motor is configured with limits on its range of motion at +/-
1000. The check_limits()
simulator can verify whether or not a plan will
violate these limits, saving you from discovering this part way through a long
experiment.
In [6]: from bluesky.simulators import check_limits
In [7]: check_limits(scan([det], motor, 1, 3 ,3)) # no problem here
In [8]: check_limits(scan([det], motor, 1, -3000, 3000)) # should raise an error
Simulated Hardware#
Warning
This feature has recently been changed, and it has yet to be documented.
Customizing RunEngine Methods#
The RunEngine allows you to customize the meaning of commands (like ‘set’ and ‘read’). One could use this feature to create a dummy RunEngine that, instead of actually reading and writing to hardware, merely reports what it would have done.
- RunEngine.register_command(name, func)[source]
Register a new Message command.
- Parameters:
- namestr
- funccallable
This can be a function or a method. The signature is f(msg).
- RunEngine.unregister_command(name)[source]
Unregister a Message command.
- Parameters:
- namestr