bluesky_tiled_plugins.writing.consolidators#

Module Contents#

Classes#

Patch

ConsolidatorBase

Consolidator of StreamDatums

CSVConsolidator

Consolidator of StreamDatums

HDF5Consolidator

Consolidator of StreamDatums

MultipartRelatedConsolidator

Consolidator of StreamDatums

TIFFConsolidator

Consolidator of StreamDatums

JPEGConsolidator

Consolidator of StreamDatums

NPYConsolidator

Consolidator of StreamDatums

Functions#

Data#

API#

class bluesky_tiled_plugins.writing.consolidators.Patch#
shape: tuple[int, ...]#

None

offset: tuple[int, ...]#

None

classmethod combine_patches(patches: list[bluesky_tiled_plugins.writing.consolidators.Patch]) bluesky_tiled_plugins.writing.consolidators.Patch#

Combine multiple patches into a single patch

The combined patch covers the union (smallest bounding box) of all provided patches.

Parameters

patches : list[Patch] A list of Patch objects to combine.

Returns

Patch A new Patch object that covers the union of all input patches.

class bluesky_tiled_plugins.writing.consolidators.ConsolidatorBase(stream_resource: event_model.documents.StreamResource, descriptor: event_model.documents.EventDescriptor)#

Consolidator of StreamDatums

A Consolidator consumes documents from RE; it is similar to usual Bluesky Handlers but is designed to work with streaming data (received via StreamResource and StreamDatum documents). It composes details (DataSource and its Assets) that will go into the Tiled database. Each Consolidator is instantiated per a Stream Resource.

Tiled Adapters will later use this to read the data, with good random access and bulk access support.

We put this code into consolidators so that additional, possibly very unusual, formats can be supported by users without getting a PR merged into Bluesky or Tiled.

The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog parameter adapters_by_mimetype can be used together to support: - Ingesting a new mimetype from Bluesky documents and generating DataSource and Asset with appropriate parameters (the consolidator’s job); - Interpreting those DataSource and Asset parameters to do I/O (the adapter’s job).

To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the consume_stream_datum and get_data_source methods, and ensure that the keys of returned adapter_parameters dictionary matches the expected adapter signature. Declare a set of supported mimetypes to allow validation and automated discovery of the subclassed Consolidator.

Attributes

supported_mimetypes : set[str] a set of mimetypes that can be handled by a derived Consolidator class; raises ValueError if attempted to pass Resource documents related to unsupported mimetypes.

join_method : Literal[“stack”, “concat”] a method to join the data; if “stack”, the resulting consolidated dataset is produced by joining all datums along a new dimension added on the left, e.g. a stack of tiff images, otherwise – datums will be appended to the end of the existing leftmost dimension, e.g. rows of a table (similarly to concatenation in numpy).

join_chunks : bool if True, the chunking of the resulting dataset will be determined after consolidation, otherwise each part is considered to be chunked separately.

supported_mimetypes: set[str]#

None

join_method: Literal[stack, concat]#

‘concat’

join_chunks: bool#

True

classmethod get_supported_mimetype(sres)#
property shape: tuple[int, ...]#

Native shape of the data stored in assets

This includes the leading (0th) dimension corresponding to the number of rows (if the join_method is stack) including skipped rows, if any. The number of relevant usable data rows may be lower, which is determined by the seq_nums field of StreamDatum documents.

property chunks: tuple[tuple[int, ...], ...]#

Explicit (dask-style) specification of chunk sizes

The produced chunk specification is a tuple of tuples of int that specify the sizes of each chunk in each dimension; it is based on the StreamResource parameter chunk_shape.

If chunk_shape is an empty tuple – assume the dataset is stored as a single chunk for all existing and new elements. Usually, however, chunk_shape is a tuple of int, in which case, we assume fixed-sized chunks with at most chunk_shape[0] elements (i.e. _num_rows); last chunk can be smaller. If chunk_shape is a tuple with less than self.shape elements – assume it defines the chunk sizes along the leading dimensions.

