bluesky_tiled_plugins.writing.consolidators#
Module Contents#
Classes#
Consolidator of StreamDatums |
|
Consolidator of StreamDatums |
|
Consolidator of StreamDatums |
|
Consolidator of StreamDatums |
|
Consolidator of StreamDatums |
|
Consolidator of StreamDatums |
|
Consolidator of StreamDatums |
Functions#
Data#
API#
- class bluesky_tiled_plugins.writing.consolidators.Patch#
-
- classmethod combine_patches(patches: list[bluesky_tiled_plugins.writing.consolidators.Patch]) bluesky_tiled_plugins.writing.consolidators.Patch#
Combine multiple patches into a single patch
The combined patch covers the union (smallest bounding box) of all provided patches.
Parameters
patches : list[Patch] A list of Patch objects to combine.
Returns
Patch A new Patch object that covers the union of all input patches.
- class bluesky_tiled_plugins.writing.consolidators.ConsolidatorBase(stream_resource: event_model.documents.StreamResource, descriptor: event_model.documents.EventDescriptor)#
Consolidator of StreamDatums
A Consolidator consumes documents from RE; it is similar to usual Bluesky Handlers but is designed to work with streaming data (received via StreamResource and StreamDatum documents). It composes details (DataSource and its Assets) that will go into the Tiled database. Each Consolidator is instantiated per a Stream Resource.
Tiled Adapters will later use this to read the data, with good random access and bulk access support.
We put this code into consolidators so that additional, possibly very unusual, formats can be supported by users without getting a PR merged into Bluesky or Tiled.
The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog parameter adapters_by_mimetype can be used together to support: - Ingesting a new mimetype from Bluesky documents and generating DataSource and Asset with appropriate parameters (the consolidator’s job); - Interpreting those DataSource and Asset parameters to do I/O (the adapter’s job).
To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the
consume_stream_datumandget_data_sourcemethods, and ensure that the keys of returnedadapter_parametersdictionary matches the expected adapter signature. Declare a set of supported mimetypes to allow validation and automated discovery of the subclassed Consolidator.Attributes
supported_mimetypes : set[str] a set of mimetypes that can be handled by a derived Consolidator class; raises ValueError if attempted to pass Resource documents related to unsupported mimetypes.
join_method : Literal[“stack”, “concat”] a method to join the data; if “stack”, the resulting consolidated dataset is produced by joining all datums along a new dimension added on the left, e.g. a stack of tiff images, otherwise – datums will be appended to the end of the existing leftmost dimension, e.g. rows of a table (similarly to concatenation in numpy).
join_chunks : bool if True, the chunking of the resulting dataset will be determined after consolidation, otherwise each part is considered to be chunked separately.
- classmethod get_supported_mimetype(sres)#
- property shape: tuple[int, ...]#
Native shape of the data stored in assets
This includes the leading (0th) dimension corresponding to the number of rows (if the join_method is stack) including skipped rows, if any. The number of relevant usable data rows may be lower, which is determined by the
seq_numsfield of StreamDatum documents.
- property chunks: tuple[tuple[int, ...], ...]#
Explicit (dask-style) specification of chunk sizes
The produced chunk specification is a tuple of tuples of int that specify the sizes of each chunk in each dimension; it is based on the StreamResource parameter
chunk_shape.If
chunk_shapeis an empty tuple – assume the dataset is stored as a single chunk for all existing and new elements. Usually, however,chunk_shapeis a tuple of int, in which case, we assume fixed-sized chunks with at mostchunk_shape[0]elements (i.e._num_rows); last chunk can be smaller. If chunk_shape is a tuple with less thanself.shapeelements – assume it defines the chunk sizes along the leading dimensions.If the joining method is “concat”, and
join_chunks = False, the chunking along the leftmost dimensions is assumed to be preserved in each appended data point, i.e. consecutive chunks do not join, e.g. for a 1d array with chunks (3,3,1), the resulting chunking after 3 repeats is (3,3,1,3,3,1,3,3,1). Whenjoin_chunks = True(default), the chunk size along the leftmost dimension is determined by the chunk_shape parameter; this is the case whenjoin_method == "stack"well. Chunking along the trailing dimensions is always preserved as in the original (single) array.
- property has_skips: bool#
Indicates whether any rows should be skipped when mapping their indices to frame numbers
This flag is intended to provide a shortcut for more efficient data access when there are no skips, and the mapping between indices and seq_nums is straightforward. In other case, the _seqnums_to_indices_map needs to be taken into account.
- adapter_parameters() dict#
A dictionary of parameters passed to an Adapter
These parameters are intended to provide any additional information required to read a data source of a specific mimetype, e.g. “path” the path into an HDF5 file or “template” the filename pattern of a TIFF sequence.
This method is to be subclassed as necessary.
- structure() tiled.structures.array.ArrayStructure#
- consume_stream_datum(doc: event_model.documents.StreamDatum)#
Process a new StreamDatum and update the internal data structure
This will be called for every new StreamDatum received to account for the new added rows. This method may need to be subclassed and expanded depending on a specific mimetype. Actions:
Parse the fields in a new StreamDatum
Increment the number of rows (implemented by the Base class)
Keep track of the correspondence between indices and seq_nums (implemented by the Base class)
Update the list of assets, including their uris, if necessary
Update shape and chunks
- get_data_source() tiled.structures.data_source.DataSource#
Return a DataSource object reflecting the current state of the streamed dataset.
The returned DataSource is conceptually similar (and can be an instance of) tiled.structures.DataSource. In general, it describes associated Assets (filepaths, mimetype) along with their internal data structure (array shape, chunks, additional parameters) and should contain all information necessary to read the file.
- init_adapter(adapter_class=None)#
Initialize a Tiled Adapter for reading the consolidated data
Parameters
adapter_class : Optional[Type[Adapter]] An optional Adapter class to use for initialization; if not provided, the default adapter for the Consolidator’s mimetype will be used.
- abstractmethod update_from_stream_resource(stream_resource: event_model.documents.StreamResource)#
Consume an additional related StreamResource document for the same data_key
- validate(fix_errors=False) list[str]#
Validate the Consolidator’s state against the expected structure
- get_adapter(adapters_by_mimetype=None)#
- class bluesky_tiled_plugins.writing.consolidators.CSVConsolidator(stream_resource: event_model.documents.StreamResource, descriptor: event_model.documents.EventDescriptor)#
Bases:
bluesky_tiled_plugins.writing.consolidators.ConsolidatorBaseConsolidator of StreamDatums
A Consolidator consumes documents from RE; it is similar to usual Bluesky Handlers but is designed to work with streaming data (received via StreamResource and StreamDatum documents). It composes details (DataSource and its Assets) that will go into the Tiled database. Each Consolidator is instantiated per a Stream Resource.
Tiled Adapters will later use this to read the data, with good random access and bulk access support.
We put this code into consolidators so that additional, possibly very unusual, formats can be supported by users without getting a PR merged into Bluesky or Tiled.
The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog parameter adapters_by_mimetype can be used together to support: - Ingesting a new mimetype from Bluesky documents and generating DataSource and Asset with appropriate parameters (the consolidator’s job); - Interpreting those DataSource and Asset parameters to do I/O (the adapter’s job).
To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the
consume_stream_datumandget_data_sourcemethods, and ensure that the keys of returnedadapter_parametersdictionary matches the expected adapter signature. Declare a set of supported mimetypes to allow validation and automated discovery of the subclassed Consolidator.Attributes
supported_mimetypes : set[str] a set of mimetypes that can be handled by a derived Consolidator class; raises ValueError if attempted to pass Resource documents related to unsupported mimetypes.
join_method : Literal[“stack”, “concat”] a method to join the data; if “stack”, the resulting consolidated dataset is produced by joining all datums along a new dimension added on the left, e.g. a stack of tiff images, otherwise – datums will be appended to the end of the existing leftmost dimension, e.g. rows of a table (similarly to concatenation in numpy).
join_chunks : bool if True, the chunking of the resulting dataset will be determined after consolidation, otherwise each part is considered to be chunked separately.
- adapter_parameters() dict#
A dictionary of parameters passed to an Adapter
These parameters are intended to provide any additional information required to read a data source of a specific mimetype, e.g. “path” the path into an HDF5 file or “template” the filename pattern of a TIFF sequence.
This method is to be subclassed as necessary.
- class bluesky_tiled_plugins.writing.consolidators.HDF5Consolidator(stream_resource: event_model.documents.StreamResource, descriptor: event_model.documents.EventDescriptor)#
Bases:
bluesky_tiled_plugins.writing.consolidators.ConsolidatorBaseConsolidator of StreamDatums
A Consolidator consumes documents from RE; it is similar to usual Bluesky Handlers but is designed to work with streaming data (received via StreamResource and StreamDatum documents). It composes details (DataSource and its Assets) that will go into the Tiled database. Each Consolidator is instantiated per a Stream Resource.
Tiled Adapters will later use this to read the data, with good random access and bulk access support.
We put this code into consolidators so that additional, possibly very unusual, formats can be supported by users without getting a PR merged into Bluesky or Tiled.
The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog parameter adapters_by_mimetype can be used together to support: - Ingesting a new mimetype from Bluesky documents and generating DataSource and Asset with appropriate parameters (the consolidator’s job); - Interpreting those DataSource and Asset parameters to do I/O (the adapter’s job).
To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the
consume_stream_datumandget_data_sourcemethods, and ensure that the keys of returnedadapter_parametersdictionary matches the expected adapter signature. Declare a set of supported mimetypes to allow validation and automated discovery of the subclassed Consolidator.Attributes
supported_mimetypes : set[str] a set of mimetypes that can be handled by a derived Consolidator class; raises ValueError if attempted to pass Resource documents related to unsupported mimetypes.
join_method : Literal[“stack”, “concat”] a method to join the data; if “stack”, the resulting consolidated dataset is produced by joining all datums along a new dimension added on the left, e.g. a stack of tiff images, otherwise – datums will be appended to the end of the existing leftmost dimension, e.g. rows of a table (similarly to concatenation in numpy).
join_chunks : bool if True, the chunking of the resulting dataset will be determined after consolidation, otherwise each part is considered to be chunked separately.
- supported_mimetypes#
None
- adapter_parameters() dict#
Parameters to be passed to the HDF5 adapter, a dictionary with the keys:
dataset: list[str] - a path to the dataset within the hdf5 file represented as list split at
/swmr: bool – True to enable the single writer / multiple readers regime
- update_from_stream_resource(stream_resource: event_model.documents.StreamResource)#
Add an Asset for a new StreamResource document
- class bluesky_tiled_plugins.writing.consolidators.MultipartRelatedConsolidator(permitted_extensions: set[str], stream_resource: event_model.documents.StreamResource, descriptor: event_model.documents.EventDescriptor)#
Bases:
bluesky_tiled_plugins.writing.consolidators.ConsolidatorBaseConsolidator of StreamDatums
A Consolidator consumes documents from RE; it is similar to usual Bluesky Handlers but is designed to work with streaming data (received via StreamResource and StreamDatum documents). It composes details (DataSource and its Assets) that will go into the Tiled database. Each Consolidator is instantiated per a Stream Resource.
Tiled Adapters will later use this to read the data, with good random access and bulk access support.
We put this code into consolidators so that additional, possibly very unusual, formats can be supported by users without getting a PR merged into Bluesky or Tiled.
The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog parameter adapters_by_mimetype can be used together to support: - Ingesting a new mimetype from Bluesky documents and generating DataSource and Asset with appropriate parameters (the consolidator’s job); - Interpreting those DataSource and Asset parameters to do I/O (the adapter’s job).
To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the
consume_stream_datumandget_data_sourcemethods, and ensure that the keys of returnedadapter_parametersdictionary matches the expected adapter signature. Declare a set of supported mimetypes to allow validation and automated discovery of the subclassed Consolidator.Attributes
supported_mimetypes : set[str] a set of mimetypes that can be handled by a derived Consolidator class; raises ValueError if attempted to pass Resource documents related to unsupported mimetypes.
join_method : Literal[“stack”, “concat”] a method to join the data; if “stack”, the resulting consolidated dataset is produced by joining all datums along a new dimension added on the left, e.g. a stack of tiff images, otherwise – datums will be appended to the end of the existing leftmost dimension, e.g. rows of a table (similarly to concatenation in numpy).
join_chunks : bool if True, the chunking of the resulting dataset will be determined after consolidation, otherwise each part is considered to be chunked separately.
- get_datum_uri(indx: int)#
Return a full uri for a datum (an individual image file) based on its index in the sequence.
This relies on the
templateparameter passed in the StreamResource, which is a string in the “new” Python formatting style that can be evaluated to a file name using the.format(indx)method given an integer index, e.g. “{:05d}.ext”.If template is not set, we assume that the uri is provided directly in the StreamResource document (i.e. a single file case), and return it as is.
- consume_stream_datum(doc: event_model.documents.StreamDatum)#
Determine the number and names of files from indices of datums and the number of files per datum.
In the most general case, each file may be a multipage tiff or a stack of images (frames) and a single datum may be composed of multiple such files, leading to a total of self.datum_shape[0] frames. Since each file necessarily represents a single chunk (tiffs can not be sub-chunked), the number of frames per file is equal to the leftmost chunk_shape dimension, self.chunk_shape[0]. The number of files produced per each datum is then the ratio of these two numbers.
If
join_method == "stack", we assume that each datum becomes its own index in the new leftmost dimension of the resulting dataset, and hence corresponds to a single file.
- class bluesky_tiled_plugins.writing.consolidators.TIFFConsolidator(stream_resource: event_model.documents.StreamResource, descriptor: event_model.documents.EventDescriptor)#
Bases:
bluesky_tiled_plugins.writing.consolidators.MultipartRelatedConsolidatorConsolidator of StreamDatums
A Consolidator consumes documents from RE; it is similar to usual Bluesky Handlers but is designed to work with streaming data (received via StreamResource and StreamDatum documents). It composes details (DataSource and its Assets) that will go into the Tiled database. Each Consolidator is instantiated per a Stream Resource.
Tiled Adapters will later use this to read the data, with good random access and bulk access support.
We put this code into consolidators so that additional, possibly very unusual, formats can be supported by users without getting a PR merged into Bluesky or Tiled.
The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog parameter adapters_by_mimetype can be used together to support: - Ingesting a new mimetype from Bluesky documents and generating DataSource and Asset with appropriate parameters (the consolidator’s job); - Interpreting those DataSource and Asset parameters to do I/O (the adapter’s job).
To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the
consume_stream_datumandget_data_sourcemethods, and ensure that the keys of returnedadapter_parametersdictionary matches the expected adapter signature. Declare a set of supported mimetypes to allow validation and automated discovery of the subclassed Consolidator.Attributes
supported_mimetypes : set[str] a set of mimetypes that can be handled by a derived Consolidator class; raises ValueError if attempted to pass Resource documents related to unsupported mimetypes.
join_method : Literal[“stack”, “concat”] a method to join the data; if “stack”, the resulting consolidated dataset is produced by joining all datums along a new dimension added on the left, e.g. a stack of tiff images, otherwise – datums will be appended to the end of the existing leftmost dimension, e.g. rows of a table (similarly to concatenation in numpy).
join_chunks : bool if True, the chunking of the resulting dataset will be determined after consolidation, otherwise each part is considered to be chunked separately.
- supported_mimetypes#
None
- class bluesky_tiled_plugins.writing.consolidators.JPEGConsolidator(stream_resource: event_model.documents.StreamResource, descriptor: event_model.documents.EventDescriptor)#
Bases:
bluesky_tiled_plugins.writing.consolidators.MultipartRelatedConsolidatorConsolidator of StreamDatums
A Consolidator consumes documents from RE; it is similar to usual Bluesky Handlers but is designed to work with streaming data (received via StreamResource and StreamDatum documents). It composes details (DataSource and its Assets) that will go into the Tiled database. Each Consolidator is instantiated per a Stream Resource.
Tiled Adapters will later use this to read the data, with good random access and bulk access support.
We put this code into consolidators so that additional, possibly very unusual, formats can be supported by users without getting a PR merged into Bluesky or Tiled.
The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog parameter adapters_by_mimetype can be used together to support: - Ingesting a new mimetype from Bluesky documents and generating DataSource and Asset with appropriate parameters (the consolidator’s job); - Interpreting those DataSource and Asset parameters to do I/O (the adapter’s job).
To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the
consume_stream_datumandget_data_sourcemethods, and ensure that the keys of returnedadapter_parametersdictionary matches the expected adapter signature. Declare a set of supported mimetypes to allow validation and automated discovery of the subclassed Consolidator.Attributes
supported_mimetypes : set[str] a set of mimetypes that can be handled by a derived Consolidator class; raises ValueError if attempted to pass Resource documents related to unsupported mimetypes.
join_method : Literal[“stack”, “concat”] a method to join the data; if “stack”, the resulting consolidated dataset is produced by joining all datums along a new dimension added on the left, e.g. a stack of tiff images, otherwise – datums will be appended to the end of the existing leftmost dimension, e.g. rows of a table (similarly to concatenation in numpy).
join_chunks : bool if True, the chunking of the resulting dataset will be determined after consolidation, otherwise each part is considered to be chunked separately.
- supported_mimetypes#
None
- class bluesky_tiled_plugins.writing.consolidators.NPYConsolidator(stream_resource: event_model.documents.StreamResource, descriptor: event_model.documents.EventDescriptor)#
Bases:
bluesky_tiled_plugins.writing.consolidators.MultipartRelatedConsolidatorConsolidator of StreamDatums
A Consolidator consumes documents from RE; it is similar to usual Bluesky Handlers but is designed to work with streaming data (received via StreamResource and StreamDatum documents). It composes details (DataSource and its Assets) that will go into the Tiled database. Each Consolidator is instantiated per a Stream Resource.
Tiled Adapters will later use this to read the data, with good random access and bulk access support.
We put this code into consolidators so that additional, possibly very unusual, formats can be supported by users without getting a PR merged into Bluesky or Tiled.
The CONSOLIDATOR_REGISTRY (see example below) and the Tiled catalog parameter adapters_by_mimetype can be used together to support: - Ingesting a new mimetype from Bluesky documents and generating DataSource and Asset with appropriate parameters (the consolidator’s job); - Interpreting those DataSource and Asset parameters to do I/O (the adapter’s job).
To implement new Consolidators for other mimetypes, subclass ConsolidatorBase, possibly expand the
consume_stream_datumandget_data_sourcemethods, and ensure that the keys of returnedadapter_parametersdictionary matches the expected adapter signature. Declare a set of supported mimetypes to allow validation and automated discovery of the subclassed Consolidator.Attributes
supported_mimetypes : set[str] a set of mimetypes that can be handled by a derived Consolidator class; raises ValueError if attempted to pass Resource documents related to unsupported mimetypes.
join_method : Literal[“stack”, “concat”] a method to join the data; if “stack”, the resulting consolidated dataset is produced by joining all datums along a new dimension added on the left, e.g. a stack of tiff images, otherwise – datums will be appended to the end of the existing leftmost dimension, e.g. rows of a table (similarly to concatenation in numpy).
join_chunks : bool if True, the chunking of the resulting dataset will be determined after consolidation, otherwise each part is considered to be chunked separately.
- supported_mimetypes#
None
- bluesky_tiled_plugins.writing.consolidators.CONSOLIDATOR_REGISTRY#
‘defaultdict(…)’
- bluesky_tiled_plugins.writing.consolidators.consolidator_factory(stream_resource_doc, descriptor_doc)#