Source code for ophyd_async.core._hdf_dataset

from collections.abc import Iterator
from pathlib import Path
from urllib.parse import urlunparse

from event_model import (  # type: ignore
    ComposeStreamResource,
    ComposeStreamResourceBundle,
    StreamDatum,
    StreamRange,
    StreamResource,
)
from pydantic import BaseModel, Field


[docs] class HDFDatasetDescription(BaseModel): """A description of the type and shape of a dataset in an HDF file.""" data_key: str """The data_key that will appear in the event descriptor, e.g. det or det.data""" dataset: str """The dataset name within the HDF file, e.g. /entry/data/data or /entry/instrument/NDAttributes/sum""" shape: tuple[int, ...] = Field(default_factory=tuple) """The shape of a single event's data in the HDF file, e.g. (1, 768, 1024) for arrays or () for scalars""" dtype_numpy: str = "" """The numpy dtype for this field, e.g. <i2 or <f8""" chunk_shape: tuple[int, ...] """The explicit chunk size written to disk"""
SLICE_NAME = "AD_HDF5_SWMR_SLICE"
[docs] class HDFDocumentComposer: """A helper class to make stream resource and datums for HDF datasets. :param full_file_name: Absolute path to the file that has been written :param datasets: Descriptions of each of the datasets that will appear in the file """ def __init__( self, full_file_name: Path, datasets: list[HDFDatasetDescription], hostname: str = "localhost", ) -> None: self._last_emitted = 0 self._hostname = hostname uri = urlunparse( ( "file", self._hostname, str(full_file_name.absolute()), "", "", None, ) ) bundler_composer = ComposeStreamResource() self._bundles: list[ComposeStreamResourceBundle] = [ bundler_composer( mimetype="application/x-hdf5", uri=uri, data_key=ds.data_key, parameters={ "dataset": ds.dataset, "chunk_shape": ds.chunk_shape, }, uid=None, validate=True, ) for ds in datasets ]
[docs] def stream_resources(self) -> Iterator[StreamResource]: for bundle in self._bundles: yield bundle.stream_resource_doc
[docs] def stream_data(self, indices_written: int) -> Iterator[StreamDatum]: # Indices are relative to resource if indices_written > self._last_emitted: indices: StreamRange = { "start": self._last_emitted, "stop": indices_written, } self._last_emitted = indices_written for bundle in self._bundles: yield bundle.compose_stream_datum(indices)