API Documentation#
This is the internal API reference for event_model
- event_model.__version__: str#
Version number as calculated by pypa/setuptools_scm
Schemas and Names#
The event-model
Python package contains tooling for composing, validating,
and transforming documents in the model.
- class event_model.DocumentNames(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
- stop = 'stop'#
- start = 'start'#
- descriptor = 'descriptor'#
- event = 'event'#
- datum = 'datum'#
- resource = 'resource'#
- event_page = 'event_page'#
- datum_page = 'datum_page'#
- stream_resource = 'stream_resource'#
- stream_datum = 'stream_datum'#
- bulk_datum = 'bulk_datum'#
- bulk_events = 'bulk_events'#
There are two dictionaries, event_model.schemas
and
event_model.schema_validators
, which are keyed on the members of the
event_model.DocumentNames
enum and which are mapped, respectively, to
a schema and an associated jsonschema.IValidator
.
Routers#
- class event_model.RunRouter(factories, handler_registry: dict | None = None, *, root_map: dict | None = None, filler_class: ~typing.Type[~event_model.Filler] = <class 'event_model.Filler'>, fill_or_fail: bool = False)[source]#
Routes documents, by run, to callbacks it creates from factory functions.
A RunRouter is callable, and it has the signature
router(name, doc)
, suitable for subscribing to the RunEngine.It is configured with a list of factory functions that produce callbacks in a two-layered scheme, described below.
Warning
This is experimental. In a future release, it may be changed in a backward-incompatible way or fully removed.
- Parameters:
factories (list) –
A list of callables with the signature:
factory('start', start_doc) -> List[Callbacks], List[SubFactories]
which should return two lists, which may be empty. All items in the first list should be callbacks — callables with the signature:
callback(name, doc)
that will receive that RunStart document and all subsequent documents from the run including the RunStop document. All items in the second list should be “subfactories” with the signature:
subfactory('descriptor', descriptor_doc) -> List[Callbacks]
These will receive each of the EventDescriptor documents for the run, as they arrive. They must return one list, which may be empty, containing callbacks that will receive the RunStart document, that EventDescriptor, all Events that reference that EventDescriptor and finally the RunStop document for the run.
handler_registry (dict, optional) –
This is passed to the Filler or whatever class is given in the filler_class parametr below.
Maps each ‘spec’ (a string identifying a given type or external resource) to a handler class.
A ‘handler class’ may be any callable with the signature:
handler_class(full_path, **resource_kwargs)
It is expected to return an object, a ‘handler instance’, which is also callable and has the following signature:
handler_instance(**datum_kwargs)
As the names ‘handler class’ and ‘handler instance’ suggest, this is typically implemented using a class that implements
__init__
and__call__
, with the respective signatures. But in general it may be any callable-that-returns-a-callable.root_map (dict, optional) –
This is passed to Filler or whatever class is given in the filler_class parameter below.
str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a
root
inroot_map
will be loaded using the mappedroot
.filler_class (type) – This is Filler by default. It can be a Filler subclass,
functools.partial(Filler, ...)
, or any class that provides the same methods asDocumentRouter
.fill_or_fail (boolean, optional) – By default (False), if a document with a spec not in
handler_registry
is encountered, let it pass through unfilled. But if set to True, fill everything and raiseUndefinedAssetSpecification
if some unknown spec is encountered.
- class event_model.SingleRunDocumentRouter[source]#
A DocumentRouter intended to process events from exactly one run.
- get_start() dict [source]#
Convenience method returning the start document for the associated run.
If no start document has been processed EventModelError will be raised.
- Returns:
start document
- Return type:
- class event_model.DocumentRouter(*, emit: Callable | None = None)[source]#
Route each document by type to a corresponding method.
When an instance is called with a document type and a document like:
router(name, doc)
the document is passed to the method of the corresponding name, as in:
getattr(router, name)(doc)
The method is expected to return
None
or a valid document of the same type. It may be the original instance (passed through), a copy, or a different dict altogether.Finally, the call to
router(name, doc)
returns:(name, getattr(router, name)(doc))
- Parameters:
emit (callable, optional) – Expected signature
f(name, doc)
- class event_model.Filler(handler_registry: dict, *, include: Iterable | None = None, exclude: Iterable | None = None, root_map: dict | None = None, coerce: str = 'as_is', handler_cache: dict | None = None, resource_cache: dict | None = None, datum_cache: dict | None = None, descriptor_cache: dict | None = None, stream_resource_cache: dict | None = None, stream_datum_cache: dict | None = None, inplace: bool | None = None, retry_intervals: List = [0.001, 0.002, 0.004, 0.008, 0.016, 0.032, 0.064, 0.128, 0.256, 0.512, 1.024])[source]#
Pass documents through, loading any externally-referenced data.
It is recommended to use the Filler as a context manager. Because the Filler manages caches of potentially expensive resources (e.g. large data in memory) managing its lifecycle is important. If used as a context manager, it will drop references to its caches upon exit from the context. Unless the user holds additional references to those caches, they will be garbage collected.
But for some applications, such as taking multiple passes over the same data, it may be useful to keep a longer-lived Filler instance and then manually delete it when finished.
See Examples below.
- Parameters:
handler_registry (dict) –
Maps each ‘spec’ (a string identifying a given type or external resource) to a handler class.
A ‘handler class’ may be any callable with the signature:
handler_class(full_path, **resource_kwargs)
It is expected to return an object, a ‘handler instance’, which is also callable and has the following signature:
handler_instance(**datum_kwargs)
As the names ‘handler class’ and ‘handler instance’ suggest, this is typically implemented using a class that implements
__init__
and__call__
, with the respective signatures. But in general it may be any callable-that-returns-a-callable.include (Iterable) – The set of fields to fill. By default all unfilled fields are filled. This parameter is mutually incompatible with the
exclude
parameter.exclude (Iterable) – The set of fields to skip filling. By default all unfilled fields are filled. This parameter is mutually incompatible with the
include
parameter.root_map (dict) – str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a
root
inroot_map
will be loaded using the mappedroot
.coerce ({'as_is', 'numpy'}) – Default is ‘as_is’. Other options (e.g. ‘delayed’) may be registered by external packages at runtime.
handler_cache (dict, optional) – A cache of handler instances. If None, a dict is used.
resource_cache (dict, optional) – A cache of Resource documents. If None, a dict is used.
datum_cache (dict, optional) – A cache of Datum documents. If None, a dict is used.
descriptor_cache (dict, optional) – A cache of EventDescriptor documents. If None, a dict is used.
stream_resource_cache (dict, optional) – A cache of StreamResource documents. If None, a dict is used.
stream_datum_cache (dict, optional) – A cache of StreamDatum documents. If None, a dict is used.
retry_intervals (Iterable, optional) – If data is not found on the first try, there may a race between the I/O systems creating the external data and this stream of Documents that reference it. If Filler encounters an
IOError
it will wait a bit and retry. This list specifies how long to sleep (in seconds) between subsequent attempts. Set toNone
to try only once before raisingDataNotAccessible
. A subclass may catch this exception and implement a different retry mechanism — for example using a different implementation of sleep from an async framework. But by default, a sequence of several retries with increasing sleep intervals is used. The default sequence should not be considered stable; it may change at any time as the authors tune it.
- Raises:
DataNotAccessible – If an IOError is raised when loading the data after the configured number of attempts. See the
retry_intervals
parameter for details.
Examples
A Filler may be used as a context manager.
>>> with Filler(handler_registry) as filler: ... for name, doc in stream: ... filler(name, doc) # mutates doc in place ... # Do some analysis or export with name and doc.
Or as a long-lived object.
>>> f = Filler(handler_registry) >>> for name, doc in stream: ... filler(name, doc) # mutates doc in place ... # Do some analysis or export with name and doc. ... >>> del filler # Free up memory from potentially large caches.
- clone(handler_registry: dict | None = None, *, root_map: dict | None = None, coerce: str | None = None, handler_cache: dict | None = None, resource_cache: dict | None = None, datum_cache: dict | None = None, descriptor_cache: dict | None = None, stream_resource_cache: dict | None = None, stream_datum_cache: dict | None = None, inplace: bool | None = None, retry_intervals: List | None = None) Filler [source]#
Create a new Filler instance from this one.
By default it will be created with the same settings that this Filler has. Individual settings may be overridden here.
The clone does not share any caches or internal state with the original.
- register_handler(spec: str, handler: Any, overwrite: bool = False) None [source]#
Register a handler.
- Parameters:
spec (str)
handler (Handler)
overwrite (boolean, optional) – False by default
- Raises:
DuplicateHandler – If a handler is already registered for spec and overwrite is False
See https://blueskyproject.io/event-model/external.html –
- deregister_handler(spec: str) Any [source]#
Deregister a handler.
If no handler is registered for this spec, it is no-op and returns None.
- Parameters:
spec (str)
- Returns:
handler (Handler or None)
See https (//blueskyproject.io/event-model/external.html)
- get_handler(resource: Resource) Any [source]#
Return a new Handler instance for this Resource.
- Parameters:
resource (Resource)
- Returns:
handler
- Return type:
Handler
- close() None [source]#
Drop cached documents and handlers.
They are not explicitly cleared, so if there are other references to these caches they will remain.
- class event_model.NoFiller(*args, **kwargs)[source]#
This does not fill the documents; it merely validates them.
It checks that all the references between the documents are resolvable and could be filled. This is useful when the filling will be done later, as a delayed computation, but we want to make sure in advance that we have all the information that we will need when that computation occurs.
- event_model.register_coercion(name: str, func: Callable, overwrite: bool = False) None [source]#
Register a new option for
Filler
’scoerce
argument.This is an advanced feature. See source code for comments and examples.
- Parameters:
name (string) – The new value for
coerce
that will invoke this function.func (callable) –
Expected signature:
func(filler, handler_class) -> handler_class
overwrite (boolean, optional) – False by default. Name collissions will raise
EventModelValueError
unless this is set toTrue
.
Document Minting#
To use these functions start with compose_run()
which will
return a ComposeRunBundle
.
- event_model.compose_run(*, uid: str | None = None, time: float | None = None, metadata: Dict | None = None, validate: bool = True, event_counters: Dict[str, int] | None = None) ComposeRunBundle [source]#
Compose a RunStart document and factory functions for related documents.
- Parameters:
uid (string, optional) – Unique identifier for this run, conventionally a UUID4. If None is given, a UUID4 will be generated.
time (float, optional) – UNIX epoch time of start of this run. If None is given, the current time will be used.
metadata (dict, optional) – Additional metadata include the document
validate (boolean, optional) – Validate this document conforms to the schema.
event_counters (dict, optional) – A dict for counting events, when an event is composed by any of the descriptors composed by this run, the element in this dict with the key of the descriptor name will be increased by 1.
- Return type:
- class event_model.ComposeRunBundle(start_doc: RunStart, compose_descriptor: ComposeDescriptor, compose_resource: ComposeResource, compose_stop: ComposeStop, compose_stream_resource: ComposeStreamResource | None = None)[source]#
Extensible compose run bundle. This maintains backward compatibility by unpacking into a basic run bundle (start, compose_descriptor, compose_resource, stop). Further extensions are optional and require keyword referencing (i.e. compose_stream_resource).
- event_model.compose_descriptor(*, start: RunStart, streams: Dict[str, Iterable], event_counters: Dict[str, int], name: str, data_keys: Dict[str, DataKey], uid: str | None = None, time: float | None = None, object_keys: Dict[str, Any] | None = None, configuration: Dict[str, Configuration] | None = None, hints: PerObjectHint | None = None, validate: bool = True) ComposeDescriptorBundle [source]#
Here for backwards compatibility, the Compose class is prefered.
- class event_model.ComposeDescriptorBundle(descriptor_doc: event_model.documents.event_descriptor.EventDescriptor, compose_event: event_model.ComposeEvent, compose_event_page: event_model.ComposeEventPage)[source]#
- event_model.compose_event(*, descriptor: EventDescriptor, event_counters: Dict[str, int], data: Dict[str, Any], timestamps: Dict[str, Any], seq_num: int, filled: Dict[str, bool | str] | None = None, uid: str | None = None, time: float | None = None, validate: bool = True) Event [source]#
Here for backwards compatibility, the Compose class is prefered.
- event_model.compose_event_page(*, descriptor: EventDescriptor, event_counters: Dict[str, int], data: Dict[str, List], timestamps: Dict[str, Any], seq_num: List[int], filled: Dict[str, List[bool | str]] | None = None, uid: List | None = None, time: List | None = None, validate: bool = True) EventPage [source]#
Here for backwards compatibility, the Compose class is prefered.
- event_model.compose_resource(*, spec: str, root: str, resource_path: str, resource_kwargs: Dict[str, Any], path_semantics: Literal['posix', 'windows'] = 'posix', start: RunStart | None = None, uid: str | None = None, validate: bool = True) ComposeResourceBundle [source]#
Here for backwards compatibility, the Compose class is prefered.
- class event_model.ComposeResourceBundle(resource_doc: event_model.documents.resource.Resource, compose_datum: event_model.ComposeDatum, compose_datum_page: event_model.ComposeDatumPage)[source]#
- event_model.compose_datum(*, resource: Resource, counter: Iterator, datum_kwargs: Dict[str, Any], validate: bool = True) Datum [source]#
Here for backwards compatibility, the Compose class is prefered.
- event_model.compose_datum_page(*, resource: Resource, counter: Iterator, datum_kwargs: Dict[str, List[Any]], validate: bool = True) DatumPage [source]#
Here for backwards compatibility, the Compose class is prefered.
- event_model.compose_stop(*, start: RunStart, event_counters: Dict[str, int], poison_pill: List, exit_status: Literal['success', 'abort', 'fail'] = 'success', reason: str = '', uid: str | None = None, time: float | None = None, validate: bool = True) RunStop [source]#
Here for backwards compatibility, the Compose class is prefered.
Document Munging#
- event_model.pack_event_page(*events: Event) EventPage [source]#
Transform one or more Event documents into an EventPage document.
- Parameters:
*event (dicts) – any number of Event documents
- Returns:
event_page
- Return type:
- event_model.unpack_event_page(event_page: EventPage) Generator [source]#
Transform an EventPage document into individual Event documents.
- Parameters:
event_page (EventPage)
- Yields:
event (Event)
- event_model.pack_datum_page(*datum: Datum) DatumPage [source]#
Transform one or more Datum documents into a DatumPage document.
- Parameters:
*datum (dicts) – any number of Datum documents
- Returns:
datum_page
- Return type:
- event_model.unpack_datum_page(datum_page: DatumPage) Generator [source]#
Transform a DatumPage document into individual Datum documents.
- Parameters:
datum_page (DatumPage)
- Yields:
datum (Datum)
- event_model.sanitize_doc(doc: dict) dict [source]#
Return a copy with any numpy objects converted to built-in Python types.
This function takes in an event-model document and returns a copy with any numpy objects converted to built-in Python types. It is useful for sanitizing documents prior to sending to any consumer that does not recognize numpy types, such as a MongoDB database or a JSON encoder.
- Parameters:
doc (dict) – The event-model document to be sanitized
- Returns:
sanitized_doc – The event-model document with numpy objects converted to built-in Python types.
- Return type:
event-model document
- event_model.verify_filled(event_page: dict) None [source]#
Take an event_page document and verify that it is completely filled.
- Parameters:
event_page (event_page document) – The event page document to check
- Raises:
UnfilledData – Raised if any of the data in the event_page is unfilled, when raised it inlcudes a list of unfilled data objects in the exception message.
- class event_model.NumpyEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]#
A json.JSONEncoder for encoding numpy objects using built-in Python types.
Examples
Encode a Python object that includes an arbitrarily-nested numpy object.
>>> json.dumps({'a': {'b': numpy.array([1, 2, 3])}}, cls=NumpyEncoder)
Constructor for JSONEncoder, with sensible defaults.
If skipkeys is false, then it is a TypeError to attempt encoding of keys that are not str, int, float or None. If skipkeys is True, such items are simply skipped.
If ensure_ascii is true, the output is guaranteed to be str objects with all incoming non-ASCII characters escaped. If ensure_ascii is false, the output can contain non-ASCII characters.
If check_circular is true, then lists, dicts, and custom encoded objects will be checked for circular references during encoding to prevent an infinite recursion (which would cause an RecursionError). Otherwise, no such check takes place.
If allow_nan is true, then NaN, Infinity, and -Infinity will be encoded as such. This behavior is not JSON specification compliant, but is consistent with most JavaScript based encoders and decoders. Otherwise, it will be a ValueError to encode such floats.
If sort_keys is true, then the output of dictionaries will be sorted by key; this is useful for regression tests to ensure that JSON serializations can be compared on a day-to-day basis.
If indent is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation.
If specified, separators should be an (item_separator, key_separator) tuple. The default is (’, ‘, ‘: ‘) if indent is
None
and (‘,’, ‘: ‘) otherwise. To get the most compact JSON representation, you should specify (‘,’, ‘:’) to eliminate whitespace.If specified, default is a function that gets called for objects that can’t otherwise be serialized. It should return a JSON encodable version of the object or raise a
TypeError
.