API Documentation

The event-model Python package contains tooling for composing, validating, and transforming documents in the model.

class event_model.DocumentNames[source]

An enumeration.

bulk_datum = 'bulk_datum'
bulk_events = 'bulk_events'
datum = 'datum'
datum_page = 'datum_page'
descriptor = 'descriptor'
event = 'event'
event_page = 'event_page'
resource = 'resource'
start = 'start'
stop = 'stop'

There are two dictionaries, event_model.schemas and event_model.schema_validators, which are keyed on the members of the event_model.DocumentNames enum and which are mapped, respectively, to a schema and an associated jsonschema.IValidator.

class event_model.RunRouter(factories, handler_registry=None, *, root_map=None, filler_class=<class 'event_model.Filler'>, fill_or_fail=False)[source]

Routes documents, by run, to callbacks it creates from factory functions.

A RunRouter is callable, and it has the signature router(name, doc), suitable for subscribing to the RunEngine.

It is configured with a list of factory functions that produce callbacks in a two-layered scheme, described below.

Warning

This is experimental. In a future release, it may be changed in a backward-incompatible way or fully removed.

Parameters
factorieslist

A list of callables with the signature:

factory('start', start_doc) -> List[Callbacks], List[SubFactories]

which should return two lists, which may be empty. All items in the first list should be callbacks — callables with the signature:

callback(name, doc)

that will receive all subsequent documents from the run including the RunStop document. All items in the second list should be “subfactories” with the signature:

subfactory('descriptor', descriptor_doc) -> List[Callbacks]

These will receive each of the EventDescriptor documents for the run, as they arrive. They must return one list, which may be empty, containing callbacks that will receive all Events that reference that EventDescriptor and finally the RunStop document for the run.

handler_registrydict, optional

This is passed to the Filler or whatever class is given in the filler_class parametr below.

Maps each ‘spec’ (a string identifying a given type or external resource) to a handler class.

A ‘handler class’ may be any callable with the signature:

handler_class(resource_path, root, **resource_kwargs)

It is expected to return an object, a ‘handler instance’, which is also callable and has the following signature:

handler_instance(**datum_kwargs)

As the names ‘handler class’ and ‘handler instance’ suggest, this is typically implemented using a class that implements __init__ and __call__, with the respective signatures. But in general it may be any callable-that-returns-a-callable.

root_map: dict, optional

This is passed to Filler or whatever class is given in the filler_class parameter below.

str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a root in root_map will be loaded using the mapped root.

filler_class: type

This is Filler by default. It can be a Filler subclass, functools.partial(Filler, ...), or any class that provides the same methods as DocumentRouter.

fill_or_fail: boolean, optional

