Message Protocol#
Note: This is a technical document not optimized for user readability.
Overview#
A plan is a sequence of atomic operations describing a data acquisition
procedure. Each operation is represented by a bluesky.Msg
(“message”)
object. A plan may be implemented as a simple list of messages:
from bluesky import Msg
# (Behold, the most boring data acquisition ever conducted!)
plan = [Msg('open_run'), Msg('close_run')]
or as a generator the yields messages one at time:
def plan():
yield Msg('open_run')
yield Msg('close_run')
The above examples are equivalent. For more sophisticated uses, the second one is more powerful, as it can incorporate loops, conditionals, adaptive logic — generally any Python code.
But, crucially, the plan code itself must not communicate with hardware.
(You should never put epics.caput(...)
in a plan!) Rather, each operation
is represented by a Msg
object that describes what should be done. This
makes it safe to introspect the plan for error-checking, simulation, and
visualization purposes — without touching real hardware. For example, we
could print each message in the plan like so:
plan = [Msg('open_run'), Msg('close_run')]
# a very, very simple 'plan simulator'
for msg in plan:
print(msg)
A Msg
has five members, accessible as attributes:
command
obj
args
kwargs
run
where command
must be one of a controlled list of commands, obj
is the
object (i.e. Device) to apply the command to, if applicable, args
and
kwargs
are arguments to the command and run
is a user-defined run key.
The run key is used by Run Engine to associate each message with one of the open runs,
manage the state of each open run, and route run data to a separate set of callbacks
(see documentation on Multi-Run Plans).
To execute the plan, the RunEngine consumes it, one message at a time.
def very_simple_run_engine(plan):
for msg in plan:
# Process the msg.
The RunEngine
has a registry which is used to dispatch the Msg
objects
based on the value of the Msg.command
. For example, if the RunEngine
receives the message Msg('set', motor, 5)
, the RunEngine will:
Identify that the command for this message is
'set'
.Look up
'set'
in its command registry and find that it is mapped toRunEngine._set
.Pass
Msg('set', motor, 5)
to its_set
method.Inside
_set
, callmotor.set(5)
. (This is where the actual communication with hardware occurs.)Update some internal caches that will be useful later. For example, it will keep track of that fact that
motor
may be in motion so that it can stop it safely if an error occurs. This illustrates another important reason that plans must always yield messages to interact with hardware and absolutely never communicate with hardware directly. Callingepics.caput
inside a plan prevents the RunEngine from knowing about it and thus circumvents its facilities for putting devices in a safe state in the event of an unexpected exit or error.
A standard set of commands are registered by default. By convention, a Msg
with the command 'name'
is mapped to a coroutine method on the RunEngine
named _name
, as in 'set'
-> RunEngine._set
in the example above.
Users can register their own coroutines to add custom commands, though this is
very rarely necessary.
Some commands do not involve communication with hardware. For example,
Msg('sleep', None, 5)
causes the RunEngine to sleep for 5 seconds. None
is a placeholder for the “object” (Device) which is not applicable for a
'sleep'
command. Just as plans should never communicate with hardware
directly, they should also never employ long blocking calls like
time.sleep()
. Instead, the 'sleep'
command, mapped to
RunEngine._sleep
, integrates with the RunEngine’s event loop to sleep in a
non-blocking way that allows for the RunEngine to stay responsive in the
meantime — watching for user interruptions and possibility collecting data
asynchronously in the background.
Other commands are used to control metadata and I/O. For example,
Msg('open_run')
and Msg('close_run')
delineate the scope of one run.
Any keyword arguments passed to the 'open_run'
message are interpreted as
metadata, encoded into the RunStart document.
The following is a comprehensive overview of the built-in commands.
Commands#
Warning
This section of the documentation is incomplete.
These are the ‘built in’ commands, some of which are deeply tied to the state of the RunEngine instance.
create#
This command tells the run engine that it should start to collect the results
of read
to create an event. If this is called twice without a save
or
drop
between them it is an exception (as you can not have more than one
open event going at a time).
This relies very heavily on the internal state of the run engine and should not be overridden by the user.
This call returns None back to the co-routine.
This ignores all parts of the Msg except the command.
save#
This is the pair to create
which bundles and causes Event
documents to
be emitted. This must be called after a create
or a the scan will die and
raise IllegalMessageSequence.
This relies very heavily on the internal state of the run engine and should not be messed with.
This call returns None back to the co-routine.
This ignores all parts of the Msg except the command.
read#
This causes read to be called on the obj
in the message
msg.obj.read(*msg.args, **msg.kwargs)
Anything that is read between a create
and save
will be bundled into
a single event.
This relies very heavily on the internal state of the run engine and should not be messed with.
Returns the dictionary returned by read to the co-routine.
The args
and kwargs
parts of the message are passed to the read
method.
null#
This is a null message and is ignored by the run engine. This exists to make the algebra work.
Returns None to the co-routine.
Ignores all values in the Msg except the command.
set#
Tells a Mover
object to move. Currently this mimics the epics-like logic
of immediate motion.
stage and unstage#
Instruct the RunEngine to stage/unstage the object. This calls
obj.stage()
/obj.unstage
.
Expected message objects are:
Msg('stage', object)
Msg('unstage', object)
which results in these calls:
staged_devices = object.stage()
unstaged_devices = object.unstage()
where staged_devices
/unstaged_devices
are a list of the
ophyd.Device
(s) that were (un)staged, not status objects.
One may wonder why the return is a list of Devices as opposed to Status
objects, such as in set
and similar Msg
s.
This was debated for awhile. Operations performed during staging are supposed
to involve twiddling configuration, and should happen fast. Staging should not
involve lengthy set calls.
Why a list of the objects staged? Staging a Device causes that Device’s component Devices (if any) to also be staged. All of these children are added to a list, along with [self], and returned by Device.stage(), so that the plan can keep track of what has been staged, like so:
devices_staged = yield Msg('stage', device)
Why would the plan want to know that? It needs to avoid accidentally trying to stage something twice, such as a staging a parent and then trying to also stage its child. It’s important to avoid that because staging something redundantly raises an error.
trigger#
This will call the obj.trigger
method and cache the returned status object
and caches the returned status object.
sleep#
Sleep the event loop.
wait#
Block progress until every object that was triggered or set the keyword argument group=<GROUP> is done.
Expected message object is:
Msg(‘wait’, group=<GROUP>)
where <GROUP>
is any hashable key.
wait_for#
Instruct the RunEngine
to wait for this asyncio.Future
object to be
done. This allows for external arbitrary control of the RunEngine
.
Ex
from asyncio.futures import Future
future = Future()
future.done() # will give false
RE(Msg('wait_for', [lambda : future ,]))
# this sets the future to done
future.set_result(3)
future.done() # will give True
Returns a set of the tasks that were waited on. Ex
from asyncio.futures import Future
future = Future()
future.done() # will give false
def plan()
results = (yield Msg('wait_for', [lambda : future ,]))
for result in results:
# will give the exception raised
result.exception()
input#
Process an input. Allows for user input during a run.
Examples:
Msg('input', None)
Msg('input', None, prompt='>') # customize prompt
checkpoint#
Instruct the RunEngine to create a checkpoint so that we can rewind to this point if necessary.
clear_checkpoint#
Clear a set checkpoint.
rewindable#
pause#
Request the run engine to pause
Expected message object is:
Msg('pause', defer=False, name=None, callback=None)
kickoff#
Start a flyscan object.
collect#
Collect data cached by a flyer and emit descriptor and event documents.
This calls the obj.collect()
method.
complete#
Tell a flyer, ‘stop collecting, whenever you are ready’.
This calls the method obj.complete()
of the given object. The flyer returns
a status object. Some flyers respond to this command by stopping collection and
returning a finished status object immediately. Other flyers finish their given
course and finish whenever they finish, irrespective of when this command is
issued.
configure#
Configure an object.
Expected message object is:
Msg('configure', object, *args, **kwargs)
which results in this call:
object.configure(*args, **kwargs)
subscribe#
Add a subscription after the run has started.
This, like subscriptions passed to __call__, will be removed at the end by the RunEngine.
Expected message object is:
Msg(‘subscribe’, None, callback_function, document_name)
where document_name is one of:
{‘start’, ‘descriptor’, ‘event’, ‘stop’, ‘all’}
and callback_function is expected to have a signature of:
f(name, document)
where name is one of the
document_name
options anddocument
is one of the document dictionaries in the event model.
See the docstring of bluesky.run_engine.Dispatcher.subscribe() for more information.
unsubscribe#
Remove a subscription during a call – useful for a multi-run call where subscriptions are wanted for some runs but not others.
Expected message object is:
Msg('unsubscribe', None, TOKEN)
Msg('unsubscribe', token=TOKEN)
where TOKEN
is the return value from RunEngine._subscribe()
open_run#
Instruct the RunEngine to start a new “run”
Expected message object is:
Msg('open_run', None, **kwargs)
where **kwargs
are any additional metadata that should go into the RunStart
document
close_run#
Instruct the RunEngine to write the RunStop document
Expected message object is:
Msg('close_run', None, exit_status=None, reason=None)
if exit_stats and reason are not provided, use the values stashed on the RE.
drop#
Drop a bundle of readings without emitting a completed Event document.
This is a command that abandons previous create
and read
commands
without emitting an event. This can be used to drop known bad events
(e.g. no beam) and keep the event document stream clean. It is safe to start
another create
, read
, save
sequence after a drop
.
This must be called after a create
or a the scan will die and raise
IllegalMessageSequence.
This call returns None back to the co-routine.
This ignores all parts of the Msg except the command.
monitor#
Monitor a signal. Emit event documents asynchronously.
A descriptor document is emitted immediately. Then, a closure is defined that emits Event documents associated with that descriptor from a separate thread. This process is not related to the main bundling process (create/read/save).
Expected message object is:
Msg('monitor', obj, **kwargs)
Msg('monitor', obj, name='event-stream-name', **kwargs)
where kwargs are passed through to obj.subscribe()
unmonitor#
Stop monitoring; i.e., remove the callback emitting event documents.
Expected message object is:
Msg('unmonitor', obj)
stop#
Stop a device.
Expected message object is:
Msg('stop', obj)
This amounts to calling obj.stop()
.
Registering Custom Commands#
The RunEngine can be taught any new commands. They can be registered using the following methods.
- RunEngine.register_command(name, func)[source]
Register a new Message command.
- Parameters:
- namestr
- funccallable
This can be a function or a method. The signature is f(msg).
- RunEngine.unregister_command(name)[source]
Unregister a Message command.
- Parameters:
- namestr
- RunEngine.commands
The list of commands available to Msg.
See also
Examples
>>> from bluesky import RunEngine >>> RE = RunEngine() >>> # to list commands >>> RE.commands
- RunEngine.print_command_registry(verbose=False)[source]
This conveniently prints the command registry of available commands.
- Parameters:
- Verbosebool, optional
- verbose print. Default is False
Examples
>>> from bluesky import RunEngine >>> RE = RunEngine() >>> # Print a very verbose list of currently registered commands >>> RE.print_command_registry(verbose=True)