If the joining method is “concat”, and join_chunks = False, the chunking along the leftmost dimensions is assumed to be preserved in each appended data point, i.e. consecutive chunks do not join, e.g. for a 1d array with chunks (3,3,1), the resulting chunking after 3 repeats is (3,3,1,3,3,1,3,3,1). When join_chunks = True (default), the chunk size along the leftmost dimension is determined by the chunk_shape parameter; this is the case when join_method == "stack" well. Chunking along the trailing dimensions is always preserved as in the original (single) array.

property has_skips: bool#

Indicates whether any rows should be skipped when mapping their indices to frame numbers

This flag is intended to provide a shortcut for more efficient data access when there are no skips, and the mapping between indices and seq_nums is straightforward. In other case, the _seqnums_to_indices_map needs to be taken into account.

adapter_parameters() dict#

A dictionary of parameters passed to an Adapter

These parameters are intended to provide any additional information required to read a data source of a specific mimetype, e.g. “path” the path into an HDF5 file or “template” the filename pattern of a TIFF sequence.

This method is to be subclassed as necessary.

structure() tiled.structures.array.ArrayStructure#
consume_stream_datum(doc: event_model.documents.StreamDatum)#

Process a new StreamDatum and update the internal data structure

This will be called for every new StreamDatum received to account for the new added rows. This method may need to be subclassed and expanded depending on a specific mimetype. Actions:

  • Parse the fields in a new StreamDatum

  • Increment the number of rows (implemented by the Base class)

  • Keep track of the correspondence between indices and seq_nums (implemented by the Base class)

  • Update the list of assets, including their uris, if necessary

  • Update shape and chunks

get_data_source() tiled.structures.data_source.DataSource#

Return a DataSource object reflecting the current state of the streamed dataset.

The returned DataSource is conceptually similar (and can be an instance of) tiled.structures.DataSource. In general, it describes associated Assets (filepaths, mimetype) along with their internal data structure (array shape, chunks, additional parameters) and should contain all information necessary to read the file.

init_adapter(adapter_class=None)#

Initialize a Tiled Adapter for reading the consolidated data

Parameters

adapter_class : Optional[Type[Adapter]] An optional Adapter class to use for initialization; if not provided, the default adapter for the Consolidator’s mimetype will be used.

abstractmethod update_from_stream_resource(stream_resource: event_model.documents.StreamResource)#

Consume an additional related StreamResource document for the same data_key

validate(fix_errors=False) list[str]#

Validate the Consolidator’s state against the expected structure

get_adapter(adapters_by_mimetype=None)#
class bluesky_tiled_plugins.writing.consolidators.CSVConsolidator(stream_resource: event_model.documents.StreamResource, descriptor: event_model.documents.EventDescriptor)#

Bases: bluesky_tiled_plugins.writing.consolidators.ConsolidatorBase

Consolidator of StreamDatums

A Consolidator consumes documents from RE; it is similar to usual Bluesky Handlers but is designed to work with streaming data (received via StreamResource and StreamDatum documents). It composes details (DataSource and its Assets) that will go into the Tiled database. Each Consolidator is instantiated per a Stream Resource.

Tiled Adapters will later use this to read the data, with good random access and bulk access support.

We put this code into consolidators so that additional, possibly very unusual, formats can be supported by users without getting a PR merged into Bluesky or Tiled.

The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog parameter adapters_by_mimetype can be used together to support: - Ingesting a new mimetype from Bluesky documents and generating DataSource and Asset with appropriate parameters (the consolidator’s job); - Interpreting those DataSource and Asset parameters to do I/O (the adapter’s job).

To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the consume_stream_datum and get_data_source methods, and ensure that the keys of returned adapter_parameters dictionary matches the expected adapter signature. Declare a set of supported mimetypes to allow validation and automated discovery of the subclassed Consolidator.

Attributes

supported_mimetypes : set[str] a set of mimetypes that can be handled by a derived Consolidator class; raises ValueError if attempted to pass Resource documents related to unsupported mimetypes.

