bluesky_tiled_plugins.writing.tiled_writer#
Module Contents#
Classes#
Route each document by type to a corresponding method. |
|
Functions#
Concatenate consecutive StreamDatum documents into a single StreamDatum document |
Data#
API#
- bluesky_tiled_plugins.writing.tiled_writer.BATCH_SIZE#
10000
- bluesky_tiled_plugins.writing.tiled_writer.MAX_ARRAY_SIZE#
16
- bluesky_tiled_plugins.writing.tiled_writer.RESERVED_DATA_KEYS#
[‘time’, ‘seq_num’]
- bluesky_tiled_plugins.writing.tiled_writer.JSON_TO_NUMPY_DTYPE#
None
- bluesky_tiled_plugins.writing.tiled_writer.MIMETYPE_LOOKUP#
‘defaultdict(…)’
- bluesky_tiled_plugins.writing.tiled_writer.logger#
‘getLogger(…)’
- exception bluesky_tiled_plugins.writing.tiled_writer.ValidationError#
Bases:
ExceptionCustom exception for validation errors in Tiled RunWriter.
- bluesky_tiled_plugins.writing.tiled_writer.concatenate_stream_datums(*docs: event_model.documents.StreamDatum)#
Concatenate consecutive StreamDatum documents into a single StreamDatum document
- bluesky_tiled_plugins.writing.tiled_writer.ExternalEventDataReference#
‘namedtuple(…)’
- class bluesky_tiled_plugins.writing.tiled_writer.RunNormalizer(patches: dict[str, collections.abc.Callable] | None = None, spec_to_mimetype: dict[str, str] | None = None)#
Bases:
event_model.DocumentRouterRoute each document by type to a corresponding method.
When an instance is called with a document type and a document like::
router(name, doc)
the document is passed to the method of the corresponding name, as in::
getattr(router, name)(doc)
The method is expected to return
Noneor a valid document of the same type. It may be the original instance (passed through), a copy, or a different dict altogether.Finally, the call to
router(name, doc)returns::(name, getattr(router, name)(doc))
Parameters
emit: callable, optional Expected signature
f(name, doc)- start(doc: event_model.documents.RunStart)#
- stop(doc: event_model.documents.RunStop)#
- descriptor(doc: event_model.documents.EventDescriptor)#
- event(doc: event_model.documents.Event)#
- resource(doc: event_model.documents.Resource)#
- stream_resource(doc: event_model.documents.StreamResource)#
- stream_datum(doc: event_model.documents.StreamDatum)#
- datum(doc: event_model.documents.Datum)#
- datum_page(doc: event_model.documents.DatumPage)#
- event_page(doc: event_model.documents.EventPage)#
- emit(name, doc)#
Check the document schema and send to the dispatcher
- subscribe(func, name='all')#
Convenience function for dispatcher subscription
- unsubscribe(token)#
Convenience function for dispatcher un-subscription
- class bluesky_tiled_plugins.writing.tiled_writer.TiledWriter(client: tiled.client.base.BaseClient, *, normalizer: type[event_model.DocumentRouter] | None = RunNormalizer, patches: dict[str, collections.abc.Callable] | None = None, spec_to_mimetype: dict[str, str] | None = None, backup_directory: str | None = None, batch_size: int = BATCH_SIZE, max_array_size: int = MAX_ARRAY_SIZE, validate: bool = False)#