Source code for bluesky_widgets.utils.streaming

from bluesky_live.bluesky_run import BlueskyRun, DocumentCache
import event_model


[docs]def stream_documents_into_runs(add_run): """ Convert a flat stream of documents to "live" BlueskyRuns. Parameters ---------- add_run : callable This will be called as ``add_run(run: BlueskyRun)`` each time a 'start' document is received. Returns ------- callback : callable This should be subscribed to a callback registry that calls it like ``callback(name, doc)``. Examples -------- This is used for connecting something that emits a flat stream of documents to something that wants to receive BlueskyRuns. Append to a plain list. >>> from bluesky import RunEngine >>> RE = RunEngine() >>> runs = [] >>> RE.subscribe(stream_documents_into_runs(runs.append)) Or, more usefully to an observable list. >>> from bluesky_widgets.models.utils import RunList >>> runs = RunList() >>> RE.subscribe(stream_documents_into_runs(runs.append)) Add runs to a model with an ``add_run`` method. For example, it might be a model that generates figures. >>> from bluesky_widgets.models.plot_builders import AutoRecentLines >>> model = AutoRecentLines(3) >>> RE.subscribe(stream_documents_into_runs(model.add_run)) """ def factory(name, doc): dc = DocumentCache() def build_and_add_run(event): run = BlueskyRun(dc) add_run(run) dc.events.started.connect(build_and_add_run) return [dc], [] rr = event_model.RunRouter([factory]) return rr