Source code for event_model

from collections import defaultdict, deque, namedtuple
import collections.abc
from distutils.version import LooseVersion
import copy
import json
from enum import Enum
from functools import partial
import itertools
import inspect
import os
from pkg_resources import resource_filename as rs_fn
import threading
import time as ttime
import types
import uuid
import warnings
import weakref

import jsonschema
import numpy

from ._version import get_versions

__all__ = ['DocumentNames', 'schemas', 'schema_validators', 'compose_run']


class DocumentNames(Enum):
    stop = 'stop'
    start = 'start'
    descriptor = 'descriptor'
    event = 'event'
    datum = 'datum'
    resource = 'resource'
    event_page = 'event_page'
    datum_page = 'datum_page'
    bulk_datum = 'bulk_datum'  # deprecated
    bulk_events = 'bulk_events'  # deprecated


[docs]class DocumentRouter: """ Route each document by type to a corresponding method. When an instance is called with a document type and a document like:: router(name, doc) the document is passed to the method of the corresponding name, as in:: getattr(router, name)(doc) The method is expected to return ``None`` or a valid document of the same type. It may be the original instance (passed through), a copy, or a different dict altogether. Finally, the call to ``router(name, doc)`` returns:: (name, getattr(router, name)(doc)) Parameters ---------- emit: callable, optional Expected signature ``f(name, doc)`` """ def __init__(self, *, emit=None): # Put in some extra effort to validate `emit` carefully, because if # this is used incorrectly the resultant errors can be confusing. if emit is not None: if not callable(emit): raise ValueError("emit must be a callable") sig = inspect.signature(emit) try: # Does this function accept two positional arguments? sig.bind(None, None) except TypeError: raise ValueError("emit must accept two positional arguments, name and doc") # Stash a weak reference to `emit`. if inspect.ismethod(emit): self._emit_ref = weakref.WeakMethod(emit) else: self._emit_ref = weakref.ref(emit) else: self._emit_ref = None def emit(self, name, doc): """ Emit to the callable provided an instantiation time, if any. """ if self._emit_ref is not None: # Call the weakref. emit = self._emit_ref() if emit is not None: emit(name, doc) def __call__(self, name, doc, validate=False): """ Process a document. Parameters ---------- name : string doc : dict validate : boolean Apply jsonschema validation to the documents coming *out*. This is False by default. Returns ------- name, output_doc : string, dict The same name as what was passed in, and a doc that may be the same instance as doc, a copy of doc, or a different dict altogether. """ return self._dispatch(name, doc, validate) def _dispatch(self, name, doc, validate): """ Dispatch to the method corresponding to the `name`. Optionally validate that the result is still a valid document. """ output_doc = getattr(self, name)(doc) # If 'event' is not defined by the subclass but 'event_page' is, or # vice versa, use that. And the same for 'datum_page' / 'datum. if output_doc is NotImplemented: if name == 'event': event_page = pack_event_page(doc) # Subclass' implementation of event_page may return a valid # EventPage or None or NotImplemented. output_event_page = self.event_page(event_page) output_event_page = output_event_page if output_event_page is not None else event_page if output_event_page is not NotImplemented: output_doc, = unpack_event_page(output_event_page) elif name == 'datum': datum_page = pack_datum_page(doc) # Subclass' implementation of datum_page may return a valid # DatumPage or None or NotImplemented. output_datum_page = self.datum_page(datum_page) output_datum_page = output_datum_page if output_datum_page is not None else datum_page if output_datum_page is not NotImplemented: output_doc, = unpack_datum_page(output_datum_page) elif name == 'event_page': output_events = [] for event in unpack_event_page(doc): # Subclass' implementation of event may return a valid # Event or None or NotImplemented. output_event = self.event(event) output_event = output_event if output_event is not None else event if output_event is NotImplemented: break output_events.append(output_event) else: output_doc = pack_event_page(*output_events) elif name == 'datum_page': output_datums = [] for datum in unpack_datum_page(doc): # Subclass' implementation of datum may return a valid # Datum or None or NotImplemented. output_datum = self.datum(datum) output_datum = output_datum if output_datum is not None else datum if output_datum is NotImplemented: break output_datums.append(output_datum) else: output_doc = pack_datum_page(*output_datums) # If we still don't find an implemented method by here, then pass the # original document through. if output_doc is NotImplemented: output_doc = doc if validate: schema_validators[getattr(DocumentNames, name)].validate(output_doc) return (name, output_doc if output_doc is not None else doc) # The methods below return NotImplemented, a built-in Python constant. # Note that it is not interchangeable with NotImplementedError. See docs at # https://docs.python.org/3/library/constants.html#NotImplemented # It is used here so that _dispatch, defined above, can detect whether a # subclass implements event, event_page, both, or neither. This is similar # to how Python uses NotImplemented in arithmetic operations, as described # in the documentation. def start(self, doc): return NotImplemented def stop(self, doc): return NotImplemented def descriptor(self, doc): return NotImplemented def resource(self, doc): return NotImplemented def event(self, doc): return NotImplemented def datum(self, doc): return NotImplemented def event_page(self, doc): return NotImplemented def datum_page(self, doc): return NotImplemented def bulk_events(self, doc): # Do not modify this in a subclass. Use event_page. warnings.warn( "The document type 'bulk_events' has been deprecated in favor of " "'event_page', whose structure is a transpose of 'bulk_events'.") for page in bulk_events_to_event_pages(doc): self.event_page(page) def bulk_datum(self, doc): # Do not modify this in a subclass. Use event_page. warnings.warn( "The document type 'bulk_datum' has been deprecated in favor of " "'datum_page', whose structure is a transpose of 'bulk_datum'.") self.datum_page(bulk_datum_to_datum_page(doc))
class SingleRunDocumentRouter(DocumentRouter): """ A DocumentRouter intended to process events from exactly one run. """ def __init__(self): super().__init__() self._start_doc = None self._descriptors = dict() def __call__(self, name, doc, validate=False): """ Process a document. Also, track of the start document and descriptor documents passed to this SingleRunDocumentRouter in caches. Parameters ---------- name : string doc : dict validate : boolean Apply jsonschema validation to the documents coming *out*. This is False by default. Returns ------- name, output_doc : string, dict The same name as what was passed in, and a doc that may be the same instance as doc, a copy of doc, or a different dict altogether. """ if name == 'start': if self._start_doc is None: self._start_doc = doc else: raise EventModelValueError( f'SingleRunDocumentRouter associated with start document {self._start_doc["uid"]} ' f'received a second start document with uid {doc["uid"]}' ) elif name == 'descriptor': if doc['run_start'] == self._start_doc['uid']: self._descriptors[doc['uid']] = doc else: raise EventModelValueError( f'SingleRunDocumentRouter associated with start document {self._start_doc["uid"]} ' f'received a descriptor {doc["uid"]} associated with start document {doc["run_start"]}' ) # Defer to superclass for dispatch/processing. return super().__call__(name, doc, validate) def get_start(self): """Convenience method returning the start document for the associated run. If no start document has been processed EventModelError will be raised. Returns ------- start document : dict """ if self._start_doc is None: raise EventModelError('SingleRunDocumentRouter has not processed a start document yet') return self._start_doc def get_descriptor(self, doc): """Convenience method returning the descriptor associated with the specified document. Parameters ---------- doc : dict event-model document Returns ------- descriptor document : dict """ if 'descriptor' not in doc: raise EventModelValueError(f'document is not associated with a descriptor:\n{doc}') elif doc['descriptor'] not in self._descriptors: raise EventModelValueError( f'SingleRunDocumentRouter has not processed a descriptor with uid {doc["descriptor"]}' ) return self._descriptors[doc['descriptor']] def get_stream_name(self, doc): """Convenience method returning the name of the stream for the specified document. Parameters ---------- doc : dict event-model document Returns ------- stream name : str """ return self.get_descriptor(doc).get('name') class HandlerRegistryView(collections.abc.Mapping): def __init__(self, handler_registry): self._handler_registry = handler_registry def __repr__(self): return f"HandlerRegistryView({self._handler_registry!r})" def __getitem__(self, key): return self._handler_registry[key] def __iter__(self): yield from self._handler_registry def __len__(self): return len(self._handler_registry) def __setitem__(self, key, value): raise EventModelTypeError( "The handler registry cannot be edited directly. " "Instead, use the method Filler.register_handler.") def __delitem__(self, key): raise EventModelTypeError( "The handler registry cannot be edited directly. " "Instead, use the method Filler.deregister_handler.") # A "coercion funcion" is a hook that Filler can use to, for example, ensure # all the external data read in my handlers is an *actual* numpy array as # opposed to some other array-like such as h5py.Dataset or dask.array.Array, # or wrap every result is dask.array.from_array(...). # # It has access to the handler_class as it is registered and to some state # provided by the Filler (more on that below). It is expected to return # something that is API-compatible with handler_class. That might be # handler_class itself (a no-op), a subclass, or an altogether different class # with the same API. See example below. # # The "state provided by the Filler", mentioned above is passed into the # coercion functions below as ``filler_state``. It is a namespace containing # information that may be useful for the coercion functions. Currently, it has # ``filler_state.descriptor`` and ``filler_state.key``. More may be added in # the future if the need arises. Ultimately, this is necessary because Resource # documents don't know the shape and dtype of the data that they reference. # That situation could be improved in the future; to some degree this is a # work-around. # # As an implementation detail, the ``filler_state`` is a ``threading.local`` # object to ensure that filling is thread-safe. # # Third-party libraries can register custom coercion options via the # register_coercion function below. For example, databroker uses this to # register a 'delayed' option. This avoids introducing dependency on a specific # delayed-computation framework (e.g. dask) in event-model itself. def as_is(handler_class, filler_state): "A no-op coercion function that returns handler_class unchanged." return handler_class def force_numpy(handler_class, filler_state): "A coercion that makes handler_class.__call__ return actual numpy.ndarray." class Subclass(handler_class): def __call__(self, *args, **kwargs): raw_result = super().__call__(*args, **kwargs) result_as_array = numpy.asarray(raw_result) return result_as_array Subclass.__name__ = f"Subclassed{handler_class.__name__}" Subclass.__qualname__ = f"Subclassed{handler_class.__qualname__}" return Subclass # maps coerce option to corresponding coercion function _coercion_registry = {'as_is': as_is, 'force_numpy': force_numpy} def register_coercion(name, func, overwrite=False): """ Register a new option for :class:`Filler`'s ``coerce`` argument. This is an advanced feature. See source code for comments and examples. Parameters ---------- name : string The new value for ``coerce`` that will invoke this function. func : callable Expected signature:: func(filler, handler_class) -> handler_class overwrite : boolean, optional False by default. Name collissions will raise ``EventModelValueError`` unless this is set to ``True``. """ if name in _coercion_registry and not overwrite: # If we are re-registering the same object, there is no problem. original = _coercion_registry[name] if original is func: return raise EventModelValueError( f"The coercion function {func} could not be registered for the " f"name {name} because {_coercion_registry[name]} is already " f"registered. Use overwrite=True to force it.") _coercion_registry[name] = func register_coersion = register_coercion # back-compat for a spelling mistake class Filler(DocumentRouter): """Pass documents through, loading any externally-referenced data. It is recommended to use the Filler as a context manager. Because the Filler manages caches of potentially expensive resources (e.g. large data in memory) managing its lifecycle is important. If used as a context manager, it will drop references to its caches upon exit from the context. Unless the user holds additional references to those caches, they will be garbage collected. But for some applications, such as taking multiple passes over the same data, it may be useful to keep a longer-lived Filler instance and then manually delete it when finished. See Examples below. Parameters ---------- handler_registry : dict Maps each 'spec' (a string identifying a given type or external resource) to a handler class. A 'handler class' may be any callable with the signature:: handler_class(full_path, **resource_kwargs) It is expected to return an object, a 'handler instance', which is also callable and has the following signature:: handler_instance(**datum_kwargs) As the names 'handler class' and 'handler instance' suggest, this is typically implemented using a class that implements ``__init__`` and ``__call__``, with the respective signatures. But in general it may be any callable-that-returns-a-callable. include : Iterable The set of fields to fill. By default all unfilled fields are filled. This parameter is mutually incompatible with the ``exclude`` parameter. exclude : Iterable The set of fields to skip filling. By default all unfilled fields are filled. This parameter is mutually incompatible with the ``include`` parameter. root_map: dict str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a ``root`` in ``root_map`` will be loaded using the mapped ``root``. coerce : {'as_is', 'numpy'} Default is 'as_is'. Other options (e.g. 'delayed') may be registered by external packages at runtime. handler_cache : dict, optional A cache of handler instances. If None, a dict is used. resource_cache : dict, optional A cache of Resource documents. If None, a dict is used. datum_cache : dict, optional A cache of Datum documents. If None, a dict is used. descriptor_cache : dict, optional A cache of EventDescriptor documents. If None, a dict is used. retry_intervals : Iterable, optional If data is not found on the first try, there may a race between the I/O systems creating the external data and this stream of Documents that reference it. If Filler encounters an ``IOError`` it will wait a bit and retry. This list specifies how long to sleep (in seconds) between subsequent attempts. Set to ``None`` to try only once before raising ``DataNotAccessible``. A subclass may catch this exception and implement a different retry mechanism --- for example using a different implementation of sleep from an async framework. But by default, a sequence of several retries with increasing sleep intervals is used. The default sequence should not be considered stable; it may change at any time as the authors tune it. Raises ------ DataNotAccessible If an IOError is raised when loading the data after the configured number of attempts. See the ``retry_intervals`` parameter for details. Examples -------- A Filler may be used as a context manager. >>> with Filler(handler_registry) as filler: ... for name, doc in stream: ... filler(name, doc) # mutates doc in place ... # Do some analysis or export with name and doc. Or as a long-lived object. >>> f = Filler(handler_registry) >>> for name, doc in stream: ... filler(name, doc) # mutates doc in place ... # Do some analysis or export with name and doc. ... >>> del filler # Free up memory from potentially large caches. """ def __init__(self, handler_registry, *, include=None, exclude=None, root_map=None, coerce='as_is', handler_cache=None, resource_cache=None, datum_cache=None, descriptor_cache=None, inplace=None, retry_intervals=(0.001, 0.002, 0.004, 0.008, 0.016, 0.032, 0.064, 0.128, 0.256, 0.512, 1.024)): if inplace is None: self._inplace = True warnings.warn( "'inplace' argument not specified. It is recommended to " "specify True or False. In future releases, 'inplace' " "will default to False.") else: self._inplace = inplace if include is not None and exclude is not None: raise EventModelValueError( "The parameters `include` and `exclude` are mutually " "incompatible. At least one must be left as the default, " "None.") try: self._coercion_func = _coercion_registry[coerce] except KeyError: raise EventModelKeyError( f"The option coerce={coerce!r} was given to event_model.Filler. " f"The valid options are {set(_coercion_registry)}.") self._coerce = coerce # See comments on coerision functions above for the use of # _current_state, which is passed to coercion functions' `filler_state` # parameter. self._current_state = threading.local() self._unpatched_handler_registry = {} self._handler_registry = {} for spec, handler_class in handler_registry.items(): self.register_handler(spec, handler_class) self.handler_registry = HandlerRegistryView(self._handler_registry) if include is not None: warnings.warn( "In a future release of event-model, the argument `include` " "will be removed from Filler.", DeprecationWarning) self.include = include if exclude is not None: warnings.warn( "In a future release of event-model, the argument `exclude` " "will be removed from Filler.", DeprecationWarning) self.exclude = exclude self.root_map = root_map or {} if handler_cache is None: handler_cache = self.get_default_handler_cache() if resource_cache is None: resource_cache = self.get_default_resource_cache() if datum_cache is None: datum_cache = self.get_default_datum_cache() if descriptor_cache is None: descriptor_cache = self.get_default_descriptor_cache() self._handler_cache = handler_cache self._resource_cache = resource_cache self._datum_cache = datum_cache self._descriptor_cache = descriptor_cache if retry_intervals is None: retry_intervals = [] self.retry_intervals = retry_intervals self._closed = False def __eq__(self, other): return ( type(self) is type(other) and self.inplace == other.inplace and self._coerce == other._coerce and self.include == other.include and self.exclude == other.exclude and self.root_map == other.root_map and type(self._handler_cache) is type(other._handler_cache) and type(self._resource_cache) is type(other._resource_cache) and type(self._datum_cache) is type(other._datum_cache) and type(self._descriptor_cache) is type(other._descriptor_cache) and self.retry_intervals == other.retry_intervals ) def __getstate__(self): return dict( inplace=self._inplace, coercion_func=self._coerce, handler_registry=self._unpatched_handler_registry, include=self.include, exclude=self.exclude, root_map=self.root_map, handler_cache=self._handler_cache, resource_cache=self._resource_cache, datum_cache=self._datum_cache, descriptor_cache=self._descriptor_cache, retry_intervals=self.retry_intervals) def __setstate__(self, d): self._inplace = d['inplace'] self._coerce = d['coercion_func'] # See comments on coerision functions above for the use of # _current_state, which is passed to coercion functions' `filler_state` # parameter. self._current_state = threading.local() self._unpatched_handler_registry = {} self._handler_registry = {} for spec, handler_class in d['handler_registry'].items(): self.register_handler(spec, handler_class) self.handler_registry = HandlerRegistryView(self._handler_registry) self.include = d['include'] self.exclude = d['exclude'] self.root_map = d['root_map'] self._handler_cache = d['handler_cache'] self._resource_cache = d['resource_cache'] self._datum_cache = d['datum_cache'] self._descriptor_cache = d['descriptor_cache'] retry_intervals = d['retry_intervals'] if retry_intervals is None: retry_intervals = [] self._retry_intervals = retry_intervals self._closed = False @property def retry_intervals(self): return self._retry_intervals @retry_intervals.setter def retry_intervals(self, value): self._retry_intervals = list(value) def __repr__(self): return "<Filler>" if not self._closed else "<Closed Filler>" @staticmethod def get_default_resource_cache(): return {} @staticmethod def get_default_descriptor_cache(): return {} @staticmethod def get_default_datum_cache(): return {} @staticmethod def get_default_handler_cache(): return {} @property def inplace(self): return self._inplace def clone(self, handler_registry=None, *, root_map=None, coerce=None, handler_cache=None, resource_cache=None, datum_cache=None, descriptor_cache=None, inplace=None, retry_intervals=None): """ Create a new Filler instance from this one. By default it will be created with the same settings that this Filler has. Individual settings may be overridden here. The clone does *not* share any caches or internal state with the original. """ if handler_registry is None: handler_registry = self._unpatched_handler_registry if root_map is None: root_map = self.root_map if coerce is None: coerce = self._coerce if inplace is None: inplace = self.inplace if retry_intervals is None: retry_intervals = self.retry_intervals return Filler(handler_registry, root_map=root_map, coerce=coerce, handler_cache=handler_cache, resource_cache=resource_cache, datum_cache=datum_cache, descriptor_cache=descriptor_cache, inplace=inplace, retry_intervals=retry_intervals) def register_handler(self, spec, handler, overwrite=False): """ Register a handler. Parameters ---------- spec: str handler: Handler overwrite: boolean, optional False by default Raises ------ DuplicateHandler If a handler is already registered for spec and overwrite is False See https://blueskyproject.io/event-model/external.html """ if (not overwrite) and (spec in self._handler_registry): original = self._unpatched_handler_registry[spec] if original is handler: return raise DuplicateHandler( f"There is already a handler registered for the spec {spec!r}. " f"Use overwrite=True to deregister the original.\n" f"Original: {original}\n" f"New: {handler}") self.deregister_handler(spec) # Keep a raw copy, unused above for identifying redundant registration. self._unpatched_handler_registry[spec] = handler # Let the 'coerce' argument to Filler.__init__ modify the handler if it # wants to. self._handler_registry[spec] = self._coercion_func( handler, self._current_state) def deregister_handler(self, spec): """ Deregister a handler. If no handler is registered for this spec, it is no-op and returns None. Parameters ---------- spec: str Returns ------- handler: Handler or None See https://blueskyproject.io/event-model/external.html """ handler = self._handler_registry.pop(spec, None) if handler is not None: self._unpatched_handler_registry.pop(spec) for key in list(self._handler_cache): resource_uid, spec_ = key if spec == spec_: del self._handler_cache[key] return handler def resource(self, doc): # Defer creating the handler instance until we actually need it, when # we fill the first Event field that requires this Resource. self._resource_cache[doc['uid']] = doc return doc # Handlers operate document-wise, so we'll explode pages into individual # documents. def datum_page(self, doc): datum = self.datum # Avoid attribute lookup in hot loop. for datum_doc in unpack_datum_page(doc): datum(datum_doc) return doc def datum(self, doc): self._datum_cache[doc['datum_id']] = doc return doc def event_page(self, doc): # TODO We may be able to fill a page in place, and that may be more # efficient than unpacking the page in to Events, filling them, and the # re-packing a new page. But that seems tricky in general since the # page may be implemented as a DataFrame or dict, etc. filled_doc = self.fill_event_page(doc, include=self.include, exclude=self.exclude) return filled_doc def event(self, doc): filled_doc = self.fill_event(doc, include=self.include, exclude=self.exclude) return filled_doc def fill_event_page(self, doc, include=None, exclude=None, inplace=None): filled_events = [] for event_doc in unpack_event_page(doc): filled_events.append(self.fill_event(event_doc, include=include, exclude=exclude, inplace=True)) filled_doc = pack_event_page(*filled_events) if inplace is None: inplace = self._inplace if inplace: doc['data'] = filled_doc['data'] doc['filled'] = filled_doc['filled'] return doc else: return filled_doc def get_handler(self, resource): """ Return a new Handler instance for this Resource. Parameters ---------- resource: dict Returns ------- handler: Handler """ if self._closed: raise EventModelRuntimeError( "This Filler has been closed and is no longer usable.") try: handler_class = self.handler_registry[resource['spec']] except KeyError as err: raise UndefinedAssetSpecification( f"Resource document with uid {resource['uid']} " f"refers to spec {resource['spec']!r} which is " f"not defined in the Filler's " f"handler registry.") from err # Apply root_map. resource_path = resource['resource_path'] original_root = resource.get('root', '') root = self.root_map.get(original_root, original_root) if root: resource_path = os.path.join(root, resource_path) msg = (f"Error instantiating handler " f"class {handler_class} " f"with Resource document {resource}. ") if root != original_root: msg += (f"Its 'root' field was " f"mapped from {original_root} to {root} by root_map.") else: msg += (f"Its 'root' field {original_root} was " f"*not* modified by root_map.") error_to_raise = EventModelError(msg) handler = _attempt_with_retries( func=handler_class, args=(resource_path,), kwargs=resource['resource_kwargs'], intervals=[0] + self.retry_intervals, error_to_catch=IOError, error_to_raise=error_to_raise) return handler def _get_handler_maybe_cached(self, resource): "Get a cached handler for this resource or make one and cache it." key = (resource['uid'], resource['spec']) try: handler = self._handler_cache[key] except KeyError: handler = self.get_handler(resource) self._handler_cache[key] = handler return handler def fill_event(self, doc, include=None, exclude=None, inplace=None): if inplace is None: inplace = self._inplace if inplace: filled_doc = doc else: filled_doc = copy.deepcopy(doc) descriptor = self._descriptor_cache[doc['descriptor']] from_datakeys = False self._current_state.descriptor = descriptor try: needs_filling = {key for key, val in doc['filled'].items() if val is False} except KeyError: # This document is not telling us which, if any, keys are filled. # Infer that none of the external data is filled. needs_filling = {key for key, val in descriptor['data_keys'].items() if 'external' in val} from_datakeys = True for key in needs_filling: self._current_state.key = key if exclude is not None and key in exclude: continue if include is not None and key not in include: continue try: datum_id = doc['data'][key] except KeyError as err: if from_datakeys: raise MismatchedDataKeys( "The documents are not valid. Either because they " "were recorded incorrectly in the first place, " "corrupted since, or exercising a yet-undiscovered " "bug in a reader. event['data'].keys() " "must equal descriptor['data_keys'].keys(). " f"event['data'].keys(): {doc['data'].keys()}, " "descriptor['data_keys'].keys(): " f"{descriptor['data_keys'].keys()}") from err else: raise MismatchedDataKeys( "The documents are not valid. Either because they " "were recorded incorrectly in the first place, " "corrupted since, or exercising a yet-undiscovered " "bug in a reader. event['filled'].keys() " "must be a subset of event['data'].keys(). " f"event['data'].keys(): {doc['data'].keys()}, " "event['filled'].keys(): " f"{doc['filled'].keys()}") from err # Look up the cached Datum doc. try: datum_doc = self._datum_cache[datum_id] except KeyError as err: raise UnresolvableForeignKeyError( datum_id, f"Event with uid {doc['uid']} refers to unknown Datum " f"datum_id {datum_id}") from err resource_uid = datum_doc['resource'] # Look up the cached Resource. try: resource = self._resource_cache[resource_uid] except KeyError as err: raise UnresolvableForeignKeyError( resource_uid, f"Datum with id {datum_id} refers to unknown Resource " f"uid {resource_uid}") from err self._current_state.resource = resource self._current_state.datum = datum_doc handler = self._get_handler_maybe_cached(resource) error_to_raise = DataNotAccessible( f"Filler was unable to load the data referenced by " f"the Datum document {datum_doc} and the Resource " f"document {resource}.") payload = _attempt_with_retries( func=handler, args=(), kwargs=datum_doc['datum_kwargs'], intervals=[0] + self.retry_intervals, error_to_catch=IOError, error_to_raise=error_to_raise) # Here we are intentionally modifying doc in place. filled_doc['data'][key] = payload filled_doc['filled'][key] = datum_id self._current_state.key = None self._current_state.descriptor = None self._current_state.resource = None self._current_state.datum = None return filled_doc def descriptor(self, doc): self._descriptor_cache[doc['uid']] = doc return doc def __enter__(self): return self def close(self): """ Drop cached documents and handlers. They are *not* explicitly cleared, so if there are other references to these caches they will remain. """ # Drop references to the caches. If the user holds another reference to # them it's the user's problem to manage their lifecycle. If the user # does not (e.g. they are the default caches) the gc will look after # them. self._closed = True self._handler_cache = None self._resource_cache = None self._datum_cache = None self._descriptor_cache = None @property def closed(self): return self._closed def clear_handler_cache(self): """ Clear any cached handler instances. This operation may free significant memory, depending on the implementation of the handlers. """ self._handler_cache.clear() def clear_document_caches(self): """ Clear any cached documents. """ self._resource_cache.clear() self._descriptor_cache.clear() self._datum_cache.clear() def __exit__(self, *exc_details): self.close() def __call__(self, name, doc, validate=False): if self._closed: raise EventModelRuntimeError( "This Filler has been closed and is no longer usable.") return super().__call__(name, doc, validate) def _attempt_with_retries(func, args, kwargs, intervals, error_to_catch, error_to_raise): """ Return func(*args, **kwargs), using a retry loop. func, args, kwargs: self-explanatory intervals: list How long to wait (seconds) between each attempt including the first. error_to_catch: Exception class If this is raised, retry. error_to_raise: Exception instance or class If we run out of retries, raise this from the proximate error. """ error = None for interval in intervals: ttime.sleep(interval) try: return func(*args, **kwargs) except error_to_catch as error_: # The file may not be visible on the filesystem yet. # Wait and try again. Stash the error in a variable # that we can access later if we run out of attempts. error = error_ else: break else: # We have used up all our attempts. There seems to be an # actual problem. Raise specified error from the error stashed above. raise error_to_raise from error class NoFiller(Filler): """ This does not fill the documents; it merely validates them. It checks that all the references between the documents are resolvable and *could* be filled. This is useful when the filling will be done later, as a delayed computation, but we want to make sure in advance that we have all the information that we will need when that computation occurs. """ def __init__(self, *args, **kwargs): # Do not make Filler make copies because we are not going to alter the # documents anyway. kwargs.setdefault('inplace', True) super().__init__(*args, **kwargs) def fill_event_page(self, doc, include=None, exclude=None): filled_events = [] for event_doc in unpack_event_page(doc): filled_events.append(self.fill_event(event_doc, include=include, exclude=exclude, inplace=True)) filled_doc = pack_event_page(*filled_events) return filled_doc def fill_event(self, doc, include=None, exclude=None, inplace=None): descriptor = self._descriptor_cache[doc['descriptor']] from_datakeys = False try: needs_filling = {key for key, val in doc['filled'].items() if val is False} except KeyError: # This document is not telling us which, if any, keys are filled. # Infer that none of the external data is filled. needs_filling = {key for key, val in descriptor['data_keys'].items() if 'external' in val} from_datakeys = True for key in needs_filling: if exclude is not None and key in exclude: continue if include is not None and key not in include: continue try: datum_id = doc['data'][key] except KeyError as err: if from_datakeys: raise MismatchedDataKeys( "The documents are not valid. Either because they " "were recorded incorrectly in the first place, " "corrupted since, or exercising a yet-undiscovered " "bug in a reader. event['data'].keys() " "must equal descriptor['data_keys'].keys(). " f"event['data'].keys(): {doc['data'].keys()}, " "descriptor['data_keys'].keys(): " f"{descriptor['data_keys'].keys()}") from err else: raise MismatchedDataKeys( "The documents are not valid. Either because they " "were recorded incorrectly in the first place, " "corrupted since, or exercising a yet-undiscovered " "bug in a reader. event['filled'].keys() " "must be a subset of event['data'].keys(). " f"event['data'].keys(): {doc['data'].keys()}, " "event['filled'].keys(): " f"{doc['filled'].keys()}") from err # Look up the cached Datum doc. try: datum_doc = self._datum_cache[datum_id] except KeyError as err: err_with_key = UnresolvableForeignKeyError( datum_id, f"Event with uid {doc['uid']} refers to unknown Datum " f"datum_id {datum_id}") err_with_key.key = datum_id raise err_with_key from err resource_uid = datum_doc['resource'] # Look up the cached Resource. try: self._resource_cache[resource_uid] except KeyError as err: raise UnresolvableForeignKeyError( datum_id, f"Datum with id {datum_id} refers to unknown Resource " f"uid {resource_uid}") from err return doc DOCS_PASSED_IN_1_14_0_WARNING = ( "The callback {callback!r} raised {err!r} when " "RunRouter passed it a {name!r} document. This is " "probably because in earlier releases the RunRouter " "expected its factory functions to forward the 'start' " "document, but starting in event-model 1.14.0 the " "RunRouter passes in the document, causing the " "callback to receive it twice and potentially raise " "an error. Update the factory function. In a future " "release this warning will become an error.") class RunRouter(DocumentRouter): """ Routes documents, by run, to callbacks it creates from factory functions. A RunRouter is callable, and it has the signature ``router(name, doc)``, suitable for subscribing to the RunEngine. It is configured with a list of factory functions that produce callbacks in a two-layered scheme, described below. .. warning:: This is experimental. In a future release, it may be changed in a backward-incompatible way or fully removed. Parameters ---------- factories : list A list of callables with the signature:: factory('start', start_doc) -> List[Callbacks], List[SubFactories] which should return two lists, which may be empty. All items in the first list should be callbacks --- callables with the signature:: callback(name, doc) that will receive that RunStart document and all subsequent documents from the run including the RunStop document. All items in the second list should be "subfactories" with the signature:: subfactory('descriptor', descriptor_doc) -> List[Callbacks] These will receive each of the EventDescriptor documents for the run, as they arrive. They must return one list, which may be empty, containing callbacks that will receive the RunStart document, that EventDescriptor, all Events that reference that EventDescriptor and finally the RunStop document for the run. handler_registry : dict, optional This is passed to the Filler or whatever class is given in the filler_class parametr below. Maps each 'spec' (a string identifying a given type or external resource) to a handler class. A 'handler class' may be any callable with the signature:: handler_class(full_path, **resource_kwargs) It is expected to return an object, a 'handler instance', which is also callable and has the following signature:: handler_instance(**datum_kwargs) As the names 'handler class' and 'handler instance' suggest, this is typically implemented using a class that implements ``__init__`` and ``__call__``, with the respective signatures. But in general it may be any callable-that-returns-a-callable. root_map: dict, optional This is passed to Filler or whatever class is given in the filler_class parameter below. str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a ``root`` in ``root_map`` will be loaded using the mapped ``root``. filler_class: type This is Filler by default. It can be a Filler subclass, ``functools.partial(Filler, ...)``, or any class that provides the same methods as ``DocumentRouter``. fill_or_fail: boolean, optional By default (False), if a document with a spec not in ``handler_registry`` is encountered, let it pass through unfilled. But if set to True, fill everything and `raise ``UndefinedAssetSpecification`` if some unknown spec is encountered. """ def __init__(self, factories, handler_registry=None, *, root_map=None, filler_class=Filler, fill_or_fail=False): self.factories = factories self.handler_registry = handler_registry or {} self.filler_class = filler_class self.fill_or_fail = fill_or_fail self.root_map = root_map # Map RunStart UID to "subfactory" functions that want all # EventDescriptors from that run. self._subfactories = defaultdict(list) # Callbacks that want all the documents from a given run, keyed on # RunStart UID. self._factory_cbs_by_start = defaultdict(list) # Callbacks that want all the documents from a given run, keyed on # each EventDescriptor UID in the run. self._factory_cbs_by_descriptor = defaultdict(list) # Callbacks that want documents related to a given EventDescriptor, # keyed on EventDescriptor UID. self._subfactory_cbs_by_descriptor = defaultdict(list) # Callbacks that want documents related to a given EventDescriptor, # keyed on the RunStart UID referenced by that EventDescriptor. self._subfactory_cbs_by_start = defaultdict(list) # Map RunStart UID to RunStart document. This is used to send # RunStart documents to subfactory callbacks. self._start_to_start_doc = dict() # Map RunStart UID to the list EventDescriptor. This is used to # facilitate efficient cleanup of the caches above. self._start_to_descriptors = defaultdict(list) # Map EventDescriptor UID to RunStart UID. This is used for looking up # Fillers. self._descriptor_to_start = {} # Map Resource UID to RunStart UID. self._resources = {} # Old-style Resources that do not have a RunStart UID self._unlabeled_resources = deque(maxlen=10000) # Map Runstart UID to instances of self.filler_class. self._fillers = {} def __repr__(self): return ("RunRouter([\n" + "\n".join(f" {factory}" for factory in self.factories) + "])") def start(self, start_doc): uid = start_doc['uid'] # If we get the same uid twice, weird things will happen, so check for # that and give a nice error message. if uid in self._start_to_start_doc: if self._start_to_start_doc[uid] == start_doc: raise ValueError( "RunRouter received the same 'start' document twice:\n" "{start_doc!r}" ) else: raise ValueError( "RunRouter received two 'start' documents with different " "contents but the same uid:\n" "First: {self._start_to_start_doc[uid]!r}\n" "Second: {start_doc!r}") self._start_to_start_doc[uid] = start_doc filler = self.filler_class(self.handler_registry, root_map=self.root_map, inplace=False) self._fillers[uid] = filler # No need to pass the document to filler # because Fillers do nothing with 'start'. for factory in self.factories: callbacks, subfactories = factory('start', start_doc) for callback in callbacks: try: callback('start', start_doc) except Exception as err: warnings.warn( DOCS_PASSED_IN_1_14_0_WARNING.format( callback=callback, name='start', err=err)) raise err self._factory_cbs_by_start[uid].extend(callbacks) self._subfactories[uid].extend(subfactories) def descriptor(self, descriptor_doc): uid = descriptor_doc['uid'] start_uid = descriptor_doc['run_start'] self._fillers[start_uid].descriptor(descriptor_doc) # Apply all factory cbs for this run to this descriptor, and run them. factory_cbs = self._factory_cbs_by_start[start_uid] self._factory_cbs_by_descriptor[uid].extend(factory_cbs) for callback in factory_cbs: callback('descriptor', descriptor_doc) # Let all the subfactories add any relevant callbacks. for subfactory in self._subfactories[start_uid]: callbacks = subfactory('descriptor', descriptor_doc) self._subfactory_cbs_by_start[start_uid].extend(callbacks) self._subfactory_cbs_by_descriptor[uid].extend(callbacks) for callback in callbacks: try: start_doc = self._start_to_start_doc[start_uid] callback('start', start_doc) except Exception as err: warnings.warn( DOCS_PASSED_IN_1_14_0_WARNING.format( callback=callback, name='start', err=err)) raise err try: callback('descriptor', descriptor_doc) except Exception as err: warnings.warn( DOCS_PASSED_IN_1_14_0_WARNING.format( callback=callback, name='descriptor', err=err)) raise err # Keep track of the RunStart UID -> [EventDescriptor UIDs] mapping for # purposes of cleanup in stop(). self._start_to_descriptors[start_uid].append(uid) # Keep track of the EventDescriptor UID -> RunStartUID for filling # purposes. self._descriptor_to_start[uid] = start_uid def event_page(self, doc): descriptor_uid = doc['descriptor'] start_uid = self._descriptor_to_start[descriptor_uid] try: doc = self._fillers[start_uid].event_page(doc) except UndefinedAssetSpecification: if self.fill_or_fail: raise for callback in self._factory_cbs_by_descriptor[descriptor_uid]: callback('event_page', doc) for callback in self._subfactory_cbs_by_descriptor[descriptor_uid]: callback('event_page', doc) def datum_page(self, doc): resource_uid = doc['resource'] try: start_uid = self._resources[resource_uid] except KeyError: if resource_uid not in self._unlabeled_resources: raise UnresolvableForeignKeyError( resource_uid, f"DatumPage refers to unknown Resource uid {resource_uid}") # Old Resources do not have a reference to a RunStart document, # so in turn we cannot immediately tell which run these datum # documents belong to. # Fan them out to every run currently flowing through RunRouter. If # they are not applicable they will do no harm, and this is # expected to be an increasingly rare case. for callbacks in self._factory_cbs_by_start.values(): for callback in callbacks: callback('datum_page', doc) for callbacks in self._subfactory_cbs_by_start.values(): for callback in callbacks: callback('datum_page', doc) for filler in self._fillers.values(): filler.datum_page(doc) else: self._fillers[start_uid].datum_page(doc) for callback in self._factory_cbs_by_start[start_uid]: callback('datum_page', doc) for callback in self._subfactory_cbs_by_start[start_uid]: callback('datum_page', doc) def resource(self, doc): try: start_uid = doc['run_start'] except KeyError: # Old Resources do not have a reference to a RunStart document. # Fan them out to every run currently flowing through RunRouter. If # they are not applicable they will do no harm, and this is # expected to be an increasingly rare case. self._unlabeled_resources.append(doc['uid']) for callbacks in self._factory_cbs_by_start.values(): for callback in callbacks: callback('resource', doc) for callbacks in self._subfactory_cbs_by_start.values(): for callback in callbacks: callback('resource', doc) for filler in self._fillers.values(): filler.resource(doc) else: self._fillers[start_uid].resource(doc) self._resources[doc['uid']] = doc['run_start'] for callback in self._factory_cbs_by_start[start_uid]: callback('resource', doc) for callback in self._subfactory_cbs_by_start[start_uid]: callback('resource', doc) def stop(self, doc): start_uid = doc['run_start'] for callback in self._factory_cbs_by_start[start_uid]: callback('stop', doc) for callback in self._subfactory_cbs_by_start[start_uid]: callback('stop', doc) # Clean up references. self._fillers.pop(start_uid, None) self._subfactories.pop(start_uid, None) self._factory_cbs_by_start.pop(start_uid, None) self._subfactory_cbs_by_start.pop(start_uid, None) for descriptor_uid in self._start_to_descriptors.pop(start_uid, ()): self._descriptor_to_start.pop(descriptor_uid, None) self._factory_cbs_by_descriptor.pop(descriptor_uid, None) self._subfactory_cbs_by_descriptor.pop(descriptor_uid, None) self._resources.pop(start_uid, None) self._start_to_start_doc.pop(start_uid, None) class EventModelError(Exception): ... # Here we define subclasses of all of the built-in Python exception types (as # needed, not a comprehensive list) so that all errors raised *directly* by # event_model also inhereit from EventModelError as well as the appropriate # built-in type. This means, for example, that `EventModelValueError` can be # caught by `except ValueError:` or by `except EventModelError:`. This can be # useful for higher-level libraries and for debugging. class EventModelKeyError(EventModelError, KeyError): ... class EventModelValueError(EventModelError, ValueError): ... class EventModelRuntimeError(EventModelError, RuntimeError): ... class EventModelTypeError(EventModelError, TypeError): ... class EventModelValidationError(EventModelError): ... class UnfilledData(EventModelError): """raised when unfilled data is found""" ... class UndefinedAssetSpecification(EventModelKeyError): """raised when a resource spec is missing from the handler registry""" ... class DataNotAccessible(EventModelError, IOError): """raised when attempts to load data referenced by Datum document fail""" ... class UnresolvableForeignKeyError(EventModelValueError): """when we see a foreign before we see the thing to which it refers""" def __init__(self, key, message): self.key = key self.message = message class DuplicateHandler(EventModelRuntimeError): """raised when a handler is already registered for a given spec""" ... class InvalidData(EventModelError): """raised when the data is invalid""" ... class MismatchedDataKeys(InvalidData): """ Raised when any data keys structures are out of sync. This includes, event['data'].keys(), descriptor['data_keys'].keys(), event['timestamp'].keys(), event['filled'].keys() """ ... SCHEMA_PATH = 'schemas' SCHEMA_NAMES = {DocumentNames.start: 'schemas/run_start.json', DocumentNames.stop: 'schemas/run_stop.json', DocumentNames.event: 'schemas/event.json', DocumentNames.event_page: 'schemas/event_page.json', DocumentNames.descriptor: 'schemas/event_descriptor.json', DocumentNames.datum: 'schemas/datum.json', DocumentNames.datum_page: 'schemas/datum_page.json', DocumentNames.resource: 'schemas/resource.json', # DEPRECATED: DocumentNames.bulk_events: 'schemas/bulk_events.json', DocumentNames.bulk_datum: 'schemas/bulk_datum.json'} schemas = {} for name, filename in SCHEMA_NAMES.items(): with open(rs_fn('event_model', filename)) as fin: schemas[name] = json.load(fin) # We pin jsonschema >=3.0.0 in requirements.txt but due to pip's dependency # resolution it is easy to end up with an environment where that pin is not # respected. Thus, we maintain best-effort support for 2.x. if LooseVersion(jsonschema.__version__) >= LooseVersion("3.0.0"): def _is_array(checker, instance): return ( jsonschema.validators.Draft7Validator.TYPE_CHECKER.is_type(instance, 'array') or isinstance(instance, tuple) or hasattr(instance, "__array__") ) _array_type_checker = jsonschema.validators.Draft7Validator.TYPE_CHECKER.redefine('array', _is_array) _Validator = jsonschema.validators.extend( jsonschema.validators.Draft7Validator, type_checker=_array_type_checker) schema_validators = {name: _Validator(schema=schema) for name, schema in schemas.items()} else: # Make objects that mock the one method on the jsonschema 3.x # Draft7Validator API that we need. schema_validators = { name: types.SimpleNamespace( validate=partial(jsonschema.validate, schema=schema, types={'array': (list, tuple)})) for name, schema in schemas.items() } __version__ = get_versions()['version'] del get_versions ComposeRunBundle = namedtuple('ComposeRunBundle', 'start_doc compose_descriptor compose_resource ' 'compose_stop') ComposeDescriptorBundle = namedtuple('ComposeDescriptorBundle', 'descriptor_doc compose_event compose_event_page') ComposeResourceBundle = namedtuple('ComposeResourceBundle', 'resource_doc compose_datum compose_datum_page') def compose_datum(*, resource, counter, datum_kwargs, validate=True): resource_uid = resource['uid'] doc = {'resource': resource_uid, 'datum_kwargs': datum_kwargs, 'datum_id': '{}/{}'.format(resource_uid, next(counter))} if validate: schema_validators[DocumentNames.datum].validate(doc) return doc def compose_datum_page(*, resource, counter, datum_kwargs, validate=True): resource_uid = resource['uid'] any_column, *_ = datum_kwargs.values() N = len(any_column) doc = {'resource': resource_uid, 'datum_kwargs': datum_kwargs, 'datum_id': ['{}/{}'.format(resource_uid, next(counter)) for _ in range(N)]} if validate: schema_validators[DocumentNames.datum_page].validate(doc) return doc default_path_semantics = {'posix': 'posix', 'nt': 'windows'}[os.name] def compose_resource(*, start, spec, root, resource_path, resource_kwargs, path_semantics=default_path_semantics, uid=None, validate=True): if uid is None: uid = str(uuid.uuid4()) counter = itertools.count() doc = {'uid': uid, 'run_start': start['uid'], 'spec': spec, 'root': root, 'resource_path': resource_path, 'resource_kwargs': resource_kwargs, 'path_semantics': path_semantics} if validate: schema_validators[DocumentNames.resource].validate(doc) return ComposeResourceBundle( doc, partial(compose_datum, resource=doc, counter=counter), partial(compose_datum_page, resource=doc, counter=counter)) def compose_stop(*, start, event_counter, poison_pill, exit_status='success', reason='', uid=None, time=None, validate=True): if poison_pill: raise EventModelError("Already composed a RunStop document for run " "{!r}.".format(start['uid'])) poison_pill.append(object()) if uid is None: uid = str(uuid.uuid4()) if time is None: time = ttime.time() doc = {'uid': uid, 'time': time, 'run_start': start['uid'], 'exit_status': exit_status, 'reason': reason, 'num_events': {k: v - 1 for k, v in event_counter.items()}} if validate: schema_validators[DocumentNames.stop].validate(doc) return doc def compose_event_page(*, descriptor, event_counter, data, timestamps, seq_num, filled=None, uid=None, time=None, validate=True): N = len(seq_num) if uid is None: uid = [str(uuid.uuid4()) for _ in range(N)] if time is None: time = [ttime.time()] * N if filled is None: filled = {} doc = {'uid': uid, 'time': time, 'data': data, 'timestamps': timestamps, 'seq_num': seq_num, 'filled': filled, 'descriptor': descriptor['uid']} if validate: schema_validators[DocumentNames.event_page].validate(doc) if not (descriptor['data_keys'].keys() == data.keys() == timestamps.keys()): raise EventModelValidationError( "These sets of keys must match:\n" "event['data'].keys(): {}\n" "event['timestamps'].keys(): {}\n" "descriptor['data_keys'].keys(): {}\n".format( data.keys(), timestamps.keys(), descriptor['data_keys'].keys())) if set(filled) - set(data): raise EventModelValidationError( "Keys in event['filled'] {} must be a subset of those in " "event['data'] {}".format(filled.keys(), data.keys())) event_counter[descriptor['name']] += len(data) return doc def compose_event(*, descriptor, event_counter, data, timestamps, seq_num=None, filled=None, uid=None, time=None, validate=True): if seq_num is None: seq_num = event_counter[descriptor['name']] if uid is None: uid = str(uuid.uuid4()) if time is None: time = ttime.time() if filled is None: filled = {} doc = {'uid': uid, 'time': time, 'data': data, 'timestamps': timestamps, 'seq_num': seq_num, 'filled': filled, 'descriptor': descriptor['uid']} if validate: schema_validators[DocumentNames.event].validate(doc) if not (descriptor['data_keys'].keys() == data.keys() == timestamps.keys()): raise EventModelValidationError( "These sets of keys must match:\n" "event['data'].keys(): {}\n" "event['timestamps'].keys(): {}\n" "descriptor['data_keys'].keys(): {}\n".format( data.keys(), timestamps.keys(), descriptor['data_keys'].keys())) if set(filled) - set(data): raise EventModelValidationError( "Keys in event['filled'] {} must be a subset of those in " "event['data'] {}".format(filled.keys(), data.keys())) event_counter[descriptor['name']] += 1 return doc def compose_descriptor(*, start, streams, event_counter, name, data_keys, uid=None, time=None, object_keys=None, configuration=None, hints=None, validate=True): if uid is None: uid = str(uuid.uuid4()) if time is None: time = ttime.time() if object_keys is None: object_keys = {} if configuration is None: configuration = {} if hints is None: hints = {} doc = {'uid': uid, 'time': time, 'run_start': start['uid'], 'name': name, 'data_keys': data_keys, 'object_keys': object_keys, 'hints': hints, 'configuration': configuration} if validate: if name in streams and streams[name] != set(data_keys): raise EventModelValidationError( "A descriptor with the name {} has already been composed with " "data_keys {}. The requested data_keys were {}. All " "descriptors in a given stream must have the same " "data_keys.".format(name, streams[name], set(data_keys))) schema_validators[DocumentNames.descriptor].validate(doc) if name not in streams: streams[name] = set(data_keys) event_counter[name] = 1 return ComposeDescriptorBundle( doc, partial(compose_event, descriptor=doc, event_counter=event_counter), partial(compose_event_page, descriptor=doc, event_counter=event_counter)) def compose_run(*, uid=None, time=None, metadata=None, validate=True): """ Compose a RunStart document and factory functions for related documents. Parameters ---------- uid : string, optional Unique identifier for this run, conventionally a UUID4. If None is given, a UUID4 will be generated. time : float, optional UNIX epoch time of start of this run. If None is given, the current time will be used. metadata : dict, optional Additional metadata include the document validate : boolean, optional Validate this document conforms to the schema. Returns ------- ComposeRunBundle """ if uid is None: uid = str(uuid.uuid4()) if time is None: time = ttime.time() if metadata is None: metadata = {} doc = dict(uid=uid, time=time, **metadata) # Define some mutable state to be shared internally by the closures composed # below. streams = {} event_counter = {} poison_pill = [] if validate: schema_validators[DocumentNames.start].validate(doc) return ComposeRunBundle( doc, partial(compose_descriptor, start=doc, streams=streams, event_counter=event_counter), partial(compose_resource, start=doc), partial(compose_stop, start=doc, event_counter=event_counter, poison_pill=poison_pill)) def pack_event_page(*events): """ Transform one or more Event documents into an EventPage document. Parameters ---------- *event : dicts any number of Event documents Returns ------- event_page : dict """ if not events: raise ValueError( "The pack_event_page() function was called with empty *args. " "Cannot create an EventPage from an empty collection of Events " "because the 'descriptor' field in an EventPage cannot be NULL.") time_list = [] uid_list = [] seq_num_list = [] data_list = [] filled_list = [] timestamps_list = [] for event in events: time_list.append(event['time']) uid_list.append(event['uid']) seq_num_list.append(event['seq_num']) filled_list.append(event.get('filled', {})) data_list.append(event['data']) timestamps_list.append(event['timestamps']) event_page = {'time': time_list, 'uid': uid_list, 'seq_num': seq_num_list, 'descriptor': event['descriptor'], 'filled': _transpose_list_of_dicts(filled_list), 'data': _transpose_list_of_dicts(data_list), 'timestamps': _transpose_list_of_dicts(timestamps_list)} return event_page def unpack_event_page(event_page): """ Transform an EventPage document into individual Event documents. Parameters ---------- event_page : dict Yields ------ event : dict """ descriptor = event_page['descriptor'] data_list = _transpose_dict_of_lists(event_page['data']) timestamps_list = _transpose_dict_of_lists(event_page['timestamps']) filled_list = _transpose_dict_of_lists(event_page.get('filled', {})) for uid, time, seq_num, data, timestamps, filled in itertools.zip_longest( event_page['uid'], event_page['time'], event_page['seq_num'], data_list, timestamps_list, filled_list, fillvalue={}): event = {'descriptor': descriptor, 'uid': uid, 'time': time, 'seq_num': seq_num, 'data': data, 'timestamps': timestamps, 'filled': filled} yield event def pack_datum_page(*datum): """ Transform one or more Datum documents into a DatumPage document. Parameters ---------- *datum : dicts any number of Datum documents Returns ------- datum_page : dict """ if not datum: raise ValueError( "The pack_datum_page() function was called with empty *args. " "Cannot create an DatumPage from an empty collection of Datum " "because the 'resource' field in a DatumPage cannot be NULL.") datum_id_list = [] datum_kwarg_list = [] for datum_ in datum: datum_id_list.append(datum_['datum_id']) datum_kwarg_list.append(datum_['datum_kwargs']) datum_page = {'resource': datum_['resource'], 'datum_id': datum_id_list, 'datum_kwargs': _transpose_list_of_dicts(datum_kwarg_list)} return datum_page def unpack_datum_page(datum_page): """ Transform a DatumPage document into individual Datum documents. Parameters ---------- datum_page : dict Yields ------ datum : dict """ resource = datum_page['resource'] datum_kwarg_list = _transpose_dict_of_lists(datum_page['datum_kwargs']) for datum_id, datum_kwargs in itertools.zip_longest( datum_page['datum_id'], datum_kwarg_list, fillvalue={}): datum = {'datum_id': datum_id, 'datum_kwargs': datum_kwargs, 'resource': resource} yield datum def rechunk_event_pages(event_pages, chunk_size): """ Resizes the event_pages in a iterable of event_pages. Parameters ---------- event_pages: Iterabile An iterable of event_pages chunk_size: integer Size of pages to yield Yields ------ event_page : dict """ remainder = chunk_size chunk_list = [] def page_chunks(page, chunk_size, remainder): """ Yields chunks of a event_page. The first chunk will be of size remainder, the following chunks will be of size chunk_size. The last chunk will be what ever is left over. """ array_keys = ['seq_num', 'time', 'uid'] page_size = len(page['uid']) # Number of events in the page. # Make a list of the chunk indexes. chunks = [(0, remainder)] chunks.extend([(i, i + chunk_size) for i in range(remainder, page_size, chunk_size)]) for start, stop in chunks: yield {'descriptor': page['descriptor'], **{key: page[key][start:stop] for key in array_keys}, 'data': {key: page['data'][key][start:stop] for key in page['data'].keys()}, 'timestamps': {key: page['timestamps'][key][start: stop] for key in page['timestamps'].keys()}, 'filled': {key: page['filled'][key][start:stop] for key in page['data'].keys()}} for page in event_pages: new_chunks = page_chunks(page, chunk_size, remainder) for chunk in new_chunks: remainder -= len(chunk['uid']) # Subtract the size of the chunk. chunk_list.append(chunk) if remainder == 0: yield merge_event_pages(chunk_list) remainder = chunk_size chunk_list = [] if chunk_list: yield merge_event_pages(chunk_list) def merge_event_pages(event_pages): """ Combines a iterable of event_pages to a single event_page. Parameters ---------- event_pages: Iterabile An iterable of event_pages Returns ------ event_page : dict """ pages = list(event_pages) if len(pages) == 1: return pages[0] array_keys = ['seq_num', 'time', 'uid'] return {'descriptor': pages[0]['descriptor'], **{key: list(itertools.chain.from_iterable( [page[key] for page in pages])) for key in array_keys}, 'data': {key: list(itertools.chain.from_iterable( [page['data'][key] for page in pages])) for key in pages[0]['data'].keys()}, 'timestamps': {key: list(itertools.chain.from_iterable( [page['timestamps'][key] for page in pages])) for key in pages[0]['data'].keys()}, 'filled': {key: list(itertools.chain.from_iterable( [page['filled'][key] for page in pages])) for key in pages[0]['data'].keys()}} def rechunk_datum_pages(datum_pages, chunk_size): """ Resizes the datum_pages in a iterable of event_pages. Parameters ---------- datum_pages: Iterabile An iterable of datum_pages chunk_size: integer Size of pages to yield Yields ------ datum_page : dict """ remainder = chunk_size chunk_list = [] def page_chunks(page, chunk_size, remainder): """ Yields chunks of a datum_page. The first chunk will be of size remainder, the following chunks will be of size chunk_size. The last chunk will be what ever is left over. """ array_keys = ['datum_id'] page_size = len(page['datum_id']) # Number of datum in the page. # Make a list of the chunk indexes. chunks = [(0, remainder)] chunks.extend([(i, i + chunk_size) for i in range(remainder, page_size, chunk_size)]) for start, stop in chunks: yield {'resource': page['resource'], **{key: page[key][start:stop] for key in array_keys}, 'datum_kwargs': {key: page['datum_kwargs'][key][start:stop] for key in page['datum_kwargs'].keys()}} for page in datum_pages: new_chunks = page_chunks(page, chunk_size, remainder) for chunk in new_chunks: remainder -= len(chunk['datum_id']) # Subtract the size of the chunk. chunk_list.append(chunk) if remainder == 0: yield merge_datum_pages(chunk_list) remainder = chunk_size chunk_list = [] if chunk_list: yield merge_datum_pages(chunk_list) def merge_datum_pages(datum_pages): """ Combines a iterable of datum_pages to a single datum_page. Parameters ---------- datum_pages: Iterabile An iterable of datum_pages Returns ------ datum_page : dict """ pages = list(datum_pages) if len(pages) == 1: return pages[0] array_keys = ['datum_id'] return {'resource': pages[0]['resource'], **{key: list(itertools.chain.from_iterable( [page[key] for page in pages])) for key in array_keys}, 'datum_kwargs': {key: list(itertools.chain.from_iterable( [page['datum_kwargs'][key] for page in pages])) for key in pages[0]['datum_kwargs'].keys()}} def bulk_events_to_event_pages(bulk_events): """ Transform a BulkEvents document into a list of EventPage documents. Note: The BulkEvents layout has been deprecated in favor of EventPage. Parameters ---------- bulk_events : dict Returns ------- event_pages : list """ # This is for a deprecated document type, so we are not being fussy # about efficiency/laziness here. event_pages = {} # descriptor uid mapped to page for events in bulk_events.values(): for event in events: descriptor = event['descriptor'] try: page = event_pages[descriptor] except KeyError: page = {'time': [], 'uid': [], 'seq_num': [], 'descriptor': descriptor} page['data'] = {k: [] for k in event['data']} page['timestamps'] = {k: [] for k in event['timestamps']} page['filled'] = {k: [] for k in event.get('filled', {})} event_pages[descriptor] = page page['uid'].append(event['uid']) page['time'].append(event['time']) page['seq_num'].append(event['seq_num']) page_data = page['data'] for k, v in event['data'].items(): page_data[k].append(v) page_timestamps = page['timestamps'] for k, v in event['timestamps'].items(): page_timestamps[k].append(v) page_filled = page['filled'] for k, v in event.get('filled', {}).items(): page_filled[k].append(v) return list(event_pages.values()) def bulk_datum_to_datum_page(bulk_datum): """ Transform one BulkDatum into one DatumPage. Note: There is only one known usage of BulkDatum "in the wild", and the BulkDatum layout has been deprecated in favor of DatumPage. """ datum_page = {'datum_id': bulk_datum['datum_ids'], 'resource': bulk_datum['resource'], 'datum_kwargs': _transpose_list_of_dicts( bulk_datum['datum_kwarg_list'])} return datum_page def _transpose_list_of_dicts(list_of_dicts): "Transform list-of-dicts into dict-of-lists (i.e. DataFrame-like)." dict_of_lists = defaultdict(list) for row in list_of_dicts: for k, v in row.items(): dict_of_lists[k].append(v) return dict(dict_of_lists) def _transpose_dict_of_lists(dict_of_lists): "Transform dict-of-lists (i.e. DataFrame-like) into list-of-dicts." list_of_dicts = [] keys = list(dict_of_lists) for row in zip(*(dict_of_lists[k] for k in keys)): list_of_dicts.append(dict(zip(keys, row))) return list_of_dicts def verify_filled(event_page): '''Take an event_page document and verify that it is completely filled. Parameters ---------- event_page : event_page document The event page document to check Raises ------ UnfilledData Raised if any of the data in the event_page is unfilled, when raised it inlcudes a list of unfilled data objects in the exception message. ''' if not all(map(all, event_page['filled'].values())): # check that all event_page data is filled. unfilled_data = [] for field, filled in event_page['filled'].items(): if not all(filled): unfilled_data.append(field) raise UnfilledData(f"Unfilled data found in fields " f"{unfilled_data!r}. Use " f"`event_model.Filler`.") def sanitize_doc(doc): '''Return a copy with any numpy objects converted to built-in Python types. This function takes in an event-model document and returns a copy with any numpy objects converted to built-in Python types. It is useful for sanitizing documents prior to sending to any consumer that does not recognize numpy types, such as a MongoDB database or a JSON encoder. Parameters ---------- doc : dict The event-model document to be sanitized Returns ------- sanitized_doc : event-model document The event-model document with numpy objects converted to built-in Python types. ''' return json.loads(json.dumps(doc, cls=NumpyEncoder)) class NumpyEncoder(json.JSONEncoder): """ A json.JSONEncoder for encoding numpy objects using built-in Python types. Examples -------- Encode a Python object that includes an arbitrarily-nested numpy object. >>> json.dumps({'a': {'b': numpy.array([1, 2, 3])}}, cls=NumpyEncoder) """ # Credit: https://stackoverflow.com/a/47626762/1221924 def default(self, obj): try: import dask.array if isinstance(obj, dask.array.Array): obj = numpy.asarray(obj) except ImportError: pass if isinstance(obj, (numpy.generic, numpy.ndarray)): if numpy.isscalar(obj): return obj.item() return obj.tolist() return json.JSONEncoder.default(self, obj)