Source code for bluesky.simulators

from warnings import warn
from bluesky.utils import maybe_await
from bluesky.preprocessors import print_summary_wrapper
from bluesky.run_engine import call_in_bluesky_event_loop, in_bluesky_event_loop
from .protocols import Checkable


[docs]def plot_raster_path(plan, x_motor, y_motor, ax=None, probe_size=None, lw=2): """Plot the raster path for this plan Parameters ---------- plan : iterable Must yield `Msg` objects and not be a co-routine x_motor, y_motor : str Names of the x and y motors ax : matplotlib.axes.Axes The axes to plot to, if none, make new figure + axes probe_size : float, optional If not None, use as radius of probe (in same units as motor positions) lw : float, optional Width of lines drawn between points """ import matplotlib.pyplot as plt from matplotlib import collections as mcollections from matplotlib import patches as mpatches if ax is None: ax = plt.subplots()[1] ax.set_aspect('equal') cur_x = cur_y = None traj = [] for msg in plan: cmd = msg.command if cmd == 'set': if msg.obj.name == x_motor: cur_x = msg.args[0] if msg.obj.name == y_motor: cur_y = msg.args[0] elif cmd == 'save': traj.append((cur_x, cur_y)) x, y = zip(*traj) path, = ax.plot(x, y, marker='', linestyle='-', lw=lw) ax.set_xlabel(x_motor) ax.set_ylabel(y_motor) if probe_size is None: read_points = ax.scatter(x, y, marker='o', lw=lw) else: circles = [mpatches.Circle((_x, _y), probe_size, facecolor='black', alpha=0.5) for _x, _y in traj] read_points = mcollections.PatchCollection(circles, match_original=True) ax.add_collection(read_points) return {'path': path, 'events': read_points}
[docs]def summarize_plan(plan): """Print summary of plan Prints a minimal version of the plan, showing only moves and where events are created. Parameters ---------- plan : iterable Must yield `Msg` objects """ for msg in print_summary_wrapper(plan): ...
print_summary = summarize_plan # back-compat
[docs]def check_limits(plan): """Run check_limits_async in the RE""" if in_bluesky_event_loop(): raise RuntimeError("Can't call check_limits() from within RE, use await check_limits_async() instead") call_in_bluesky_event_loop(check_limits_async(plan))
async def check_limits_async(plan): """ Check that a plan will not move devices outside of their limits. Parameters ---------- plan : iterable Must yield `Msg` objects """ ignore = [] for msg in plan: obj = msg.obj if msg.command == 'set' and obj not in ignore: if isinstance(obj, Checkable): await maybe_await(obj.check_value(msg.args[0])) else: warn(f"{obj.name} has no check_value() method" f" to check if {msg.args[0]} is within its limits.") ignore.append(obj)