import time as ttime
from collections import ChainMap
from collections.abc import Iterable
import numpy as np
from event_model import DocumentNames, schema_validators
from ..run_engine import Dispatcher
from ..utils import new_uid
from .core import CallbackBase
[docs]
class LiveDispatcher(CallbackBase):
"""
A secondary event stream of processed data
The LiveDipatcher base implementation does not change any of the data
emitted, this task is left to sub-classes, but instead handles
reimplementing a secondary event stream that fits the same schema demanded
by the RunEngine itself. In order to reduce the work done by these
processed data pipelines, the LiveDispatcher handles the nitty-gritty
details of formatting the event documents. This includes creating new uids,
numbering events and creating descriptors.
The LiveDispatcher can be subscribed to using the same syntax as the
RunEngine, effectively creating a small chain of callbacks
.. code::
# Create our dispatcher
ld = LiveDispatcher()
# Subscribe it to receive events from the RunEgine
RE.subscribe(ld)
# Subscribe any callbacks we desire to second stream
ld.subscribe(LivePlot('det', x='motor'))
"""
def __init__(self):
# Public dispatcher for callbacks
self.dispatcher = Dispatcher()
# Local caches for internal use
self.seq_count = 0 # Maintain our own sequence count for this stream
self.raw_descriptors = dict() # Store raw descriptors for use later # noqa: C408
self._stream_start_uid = None # Generated start doc uid
self._descriptors = dict() # Dictionary of sent descriptors # noqa: C408
[docs]
def start(self, doc, _md=None):
"""Receive a raw start document, re-emit it for the modified stream"""
self._stream_start_uid = new_uid()
_md = _md or dict() # noqa: C408
# Create a new start document with a new uid, start time, and the uid
# of the original start document. Preserve the rest of the metadata
# that we retrieved from the start document
md = ChainMap(
{"uid": self._stream_start_uid, "original_run_uid": doc["uid"], "time": ttime.time()}, _md, doc
)
# Dispatch the start document for anyone subscribed to our Dispatcher
self.emit(DocumentNames.start, dict(md))
super().start(doc)
[docs]
def descriptor(self, doc):
"""Store a descriptor"""
self.raw_descriptors[doc["uid"]] = doc
super().descriptor(doc)
[docs]
def event(self, doc, **kwargs):
"""
Receive an event document from the raw stream.
This should be reimplemented by a subclass.
Parameters
----------
doc : event
kwargs:
All keyword arguments are passed to :meth:`.process_event`
"""
self.process_event(doc, **kwargs)
return super().event(doc)
[docs]
def process_event(self, doc, stream_name="primary", id_args=None, config=None):
"""
Process a modified event document then emit it for the modified stream
This will pass an Event document to the dispatcher. If we have received
a new event descriptor from the original stream, or we have received a
new set of `id_args` or `descriptor_id` , a new descriptor document is
first issued and passed through to the dispatcher. When issuing a new
event, the new descriptor is given a new source field.
Parameters
----------
doc : event
stream_name : str, optional
String identifier for a particular stream
id_args : tuple, optional
Additional tuple of hashable objects to identify the stream
config: dict, optional
Additional configuration information to be included in the event
descriptor
Notes
-----
Any callback subscribed to the `Dispatcher` will receive these event
streams. If nothing is subscribed, these documents will not go
anywhere.
"""
id_args = id_args or (doc["descriptor"],)
config = config or dict() # noqa: C408
# Determine the descriptor id
desc_id = frozenset((tuple(doc["data"].keys()), stream_name, id_args))
# If we haven't described this configuration
# Send a new document to our subscribers
if stream_name not in self._descriptors or desc_id not in self._descriptors[stream_name]:
# Create a new description document for the output of the stream
data_keys = dict() # noqa: C408
# Parse the event document creating a new description. If the key
# existed in the original source description, just assume that it
# is the same type, units and shape. Otherwise do some
# investigation
raw_desc = self.raw_descriptors.get(doc["descriptor"], {})
for key, val in doc["data"].items():
# Described priorly
if key in raw_desc["data_keys"]:
key_desc = raw_desc["data_keys"][key]
# String key
elif isinstance(val, str):
key_desc = {"dtype": "string", "shape": []}
# Iterable
elif isinstance(val, Iterable):
key_desc = {"dtype": "array", "shape": np.shape(val)}
# Number
else:
key_desc = {"dtype": "number", "shape": []}
# Modify the source
key_desc["source"] = "Stream"
# Store in our new descriptor
data_keys[key] = key_desc
# Create our complete description document
desc = ChainMap(
{
"uid": new_uid(),
"time": ttime.time(),
"run_start": self._stream_start_uid,
"data_keys": data_keys,
"configuration": config,
"object_keys": {"stream": list(data_keys.keys())},
},
raw_desc,
)
# Store information about our descriptors
desc = dict(desc)
if stream_name not in self._descriptors:
self._descriptors[stream_name] = dict() # noqa: C408
self._descriptors[stream_name][desc_id] = desc
# Emit the document to all subscribers
self.emit(DocumentNames.descriptor, desc)
# Clean the Event document produced by graph network. The data is left
# untouched, but the relevant uids, timestamps, seq_num are modified so
# that this event is not confused with the raw data stream
self.seq_count += 1
desc_uid = self._descriptors[stream_name][desc_id]["uid"]
current_time = ttime.time()
evt = ChainMap(
{
"uid": new_uid(),
"descriptor": desc_uid,
"timestamps": dict((key, current_time) for key in doc["data"].keys()), # noqa: C402
"seq_num": self.seq_count,
"time": current_time,
},
doc,
)
# Emit the event document
self.emit(DocumentNames.event, dict(evt))
[docs]
def stop(self, doc, _md=None):
"""Receive a raw stop document, re-emit it for the modified stream"""
# Create a new stop document with a new_uid, pointing to the correct
# start document uid, and tally the number of events we have emitted.
# The rest of the stop information is passed on to the next callback
_md = _md or dict() # noqa: C408
num_events = dict((stream, len(self._descriptors[stream])) for stream in self._descriptors.keys()) # noqa: C402
md = ChainMap(
dict(run_start=self._stream_start_uid, time=ttime.time(), uid=new_uid(), num_events=num_events), # noqa: C408
doc, # noqa: C408
)
self.emit(DocumentNames.stop, dict(md))
# Clear the local caches for the run
self.seq_count = 0
self.raw_descriptors.clear()
self._descriptors.clear()
self._stream_start_uid = None
super().stop(doc)
[docs]
def emit(self, name, doc):
"""Check the document schema and send to the dispatcher"""
schema_validators[name].validate(doc)
self.dispatcher.process(name, doc)
[docs]
def subscribe(self, func, name="all"):
"""Convenience function for dispatcher subscription"""
return self.dispatcher.subscribe(func, name)
[docs]
def unsubscribe(self, token):
"""Convenience function for dispatcher un-subscription"""
self.dispatcher.unsubscribe(token)