The RunEngine run loop#
Note: This is a technical document not optimized for user readability.
In this document, we start with a simplified version of the bluesky RunEngine. We add more complexity step by step, with commentary.
The heart of bluesky is the RunEngine._run
co-routine which dispatches the
Msg
in the plan to functions that actually carry out the requested task.
The core operation is obscured by the layers of exception handling, state
management, and clean up the RunEngine is responsible for. (Some of this may
be refactored in the near future). This document is only going to discuss the
run loop, not Document generation or hardware clean up.
Minimal RunEngine#
A minimal (run-able) RunEngine is
from time import sleep
import datetime
now = datetime.datetime.now
from bluesky import Msg
function_map = {'print':
lambda msg: print('-- {!s:10.10s} : {: <25.25s} --'.format(now().time(), msg.obj)),
'sleep':
lambda msg: sleep(msg.args[0])}
def RE_v0(plan):
for msg in plan:
func = function_map[msg.command]
func(msg)
welcome_plan = [Msg('print', 'hello'), Msg('sleep', None, 1), Msg('print', 'world!')]
RE_v0(welcome_plan)
which captures one of the key abstractions of bluesky: A plan is
just an iterable of messages. This abstraction means that the to plan an
experiment you only need to generate a stream of Msg
objects and the
RunEngine will take care of actually executing the code.
Adaptive Plans#
Simply having a stream of commands is not quite enough, you may want to
have the code generating the stream of messages be aware of the return
value of a previous Msg
to decide what to do next. This sort of
thing is supported in python using
generators
which ‘suspend’ their execution at a yield
statement. When you
iterate over a generator, it runs until the next yield
statement, suspends, and yields the value to the code which is iterating
over it.
Switching to generators requires we change our minimal RE to
from bluesky.utils import ensure_generator
def RE_v1(plan):
plan = ensure_generator(plan)
last_result = None
while True:
try:
msg = plan.send(last_result)
except StopIteration:
# generators indicate they are done by raising
# StopIteration
break
func = function_map[msg.command]
last_result = func(msg)
which still works with the welcome_plan
RE_v1([Msg('print', 'hello'), Msg('sleep', None, 1), Msg('print', 'world!')])
but we can also do more sophisticated things like
function_map['sum'] = lambda msg: sum(msg.args)
def adding_plan(a, b):
yield Msg('print', '{} + {} = ??'.format(a, b))
ret = yield Msg('sum', None, a, b)
yield Msg('print', '{} + {} = {}'.format(a, b, ret))
yield Msg('print', 'thanks for adding')
Which gives
RE_v1(adding_plan(1, 2))
RE_v1(adding_plan(5, 2))
This is obviously overkill for simple addition, but enables this like an adaptive dscan that changes the step size based on the local slope.
Exception Handling#
In addition to generator.send
(which inserts a value into the
generator) you can also use generator.throw
which raises an
exception at the point where the generator is paused. If the generator
handles the exception (via a try...except
block) then generator
runs until the next yield
and throw
returns the yielded
value. If the generator does not handle the exception (or raises a
different exception) then it is (re)raised by throw
.
We want to be able to capture any exceptions raised by the RE
and pass those back to the plan.
def RE_v2(plan):
plan = ensure_generator(plan)
last_result = None
_exception = None
while True:
try:
if _exception is not None:
msg = plan.throw(_exception)
_exception = None
else:
msg = plan.send(last_result)
except StopIteration:
break
try:
func = function_map[msg.command]
last_result = func(msg)
except Exception as e:
_exception = e
We can now write plans that handle exception from the RE, in this case
reporting that the addition failed due to a TypeError
def safe_adding_plan(a, b):
yield Msg('print', '{} + {} = ??'.format(a, b))
try:
ret = yield Msg('sum', None, a, b)
except TypeError:
yield Msg('print', 'can not add {} + {}!'.format(a, b))
else:
yield Msg('print', '{} + {} = {}'.format(a, b, ret))
finally:
yield Msg('print', 'thanks for adding')
Compare the behavior of between adding_plan
and addingplan
in cases
where they succeed
RE_v2(safe_adding_plan(1, 2))
RE_v2(adding_plan(1, 2))
and fail
RE_v2(safe_adding_plan('a', 2))
RE_v2(adding_plan('a', 2))
Again, this is overkill for these simple cases, but this mechanism allows us to write delta scans that always return the motors to their original position, shut shutters, etc even if the plan fails or is canceled.
Turn into a callable class#
We are going to want to have access to the internal state of the
_run
loop very soon. An way to do this, while maintaining
the API we have above is to write a callable class instead of a
function.
class RunEngine_v3:
def _sleep(self, msg):
sleep(msg.args[0])
def _print(self, msg):
print('-- {!s:10.10s} : {: <25.25s} --'.format(now().time(), msg.obj)),
def _sum(self, msg):
return sum(msg.args)
def __init__(self):
self._command_registry = {
'print': self._print,
'sum': self._sum,
'sleep': self._sleep}
def __call__(self, plan):
self._run(plan)
def _run(self, plan):
plan = ensure_generator(plan)
last_result = None
_exception = None
while True:
try:
if _exception is not None:
msg = plan.throw(_exception)
_exception = None
else:
msg = plan.send(last_result)
except StopIteration:
break
try:
func = self._command_registry[msg.command]
last_result = func(msg)
except Exception as e:
_exception = e
RE_v3 = RunEngine_v3()
In doing this we also pulled the function the commands dispatched to
into the class. While these methods are almost trivial, we will soon
have methods that alter the internal state of the RunEngine
.
asyncio
integration#
So far all of these RE implementations have been synchronous functions, that is they run straight through the plan. However, at a beamline we need to be able to support asynchronous functionality and gracefully interrupt the plan.
To enable this we are using asyncio
from the python standard library
(new in 3.4) to provide the outer event loop. At this point we are
integrating together two event loops: the RE loop which is processing
the plan and the asyncio
event loop which is managing multiple
frames of execution. The event loop may switch between execution frames
when a coroutine is suspended by a yield from
statement. Thus we
change the methods we dispatch to and the main _run
method to
co-routines by adding the @asyncio.coroutine
decorator and calling
the dispatched functions via yield from
rather than with a direct
function call.
We also added a msg_hook
attribute to the RunEngine
which is a super handy debugging tool to see exactly what messages are
being processed by the RunEngine. It can be set to any callable which
takes a single Msg
as input (ex print
)
import asyncio
class RunEngine_v4:
def __init__(self, *, loop=None):
# map messages to coro
self._command_registry = {
'print': self._print,
'sum': self._sum,
'sleep': self._sleep}
# debugging hook
self.msg_hook = None
# bind RE to a specific loop
if loop is None:
loop = asyncio.get_event_loop()
self.loop = loop
# The RunEngine keeps track of a *lot* of state.
# All flags and caches are defined here with a comment. Good luck.
self._task = None # asyncio.Task associated with call to self._run
def __call__(self, plan):
self._task = self.loop.create_task(self._run(plan))
self.loop.run_until_complete(self._task)
if self._task.done() and not self._task.cancelled():
exc = self._task.exception()
if exc is not None:
raise exc
@asyncio.coroutine
def _run(self, plan):
plan = ensure_generator(plan)
last_result = None
_exception = None
while True:
try:
yield from asyncio.sleep(0.0001, loop=self.loop)
if _exception is not None:
msg = plan.throw(_exception)
_exception = None
else:
msg = plan.send(last_result)
except StopIteration:
break
if self.msg_hook:
self.msg_hook(msg)
try:
func = self._command_registry[msg.command]
last_result = yield from func(msg)
except Exception as e:
_exception = e
@asyncio.coroutine
def _sleep(self, msg):
yield from asyncio.sleep(msg.args[0])
@asyncio.coroutine
def _print(self, msg):
print('-- {!s:10.10s} : {: <25.25s} --'.format(now().time(), msg.obj)),
@asyncio.coroutine
def _sum(self, msg):
return sum(msg.args)
RE_v4 = RunEngine_v4()
Pausing, Resuming, and Rewinding#
Adding the ability to pause/resume/rewind a scan adds a fair amount of
complexity as now the RunEngine
must keep track of a stack of plans
rather than a single plan, cache Msg
as they go by and expose enough
API to control the behavior.
from collections import deque
import asyncio
import datetime
import functools
from bluesky.utils import (AsyncInput, FailedPause, InvalidCommand, Msg,
ensure_generator)
from bluesky.run_engine import RunEngineStateMachine, PropertyMachine
from bluesky._vendor.super_state_machine.errors import TransitionError
class RunEngine_v5:
state = PropertyMachine(RunEngineStateMachine)
_UNCACHEABLE_COMMANDS = ['pause', ]
def __init__(self, *, loop=None):
# map messages to coro
self._command_registry = {
'print': self._print,
'sum': self._sum,
# coros on real RE
'sleep': self._sleep,
'checkpoint': self._checkpoint,
'clear_checkpoint': self._clear_checkpoint,
'rewindable': self._rewindable,
'pause': self._pause,
'input': self._input,
'null': self._null, }
# debugging hook
self.msg_hook = None
# bind RE to a specific loop
if loop is None:
loop = asyncio.get_event_loop()
self.loop = loop
# The RunEngine keeps track of a *lot* of state.
# All flags and caches are defined here with a comment. Good luck.
self._task = None # asyncio.Task associated with call to self._run
self._deferred_pause_requested = False # pause at next 'checkpoint'
self._msg_cache = deque() # history of processed msgs for rewinding
self._rewindable_flag = True # if the RE is allowed to replay msgs
self._plan = None # the scan plan instance from __call__
self._plan_stack = deque() # stack of generators to work off of
self._response_stack = deque([None]) # resps to send into the plans
self._interrupted = False # True if paused, aborted, or failed
def __call__(self, plan):
# First thing's first: if we are in the wrong state, raise.
if not self.state.is_idle:
raise RuntimeError("The RunEngine is in a %s state" % self.state)
self._clear_call_cache()
self._plan = plan
gen = ensure_generator(plan)
self._plan_stack.append(gen)
self._response_stack.append(None)
self._task = self.loop.create_task(self._run())
self.loop.run_forever()
if self._task.done() and not self._task.cancelled():
exc = self._task.exception()
if exc is not None:
raise exc
def _clear_call_cache(self):
self._deferred_pause_requested = False
self._plan_stack = deque()
self._msg_cache = deque()
self._response_stack = deque([None])
self._exception = None
self._task = None
self._plan = None
self._interrupted = False
@property
def rewindable(self):
return self._rewindable_flag
@rewindable.setter
def rewindable(self, v):
cur_state = self._rewindable_flag
self._rewindable_flag = bool(v)
if self.resumable and self._rewindable_flag != cur_state:
self._reset_checkpoint_state()
@property
def resumable(self):
"i.e., can the plan in progress by rewound"
return self._msg_cache is not None
@asyncio.coroutine
def _run(self):
pending_cancel_exception = None
try:
self.state = 'running'
while True:
try:
yield from asyncio.sleep(0.0001, loop=self.loop)
# The case where we have a stashed exception
if self._exception is not None:
# throw the exception at the current plan
try:
msg = self._plan_stack[-1].throw(
self._exception)
except Exception as e:
# The current plan did not handle it,
# maybe the next plan (if any) would like
# to try
self._plan_stack.pop()
if len(self._plan_stack):
self._exception = e
continue
# no plans left and still an unhandled exception
# re-raise to exit the infinite loop
else:
raise
# clear the stashed exception, the top plan
# handled it.
else:
self._exception = None
# The normal case of clean operation
else:
resp = self._response_stack.pop()
try:
msg = self._plan_stack[-1].send(resp)
# We have exhausted the top generator
except StopIteration:
# pop the dead generator go back to the top
self._plan_stack.pop()
if len(self._plan_stack):
continue
# or reraise to get out of the infinite loop
else:
raise
# Any other exception that comes out of the plan
except Exception as e:
# pop the dead plan, stash the exception and
# go to the top of the loop
self._plan_stack.pop()
if len(self._plan_stack):
self._exception = e
continue
# or reraise to get out of the infinite loop
else:
raise
if self.msg_hook:
self.msg_hook(msg)
# if this message can be cached for rewinding, cache it
if (self._msg_cache is not None and
self._rewindable_flag and
msg.command not in self._UNCACHEABLE_COMMANDS):
# We have a checkpoint.
self._msg_cache.append(msg)
# try to look up the coroutine to execute the command
try:
coro = self._command_registry[msg.command]
# replace KeyError with a local sub-class and go
# to top of the loop
except KeyError:
# TODO make this smarter
self._exception = InvalidCommand(msg.command)
continue
# try to finally run the command the user asked for
try:
# this is one of two places that 'async'
# exceptions (coming in via throw) can be
# raised
response = yield from coro(msg)
# special case `CancelledError` and let the outer
# exception block deal with it.
except asyncio.CancelledError:
raise
# any other exception, stash it and go to the top of loop
except Exception as e:
self._exception = e
continue
# normal use, if it runs cleanly, stash the response and
# go to the top of the loop
else:
self._response_stack.append(response)
continue
except KeyboardInterrupt:
# This only happens if some external code captures SIGINT
# -- overriding the RunEngine -- and then raises instead
# of (properly) calling the RunEngine's handler.
# See https://github.com/NSLS-II/bluesky/pull/242
print("An unknown external library has improperly raised "
"KeyboardInterrupt. Intercepting and triggering "
"a hard pause instead.")
self.loop.call_soon(self.request_pause, False)
print(PAUSE_MSG)
except asyncio.CancelledError as e:
# if we are handling this twice, raise and leave the plans
# alone
if self._exception is e:
raise e
# the case where FailedPause, RequestAbort or a coro
# raised error is not already stashed in _exception
if self._exception is None:
self._exception = e
pending_cancel_exception = e
except StopIteration:
pass
finally:
self.loop.stop()
self.state = 'idle'
# if the task was cancelled
if pending_cancel_exception is not None:
raise pending_cancel_exception
@asyncio.coroutine
def _sleep(self, msg):
yield from asyncio.sleep(msg.args[0])
@asyncio.coroutine
def _print(self, msg):
now = datetime.datetime.now
print('-- {!s:10.10s} : {: <25.25s} --'.format(now().time(), msg.obj))
@asyncio.coroutine
def _sum(self, msg):
return sum(msg.args)
@asyncio.coroutine
def _input(self, msg):
"""
Process a 'input' Msg. Expected Msg:
Msg('input', None)
Msg('input', None, prompt='>') # customize prompt
"""
prompt = msg.kwargs.get('prompt', '')
async_input = AsyncInput(self.loop)
async_input = functools.partial(async_input, end='', flush=True)
return (yield from async_input(prompt))
@asyncio.coroutine
def _pause(self, msg):
"""Request the run engine to pause
Expected message object is:
Msg('pause', defer=False, name=None, callback=None)
See RunEngine.request_pause() docstring for explanation of the three
keyword arguments in the `Msg` signature
"""
self.request_pause(*msg.args, **msg.kwargs)
def request_pause(self, defer=False):
"""
Command the Run Engine to pause.
This function is called by 'pause' Messages. It can also be called
by other threads. It cannot be called on the main thread during a run,
but it is called by SIGINT (i.e., Ctrl+C).
If there current run has no checkpoint (via the 'clear_checkpoint'
message), this will cause the run to abort.
Parameters
----------
defer : bool, optional
If False, pause immediately before processing any new messages.
If True, pause at the next checkpoint.
False by default.
"""
if defer:
self._deferred_pause_requested = True
print("Deferred pause acknowledged. Continuing to checkpoint.")
return
# We are pausing. Cancel any deferred pause previously requested.
self._deferred_pause_requested = False
self._interrupted = True
print("Pausing...")
self.state = 'paused'
if not self.resumable:
# cannot resume, so we cannot pause. Abort the scan
print("No checkpoint; cannot pause.")
print("Aborting: running cleanup and marking "
"exit_status as 'abort'...")
self._exception = FailedPause()
self._task.cancel()
for task in self._failed_status_tasks:
task.cancel()
return
# stop accepting new tasks in the event loop (existing tasks will
# still be processed)
self.loop.stop()
def resume(self):
"""Resume a paused plan from the last checkpoint.
Returns
-------
uids : list
list of Header uids (a.k.a RunStart uids) of run(s)
"""
# The state machine does not capture the whole picture.
if not self.state.is_paused:
raise TransitionError("The RunEngine is the {0} state. "
"You can only resume for the paused state."
"".format(self.state))
self._interrupted = False
new_plan = self._rewind()
self._plan_stack.append(new_plan)
self._response_stack.append(None)
self._resume_event_loop()
return []
def _rewind(self):
'''Clean up in preparation for resuming from a pause or suspension.
Returns
-------
new_plan : generator
A new plan made from the messages in the message cache
'''
new_plan = ensure_generator(list(self._msg_cache))
self._msg_cache = deque()
# This is needed to 'cancel' an open bundling (e.g. create) if
# the pause happens after a 'checkpoint', after a 'create', but before
# the paired 'save'.
return new_plan
def _resume_event_loop(self):
# may be called by 'resume' or 'abort'
self.state = 'running'
self._last_sigint_time = None
if self._task.done():
return
self.loop.run_forever()
if self._task.done() and not self._task.cancelled():
exc = self._task.exception()
if exc is not None:
raise exc
@asyncio.coroutine
def _checkpoint(self, msg):
"""Instruct the RunEngine to create a checkpoint so that we can rewind
to this point if necessary
Expected message object is:
Msg('checkpoint')
"""
yield from self._reset_checkpoint_state_coro()
if self._deferred_pause_requested:
# We are at a checkpoint; we are done deferring the pause.
# Give the _check_for_signals coroutine time to look for
# additional SIGINTs that would trigger an abort.
yield from asyncio.sleep(0.5, loop=self.loop)
self.request_pause(defer=False)
def _reset_checkpoint_state(self):
if self._msg_cache is None:
return
self._msg_cache = deque()
_reset_checkpoint_state_coro = asyncio.coroutine(_reset_checkpoint_state)
@asyncio.coroutine
def _clear_checkpoint(self, msg):
"""Clear a set checkpoint
Expected message object is:
Msg('clear_checkpoint')
"""
# clear message cache
self._msg_cache = None
# clear stashed
self._teed_sequence_counters.clear()
@asyncio.coroutine
def _rewindable(self, msg):
'''Set rewindable state of RunEngine
Expected message object is:
Msg('rewindable', None, bool or None)
'''
rw_flag, = msg.args
if rw_flag is not None:
self.rewindable = rw_flag
return self.rewindable
@asyncio.coroutine
def _null(self, msg):
"""
A no-op message, mainly for debugging and testing.
"""
pass
RE_v5 = RunEngine_v5()
RE_v5.msg_hook = print
def pausing_plan():
yield Msg('null')
yield Msg('null')
yield Msg('pause')
yield Msg('null')