bluesky_tiled_plugins.writing.tiled_writer#

Module Contents#

Classes#

RunNormalizer

Route each document by type to a corresponding method.

TiledWriter

Functions#

concatenate_stream_datums

Concatenate consecutive StreamDatum documents into a single StreamDatum document

Data#

API#

bluesky_tiled_plugins.writing.tiled_writer.BATCH_SIZE#

10000

bluesky_tiled_plugins.writing.tiled_writer.MAX_ARRAY_SIZE#

16

bluesky_tiled_plugins.writing.tiled_writer.RESERVED_DATA_KEYS#

[‘time’, ‘seq_num’]

bluesky_tiled_plugins.writing.tiled_writer.JSON_TO_NUMPY_DTYPE#

None

bluesky_tiled_plugins.writing.tiled_writer.MIMETYPE_LOOKUP#

‘defaultdict(…)’

bluesky_tiled_plugins.writing.tiled_writer.logger#

‘getLogger(…)’

exception bluesky_tiled_plugins.writing.tiled_writer.ValidationError#

Bases: Exception

Custom exception for validation errors in Tiled RunWriter.

bluesky_tiled_plugins.writing.tiled_writer.concatenate_stream_datums(*docs: event_model.documents.StreamDatum)#

Concatenate consecutive StreamDatum documents into a single StreamDatum document

bluesky_tiled_plugins.writing.tiled_writer.ExternalEventDataReference#

‘namedtuple(…)’

class bluesky_tiled_plugins.writing.tiled_writer.RunNormalizer(patches: dict[str, collections.abc.Callable] | None = None, spec_to_mimetype: dict[str, str] | None = None)#

Bases: event_model.DocumentRouter

Route each document by type to a corresponding method.

When an instance is called with a document type and a document like::

router(name, doc)

the document is passed to the method of the corresponding name, as in::

getattr(router, name)(doc)

The method is expected to return None or a valid document of the same type. It may be the original instance (passed through), a copy, or a different dict altogether.

Finally, the call to router(name, doc) returns::

(name, getattr(router, name)(doc))

Parameters

emit: callable, optional Expected signature f(name, doc)

start(doc: event_model.documents.RunStart)#
stop(doc: event_model.documents.RunStop)#
descriptor(doc: event_model.documents.EventDescriptor)#
event(doc: event_model.documents.Event)#
resource(doc: event_model.documents.Resource)#
stream_resource(doc: event_model.documents.StreamResource)#
stream_datum(doc: event_model.documents.StreamDatum)#
datum(doc: event_model.documents.Datum)#
datum_page(doc: event_model.documents.DatumPage)#
event_page(doc: event_model.documents.EventPage)#
emit(name, doc)#

Check the document schema and send to the dispatcher

subscribe(func, name='all')#

Convenience function for dispatcher subscription

unsubscribe(token)#

Convenience function for dispatcher un-subscription

class bluesky_tiled_plugins.writing.tiled_writer.TiledWriter(client: tiled.client.base.BaseClient, *, normalizer: type[event_model.DocumentRouter] | None = RunNormalizer, patches: dict[str, collections.abc.Callable] | None = None, spec_to_mimetype: dict[str, str] | None = None, backup_directory: str | None = None, batch_size: int = BATCH_SIZE, max_array_size: int = MAX_ARRAY_SIZE, validate: bool = False)#
classmethod from_uri(uri, *, normalizer: type[event_model.DocumentRouter] | None = RunNormalizer, patches: dict[str, collections.abc.Callable] | None = None, spec_to_mimetype: dict[str, str] | None = None, backup_directory: str | None = None, batch_size: int = BATCH_SIZE, **kwargs)#
classmethod from_profile(profile, *, normalizer: type[event_model.DocumentRouter] | None = RunNormalizer, patches: dict[str, collections.abc.Callable] | None = None, spec_to_mimetype: dict[str, str] | None = None, backup_directory: str | None = None, batch_size: int = BATCH_SIZE, **kwargs)#