Source code for ophyd_async.core._hdf_dataset

from collections.abc import Iterator, Sequence
from dataclasses import dataclass, field
from pathlib import Path
from urllib.parse import urlunparse

from event_model import (
    ComposeStreamResource,
    ComposeStreamResourceBundle,
    StreamDatum,
    StreamRange,
    StreamResource,
)


[docs] @dataclass class HDFDataset: data_key: str dataset: str shape: Sequence[int] = field(default_factory=tuple) dtype_numpy: str = "" multiplier: int = 1 swmr: bool = False # Represents explicit chunk size written to disk. chunk_shape: tuple[int, ...] = ()
SLICE_NAME = "AD_HDF5_SWMR_SLICE"
[docs] class HDFFile: """ :param full_file_name: Absolute path to the file to be written :param datasets: Datasets to write into the file """ def __init__( self, full_file_name: Path, datasets: list[HDFDataset], hostname: str = "localhost", ) -> None: self._last_emitted = 0 self._hostname = hostname if len(datasets) == 0: self._bundles = [] return None bundler_composer = ComposeStreamResource() uri = urlunparse( ( "file", self._hostname, str(full_file_name.absolute()), "", "", None, ) ) self._bundles: list[ComposeStreamResourceBundle] = [ bundler_composer( mimetype="application/x-hdf5", uri=uri, data_key=ds.data_key, parameters={ "dataset": ds.dataset, "swmr": ds.swmr, "multiplier": ds.multiplier, "chunk_shape": ds.chunk_shape, }, uid=None, validate=True, ) for ds in datasets ] def stream_resources(self) -> Iterator[StreamResource]: for bundle in self._bundles: yield bundle.stream_resource_doc def stream_data(self, indices_written: int) -> Iterator[StreamDatum]: # Indices are relative to resource if indices_written > self._last_emitted: indices: StreamRange = { "start": self._last_emitted, "stop": indices_written, } self._last_emitted = indices_written for bundle in self._bundles: yield bundle.compose_stream_datum(indices)