from bluesky_live.bluesky_run import BlueskyRun, DocumentCache
import event_model
[docs]def stream_documents_into_runs(add_run):
"""
Convert a flat stream of documents to "live" BlueskyRuns.
Parameters
----------
add_run : callable
This will be called as ``add_run(run: BlueskyRun)`` each time a 'start'
document is received.
Returns
-------
callback : callable
This should be subscribed to a callback registry that calls it like
``callback(name, doc)``.
Examples
--------
This is used for connecting something that emits a flat stream of documents
to something that wants to receive BlueskyRuns.
Append to a plain list.
>>> from bluesky import RunEngine
>>> RE = RunEngine()
>>> runs = []
>>> RE.subscribe(stream_documents_into_runs(runs.append))
Or, more usefully to an observable list.
>>> from bluesky_widgets.models.utils import RunList
>>> runs = RunList()
>>> RE.subscribe(stream_documents_into_runs(runs.append))
Add runs to a model with an ``add_run`` method. For example, it might be a
model that generates figures.
>>> from bluesky_widgets.models.plot_builders import AutoRecentLines
>>> model = AutoRecentLines(3)
>>> RE.subscribe(stream_documents_into_runs(model.add_run))
"""
def factory(name, doc):
dc = DocumentCache()
def build_and_add_run(event):
run = BlueskyRun(dc)
add_run(run)
dc.events.started.connect(build_and_add_run)
return [dc], []
rr = event_model.RunRouter([factory])
return rr