Source code for bluesky.preprocessors

from __future__ import generator_stop

from collections import OrderedDict, deque, ChainMap
from collections.abc import Iterable
import uuid

from bluesky.protocols import Locatable
from .utils import (get_hinted_fields, normalize_subs_input, root_ancestor,
                    separate_devices,
                    Msg, ensure_generator, single_gen,
                    short_uid as _short_uid, make_decorator,
                    RunEngineControlException, merge_axis)
from functools import wraps
from .plan_stubs import (open_run, close_run, mv, pause, trigger_and_read)


[docs]def plan_mutator(plan, msg_proc): """ Alter the contents of a plan on the fly by changing or inserting messages. Parameters ---------- plan : generator a generator that yields messages (`Msg` objects) msg_proc : callable This function takes in a message and specifies messages(s) to replace it with. The function must account for what type of response the message would prompt. For example, an 'open_run' message causes the RunEngine to send a uid string back to the plan, while a 'set' message causes the RunEngine to send a status object back to the plan. The function should return a pair of generators ``(head, tail)`` that yield messages. The last message out of the ``head`` generator is the one whose response will be sent back to the host plan. Therefore, that message should prompt a response compatible with the message that it is replacing. Any responses to all other messages will be swallowed. As shorthand, either ``head`` or ``tail`` can be replaced by ``None``. This means: * ``(None, None)`` No-op. Let the original message pass through. * ``(head, None)`` Mutate and/or insert messages before the original message. * ``(head, tail)`` As above, and additionally insert messages after. * ``(None, tail)`` Let the original message pass through and then insert messages after. The reason for returning a pair of generators instead of just one is to provide a way to specify which message's response should be sent out to the host plan. Again, it's the last message yielded by the first generator (``head``). Yields ------ msg : Msg messages from `plan`, altered by `msg_proc` See Also -------- :func:`bluesky.plans.msg_mutator` """ # internal stacks msgs_seen = dict() plan_stack = deque() result_stack = deque() tail_cache = dict() tail_result_cache = dict() exception = None parent_plan = plan ret_value = None # seed initial conditions plan_stack.append(plan) result_stack.append(None) while True: # get last result if exception is not None: # if we have a stashed exception, pass it along try: msg = plan_stack[-1].throw(exception) except StopIteration as e: # discard the exhausted generator exhausted_gen = plan_stack.pop() # if this is the parent plan, capture it's return value if exhausted_gen is parent_plan: ret_value = e.value # if we just came out of a 'tail' generator, # discard its return value and replace it with the # cached one (from the last message in its paired # 'new_gen') if id(exhausted_gen) in tail_result_cache: ret = tail_result_cache.pop(id(exhausted_gen)) result_stack.append(ret) if id(exhausted_gen) in tail_cache: gen = tail_cache.pop(id(exhausted_gen)) if gen is not None: plan_stack.append(gen) saved_result = result_stack.pop() tail_result_cache[id(gen)] = saved_result # must use None to prime generator result_stack.append(None) if plan_stack: continue else: return ret_value except Exception as e: # if we catch an exception, # the current top plan is dead so pop it plan_stack.pop() if plan_stack: # stash the exception and go to the top exception = e continue else: raise else: exception = None else: ret = result_stack.pop() try: msg = plan_stack[-1].send(ret) except StopIteration as e: # discard the exhausted generator exhausted_gen = plan_stack.pop() # if this is the parent plan, capture it's return value if exhausted_gen is parent_plan: ret_value = e.value # if we just came out of a 'tail' generator, # discard its return value and replace it with the # cached one (from the last message in its paired # 'new_gen') if id(exhausted_gen) in tail_result_cache: ret = tail_result_cache.pop(id(exhausted_gen)) result_stack.append(ret) if id(exhausted_gen) in tail_cache: gen = tail_cache.pop(id(exhausted_gen)) if gen is not None: plan_stack.append(gen) saved_result = result_stack.pop() tail_result_cache[id(gen)] = saved_result # must use None to prime generator result_stack.append(None) if plan_stack: continue else: return ret_value except Exception as ex: # we are here because an exception came out of the send # this may be due to # a) the plan really raising or # b) an exception that came out of the run engine via ophyd # in either case the current plan is dead so pop it failed_gen = plan_stack.pop() if id(failed_gen) in tail_cache: gen = tail_cache.pop(id(failed_gen)) if gen is not None: plan_stack.append(gen) # if there is at least if plan_stack: exception = ex continue else: raise ex # if inserting / mutating, put new generator on the stack # and replace the current msg with the first element from the # new generator if id(msg) not in msgs_seen: # Use the id as a hash, and hold a reference to the msg so that # it cannot be garbage collected until the plan is complete. msgs_seen[id(msg)] = msg new_gen, tail_gen = msg_proc(msg) # mild correctness check if tail_gen is not None and new_gen is None: new_gen = single_gen(msg) if new_gen is not None: # stash the new generator plan_stack.append(new_gen) # put in a result value to prime it result_stack.append(None) # stash the tail generator tail_cache[id(new_gen)] = tail_gen # go to the top of the loop continue try: # yield out the 'current message' and collect the return inner_ret = yield msg except GeneratorExit: # special case GeneratorExit. We must clean up all of our plans # and exit with out yielding anything else. for p in plan_stack: p.close() raise except Exception as ex: if plan_stack: exception = ex continue else: raise else: result_stack.append(inner_ret)
[docs]def msg_mutator(plan, msg_proc): """ A simple preprocessor that mutates or deletes single messages in a plan To *insert* messages, use ``plan_mutator`` instead. Parameters ---------- plan : generator a generator that yields messages (`Msg` objects) msg_proc : callable Expected signature `f(msg) -> new_msg or None` Yields ------ msg : Msg messages from `plan`, altered by `msg_proc` See Also -------- :func:`bluesky.plans.plan_mutator` """ ret = None while True: try: msg = plan.send(ret) msg = msg_proc(msg) # if None, just skip message # feed 'None' back down into the base plan, # this may break some plans if msg is None: ret = None continue ret = yield msg except StopIteration as e: return e.value
[docs]def pchain(*args): '''Like `itertools.chain` but using `yield from` This ensures than `.send` works as expected and the underlying plans get the return values Parameters ---------- args : generators (plans) Yields ------ msg : Msg The messages from each plan in turn ''' rets = deque() for p in args: rets.append((yield from p)) return tuple(rets)
def print_summary_wrapper(plan): """Print summary of plan as it goes by Prints a minimal version of the plan, showing only moves and where events are created. Yields the `Msg` unchanged. Parameters ---------- plan : iterable Must yield `Msg` objects Yields ------ msg : `Msg` """ read_cache = [] for msg in plan: cmd = msg.command if cmd == 'open_run': print('{:=^80}'.format(' Open Run ')) elif cmd == 'close_run': print('{:=^80}'.format(' Close Run ')) elif cmd == 'set': print('{motor.name} -> {args[0]}'.format(motor=msg.obj, args=msg.args)) elif cmd == 'create': read_cache = [] elif cmd == 'read': read_cache.append(msg.obj.name) elif cmd == 'save': print(' Read {}'.format(read_cache)) yield msg
[docs]def run_wrapper(plan, *, md=None): """Enclose in 'open_run' and 'close_run' messages. Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects md : dict, optional metadata to be passed into the 'open_run' message """ rs_uid = yield from open_run(md) def except_plan(e): if isinstance(e, RunEngineControlException): yield from close_run(exit_status=e.exit_status) else: yield from close_run(exit_status='fail', reason=str(e)) yield from contingency_wrapper(plan, except_plan=except_plan, else_plan=close_run) return rs_uid
[docs]def subs_wrapper(plan, subs): """ Subscribe callbacks to the document stream; finally, unsubscribe. Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects subs : callable, list of callables, or dict of lists of callables Documents of each type are routed to a list of functions. Input is normalized to a dict of lists of functions, like so: None -> {'all': [], 'start': [], 'stop': [], 'event': [], 'descriptor': []} func -> {'all': [func], 'start': [], 'stop': [], 'event': [], 'descriptor': []} [f1, f2] -> {'all': [f1, f2], 'start': [], 'stop': [], 'event': [], 'descriptor': []} {'event': [func]} -> {'all': [], 'start': [], 'stop': [], 'event': [func], 'descriptor': []} Signature of functions must conform to `f(name, doc)` where name is one of {'all', 'start', 'stop', 'event', 'descriptor'} and doc is a dictionary. Yields ------ msg : Msg messages from plan, with 'subscribe' and 'unsubscribe' messages inserted and appended """ subs = normalize_subs_input(subs) tokens = set() def _subscribe(): for name, funcs in subs.items(): for func in funcs: token = yield Msg('subscribe', None, func, name) tokens.add(token) def _unsubscribe(): for token in tokens: yield Msg('unsubscribe', None, token=token) def _inner_plan(): yield from _subscribe() return (yield from plan) return (yield from finalize_wrapper(_inner_plan(), _unsubscribe()))
[docs]def suspend_wrapper(plan, suspenders): """ Install suspenders to the RunEngine, and remove them at the end. Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects suspenders : suspender or list of suspenders Suspenders to use for the duration of the wrapper Yields ------ msg : Msg messages from plan, with 'install_suspender' and 'remove_suspender' messages inserted and appended """ if not isinstance(suspenders, Iterable): suspenders = [suspenders] def _install(): for susp in suspenders: yield Msg('install_suspender', None, susp) def _remove(): for susp in suspenders: yield Msg('remove_suspender', None, susp) def _inner_plan(): yield from _install() return (yield from plan) return (yield from finalize_wrapper(_inner_plan(), _remove()))
def configure_count_time_wrapper(plan, time): """ Preprocessor that sets all devices with a `count_time` to the same time. The original setting is stashed and restored at the end. Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects time : float or None If None, the plan passes through unchanged. Yields ------ msg : Msg messages from plan, with 'set' messages inserted """ devices_seen = set() original_times = {} def insert_set(msg): obj = msg.obj if obj is not None and obj not in devices_seen: devices_seen.add(obj) if hasattr(obj, 'count_time'): # TODO Do this with a 'read' Msg once reads can be # marked as belonging to a different event stream (or no # event stream. original_times[obj] = obj.count_time.get() # TODO do this with configure return pchain(mv(obj.count_time, time), single_gen(msg)), None return None, None def reset(): for obj, time in original_times.items(): yield from mv(obj.count_time, time) if time is None: # no-op return (yield from plan) else: return (yield from finalize_wrapper(plan_mutator(plan, insert_set), reset()))
[docs]def finalize_wrapper(plan, final_plan, *, pause_for_debug=False): '''try...finally helper Run the first plan and then the second. If any of the messages raise an error in the RunEngine (or otherwise), the second plan will attempted to be run anyway. See :func:`contingency_wrapper` for a more complex and more feature-complete error-handling preprocessor. Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects final_plan : callable, iterable or iterator a generator, list, or similar containing `Msg` objects or a callable that reurns one; attempted to be run no matter what happens in the first plan pause_for_debug : bool, optional If the plan should pause before running the clean final_plan in the case of an Exception. This is intended as a debugging tool only. Yields ------ msg : Msg messages from `plan` until it terminates or an error is raised, then messages from `final_plan` See Also -------- :func:`contingency_wrapper` ''' # If final_plan is a generator *function* (as opposed to a generator # *instance*), call it. if callable(final_plan): final_plan_instance = final_plan() else: final_plan_instance = final_plan cleanup = True try: ret = yield from plan except GeneratorExit: cleanup = False raise except BaseException: if pause_for_debug: yield from pause() raise finally: # if the exception raised in `GeneratorExit` that means # someone called `gen.close()` on this generator. In those # cases generators must either re-raise the GeneratorExit or # raise a different exception. Trying to yield any values # results in a RuntimeError being raised where `close` is # called. Thus, we catch, the GeneratorExit, disable cleanup # and then re-raise # https://docs.python.org/3/reference/expressions.html?#generator.close if cleanup: yield from ensure_generator(final_plan_instance) return ret
[docs]def contingency_wrapper(plan, *, except_plan=None, else_plan=None, final_plan=None, pause_for_debug=False): '''try...except...else...finally helper See :func:`finalize_wrapper` for a simplified but less powerful error-handling preprocessor. Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects except_plan : generator function, optional This will be called with the exception as the only input. The plan does not need to re-raise, but may if you want to change the exception. Only subclasses of `Exception` will be passed in, will not see `GeneratorExit`, `SystemExit`, or `KeyboardInterrupt` else_plan : generator function, optional This will be called with no arguments if plan completes without raising final_plan : generator function, optional a generator, list, or similar containing `Msg` objects or a callable that reurns one; attempted to be run no matter what happens in the first plan pause_for_debug : bool, optional If the plan should pause before running the clean final_plan in the case of an Exception. This is intended as a debugging tool only. Yields ------ msg : Msg messages from `plan` until it terminates or an error is raised, then messages from `final_plan` See Also -------- :func:`finalize_wrapper` ''' cleanup = True try: ret = yield from plan except GeneratorExit: cleanup = False raise except Exception as e: if pause_for_debug: yield from pause() if except_plan: # it might be better to throw this in, but this is simpler # to implement for now yield from except_plan(e) raise else: if else_plan: yield from else_plan() finally: # if the exception raised in `GeneratorExit` that means # someone called `gen.close()` on this generator. In those # cases generators must either re-raise the GeneratorExit or # raise a different exception. Trying to yield any values # results in a RuntimeError being raised where `close` is # called. Thus, we catch, the GeneratorExit, disable cleanup # and then re-raise # https://docs.python.org/3/reference/expressions.html?#generator.close if cleanup and final_plan: yield from final_plan() return ret
[docs]def finalize_decorator(final_plan): '''try...finally helper Run the first plan and then the second. If any of the messages raise an error in the RunEngine (or otherwise), the second plan will attempted to be run anyway. Notice that, this decorator requires a generator *function* so that it can be used multiple times, whereas :func:`bluesky.plans.finalize_wrapper` accepts either a generator function or a generator instance. Parameters ---------- final_plan : callable a callable that returns a generator, list, or similar containing `Msg` objects; attempted to be run no matter what happens in the first plan Yields ------ msg : Msg messages from `plan` until it terminates or an error is raised, then messages from `final_plan` ''' def dec(gen_func): @wraps(gen_func) def dec_inner(*inner_args, **inner_kwargs): if not callable(final_plan): raise TypeError("final_plan must be a callable (e.g., a " "generator function) not an iterable (e.g., a " "generator instance).") final_plan_instance = final_plan() plan = gen_func(*inner_args, **inner_kwargs) cleanup = True try: ret = yield from plan except GeneratorExit: cleanup = False raise finally: # if the exception raised in `GeneratorExit` that means # someone called `gen.close()` on this generator. In those # cases generators must either re-raise the GeneratorExit or # raise a different exception. Trying to yield any values # results in a RuntimeError being raised where `close` is # called. Thus, we catch, the GeneratorExit, disable cleanup # and then re-raise # https://docs.python.org/3/reference/expressions.html?#generator.close if cleanup: yield from ensure_generator(final_plan_instance) return ret return dec_inner return dec
def rewindable_wrapper(plan, rewindable): '''Toggle the 'rewindable' state of the RE Allow or disallow rewinding during the processing of the wrapped messages. Then restore the initial state (rewindable or not rewindable). Parameters ---------- plan : generator The plan to wrap in a 'rewindable' or 'not rewindable' context rewindable : bool ''' initial_rewindable = True def capture_rewindable_state(): nonlocal initial_rewindable initial_rewindable = yield Msg('rewindable', None, None) def set_rewindable(rewindable): if initial_rewindable != rewindable: return (yield Msg('rewindable', None, rewindable)) def restore_rewindable(): if initial_rewindable != rewindable: return (yield Msg('rewindable', None, initial_rewindable)) if not rewindable: yield from capture_rewindable_state() yield from set_rewindable(rewindable) return (yield from finalize_wrapper(plan, restore_rewindable())) else: return (yield from plan)
[docs]def inject_md_wrapper(plan, md): """ Inject additional metadata into a run. This takes precedences over the original metadata dict in the event of overlapping keys, but it does not mutate the original metadata dict. (It uses ChainMap.) Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects md : dict metadata """ def _inject_md(msg): if msg.command == 'open_run': msg = msg._replace(kwargs=ChainMap(md, msg.kwargs)) return msg return (yield from msg_mutator(plan, _inject_md))
def stub_wrapper(plan): """ Remove Msg object in order to use plan as a stub This will remove any `open_run`, `close_run`, `stage` and `unstage` `Msg` objects present in the plan in order for it to be run as part of a larger scan. Note, that any metadata from the provided plan will not be sent to the RunEngine automatically. Parameters ---------- plan : iterable or iterator A generator list or similar containing `Msg` objects Returns ------- md : dict Metadata discovered from `open_run` Msg """ md = {} def _block_run_control(msg): """ Block open and close run messages """ # Capture the metadata from open_run if msg.command == 'open_run': md.update(msg.kwargs) return None elif msg.command in ('close_run', 'stage', 'unstage'): return None return msg yield from msg_mutator(plan, _block_run_control) return md
[docs]def monitor_during_wrapper(plan, signals): """ Monitor (asynchronously read) devices during runs. This is a preprocessor that insert messages immediately after a run is opened and before it is closed. Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects signals : collection objects that support the Signal interface Yields ------ msg : Msg messages from plan with 'monitor', and 'unmontior' messages inserted See Also -------- :func:`bluesky.plans.fly_during_wrapper` """ monitor_msgs = [Msg('monitor', sig, name=sig.name + '_monitor') for sig in signals] unmonitor_msgs = [Msg('unmonitor', sig) for sig in signals] def insert_after_open(msg): if msg.command == 'open_run': def new_gen(): yield from ensure_generator(monitor_msgs) return single_gen(msg), new_gen() else: return None, None def insert_before_close(msg): if msg.command == 'close_run': def new_gen(): yield from ensure_generator(unmonitor_msgs) yield msg return new_gen(), None else: return None, None # Apply nested mutations. plan1 = plan_mutator(plan, insert_after_open) plan2 = plan_mutator(plan1, insert_before_close) return (yield from plan2)
[docs]def fly_during_wrapper(plan, flyers): """ Kickoff and collect "flyer" (asynchronously collect) objects during runs. This is a preprocessor that insert messages immediately after a run is opened and before it is closed. Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects flyers : collection objects that support the flyer interface Yields ------ msg : Msg messages from plan with 'kickoff', 'wait' and 'collect' messages inserted See Also -------- :func:`bluesky.plans.fly` """ grp1 = _short_uid('flyers-kickoff') grp2 = _short_uid('flyers-complete') kickoff_msgs = [Msg('kickoff', flyer, group=grp1) for flyer in flyers] complete_msgs = [Msg('complete', flyer, group=grp2) for flyer in flyers] collect_msgs = [Msg('collect', flyer) for flyer in flyers] if flyers: # If there are any flyers, insert a 'wait' Msg after kickoff, complete kickoff_msgs += [Msg('wait', None, group=grp1)] complete_msgs += [Msg('wait', None, group=grp2)] def insert_after_open(msg): if msg.command == 'open_run': def new_gen(): yield from ensure_generator(kickoff_msgs) return single_gen(msg), new_gen() else: return None, None def insert_before_close(msg): if msg.command == 'close_run': def new_gen(): yield from ensure_generator(complete_msgs) yield from ensure_generator(collect_msgs) yield msg return new_gen(), None else: return None, None # Apply nested mutations. plan1 = plan_mutator(plan, insert_after_open) plan2 = plan_mutator(plan1, insert_before_close) return (yield from plan2)
[docs]def lazily_stage_wrapper(plan): """ This is a preprocessor that inserts 'stage' messages and appends 'unstage'. The first time an object is seen in `plan`, it is staged. To avoid redundant staging we actually stage the object's ultimate parent. At the end, in a `finally` block, an 'unstage' Message issued for every 'stage' Message. Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects Yields ------ msg : Msg messages from plan with 'stage' messages inserted and 'unstage' messages appended """ COMMANDS = set(['read', 'set', 'trigger', 'kickoff']) # Cache devices in the order they are staged; then unstage in reverse. devices_staged = [] def inner(msg): if msg.command in COMMANDS and msg.obj not in devices_staged: root = root_ancestor(msg.obj) def new_gen(): # Here we insert a 'stage' message ret = yield Msg('stage', root) # and cache the result if ret is None: # The generator may be being list-ified. # This is a hack to make that possible. ret = [root] devices_staged.extend(ret) # and then proceed with our regularly scheduled programming yield msg return new_gen(), None else: return None, None def unstage_all(): for device in reversed(devices_staged): yield Msg('unstage', device) return (yield from finalize_wrapper(plan_mutator(plan, inner), unstage_all()))
[docs]def stage_wrapper(plan, devices): """ 'Stage' devices (i.e., prepare them for use, 'arm' them) and then unstage. Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects devices : collection list of devices to stage immediately on entrance and unstage on exit Yields ------ msg : Msg messages from plan with 'stage' and finally 'unstage' messages inserted See Also -------- :func:`bluesky.plans.lazily_stage_wrapper` :func:`bluesky.plans.stage` :func:`bluesky.plans.unstage` """ devices = separate_devices(root_ancestor(device) for device in devices) def stage_devices(): for d in devices: yield Msg('stage', d) def unstage_devices(): for d in reversed(devices): yield Msg('unstage', d) def inner(): yield from stage_devices() return (yield from plan) return (yield from finalize_wrapper(inner(), unstage_devices()))
def _normalize_devices(devices): coupled_parents = set() # if we have any pseudo devices then setting any part of it # needs to trigger the relative behavior. io, co, go = merge_axis(devices) devices = set(devices) | set(io) | set(co) | set(go) # if a device with coupled children is directly in the # list, include all the coupled children as well for obj in co: devices |= set(obj.pseudo_positioners) coupled_parents.add(obj) # if at least one child of a device with coupled children # only include the coupled children if at least of the children # directly included is one of the coupled ones. for obj, type_map in go.items(): if len(type_map['pseudo']) > 0: devices |= set(obj.pseudo_positioners) coupled_parents.add(obj) return devices, coupled_parents def __get_result_of_message(msg_type: str, obj): result = yield Msg(msg_type, obj) if result is None: # this plan may be being list-ified print(f"*** all positions for {obj.name} are relative to current position ***") return result def __read_and_stash_a_motor(obj, initial_positions, coupled_parents): """Internal plan for relative set and reset wrappers .. warning :: Do not use this plan directly for any reason. """ # First check if it is a locatable, and use its location if it is if isinstance(obj, Locatable): location = yield from __get_result_of_message("locate", obj) if location is None: setpoint = 0 else: setpoint = location["setpoint"] # Otherwise it might have a `position` attribution elif hasattr(obj, "position"): setpoint = obj.position # Otherwise fallback to read obj and grab the value of the first key else: reading = yield from __get_result_of_message("read", obj) if reading is None: setpoint = 0 else: fields = get_hinted_fields(obj) if len(fields) == 1: k, = fields setpoint = reading[k]['value'] elif len(fields) == 0: k = list(reading.keys())[0] setpoint = reading[k]['value'] else: raise Exception("do not yet know how to deal with " "non pseudopositioner multi-axis. Please " "contact DAMA to justify why you need " "this.") initial_positions[obj] = setpoint # if we move a pseudo positioner also stash it's children if obj in coupled_parents: for c, p in zip(obj.pseudo_positioners, setpoint): initial_positions[c] = p # if we move a pseudo single, also stash it's parent and siblings parent = obj.parent if parent in coupled_parents and obj in parent.pseudo_positioners: parent_pos = parent.position initial_positions[parent] = parent_pos for c, p in zip(parent.pseudo_positioners, parent_pos): initial_positions[c] = p # TODO forbid mixed pseudo / real motion
[docs]def relative_set_wrapper(plan, devices=None): """ Interpret 'set' messages on devices as relative to initial position. Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects devices : collection or None, optional if default (None), apply to all devices that are moved by the plan Yields ------ msg : Msg messages from plan, with 'read' messages inserted and 'set' messages mutated """ initial_positions = {} if devices is not None: devices, coupled_parents = _normalize_devices(devices) else: coupled_parents = set() def rewrite_pos(msg): if (msg.command == 'set') and (msg.obj in initial_positions): rel_pos, = msg.args abs_pos = initial_positions[msg.obj] + rel_pos new_msg = msg._replace(args=(abs_pos,)) return new_msg else: return msg def insert_reads(msg): eligible = (devices is None) or (msg.obj in devices) seen = msg.obj in initial_positions if (msg.command == 'set') and eligible and not seen: return (pchain( __read_and_stash_a_motor( msg.obj, initial_positions, coupled_parents), single_gen(msg)), None) else: return None, None plan = plan_mutator(plan, insert_reads) plan = msg_mutator(plan, rewrite_pos) return (yield from plan)
[docs]def reset_positions_wrapper(plan, devices=None): """ Return movable devices to their initial positions at the end. Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects devices : collection or None, optional If default (None), apply to all devices that are moved by the plan. Yields ------ msg : Msg messages from plan with 'read' and finally 'set' messages inserted """ initial_positions = OrderedDict() if devices is not None: devices, coupled_parents = _normalize_devices(devices) else: coupled_parents = set() def insert_reads(msg): eligible = devices is None or msg.obj in devices seen = msg.obj in initial_positions if (msg.command == 'set') and eligible and not seen: return (pchain( __read_and_stash_a_motor( msg.obj, initial_positions, coupled_parents), single_gen(msg)), None) else: return None, None def reset(): blk_grp = 'reset-{}'.format(str(uuid.uuid4())[:6]) for k, v in initial_positions.items(): if k.parent in coupled_parents: continue yield Msg('set', k, v, group=blk_grp) yield Msg('wait', None, group=blk_grp) return (yield from finalize_wrapper(plan_mutator(plan, insert_reads), reset()))
[docs]def baseline_wrapper(plan, devices, name='baseline'): """ Preprocessor that records a baseline of all `devices` after `open_run` The readings are designated for a separate event stream named 'baseline' by default. Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects devices : collection collection of Devices to read If None, the plan passes through unchanged. name : string, optional name for event stream; by default, 'baseline' Yields ------ msg : Msg messages from plan, with 'set' messages inserted """ def insert_baseline(msg): if msg.command == 'open_run': return None, trigger_and_read(devices, name=name) elif msg.command == 'close_run': def post_baseline(): yield from trigger_and_read(devices, name=name) return (yield msg) return post_baseline(), None return None, None if not devices: # no-op return (yield from plan) else: return (yield from plan_mutator(plan, insert_baseline))
# Make generator function decorator for each generator instance wrapper. baseline_decorator = make_decorator(baseline_wrapper) subs_decorator = make_decorator(subs_wrapper) suspend_decorator = make_decorator(suspend_wrapper) relative_set_decorator = make_decorator(relative_set_wrapper) reset_positions_decorator = make_decorator(reset_positions_wrapper) # finalize_decorator is custom-made since it takes a plan as its # argument. See its docstring for details why. lazily_stage_decorator = make_decorator(lazily_stage_wrapper) stage_decorator = make_decorator(stage_wrapper) fly_during_decorator = make_decorator(fly_during_wrapper) monitor_during_decorator = make_decorator(monitor_during_wrapper) inject_md_decorator = make_decorator(inject_md_wrapper) run_decorator = make_decorator(run_wrapper) contingency_decorator = make_decorator(contingency_wrapper) stub_decorator = make_decorator(stub_wrapper) configure_count_time_decorator = make_decorator(configure_count_time_wrapper)
[docs]class SupplementalData: """ A configurable preprocessor for supplemental measurements This is a plan preprocessor. It inserts messages into plans to: * take "baseline" readings at the beginning and end of each run for the devices listed in its ``baseline`` atrribute * monitor signals in its ``monitors`` attribute for asynchronous updates during each run. * kick off "flyable" devices listed in its ``flyers`` attribute at the beginning of each run and collect their data at the end Internally, it uses the plan preprocessors: * :func:`baseline_wrapper` * :func:`monitor_during_wrapper` * :func:`fly_during_wrapper` Parameters ---------- baseline : list Devices to be read at the beginning and end of each run monitors : list Signals (not multi-signal Devices) to be monitored during each run, generating readings asynchronously flyers : list "Flyable" Devices to be kicked off before each run and collected at the end of each run Examples -------- Create an instance of SupplementalData and apply it to a RunEngine. >>> sd = SupplementalData(baseline=[some_motor, some_detector]), ... monitors=[some_signal], ... flyers=[some_flyer]) >>> RE = RunEngine({}) >>> RE.preprocessors.append(sd) Now all plans executed by RE will be modified to add baseline readings (before and after each run), monitors (during each run), and flyers (kicked off before each run and collected afterward). Inspect or update the lists of devices interactively. >>> sd.baseline [some_motor, some_detector] >>> sd.baseline.remove(some_motor) >>> sd.baseline [some_detector] >>> sd.baseline.append(another_detector) >>> sd.baseline [some_detector, another_detector] Each attribute (``baseline``, ``monitors``, ``flyers``) is an ordinary Python list, support all the standard list methods, such as: >>> sd.baseline.clear() The arguments to SupplementalData are optional. All the lists will empty by default. As shown above, they can be populated interactively. >>> sd = SupplementalData() >>> RE = RunEngine({}) >>> RE.preprocessors.append(sd) >>> sd.baseline.append(some_detector) """ def __init__(self, *, baseline=None, monitors=None, flyers=None): if baseline is None: baseline = [] if monitors is None: monitors = [] if flyers is None: flyers = [] self.baseline = list(baseline) self.monitors = list(monitors) self.flyers = list(flyers) def __repr__(self): return ("{cls}(baseline={baseline}, monitors={monitors}, " "flyers={flyers})" "").format(cls=type(self).__name__, **vars(self)) # I'm not sure why anyone would want to pickle this but it's good manners # to avoid breaking pickling. def __setstate__(self, state): baseline, monitors, flyers = state self.baseline = baseline self.monitors = monitors self.flyers = flyers def __getstate__(self): return (self.baseline, self.monitors, self.flyers) def __call__(self, plan): """ Insert messages into a plan. Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects """ # Read this as going from the inside out: first we wrap the plan in the # flying instructions, then monitoring, then baseline, so that the # order of operations is: # - Take baseline readings # - Start monitoring. # - Kick off flyers and wait for them to be kicked off. # - Do `plan`. # - Complete and collect flyers. # - Stop monitoring. # - Take baseline readings. plan = fly_during_wrapper(plan, self.flyers) plan = monitor_during_wrapper(plan, self.monitors) plan = baseline_wrapper(plan, self.baseline) return (yield from plan)
def set_run_key_wrapper(plan, run): """ Add a run key to each message in wrapped plan Parameters ---------- plan : iterable or iterator a generator, list, or similar containing `Msg` objects run : str or any other type except None The run key to set on each Msg. It is recommended that run key represents informative string for better readability of plans. But value of any other type can be used if needed. """ if run is None: raise ValueError("run ID can not be None") def _set_run_key(msg): # Replace only the default value None if msg.run is None: msg = msg._replace(run=run) return msg return (yield from msg_mutator(plan, _set_run_key)) set_run_key_decorator = make_decorator(set_run_key_wrapper)