Source code for bluesky_adaptive.per_event
"""
Per-Event adaptive integration.
These tools are for integrating the adaptive logic inside of a run.
They are expected to get single events and provide feedback to drive
the plan based in that information. This is useful when the computation
time to make the decision is short compared to acquisition / movement time
and the computation is amenable to streaming analysis.
This is a "fine" grained integration of the adaptive logic into data acquisition.
"""
import itertools
from queue import Queue
import bluesky.plan_stubs as bps
import bluesky.preprocessors as bpp
from bluesky.utils import RunEngineControlException
from event_model import RunRouter
from .recommendations import NoRecommendation, RequestPause
from .utils import extract_event_page
[docs]
def recommender_factory(recommender, independent_keys, dependent_keys, *, max_count=10, queue=None):
"""
Generate the callback and queue for recommender agent integration.
For each Event that the callback sees it will place either a
recommendation, `None`, or Exception for the Run Engine into the queue.
Recommendations will be of a dict mapping the independent_keys to the recommended values and
should be interpreted by the plan as a request for more data. A `None`
placed in the queue should be interpreted by the plan as in instruction to terminate the run.
An `Exception` placed in the queue will be raised by the plan.
Parameters
----------
recommender : object
The recommendation agent object with ask/tell interface
independent_keys : List[str]
The names of the independent keys in the events
dependent_keys : List[str]
The names of the dependent keys in the events
max_count : int, optional
The maximum number of measurements to take before poisoning the queue.
queue : Queue, optional
The communication channel for the callback to feedback to the plan.
If not given, a new queue will be created.
Returns
-------
callback : Callable[str, dict]
This function must be subscribed to RunEngine to receive the
document stream.
queue : Queue
The communication channel between the callback and the plan. This
is always returned (even if the user passed it in).
"""
if queue is None:
queue = Queue()
# TODO handle multi-stream runs!
def callback(name, doc):
if name == "event_page":
if doc["seq_num"][-1] > max_count:
# if at max number of points poison the queue and return early
queue.put(None)
return
independent, measurement = extract_event_page(independent_keys, dependent_keys, payload=doc["data"])
recommender.tell_many(independent, measurement)
try:
next_point = recommender.ask(1)
except NoRecommendation:
# no recommendation
queue.put(None)
except RunEngineControlException as e:
# Recommendation to stop/abort/pause
queue.put(e)
except Exception as e:
# Some other exception will be raised by the plan
queue.put(e)
else:
queue.put({k: v for k, v in zip(independent_keys, next_point)})
rr = RunRouter([lambda name, doc: ([callback], [])])
return rr, queue
[docs]
def adaptive_plan(dets, first_point, *, to_recommender, from_recommender, md=None, take_reading=None):
"""
Execute an adaptive scan using an per event-run recommendation engine.
The communication pattern here is that there is 1 recommendation for
each Event that is generate
Parameters
----------
dets : List[OphydObj]
The detector to read at each point. The dependent keys that the
recommendation engine is looking for must be provided by these
devices.
first_point : Dict[Settable, Any]
The first point of the scan. The motors that will be scanned
are extracted from the keys. The independent keys that the
recommendation engine is looking for / returning must be provided
by these devices.
to_recommender : Callable[document_name: str, document: dict]
This is the callback that will be registered to the RunEngine.
The expected contract is for each event it will place either a
dict mapping independent variable to recommended value or None.
This plan will either move to the new position and take data
if the value is a dict or end the run if `None`
from_recommender : Queue
The consumer side of the Queue that the recommendation engine is
putting the recommendations onto.
md : dict[str, Any], optional
Any extra meta-data to put in the Start document
take_reading : plan, optional
function to do the actual acquisition ::
def take_reading(dets, name='primary'):
yield from ...
Callable[List[OphydObj], Optional[str]] -> Generator[Msg], optional
Defaults to `bluesky.plan_stubs.trigger_and_read`
"""
if take_reading is None:
take_reading = bps.trigger_and_read
# TODO inject args / kwargs here.
_md = {"hints": {}}
_md.update(md or {})
try:
_md["hints"].setdefault("dimensions", [(m.hints["fields"], "primary") for m in first_point.keys()])
except (AttributeError, KeyError):
...
# extract the motors
motors = list(first_point.keys())
# convert the first_point variable to from we will be getting
# from queue
first_point = {m.name: v for m, v in first_point.items()}
@bpp.subs_decorator(to_recommender)
@bpp.run_decorator(md=_md)
def adaptive_inner_plan():
next_point = first_point
while True:
# this assumes that m.name == the key in Event
target = {m: next_point[m.name] for m in motors}
motor_position_pairs = itertools.chain(*target.items())
yield from bps.mov(*motor_position_pairs)
yield from take_reading(dets + motors, name="primary")
next_point = from_recommender.get(timeout=1)
if next_point is None:
return
elif isinstance(next_point, RequestPause):
yield from bps.pause()
elif isinstance(next_point, Exception):
raise next_point
return (yield from adaptive_inner_plan())