By default (False), if a document with a spec not in handler_registry is encountered, let it pass through unfilled. But if set to True, fill everything and raise ``UndefinedAssetSpecification` if some unknown spec is encountered.

datum_page(self, doc)[source]
descriptor(self, doc)[source]
event_page(self, doc)[source]
resource(self, doc)[source]
start(self, doc)[source]
stop(self, doc)[source]
class event_model.DocumentRouter[source]

Route each document by type to a corresponding method.

When an instance is called with a document type and a document like:

router(name, doc)

the document is passed to the method of the corresponding name, as in:

getattr(router, name)(doc)

The method is expected to return None or a valid document of the same type. It may be the original instance (passed through), a copy, or a different dict altogether.

Finally, the call to router(name, doc) returns:

(name, getattr(router, name)(doc))
bulk_datum(self, doc)[source]
bulk_events(self, doc)[source]
datum(self, doc)[source]
datum_page(self, doc)[source]
descriptor(self, doc)[source]
event(self, doc)[source]
event_page(self, doc)[source]
resource(self, doc)[source]
start(self, doc)[source]
stop(self, doc)[source]
class event_model.Filler(handler_registry, *, include=None, exclude=None, root_map=None, coerce='as_is', handler_cache=None, resource_cache=None, datum_cache=None, descriptor_cache=None, inplace=None, retry_intervals=(0.001, 0.002, 0.004, 0.008, 0.016, 0.032, 0.064, 0.128, 0.256, 0.512, 1.024))[source]

Pass documents through, loading any externally-referenced data.

It is recommended to use the Filler as a context manager. Because the Filler manages caches of potentially expensive resources (e.g. large data in memory) managing its lifecycle is important. If used as a context manager, it will drop references to its caches upon exit from the context. Unless the user holds additional references to those caches, they will be garbage collected.

But for some applications, such as taking multiple passes over the same data, it may be useful to keep a longer-lived Filler instance and then manually delete it when finished.

See Examples below.

Parameters
handler_registrydict

Maps each ‘spec’ (a string identifying a given type or external resource) to a handler class.

A ‘handler class’ may be any callable with the signature:

handler_class(resource_path, root, **resource_kwargs)

It is expected to return an object, a ‘handler instance’, which is also callable and has the following signature:

handler_instance(**datum_kwargs)

As the names ‘handler class’ and ‘handler instance’ suggest, this is typically implemented using a class that implements __init__ and __call__, with the respective signatures. But in general it may be any callable-that-returns-a-callable.

includeIterable

The set of fields to fill. By default all unfilled fields are filled. This parameter is mutually incompatible with the exclude parameter.

excludeIterable

The set of fields to skip filling. By default all unfilled fields are filled. This parameter is mutually incompatible with the include parameter.

root_map: dict

str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a root in root_map will be loaded using the mapped root.

coerce{‘as_is’, ‘numpy’}

Default is ‘as_is’. Other options (e.g. ‘delayed’) may be registered by external packages at runtime.

handler_cachedict, optional

A cache of handler instances. If None, a dict is used.

resource_cachedict, optional

A cache of Resource documents. If None, a dict is used.

datum_cachedict, optional

A cache of Datum documents. If None, a dict is used.

descriptor_cachedict, optional

A cache of EventDescriptor documents. If None, a dict is used.

retry_intervalsIterable, optional

If data is not found on the first try, there may a race between the I/O systems creating the external data and this stream of Documents that reference it. If Filler encounters an IOError it will wait a bit and retry. This list specifies how long to sleep (in seconds) between subsequent attempts. Set to None to try only once before raising DataNotAccessible. A subclass may catch this exception and implement a different retry mechanism — for example using a different implementation of sleep from an async framework. But by default, a sequence of several retries with increasing sleep intervals is used. The default sequence should not be considered stable; it may change at any time as the authors tune it.

Raises
DataNotAccessible

If an IOError is raised when loading the data after the configured number of attempts. See the retry_intervals parameter for details.

Examples

A Filler may be used as a context manager.

>>> with Filler(handler_registry) as filler:
...     for name, doc in stream:
...         filler(name, doc)  # mutates doc in place
...         # Do some analysis or export with name and doc.

Or as a long-lived object.

>>> f = Filler(handler_registry)
>>> for name, doc in stream:
...     filler(name, doc)  # mutates doc in place
...     # Do some analysis or export with name and doc.
...
>>> del filler  # Free up memory from potentially large caches.
clone(self, handler_registry=None, *, root_map=None, coerce=None, handler_cache=None, resource_cache=None, datum_cache=None, descriptor_cache=None, inplace=None, retry_intervals=None)[source]

Create a new Filler instance from this one.

By default it will be created with the same settings that this Filler has. Individual settings may be overridded here.

get_handler(self, resource)[source]

Return a new Handler instance for this Resource.

Parameters
resource: dict
Returns
handler: Handler
class event_model.NoFiller(*args, **kwargs)[source]

This does not fill the documents; it merely validates them.

It checks that all the references between the documents are resolvable and could be filled. This is useful when the filling will be done later, as a delayed computation, but we want to make sure in advance that we have all the information that we will need when that computation occurs.

event_model.verify_filled(event_page)[source]

Take an event_page document and verify that it is completely filled.

Parameters
event_pageevent_page document

The event page document to check

Raises
UnfilledData

Raised if any of the data in the event_page is unfilled, when raised it inlcudes a list of unfilled data objects in the exception message.

event_model.pack_event_page(*events)[source]

Transform one or more Event documents into an EventPage document.

Parameters
*eventdicts

any number of Event documents

Returns
event_pagedict
event_model.unpack_event_page(event_page)[source]

Transform an EventPage document into individual Event documents.

Parameters
event_pagedict
Yields
eventdict
event_model.pack_datum_page(*datum)[source]

Transform one or more Datum documents into a DatumPage document.

Parameters
*datumdicts

any number of Datum documents

Returns
datum_pagedict
event_model.unpack_datum_page(datum_page)[source]

Transform a DatumPage document into individual Datum documents.

Parameters
datum_pagedict
Yields
datumdict
event_model.sanitize_doc(doc)[source]

Return a copy with any numpy objects converted to built-in Python types.

This function takes in an event-model document and returns a copy with any numpy objects converted to built-in Python types. It is useful for sanitizing documents prior to sending to any consumer that does not recognize numpy types, such as a MongoDB database or a JSON encoder.

Parameters
docdict

The event-model document to be sanitized

Returns
sanitized_docevent-model document

The event-model document with numpy objects converted to built-in Python types.

class event_model.NumpyEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

A json.JSONEncoder for encoding numpy objects using built-in Python types.

Examples

Encode a Python object that includes an arbitrarily-nested numpy object.

>>> json.dumps({'a': {'b': numpy.array([1, 2, 3])}}, cls=NumpyEncoder)
default(self, obj)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
event_model.compose_run(*, uid=None, time=None, metadata=None, validate=True)[source]

Compose a RunStart document and factory functions for related documents.

Parameters
uidstring, optional

Unique identifier for this run, conventionally a UUID4. If None is given, a UUID4 will be generated.

timefloat, optional

UNIX epoch time of start of this run. If None is given, the current time will be used.

metadatadict, optional

Additional metadata include the document

validateboolean, optional

Validate this document conforms to the schema.

event_model.compose_descriptor(*, start, streams, event_counter, name, data_keys, uid=None, time=None, object_keys=None, configuration=None, hints=None, validate=True)[source]
event_model.compose_resource(*, start, spec, root, resource_path, resource_kwargs, path_semantics='posix', uid=None, validate=True)[source]
event_model.compose_datum(*, resource, counter, datum_kwargs, validate=True)[source]
event_model.compose_datum_page(*, resource, counter, datum_kwargs, validate=True)[source]
event_model.compose_event(*, descriptor, event_counter, data, timestamps, seq_num=None, filled=None, uid=None, time=None, validate=True)[source]
event_model.compose_event_page(*, descriptor, event_counter, data, timestamps, seq_num, filled=None, uid=None, time=None, validate=True)[source]
event_model.compose_stop(*, start, event_counter, poison_pill, exit_status='success', reason='', uid=None, time=None, validate=True)[source]