Source code for bluesky_adaptive.per_start
"""
Per-Start adaptive integration.
These functions are for integrating adaptive logic that works
between runs. The decision making process can expect to consume a
full run before having to make a recommendation about what to do next.
This may be desirable if there is a major time miss-match between the
computation and the experiment, of the data collection is not amenable
to streaming analysis, or the natural structure of the experiment
dictates.
This corresponds to a "middle" scale of adaptive integration into
data collection.
"""
import itertools
import uuid
from queue import Empty, Queue
import bluesky.plan_stubs as bps
import bluesky.plans as bp
import bluesky.preprocessors as bpp
from event_model import RunRouter
from .recommendations import NoRecommendation
from .utils import extract_event_page
[docs]
def recommender_factory(adaptive_obj, independent_keys, dependent_keys, *, max_count=10, queue=None):
"""
Generate the callback and queue for an Adaptive API backed reccomender.
This recommends a fixed step size independent of the measurement.
For each Run (aka Start) that the callback sees it will place
either a recommendation or `None` into the queue. Recommendations
will be of a dict mapping the independent_keys to the recommended
values and should be interpreted by the plan as a request for more
data. A `None` placed in the queue should be interpreted by the
plan as in instruction to terminate the run.
The StartDocuments in the stream must contain the key
``'batch_count'``.
Parameters
----------
adaptive_object : adaptive.BaseLearner
The recommendation engine
independent_keys : List[str]
The names of the independent keys in the events
dependent_keys : List[str]
The names of the dependent keys in the events
max_count : int, optional
The maximum number of measurements to take before poisoning the queue.
queue : Queue, optional
The communication channel for the callback to feedback to the plan.
If not given, a new queue will be created.
Returns
-------
callback : Callable[str, dict]
This function must be subscribed to RunEngine to receive the
document stream.
queue : Queue
The communication channel between the callback and the plan. This
is always returned (even if the user passed it in).
"""
if queue is None:
queue = Queue()
poisoned = None
def callback(name, doc):
nonlocal poisoned
# TODO handle multi-stream runs with more than 1 event!
if name == "start":
if doc["batch_count"] > max_count:
queue.put(None)
poisoned = True
return
else:
poisoned = False
if name == "event_page":
if poisoned:
return
independent, measurement = extract_event_page(independent_keys, dependent_keys, payload=doc["data"])
adaptive_obj.tell_many(independent, measurement)
# pull the next point out of the adaptive API
try:
next_point = adaptive_obj.ask(1)
except NoRecommendation:
queue.put(None)
else:
queue.put({k: v for k, v in zip(independent_keys, next_point)})
rr = RunRouter([lambda name, doc: ([callback], [])])
return rr, queue
[docs]
def adaptive_plan(dets, first_point, *, to_recommender, from_recommender, md=None, take_reading=None):
"""
Execute an adaptive scan using an inter-run recommendation engine.
Parameters
----------
dets : List[OphydObj]
The detector to read at each point. The dependent keys that the
recommendation engine is looking for must be provided by these
devices.
first_point : Dict[Settable, Any]
The first point of the scan. The motors that will be scanned
are extracted from the keys. The independent keys that the
recommendation engine is looking for / returning must be provided
by these devices.
to_recommender : Callable[document_name: str, document: dict]
This is the callback that will be registered to the RunEngine.
The expected contract is for each event it will place either a
dict mapping independent variable to recommended value or None.
This plan will either move to the new position and take data
if the value is a dict or end the run if `None`
from_recommender : Queue
The consumer side of the Queue that the recommendation engine is
putting the recommendations onto.
md : dict[str, Any], optional
Any extra meta-data to put in the Start document
take_reading : plan, optional
function to do the actual acquisition ::
def take_reading(dets, md={}):
yield from ...
Callable[List[OphydObj], Optional[Dict[str, Any]]] -> Generator[Msg], optional
This plan must generate exactly 1 Run
Defaults to `bluesky.plans.count`
"""
if take_reading is None:
take_reading = bp.count
# extract the motors
motors = list(first_point.keys())
# convert the first_point variable to from we will be getting
# from queue
first_point = {m.name: v for m, v in first_point.items()}
_md = {"batch_id": str(uuid.uuid4())}
_md.update(md or {})
@bpp.subs_decorator(to_recommender)
def inner_plan():
# drain the queue in case there is anything left over from a previous
# run
while True:
try:
from_recommender.get(block=False)
except Empty:
break
uids = []
next_point = first_point
for j in itertools.count():
# this assumes that m.name == the key in Event
target = {m: next_point[m.name] for m in motors}
motor_position_pairs = itertools.chain(*target.items())
yield from bps.mov(*motor_position_pairs)
uid = yield from take_reading(dets + motors, md={**_md, "batch_count": j})
uids.append(uid)
next_point = from_recommender.get(timeout=1)
if next_point is None:
return
return uids
return (yield from inner_plan())