join_method : Literal[“stack”, “concat”] a method to join the data; if “stack”, the resulting consolidated dataset is produced by joining all datums along a new dimension added on the left, e.g. a stack of tiff images, otherwise – datums will be appended to the end of the existing leftmost dimension, e.g. rows of a table (similarly to concatenation in numpy).

join_chunks : bool if True, the chunking of the resulting dataset will be determined after consolidation, otherwise each part is considered to be chunked separately.

supported_mimetypes: set[str]#

None

join_method: Literal[stack, concat]#

‘concat’

join_chunks: bool#

False

adapter_parameters() dict#

A dictionary of parameters passed to an Adapter

These parameters are intended to provide any additional information required to read a data source of a specific mimetype, e.g. “path” the path into an HDF5 file or “template” the filename pattern of a TIFF sequence.

This method is to be subclassed as necessary.

class bluesky_tiled_plugins.writing.consolidators.HDF5Consolidator(stream_resource: event_model.documents.StreamResource, descriptor: event_model.documents.EventDescriptor)#

Bases: bluesky_tiled_plugins.writing.consolidators.ConsolidatorBase

Consolidator of StreamDatums

A Consolidator consumes documents from RE; it is similar to usual Bluesky Handlers but is designed to work with streaming data (received via StreamResource and StreamDatum documents). It composes details (DataSource and its Assets) that will go into the Tiled database. Each Consolidator is instantiated per a Stream Resource.

Tiled Adapters will later use this to read the data, with good random access and bulk access support.

We put this code into consolidators so that additional, possibly very unusual, formats can be supported by users without getting a PR merged into Bluesky or Tiled.

The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog parameter adapters_by_mimetype can be used together to support: - Ingesting a new mimetype from Bluesky documents and generating DataSource and Asset with appropriate parameters (the consolidator’s job); - Interpreting those DataSource and Asset parameters to do I/O (the adapter’s job).

To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the consume_stream_datum and get_data_source methods, and ensure that the keys of returned adapter_parameters dictionary matches the expected adapter signature. Declare a set of supported mimetypes to allow validation and automated discovery of the subclassed Consolidator.

Attributes

supported_mimetypes : set[str] a set of mimetypes that can be handled by a derived Consolidator class; raises ValueError if attempted to pass Resource documents related to unsupported mimetypes.

join_method : Literal[“stack”, “concat”] a method to join the data; if “stack”, the resulting consolidated dataset is produced by joining all datums along a new dimension added on the left, e.g. a stack of tiff images, otherwise – datums will be appended to the end of the existing leftmost dimension, e.g. rows of a table (similarly to concatenation in numpy).

join_chunks : bool if True, the chunking of the resulting dataset will be determined after consolidation, otherwise each part is considered to be chunked separately.

supported_mimetypes#

None

adapter_parameters() dict#

Parameters to be passed to the HDF5 adapter, a dictionary with the keys:

dataset: list[str] - a path to the dataset within the hdf5 file represented as list split at / swmr: bool – True to enable the single writer / multiple readers regime

update_from_stream_resource(stream_resource: event_model.documents.StreamResource)#

Add an Asset for a new StreamResource document

class bluesky_tiled_plugins.writing.consolidators.MultipartRelatedConsolidator(permitted_extensions: set[str], stream_resource: event_model.documents.StreamResource, descriptor: event_model.documents.EventDescriptor)#

Bases: bluesky_tiled_plugins.writing.consolidators.ConsolidatorBase

Consolidator of StreamDatums

A Consolidator consumes documents from RE; it is similar to usual Bluesky Handlers but is designed to work with streaming data (received via StreamResource and StreamDatum documents). It composes details (DataSource and its Assets) that will go into the Tiled database. Each Consolidator is instantiated per a Stream Resource.

Tiled Adapters will later use this to read the data, with good random access and bulk access support.

We put this code into consolidators so that additional, possibly very unusual, formats can be supported by users without getting a PR merged into Bluesky or Tiled.

The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog parameter adapters_by_mimetype can be used together to support: - Ingesting a new mimetype from Bluesky documents and generating DataSource and Asset with appropriate parameters (the consolidator’s job); - Interpreting those DataSource and Asset parameters to do I/O (the adapter’s job).

