API Documentation#

This is the internal API reference for event_model

event_model.__version__: str#

Version number as calculated by pypa/setuptools_scm

Schemas and Names#

The event-model Python package contains tooling for composing, validating, and transforming documents in the model.

class event_model.DocumentNames(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
stop = 'stop'#
start = 'start'#
descriptor = 'descriptor'#
event = 'event'#
datum = 'datum'#
resource = 'resource'#
event_page = 'event_page'#
datum_page = 'datum_page'#
stream_resource = 'stream_resource'#
stream_datum = 'stream_datum'#
bulk_datum = 'bulk_datum'#
bulk_events = 'bulk_events'#

There are two dictionaries, event_model.schemas and event_model.schema_validators, which are keyed on the members of the event_model.DocumentNames enum and which are mapped, respectively, to a schema and an associated jsonschema.IValidator.

Routers#

class event_model.RunRouter(factories, handler_registry: dict | None = None, *, root_map: dict | None = None, filler_class: ~typing.Type[~event_model.Filler] = <class 'event_model.Filler'>, fill_or_fail: bool = False)[source]#

Routes documents, by run, to callbacks it creates from factory functions.

A RunRouter is callable, and it has the signature router(name, doc), suitable for subscribing to the RunEngine.

It is configured with a list of factory functions that produce callbacks in a two-layered scheme, described below.

Warning

This is experimental. In a future release, it may be changed in a backward-incompatible way or fully removed.

Parameters:
  • factories (list) –

    A list of callables with the signature:

    factory('start', start_doc) -> List[Callbacks], List[SubFactories]
    

    which should return two lists, which may be empty. All items in the first list should be callbacks — callables with the signature:

    callback(name, doc)
    

    that will receive that RunStart document and all subsequent documents from the run including the RunStop document. All items in the second list should be “subfactories” with the signature:

    subfactory('descriptor', descriptor_doc) -> List[Callbacks]
    

    These will receive each of the EventDescriptor documents for the run, as they arrive. They must return one list, which may be empty, containing callbacks that will receive the RunStart document, that EventDescriptor, all Events that reference that EventDescriptor and finally the RunStop document for the run.

  • handler_registry (dict, optional) –

    This is passed to the Filler or whatever class is given in the filler_class parametr below.

    Maps each ‘spec’ (a string identifying a given type or external resource) to a handler class.

    A ‘handler class’ may be any callable with the signature:

    handler_class(full_path, **resource_kwargs)
    

    It is expected to return an object, a ‘handler instance’, which is also callable and has the following signature:

    handler_instance(**datum_kwargs)
    

    As the names ‘handler class’ and ‘handler instance’ suggest, this is typically implemented using a class that implements __init__ and __call__, with the respective signatures. But in general it may be any callable-that-returns-a-callable.

  • root_map (dict, optional) –

    This is passed to Filler or whatever class is given in the filler_class parameter below.

    str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a root in root_map will be loaded using the mapped root.

  • filler_class (type) – This is Filler by default. It can be a Filler subclass, functools.partial(Filler, ...), or any class that provides the same methods as DocumentRouter.

  • fill_or_fail (boolean, optional) – By default (False), if a document with a spec not in handler_registry is encountered, let it pass through unfilled. But if set to True, fill everything and raise UndefinedAssetSpecification if some unknown spec is encountered.

start(start_doc: RunStart) None[source]#
descriptor(descriptor_doc: EventDescriptor) None[source]#
event_page(doc: EventPage)[source]#
datum_page(doc: DatumPage) None[source]#
stream_datum(doc: StreamDatum) None[source]#
resource(doc: Resource) None[source]#
stream_resource(doc: StreamResource) None[source]#
stop(doc: RunStop) None[source]#
class event_model.SingleRunDocumentRouter[source]#

A DocumentRouter intended to process events from exactly one run.

get_start() dict[source]#

Convenience method returning the start document for the associated run.

If no start document has been processed EventModelError will be raised.

Returns:

start document

Return type:

dict

get_descriptor(doc: dict) EventDescriptor[source]#

Convenience method returning the descriptor associated with the specified document.

Parameters:

doc (dict) – event-model document

Returns:

descriptor document

Return type:

EventDescriptor

get_stream_name(doc: dict) str[source]#

Convenience method returning the name of the stream for the specified document.

Parameters:

doc (dict) – event-model document

Returns:

stream name

Return type:

str

class event_model.DocumentRouter(*, emit: Callable | None = None)[source]#

Route each document by type to a corresponding method.

When an instance is called with a document type and a document like:

router(name, doc)

the document is passed to the method of the corresponding name, as in:

getattr(router, name)(doc)

The method is expected to return None or a valid document of the same type. It may be the original instance (passed through), a copy, or a different dict altogether.

Finally, the call to router(name, doc) returns:

(name, getattr(router, name)(doc))
Parameters:

emit (callable, optional) – Expected signature f(name, doc)

emit(name: str, doc: dict) None[source]#

Emit to the callable provided an instantiation time, if any.

start(doc: RunStart) RunStart | None[source]#
stop(doc: RunStop) RunStop | None[source]#
descriptor(doc: EventDescriptor) EventDescriptor | None[source]#
resource(doc: Resource) Resource | None[source]#
event(doc: Event) Event[source]#
datum(doc: Datum) Datum[source]#
event_page(doc: EventPage) EventPage[source]#
datum_page(doc: DatumPage) DatumPage | None[source]#
stream_datum(doc: StreamDatum) StreamDatum | None[source]#
stream_resource(doc: StreamResource) StreamResource | None[source]#
bulk_events(doc: dict) None[source]#
bulk_datum(doc: dict) None[source]#
class event_model.Filler(handler_registry: dict, *, include: Iterable | None = None, exclude: Iterable | None = None, root_map: dict | None = None, coerce: str = 'as_is', handler_cache: dict | None = None, resource_cache: dict | None = None, datum_cache: dict | None = None, descriptor_cache: dict | None = None, stream_resource_cache: dict | None = None, stream_datum_cache: dict | None = None, inplace: bool | None = None, retry_intervals: List = [0.001, 0.002, 0.004, 0.008, 0.016, 0.032, 0.064, 0.128, 0.256, 0.512, 1.024])[source]#

Pass documents through, loading any externally-referenced data.

It is recommended to use the Filler as a context manager. Because the Filler manages caches of potentially expensive resources (e.g. large data in memory) managing its lifecycle is important. If used as a context manager, it will drop references to its caches upon exit from the context. Unless the user holds additional references to those caches, they will be garbage collected.

But for some applications, such as taking multiple passes over the same data, it may be useful to keep a longer-lived Filler instance and then manually delete it when finished.

See Examples below.

Parameters:
  • handler_registry (dict) –

    Maps each ‘spec’ (a string identifying a given type or external resource) to a handler class.

    A ‘handler class’ may be any callable with the signature:

    handler_class(full_path, **resource_kwargs)
    

    It is expected to return an object, a ‘handler instance’, which is also callable and has the following signature:

    handler_instance(**datum_kwargs)
    

    As the names ‘handler class’ and ‘handler instance’ suggest, this is typically implemented using a class that implements __init__ and __call__, with the respective signatures. But in general it may be any callable-that-returns-a-callable.

  • include (Iterable) – The set of fields to fill. By default all unfilled fields are filled. This parameter is mutually incompatible with the exclude parameter.

  • exclude (Iterable) – The set of fields to skip filling. By default all unfilled fields are filled. This parameter is mutually incompatible with the include parameter.

  • root_map (dict) – str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a root in root_map will be loaded using the mapped root.

  • coerce ({'as_is', 'numpy'}) – Default is ‘as_is’. Other options (e.g. ‘delayed’) may be registered by external packages at runtime.

  • handler_cache (dict, optional) – A cache of handler instances. If None, a dict is used.

  • resource_cache (dict, optional) – A cache of Resource documents. If None, a dict is used.

  • datum_cache (dict, optional) – A cache of Datum documents. If None, a dict is used.

  • descriptor_cache (dict, optional) – A cache of EventDescriptor documents. If None, a dict is used.

  • stream_resource_cache (dict, optional) – A cache of StreamResource documents. If None, a dict is used.

  • stream_datum_cache (dict, optional) – A cache of StreamDatum documents. If None, a dict is used.

  • retry_intervals (Iterable, optional) – If data is not found on the first try, there may a race between the I/O systems creating the external data and this stream of Documents that reference it. If Filler encounters an IOError it will wait a bit and retry. This list specifies how long to sleep (in seconds) between subsequent attempts. Set to None to try only once before raising DataNotAccessible. A subclass may catch this exception and implement a different retry mechanism — for example using a different implementation of sleep from an async framework. But by default, a sequence of several retries with increasing sleep intervals is used. The default sequence should not be considered stable; it may change at any time as the authors tune it.

Raises:

DataNotAccessible – If an IOError is raised when loading the data after the configured number of attempts. See the retry_intervals parameter for details.

Examples

A Filler may be used as a context manager.

>>> with Filler(handler_registry) as filler:
...     for name, doc in stream:
...         filler(name, doc)  # mutates doc in place
...         # Do some analysis or export with name and doc.

Or as a long-lived object.

>>> f = Filler(handler_registry)
>>> for name, doc in stream:
...     filler(name, doc)  # mutates doc in place
...     # Do some analysis or export with name and doc.
...
>>> del filler  # Free up memory from potentially large caches.
clone(handler_registry: dict | None = None, *, root_map: dict | None = None, coerce: str | None = None, handler_cache: dict | None = None, resource_cache: dict | None = None, datum_cache: dict | None = None, descriptor_cache: dict | None = None, stream_resource_cache: dict | None = None, stream_datum_cache: dict | None = None, inplace: bool | None = None, retry_intervals: List | None = None) Filler[source]#

Create a new Filler instance from this one.

By default it will be created with the same settings that this Filler has. Individual settings may be overridden here.

The clone does not share any caches or internal state with the original.

register_handler(spec: str, handler: Any, overwrite: bool = False) None[source]#

Register a handler.

Parameters:
  • spec (str) –

  • handler (Handler) –

  • overwrite (boolean, optional) – False by default

Raises:
  • DuplicateHandler – If a handler is already registered for spec and overwrite is False

  • See https://blueskyproject.io/event-model/external.html

deregister_handler(spec: str) Any[source]#

Deregister a handler.

If no handler is registered for this spec, it is no-op and returns None.

Parameters:

spec (str) –

Returns:

  • handler (Handler or None)

  • See https (//blueskyproject.io/event-model/external.html)

get_handler(resource: Resource) Any[source]#

Return a new Handler instance for this Resource.

Parameters:

resource (Resource) –

Returns:

handler

Return type:

Handler

close() None[source]#

Drop cached documents and handlers.

They are not explicitly cleared, so if there are other references to these caches they will remain.

clear_handler_cache() None[source]#

Clear any cached handler instances.

This operation may free significant memory, depending on the implementation of the handlers.

clear_document_caches() None[source]#

Clear any cached documents.

class event_model.NoFiller(*args, **kwargs)[source]#

This does not fill the documents; it merely validates them.

It checks that all the references between the documents are resolvable and could be filled. This is useful when the filling will be done later, as a delayed computation, but we want to make sure in advance that we have all the information that we will need when that computation occurs.

event_model.register_coercion(name: str, func: Callable, overwrite: bool = False) None[source]#

Register a new option for Filler’s coerce argument.

This is an advanced feature. See source code for comments and examples.

Parameters:
  • name (string) – The new value for coerce that will invoke this function.

  • func (callable) –

    Expected signature:

    func(filler, handler_class) -> handler_class
    

  • overwrite (boolean, optional) – False by default. Name collissions will raise EventModelValueError unless this is set to True.

event_model.as_is(handler_class, filler_state) Type[source]#

A no-op coercion function that returns handler_class unchanged.

event_model.force_numpy(handler_class: Type, filler_state) Any[source]#

A coercion that makes handler_class.__call__ return actual numpy.ndarray.

Document Minting#

To use these functions start with compose_run() which will return a ComposeRunBundle.

event_model.compose_run(*, uid: str | None = None, time: float | None = None, metadata: Dict | None = None, validate: bool = True, event_counters: Dict[str, int] | None = None) ComposeRunBundle[source]#

Compose a RunStart document and factory functions for related documents.

Parameters:
  • uid (string, optional) – Unique identifier for this run, conventionally a UUID4. If None is given, a UUID4 will be generated.

  • time (float, optional) – UNIX epoch time of start of this run. If None is given, the current time will be used.

  • metadata (dict, optional) – Additional metadata include the document

  • validate (boolean, optional) – Validate this document conforms to the schema.

  • event_counters (dict, optional) – A dict for counting events, when an event is composed by any of the descriptors composed by this run, the element in this dict with the key of the descriptor name will be increased by 1.

Return type:

ComposeRunBundle

class event_model.ComposeRunBundle(start_doc: RunStart, compose_descriptor: ComposeDescriptor, compose_resource: ComposeResource, compose_stop: ComposeStop, compose_stream_resource: ComposeStreamResource | None = None)[source]#

Extensible compose run bundle. This maintains backward compatibility by unpacking into a basic run bundle (start, compose_descriptor, compose_resource, stop). Further extensions are optional and require keyword referencing (i.e. compose_stream_resource).

event_model.compose_descriptor(*, start: RunStart, streams: Dict[str, Iterable], event_counters: Dict[str, int], name: str, data_keys: Dict[str, DataKey], uid: str | None = None, time: float | None = None, object_keys: Dict[str, Any] | None = None, configuration: Dict[str, Configuration] | None = None, hints: PerObjectHint | None = None, validate: bool = True) ComposeDescriptorBundle[source]#

Here for backwards compatibility, the Compose class is prefered.

class event_model.ComposeDescriptorBundle(descriptor_doc: event_model.documents.event_descriptor.EventDescriptor, compose_event: event_model.ComposeEvent, compose_event_page: event_model.ComposeEventPage)[source]#
event_model.compose_event(*, descriptor: EventDescriptor, event_counters: Dict[str, int], data: Dict[str, Any], timestamps: Dict[str, Any], seq_num: int, filled: Dict[str, bool | str] | None = None, uid: str | None = None, time: float | None = None, validate: bool = True) Event[source]#

Here for backwards compatibility, the Compose class is prefered.

event_model.compose_event_page(*, descriptor: EventDescriptor, event_counters: Dict[str, int], data: Dict[str, List], timestamps: Dict[str, Any], seq_num: List[int], filled: Dict[str, List[bool | str]] | None = None, uid: List | None = None, time: List | None = None, validate: bool = True) EventPage[source]#

Here for backwards compatibility, the Compose class is prefered.

event_model.compose_resource(*, spec: str, root: str, resource_path: str, resource_kwargs: Dict[str, Any], path_semantics: Literal['posix', 'windows'] = 'posix', start: RunStart | None = None, uid: str | None = None, validate: bool = True) ComposeResourceBundle[source]#

Here for backwards compatibility, the Compose class is prefered.

class event_model.ComposeResourceBundle(resource_doc: event_model.documents.resource.Resource, compose_datum: event_model.ComposeDatum, compose_datum_page: event_model.ComposeDatumPage)[source]#
event_model.compose_datum(*, resource: Resource, counter: Iterator, datum_kwargs: Dict[str, Any], validate: bool = True) Datum[source]#

Here for backwards compatibility, the Compose class is prefered.

event_model.compose_datum_page(*, resource: Resource, counter: Iterator, datum_kwargs: Dict[str, List[Any]], validate: bool = True) DatumPage[source]#

Here for backwards compatibility, the Compose class is prefered.

event_model.compose_stop(*, start: RunStart, event_counters: Dict[str, int], poison_pill: List, exit_status: Literal['success', 'abort', 'fail'] = 'success', reason: str = '', uid: str | None = None, time: float | None = None, validate: bool = True) RunStop[source]#

Here for backwards compatibility, the Compose class is prefered.

Document Munging#

event_model.pack_event_page(*events: Event) EventPage[source]#

Transform one or more Event documents into an EventPage document.

Parameters:

*event (dicts) – any number of Event documents

Returns:

event_page

Return type:

dict

event_model.unpack_event_page(event_page: EventPage) Generator[source]#

Transform an EventPage document into individual Event documents.

Parameters:

event_page (EventPage) –

Yields:

event (Event)

event_model.pack_datum_page(*datum: Datum) DatumPage[source]#

Transform one or more Datum documents into a DatumPage document.

Parameters:

*datum (dicts) – any number of Datum documents

Returns:

datum_page

Return type:

dict

event_model.unpack_datum_page(datum_page: DatumPage) Generator[source]#

Transform a DatumPage document into individual Datum documents.

Parameters:

datum_page (DatumPage) –

Yields:

datum (Datum)

event_model.sanitize_doc(doc: dict) dict[source]#

Return a copy with any numpy objects converted to built-in Python types.

This function takes in an event-model document and returns a copy with any numpy objects converted to built-in Python types. It is useful for sanitizing documents prior to sending to any consumer that does not recognize numpy types, such as a MongoDB database or a JSON encoder.

Parameters:

doc (dict) – The event-model document to be sanitized

Returns:

sanitized_doc – The event-model document with numpy objects converted to built-in Python types.

Return type:

event-model document

event_model.verify_filled(event_page: dict) None[source]#

Take an event_page document and verify that it is completely filled.

Parameters:

event_page (event_page document) – The event page document to check

Raises:

UnfilledData – Raised if any of the data in the event_page is unfilled, when raised it inlcudes a list of unfilled data objects in the exception message.

class event_model.NumpyEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]#

A json.JSONEncoder for encoding numpy objects using built-in Python types.

Examples

Encode a Python object that includes an arbitrarily-nested numpy object.

>>> json.dumps({'a': {'b': numpy.array([1, 2, 3])}}, cls=NumpyEncoder)

Constructor for JSONEncoder, with sensible defaults.

If skipkeys is false, then it is a TypeError to attempt encoding of keys that are not str, int, float or None. If skipkeys is True, such items are simply skipped.

If ensure_ascii is true, the output is guaranteed to be str objects with all incoming non-ASCII characters escaped. If ensure_ascii is false, the output can contain non-ASCII characters.

If check_circular is true, then lists, dicts, and custom encoded objects will be checked for circular references during encoding to prevent an infinite recursion (which would cause an RecursionError). Otherwise, no such check takes place.

If allow_nan is true, then NaN, Infinity, and -Infinity will be encoded as such. This behavior is not JSON specification compliant, but is consistent with most JavaScript based encoders and decoders. Otherwise, it will be a ValueError to encode such floats.

If sort_keys is true, then the output of dictionaries will be sorted by key; this is useful for regression tests to ensure that JSON serializations can be compared on a day-to-day basis.

If indent is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation.

If specified, separators should be an (item_separator, key_separator) tuple. The default is (’, ‘, ‘: ‘) if indent is None and (‘,’, ‘: ‘) otherwise. To get the most compact JSON representation, you should specify (‘,’, ‘:’) to eliminate whitespace.

If specified, default is a function that gets called for objects that can’t otherwise be serialized. It should return a JSON encodable version of the object or raise a TypeError.