Lock Step Implementations#
As discussed in Lockstep Agents and Synchronous Integration with RunEngine, the synchronous approach to agent implementation can be
accomplished on a per-event or per-run basis in bluesky-adaptive
.
The core of synchronous integration lies in the utilization of adaptive plans and factory functions.
The adaptive plans
Per-Event#
In cases where the computation we need to do to recommend the next step
is fast compared to the time it takes to collect a single data point (
aka an event
), then it makes sense to run
the recommendation engine on every Event.
Generate the callback and queue for recommender agent integration. |
|
Execute an adaptive scan using an per event-run recommendation engine. |
Per-Run#
In cases where the data we need to make a decision about what to do next maps more closely to a Run, we do the same as the Per-Event case, but only expect a recommendation once per-run.
Generate the callback and queue for an Adaptive API backed reccomender. |
|
Execute an adaptive scan using an inter-run recommendation engine. |
|
Generate the callback and queue for an Adaptive API backed reccomender. |
Integrated Demonstrations#
The following examaples are provided to illustrate the rational and mechanics behind the lock-step implementations. They show how to build a reccomender factory from scratch, with the agent embedded in the factory, where the agent is a simple step function.
Per-event#
def per_event_plan_sequence_factory(
sequence, independent_keys, dependent_keys, *, max_count=10, queue=None
):
"""
Generate the callback and queue for a naive recommendation engine.
This returns the same sequence of points no matter what the
measurements are.
For each Event that the callback sees it will place either a
recommendation or `None` into the queue. Recommendations will be
of a dict mapping the independent_keys to the recommended values and
should be interpreted by the plan as a request for more data. A `None`
placed in the queue should be interpreted by the plan as in instruction to
terminate the run.
Parameters
----------
sequence : iterable of positions
This should be an iterable of positions vectors that match the motors
independent_keys : List[str]
The names of the independent keys in the events
dependent_keys : List[str]
The names of the dependent keys in the events
max_count : int, optional
The maximum number of measurements to take before poisoning the queue.
queue : Queue, optional
The communication channel for the callback to feedback to the plan.
If not given, a new queue will be created.
Returns
-------
callback : Callable[str, dict]
This function must be subscribed to RunEngine to receive the
document stream.
queue : Queue
The communication channel between the callback and the plan. This
is always returned (even if the user passed it in).
"""
seq = iter(itertools.cycle(sequence))
if queue is None:
queue = Queue()
# TODO handle multi-stream runs!
def callback(name, doc):
if name == "event":
if doc["seq_num"] >= max_count:
# if at max number of points poison the queue and return early
queue.put(None)
return
payload = doc["data"]
inp, measurements = extract_arrays(
independent_keys, dependent_keys, payload
)
# call something to get next point!
next_point = next(seq)
queue.put({k: v for k, v in zip(independent_keys, next_point)})
return callback, queue
def per_event_plan_step_factory(
step, independent_keys, dependent_keys, *, max_count=10, queue=None
):
"""
Generate the callback and queue for a naive recommendation engine.
This recommends a fixed step size independent of the measurement.
For each Event that the callback sees it will place either a
recommendation or `None` into the queue. Recommendations will be
of a dict mapping the independent_keys to the recommended values and
should be interpreted by the plan as a request for more data. A `None`
placed in the queue should be interpreted by the plan as in instruction to
terminate the run.
Parameters
----------
step : array[float]
The delta step to take on each point
independent_keys : List[str]
The names of the independent keys in the events
dependent_keys : List[str]
The names of the dependent keys in the events
max_count : int, optional
The maximum number of measurements to take before poisoning the queue.
queue : Queue, optional
The communication channel for the callback to feedback to the plan.
If not given, a new queue will be created.
Returns
-------
callback : Callable[str, dict]
This function must be subscribed to RunEngine to receive the
document stream.
queue : Queue
The communication channel between the callback and the plan. This
is always returned (even if the user passed it in).
"""
if queue is None:
queue = Queue()
def callback(name, doc):
# TODO handle multi-stream runs!
if name == "event_page":
if doc["seq_num"][-1] > max_count:
# if at max number of points poison the queue and return early
queue.put(None)
return
payload = doc["data"]
# This is your "motor positions" and the "extracted measurements"
independent, measurement = extract_arrays(
independent_keys, dependent_keys, payload
)
# call something to get next point!
next_point = independent + step
queue.put({k: v for k, v in zip(independent_keys, next_point)})
rr = RunRouter([lambda name, doc: ([callback], [])])
return rr, queue
and to run it:
cb, queue = intra_plan_step_factory(np.asarray((5, 5)), ['ctrl_Ti', 'ctrl_temp'], ['rois_I_00', 'rois_I_01'] )
intra_run_adaptive_plan([rois], {ctrl.Ti: 15, ctrl.temp: 300}, to_brains=cb, from_brains=queue)
Per-start#
def per_start_step_factory(
step, independent_keys, dependent_keys, *, max_count=10, queue=None
):
"""
Generate the callback and queue for a naive recommendation engine.
This recommends a fixed step size independent of the measurement.
For each Run (aka Start) that the callback sees it will place
either a recommendation or `None` into the queue. Recommendations
will be of a dict mapping the independent_keys to the recommended
values and should be interpreted by the plan as a request for more
data. A `None` placed in the queue should be interpreted by the
plan as in instruction to terminate the run.
The StartDocuments in the stream must contain the key
``'batch_count'``.
Parameters
----------
step : array[float]
The delta step to take on each point
independent_keys : List[str]
The names of the independent keys in the events
dependent_keys : List[str]
The names of the dependent keys in the events
max_count : int, optional
The maximum number of measurements to take before poisoning the queue.
queue : Queue, optional
The communication channel for the callback to feedback to the plan.
If not given, a new queue will be created.
Returns
-------
callback : Callable[str, dict]
This function must be subscribed to RunEngine to receive the
document stream.
queue : Queue
The communication channel between the callback and the plan. This
is always returned (even if the user passed it in).
"""
if queue is None:
queue = Queue()
def callback(name, doc):
# TODO handle multi-stream runs with more than 1 event!
if name == 'start':
if doc['batch_count'] > max_count:
queue.put(None)
return
if name == "event_page":
payload = doc["data"]
# This is your "motor positions"
independent = np.asarray([payload[k][-1] for k in independent_keys])
# This is the extracted measurements
measurement = np.asarray([payload[k][-1] for k in dependent_keys])
# call something to get next point!
next_point = independent + step
queue.put({k: v for k, v in zip(independent_keys, next_point)})
rr = RunRouter([lambda name, doc: ([callback], [])])
return rr, queue
def adaptive_factory_factory(
adaptive_factory, independent_keys, dependent_keys, *, max_count=10, queue=None
):
"""
Generate the callback and queue for a naive recommendation engine.
This recommends a fixed step size independent of the measurement.
For each Run (aka Start) that the callback sees it will place
either a recommendation or `None` into the queue. Recommendations
will be of a dict mapping the independent_keys to the recommended
values and should be interpreted by the plan as a request for more
data. A `None` placed in the queue should be interpreted by the
plan as in instruction to terminate the run.
The StartDocuments in the stream must contain the key
``'batch_count'``.
Parameters
----------
adaptive_factory : Callable[dict] -> adaptive.BaseLearner
Function that when passed a Start document will return an
`adaptive.BaseLearner` object ready to go
independent_keys : List[str]
The names of the independent keys in the events
dependent_keys : List[str]
The names of the dependent keys in the events
max_count : int, optional
The maximum number of measurements to take before poisoning the queue.
queue : Queue, optional
The communication channel for the callback to feedback to the plan.
If not given, a new queue will be created.
Returns
-------
callback : Callable[str, dict]
This function must be subscribed to RunEngine to receive the
document stream.
queue : Queue
The communication channel between the callback and the plan. This
is always returned (even if the user passed it in).
"""
if queue is None:
queue = Queue()
last_batch_id = None
adaptive_obj = None
def callback(name, doc):
nonlocal last_batch_id, adaptive_obj
# TODO handle multi-stream runs with more than 1 event!
if name == "start":
if doc["batch_count"] > max_count:
queue.put(None)
return
if last_batch_id != doc["batch_id"]:
last_batch_id = doc["batch_id"]
adaptive_obj = adaptive_factory(doc)
if name == "event_page":
payload = doc["data"]
# This is your "motor positions"
independent = np.asarray([payload[k][-1] for k in independent_keys])
# This is the extracted measurements
measurement = np.asarray([payload[k][-1] for k in dependent_keys])
# push into the adaptive API
adaptive_obj.tell(independent, measurement)
# pull the next point out of the adaptive API
next_point = adaptive_obj.ask(1)
queue.put({k: v for k, v in zip(independent_keys, next_point)})
rr = RunRouter([lambda name, doc: ([callback], [])])
return rr, queue