To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the consume_stream_datum and get_data_source methods, and ensure that the keys of returned adapter_parameters dictionary matches the expected adapter signature. Declare a set of supported mimetypes to allow validation and automated discovery of the subclassed Consolidator.

Attributes

supported_mimetypes : set[str] a set of mimetypes that can be handled by a derived Consolidator class; raises ValueError if attempted to pass Resource documents related to unsupported mimetypes.

join_method : Literal[“stack”, “concat”] a method to join the data; if “stack”, the resulting consolidated dataset is produced by joining all datums along a new dimension added on the left, e.g. a stack of tiff images, otherwise – datums will be appended to the end of the existing leftmost dimension, e.g. rows of a table (similarly to concatenation in numpy).

join_chunks : bool if True, the chunking of the resulting dataset will be determined after consolidation, otherwise each part is considered to be chunked separately.

get_datum_uri(indx: int)#

Return a full uri for a datum (an individual image file) based on its index in the sequence.

This relies on the template parameter passed in the StreamResource, which is a string in the “new” Python formatting style that can be evaluated to a file name using the .format(indx) method given an integer index, e.g. “{:05d}.ext”.

If template is not set, we assume that the uri is provided directly in the StreamResource document (i.e. a single file case), and return it as is.

consume_stream_datum(doc: event_model.documents.StreamDatum)#

Determine the number and names of files from indices of datums and the number of files per datum.

In the most general case, each file may be a multipage tiff or a stack of images (frames) and a single datum may be composed of multiple such files, leading to a total of self.datum_shape[0] frames. Since each file necessarily represents a single chunk (tiffs can not be sub-chunked), the number of frames per file is equal to the leftmost chunk_shape dimension, self.chunk_shape[0]. The number of files produced per each datum is then the ratio of these two numbers.

If join_method == "stack", we assume that each datum becomes its own index in the new leftmost dimension of the resulting dataset, and hence corresponds to a single file.

class bluesky_tiled_plugins.writing.consolidators.TIFFConsolidator(stream_resource: event_model.documents.StreamResource, descriptor: event_model.documents.EventDescriptor)#

Bases: bluesky_tiled_plugins.writing.consolidators.MultipartRelatedConsolidator

Consolidator of StreamDatums

A Consolidator consumes documents from RE; it is similar to usual Bluesky Handlers but is designed to work with streaming data (received via StreamResource and StreamDatum documents). It composes details (DataSource and its Assets) that will go into the Tiled database. Each Consolidator is instantiated per a Stream Resource.

Tiled Adapters will later use this to read the data, with good random access and bulk access support.

We put this code into consolidators so that additional, possibly very unusual, formats can be supported by users without getting a PR merged into Bluesky or Tiled.

The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog parameter adapters_by_mimetype can be used together to support: - Ingesting a new mimetype from Bluesky documents and generating DataSource and Asset with appropriate parameters (the consolidator’s job); - Interpreting those DataSource and Asset parameters to do I/O (the adapter’s job).

To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the consume_stream_datum and get_data_source methods, and ensure that the keys of returned adapter_parameters dictionary matches the expected adapter signature. Declare a set of supported mimetypes to allow validation and automated discovery of the subclassed Consolidator.

Attributes

supported_mimetypes : set[str] a set of mimetypes that can be handled by a derived Consolidator class; raises ValueError if attempted to pass Resource documents related to unsupported mimetypes.

join_method : Literal[“stack”, “concat”] a method to join the data; if “stack”, the resulting consolidated dataset is produced by joining all datums along a new dimension added on the left, e.g. a stack of tiff images, otherwise – datums will be appended to the end of the existing leftmost dimension, e.g. rows of a table (similarly to concatenation in numpy).

join_chunks : bool if True, the chunking of the resulting dataset will be determined after consolidation, otherwise each part is considered to be chunked separately.

supported_mimetypes#

None

class bluesky_tiled_plugins.writing.consolidators.JPEGConsolidator(stream_resource: event_model.documents.StreamResource, descriptor: event_model.documents.EventDescriptor)#

