Source code for bluesky_adaptive.on_stop
"""
Per-Start adaptive integration, but triggered on the stop document.
These functions are for integrating adaptive logic that works
between runs. The decision making process can expect to consume a
full run before having to make a recommendation about what to do next.
This may be desirable if there is a major time miss-match between the
computation and the experiment, of the data collection is not amenable
to streaming analysis, or the natural structure of the experiment
dictates.
This corresponds to a "middle" scale of adaptive integration into
data collection.
"""
from queue import Queue
import event_model
from bluesky_live.bluesky_run import BlueskyRun, DocumentCache
from bluesky_widgets.models.utils import call_or_eval
from ophyd.sim import NumpySeqHandler
from .recommendations import NoRecommendation
def stream_documents_into_runs(add_run):
"""
Convert a flat stream of documents to "live" BlueskyRuns.
Parameters
----------
add_run : callable
This will be called as ``add_run(run: BlueskyRun)`` each time a 'start'
document is received.
Returns
-------
callback : callable
This should be subscribed to a callback registry that calls it like
``callback(name, doc)``.
Examples
--------
This is used for connecting something that emits a flat stream of documents
to something that wants to receive BlueskyRuns.
Append to a plain list.
>>> from bluesky import RunEngine
>>> RE = RunEngine()
>>> runs = []
>>> RE.subscribe(stream_documents_into_runs(runs.append))
Or, more usefully to an observable list.
>>> from bluesky_widgets.models.utils import RunList
>>> runs = RunList()
>>> RE.subscribe(stream_documents_into_runs(runs.append))
Add runs to a model with an ``add_run`` method. For example, it might be a
model that generates figures.
>>> from bluesky_widgets.models.plot_builders import AutoLines
>>> model = AutoLines()
>>> RE.subscribe(stream_documents_into_runs(model.add_run))
"""
def factory(name, doc):
dc = DocumentCache()
def build_and_add_run(event):
run = BlueskyRun(dc)
add_run(run)
dc.events.started.connect(build_and_add_run)
return [dc], []
rr = event_model.RunRouter([factory], handler_registry={"NPY_SEQ": NumpySeqHandler})
return rr
[docs]
def recommender_factory(
*,
adaptive_obj,
independent_keys,
dependent_keys,
target_keys,
stream_names=("primary",),
max_count=10,
queue=None,
target_transforms=None
):
"""
Generate the callback and queue for an Adaptive API backed reccomender.
This recommends a fixed step size independent of the measurement.
For each Run (aka Start) that the callback sees it will place
either a recommendation or `None` into the queue. Recommendations
will be of a dict mapping the independent_keys to the recommended
values and should be interpreted by the plan as a request for more
data. A `None` placed in the queue should be interpreted by the
plan as in instruction to terminate the run.
The StartDocuments in the stream must contain the key
``'batch_count'``.
Parameters
----------
adaptive_obj : adaptive.BaseLearner
The recommendation engine. Must implement
independent_keys : List[String | Callable]
Each value must be a stream name, field name, a valid Python
expression, or a callable. The signature of the callable may include
any valid Python identifiers provideed by :func:`construct_namespace`
or the user-provided namespace parmeter below. See examples.
dependent_keys : List[String | Callable]
Each value must be a stream name, field name, a valid Python
expression, or a callable. The signature of the callable may include
any valid Python identifiers provideed by :func:`construct_namespace`
or the user-provided namespace parmeter below. See examples.
target_keys : List[String]
Keys passed back to the plan, must be the same length as
the return of `adaptive_obj.ask(1)`
stream_names : Tuple[String], default ("primary",)
The streams to be offered to the
max_count : int, optional
The maximum number of measurements to take before poisoning the queue.
queue : Queue, optional
The communication channel for the callback to feedback to the plan.
If not given, a new queue will be created.
target_transforms : Dict[String, Callable], optional
Transforms to be applied to the values from ask before returning
to the run engine. This can be useful handling trivial coordinate
transformations.
Returns
-------
callback : Callable[str, dict]
This function must be subscribed to RunEngine to receive the
document stream.
queue : Queue
The communication channel between the callback and the plan. This
is always returned (even if the user passed it in).
"""
if queue is None:
queue = Queue()
if target_transforms is None:
target_transforms = {}
def tell_recommender(event):
run = event.run
independent_map = call_or_eval({j: val for j, val in enumerate(independent_keys)}, run, stream_names)
dependent_map = call_or_eval({j: val for j, val in enumerate(dependent_keys)}, run, stream_names)
independent = tuple(independent_map[j] for j in range(len(independent_keys)))
measurement = tuple(dependent_map[j] for j in range(len(dependent_keys)))
adaptive_obj.tell_many(independent, measurement)
# pull the next point out of the adaptive API
try:
next_point = adaptive_obj.ask(1)
except NoRecommendation:
queue.put(None)
else:
if run.metadata["start"].get("batch_count") >= max_count:
queue.put(None)
else:
queue.put({k: target_transforms.get(k, lambda x: x)(v) for k, v in zip(target_keys, next_point)})
def tell_recommender_on_completion(run):
run.events.completed.connect(tell_recommender)
return stream_documents_into_runs(tell_recommender_on_completion), queue