Source code for bluesky_widgets.models.plot_builders

from collections import defaultdict
import collections.abc
import functools
import itertools
import weakref

from .plot_specs import (
    FigureSpec,
    AxesSpec,
    ImageSpec,
    LineSpec,
    FigureSpecList,
)
from ._heuristics import infer_lines_to_plot
from .utils import RunList, run_is_live_and_not_completed
from ..utils.list import EventedList
from ..utils.dict_view import DictView


class BuilderList(EventedList):
    "A list of functions that accept a BlueskyRun and return FigureSpec(s)."
    ...


[docs]class PromptPlotter: """ Produce Figures from BlueskyRuns promptly (as Run completion time). Parameters ---------- builders : BuilderList[callable] A list of functions that accept a BlueskyRun and return FigureSpec(s). Attributes ---------- runs : RunList[BlueskyRun] Add or remove runs from this list. figures : FigureSpecList[FigureSpec] Figures will be added to this list. builders : BuilderList[callable] A list of functions with the expected signature:: f(run: BlueskyRun) -> FigureSpec or:: f(run: BlueskyRun) -> List{FigureSpec] """ def __init__(self, builders): self.figures = FigureSpecList() self.builders = BuilderList() self.runs = RunList() self.builders.extend(builders) self.runs.events.added.connect(self._on_run_added)
[docs] def add_run(self, run): """ Add a Run. Parameters ---------- run : BlueskyRun """ self.runs.append(run)
[docs] def discard_run(self, run): """ Discard a Run. If the Run is not present, this will return silently. Parameters ---------- run : BlueskyRun """ if run in self.runs: self.runs.remove(run)
def _on_run_added(self, event): run = event.item # If Run is complete, process is now. Otherwise, schedule it to # process when it completes. if not run_is_live_and_not_completed(run): self._process_run(run) else: run.events.completed.connect(lambda event: self._process_run(event.run)) def _on_builder_added(self, event): builder = event.item self.builders.append(builder) # Process all runs we already have with the new builder. for run in self.runs: if not run_is_live_and_not_completed(run): self._process_run(run) else: run.events.completed.connect(lambda event: self._process_run(event.run)) def _process_run(self, run): for builder in self.builders: figures = builder(run) # Tolerate a FigureSpec or a list of them. if not isinstance(figures, collections.abc.Iterable): figures = [figures] self.figures.extend(figures)
def prompt_line_builder(run): """ This is a simple example. This makes a hard-coded assumption that the data has columns "motor" and "det" in the primary stream. """ def func(run): "Return any arrays x, y. They must be of equal length." # *Lazily* read the data so that large arrays are not loaded unless # the yare used. ds = run.primary.read() # Do any computation you want in here.... return ds["motor"], ds["det"] label = f"Scan {run.metadata['start']['scan_id']}" line = LineSpec(func, run, label) axes = AxesSpec(lines=[line], x_label="motor", y_label="det") figure = FigureSpec((axes,), title="det v motor") return [figure]
[docs]class RecentLines: """ Plot y vs x for the last N runs. Parameters ---------- max_runs : int Number of lines to show at once x : string Field name y : string Field name stream_name : string, optional Stream where fields x and y are found. Default is "primary". func : callable, optional Expected signature:: func(run: BlueskyRun, stream_name: str, x: str, y: str) -> x: Array, y: Array Default:: def func(run, stream_name, x, y): ds = run[stream_name].to_dask() return ds[x], ds[y] axes : AxesSpec, optional If None, an axes and figure are created with default labels and titles derived from the ``x`` and ``y`` parameters. Attributes ---------- max_runs : int Number of Runs to plot at once. This may be changed at any point. (Note: Increasing it will not restore any Runs that have already been removed, but it will allow more new Runs to be added.) runs : RunList[BlueskyRun] As runs are appended entries will be removed from the beginning of the last (first in, first out) so that there are at most ``max_runs``. pinned : Frozenset[String] Run uids of pinned runs. figure : FigureSpec func : callable axes : AxesSpec x : string Read-only access to x field name y : string Read-only access to y field name stream_name : string Read-only access to stream name Examples -------- >>> model = RecentLines(3, "motor", "det") >>> from bluesky_widgets.jupyter.figures import JupyterFigure >>> view = JupyterFigure(model.figure) >>> model.add_run(run) >>> model.add_run(another_run, pinned=True) """ def __init__(self, max_runs, x, y, stream_name="primary", func=None, axes=None): super().__init__() if func is None: def func(run, stream_name, x, y): ds = run[stream_name].to_dask() return ds[x], ds[y] # Stash these and expose them as read-only properties. self._max_runs = int(max_runs) self._x = x self._y = y self._stream_name = stream_name self._func = func self.runs = RunList() self._pinned = set() self._color_cycle = itertools.cycle(f"C{i}" for i in range(10)) # Maps Run (uid) to LineSpec self._runs_to_lines = weakref.WeakValueDictionary() self.runs.events.added.connect(self._on_run_added) self.runs.events.removed.connect(self._on_run_removed) if axes is None: axes = AxesSpec(x_label=self.x, y_label=self.y) figure = FigureSpec((axes,), title=f"{self.y} v {self.x}") else: figure = axes.figure self.axes = axes self.figure = figure
[docs] def add_run(self, run, pinned=False): """ Add a Run. Parameters ---------- run : BlueskyRun pinned : Boolean If True, retain this Run until it is removed by the user. """ if pinned: self._pinned.add(run.metadata["start"]["uid"]) self.runs.append(run)
[docs] def discard_run(self, run): """ Discard a Run, including any pinned and unpinned. If the Run is not present, this will return silently. Parameters ---------- run : BlueskyRun """ if run in self.runs: self.runs.remove(run)
def _add_line(self, run): "Add a line." # Create a plot if we do not have one. # If necessary, removes runs to make room for the new one. self._cull_runs() label = f"Scan {run.metadata['start']['scan_id']}" # If run is in progress, give it a special color so it stands out. if run_is_live_and_not_completed(run): color = "black" # Later, when it completes, flip the color to one from the cycle. run.events.completed.connect(self._on_run_complete) else: color = next(self._color_cycle) style = {"color": color} # Style pinned runs differently. if run.metadata["start"]["uid"] in self._pinned: style.update(linestyle="dashed") label += " (pinned)" func = functools.partial( self.func, stream_name=self.stream_name, x=self.x, y=self.y ) line = LineSpec(func, run, label, style) run_uid = run.metadata["start"]["uid"] self._runs_to_lines[run_uid] = line self.axes.lines.append(line) def _cull_runs(self): "Remove Runs from the beginning of self.runs to keep the length <= max_runs." i = 0 while len(self.runs) > self.max_runs + len(self._pinned): while self.runs[i].metadata["start"]["uid"] in self._pinned: i += 1 self.runs.pop(i) def _on_run_added(self, event): "When a new Run is added, draw a line or schedule it to be drawn." run = event.item # If the stream of interest is defined already, plot now. if self.stream_name in run: self._add_line(run) else: # Otherwise, connect a callback to run when the stream of interest arrives. run.events.new_stream.connect(self._on_new_stream) def _on_run_removed(self, event): "Remove the line if its corresponding Run is removed." run_uid = event.item.metadata["start"]["uid"] self._pinned.discard(run_uid) try: line = self._runs_to_lines.pop(run_uid) except KeyError: # The line has been removed before the Run was. return try: self.axes.lines.remove(line) except ValueError: # The line has been removed before the Run was. pass def _on_new_stream(self, event): "This callback runs whenever BlueskyRun has a new stream." if event.name == self.stream_name: self._add_line(event.run) event.run.events.new_stream.disconnect(self._on_new_stream) def _on_run_complete(self, event): "When a run completes, update the color from back to a color." run_uid = event.run.metadata["start"]["uid"] try: line = self._runs_to_lines[run_uid] except KeyError: # The line has been removed before the Run completed. return line.style.update({"color": next(self._color_cycle)}) @property def max_runs(self): return self._max_runs @max_runs.setter def max_runs(self, value): self._max_runs = value self._cull_runs() # Read-only properties so that these settings are inspectable, but not # changeable. @property def x(self): return self._x @property def y(self): return self._y @property def stream_name(self): return self._stream_name @property def func(self): return self._func @property def pinned(self): return frozenset(self._pinned)
[docs]class AutoRecentLines: """ Automatically guess useful lines to plot. Show the last N runs (per figure). Parameters ---------- max_runs : int Number of Runs to plot at once, per figure Attributes ---------- figures : FigureSpecList[FigureSpec] max_runs : int Number of Runs to plot at once. This may be changed at any point. (Note: Increasing it will not restore any Runs that have already been removed, but it will allow more new Runs to be added.) keys_to_figures : dict Read-only mapping of each key to the active RecentLines instance. Examples -------- >>> model = AutoRecentLines(3) >>> from bluesky_widgets.jupyter.figures import JupyterFigures >>> view = JupyterFigures(model.figures) >>> model.add_run(run) >>> model.add_run(another_run, pinned=True) """ def __init__(self, max_runs): self.figures = FigureSpecList() self._max_runs = max_runs # Map key like ((x, y), stream_name) to RecentLines instance so configured. self._key_to_instance = {} # Map FigureSpec UUID to key like ((x, y), stream_name) self._figure_to_key = {} # Track inactive instances/figures which are no longer being updated # with new Runs. Structure is a dict-of-dicts like: # {key: {figure_uuid: instance, ...}, ...} self._inactive_instances = defaultdict(dict) self.figures.events.removed.connect(self._on_figure_removed) @property def keys_to_figures(self): "Read-only mapping of each key to the active RecentLines instance." return DictView({v: k for k, v in self._figure_to_key.items()})
[docs] def new_instance_for_key(self, key): """ Make a new RecentLine instance for a key. If there is an existing one the instance and figure will remain but will no longer be updated with new Runs. Those will go to a new instance and figure, created here. """ (x, y), stream_name = key old_instance = self._key_to_instance.pop(key, None) if old_instance is not None: self._inactive_instances[key][old_instance.figure.uuid] = old_instance instance = RecentLines( max_runs=self.max_runs, x=x, y=y, stream_name=stream_name ) self._key_to_instance[key] = instance self._figure_to_key[instance.figure.uuid] = key self.figures.append(instance.figure) return instance
[docs] def add_run(self, run, pinned=False): """ Add a Run. Parameters ---------- run : BlueskyRun pinned : Boolean If True, retain this Run until it is removed by the user. """ for stream_name in run: self._handle_stream(run, stream_name, pinned) if run_is_live_and_not_completed(run): # Listen for additional streams. run.events.new_stream.connect( lambda event: self._handle_stream(run, event.name, pinned) )
[docs] def discard_run(self, run): """ Discard a Run, including any pinned and unpinned. If the Run is not present, this will return silently. Also, note that this only affect "active" plots that are currently receive new runs. Inactive ones will be left as they are. Parameters ---------- run : BlueskyRun """ for instance in self._key_to_instance.values(): instance.discard_run(run)
def _handle_stream(self, run, stream_name, pinned): "This examines a stream and adds this run to RecentLines instances." for key in infer_lines_to_plot(run, run[stream_name]): try: instance = self._key_to_instance[key] except KeyError: instance = self.new_instance_for_key(key) instance.add_run(run, pinned=pinned) def _on_figure_removed(self, event): """ A figure was removed from self.figures. Remove the relevant RecentLines instance. """ figure = event.item try: key = self._figure_to_key.pop(figure.uuid) except KeyError: # This figure belongs to an inactive instance. del self._inactive_instances[key][figure.uuid] else: self._key_to_instance.pop(key) @property def max_runs(self): return self._max_runs @max_runs.setter def max_runs(self, value): self._max_runs = value for instance in self._key_to_instance.values(): instance.max_runs = value
[docs]class Image: """ Plot an image from a Run. By default, higher-dimensional data is handled by repeatedly averaging over the leading dimension until there are only two dimensions. Parameters ---------- field : string Field name ("data key") for this image stream_name : string, optional Stream where fields x and y are found. Default is "primary". func : callable, optional Expected signature:: func(run: BlueskyRun, stream_name: str, x: str, y: str) -> x: Array, y: Array Default:: def func(run, field): ds = run[stream_name].to_dask() data = ds[field].data # Reduce the data until it is 2D by repeatedly averaging over # the leading axis until there only two axes. while data.ndim > 2: data = data.mean(0) return data axes : AxesSpec, optional If None, an axes and figure are created with default labels and titles derived from the ``x`` and ``y`` parameters. Attributes ---------- run : BlueskyRun The currently-viewed Run figure : FigureSpec func : callable axes : AxesSpec x : string Read-only access to x field name y : string Read-only access to y field name stream_name : string Read-only access to stream name Examples -------- >>> model = RecentLines(3, "motor", "det") >>> from bluesky_widgets.jupyter.figures import JupyterFigure >>> view = JupyterFigure(model.figure) >>> model.add_run(run) >>> model.add_run(another_run, pinned=True) """ def __init__(self, field, stream_name="primary", func=None, axes=None): super().__init__() if func is None: def func(run, field): ds = run[stream_name].to_dask() data = ds[field].data # Reduce the data until it is 2D by repeatedly averaging over # the leading axis until there only two axes. while data.ndim > 2: data = data.mean(0) return data # Stash these and expose them as read-only properties. self._field = field self._stream_name = stream_name self._func = func self._run = None if axes is None: axes = AxesSpec() figure = FigureSpec((axes,), title="") else: figure = axes.figure self.axes = axes self.figure = figure @property def run(self): return self._run @run.setter def run(self, value): self._run = value self.axes.images.clear() if self._run is not None: self._add_image() def _add_image(self): md = self.run.metadata["start"] title = f"Scan ID {md['scan_id']} UID {md['uid'][:8]}" func = functools.partial(self.func, field=self.field) image = ImageSpec(func, self.run, label=self.field) self.axes.images.append(image) self.axes.title = title # TODO Set axes x, y from xarray dims @property def func(self): return self._func def stream_name(self): return self._stream_name @property def field(self): return self._field