Bases: bluesky_tiled_plugins.writing.consolidators.MultipartRelatedConsolidator

Consolidator of StreamDatums

A Consolidator consumes documents from RE; it is similar to usual Bluesky Handlers but is designed to work with streaming data (received via StreamResource and StreamDatum documents). It composes details (DataSource and its Assets) that will go into the Tiled database. Each Consolidator is instantiated per a Stream Resource.

Tiled Adapters will later use this to read the data, with good random access and bulk access support.

We put this code into consolidators so that additional, possibly very unusual, formats can be supported by users without getting a PR merged into Bluesky or Tiled.

The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog parameter adapters_by_mimetype can be used together to support: - Ingesting a new mimetype from Bluesky documents and generating DataSource and Asset with appropriate parameters (the consolidator’s job); - Interpreting those DataSource and Asset parameters to do I/O (the adapter’s job).

To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the consume_stream_datum and get_data_source methods, and ensure that the keys of returned adapter_parameters dictionary matches the expected adapter signature. Declare a set of supported mimetypes to allow validation and automated discovery of the subclassed Consolidator.

Attributes

supported_mimetypes : set[str] a set of mimetypes that can be handled by a derived Consolidator class; raises ValueError if attempted to pass Resource documents related to unsupported mimetypes.

join_method : Literal[“stack”, “concat”] a method to join the data; if “stack”, the resulting consolidated dataset is produced by joining all datums along a new dimension added on the left, e.g. a stack of tiff images, otherwise – datums will be appended to the end of the existing leftmost dimension, e.g. rows of a table (similarly to concatenation in numpy).

join_chunks : bool if True, the chunking of the resulting dataset will be determined after consolidation, otherwise each part is considered to be chunked separately.

supported_mimetypes#

None

class bluesky_tiled_plugins.writing.consolidators.NPYConsolidator(stream_resource: event_model.documents.StreamResource, descriptor: event_model.documents.EventDescriptor)#

Bases: bluesky_tiled_plugins.writing.consolidators.MultipartRelatedConsolidator

Consolidator of StreamDatums

A Consolidator consumes documents from RE; it is similar to usual Bluesky Handlers but is designed to work with streaming data (received via StreamResource and StreamDatum documents). It composes details (DataSource and its Assets) that will go into the Tiled database. Each Consolidator is instantiated per a Stream Resource.

Tiled Adapters will later use this to read the data, with good random access and bulk access support.

We put this code into consolidators so that additional, possibly very unusual, formats can be supported by users without getting a PR merged into Bluesky or Tiled.

The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog parameter adapters_by_mimetype can be used together to support: - Ingesting a new mimetype from Bluesky documents and generating DataSource and Asset with appropriate parameters (the consolidator’s job); - Interpreting those DataSource and Asset parameters to do I/O (the adapter’s job).

To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the consume_stream_datum and get_data_source methods, and ensure that the keys of returned adapter_parameters dictionary matches the expected adapter signature. Declare a set of supported mimetypes to allow validation and automated discovery of the subclassed Consolidator.

Attributes

supported_mimetypes : set[str] a set of mimetypes that can be handled by a derived Consolidator class; raises ValueError if attempted to pass Resource documents related to unsupported mimetypes.

join_method : Literal[“stack”, “concat”] a method to join the data; if “stack”, the resulting consolidated dataset is produced by joining all datums along a new dimension added on the left, e.g. a stack of tiff images, otherwise – datums will be appended to the end of the existing leftmost dimension, e.g. rows of a table (similarly to concatenation in numpy).

join_chunks : bool if True, the chunking of the resulting dataset will be determined after consolidation, otherwise each part is considered to be chunked separately.

supported_mimetypes#

None

join_method: Literal[stack, concat]#

‘stack’

bluesky_tiled_plugins.writing.consolidators.CONSOLIDATOR_REGISTRY#

‘defaultdict(…)’

bluesky_tiled_plugins.writing.consolidators.consolidator_factory(stream_resource_doc, descriptor_doc)#