Source code for ophyd_async.epics.adcore._hdf_writer

import asyncio
from collections.abc import AsyncIterator
from pathlib import Path
from xml.etree import ElementTree as ET

from bluesky.protocols import StreamAsset
from event_model import DataKey
from pydantic import PositiveInt

from ophyd_async.core import (
    DatasetDescriber,
    HDFDatasetDescription,
    HDFDocumentComposer,
    PathProvider,
)

from ._core_io import NDFileHDFIO, NDPluginBaseIO
from ._core_writer import ADWriter
from ._utils import (
    convert_param_dtype_to_np,
    convert_pv_dtype_to_np,
)


[docs] class ADHDFWriter(ADWriter[NDFileHDFIO]): """Allow `NDFileHDFIO` to be used within `StandardDetector`.""" default_suffix: str = "HDF1:" def __init__( self, fileio: NDFileHDFIO, path_provider: PathProvider, dataset_describer: DatasetDescriber, plugins: dict[str, NDPluginBaseIO] | None = None, ) -> None: super().__init__( fileio, path_provider, dataset_describer, plugins=plugins, file_extension=".h5", mimetype="application/x-hdf5", ) self._datasets: list[HDFDatasetDescription] = [] self._composer: HDFDocumentComposer | None = None self._filename_template = "%s%s"
[docs] async def open( self, name: str, exposures_per_event: PositiveInt = 1 ) -> dict[str, DataKey]: self._composer = None # Setting HDF writer specific signals # Make sure we are using chunk auto-sizing await asyncio.gather(self.fileio.chunk_size_auto.set(True)) await asyncio.gather( self.fileio.num_extra_dims.set(0), self.fileio.lazy_open.set(True), self.fileio.swmr_mode.set(True), self.fileio.xml_file_name.set(""), ) # Set common AD file plugin params, begin capturing await self.begin_capture(name) detector_shape = await self._dataset_describer.shape() np_dtype = await self._dataset_describer.np_datatype() # Used by the base class self._exposures_per_event = exposures_per_event # Determine number of frames that will be saved per HDF chunk frames_per_chunk = await self.fileio.num_frames_chunks.get_value() # Add the main data self._datasets = [ HDFDatasetDescription( data_key=name, dataset="/entry/data/data", shape=(exposures_per_event, *detector_shape), dtype_numpy=np_dtype, chunk_shape=(frames_per_chunk, *detector_shape), ) ] # And all the scalar datasets for plugin in self._plugins.values(): maybe_xml = await plugin.nd_attributes_file.get_value() # This is the check that ADCore does to see if it is an XML string # rather than a filename to parse if "<Attributes>" in maybe_xml: root = ET.fromstring(maybe_xml) for child in root: data_key = child.attrib["name"] if child.attrib.get("type", "EPICS_PV") == "EPICS_PV": np_datatype = convert_pv_dtype_to_np( child.attrib.get("dbrtype", "DBR_NATIVE") ) else: np_datatype = convert_param_dtype_to_np( child.attrib.get("datatype", "INT") ) self._datasets.append( HDFDatasetDescription( data_key=data_key, dataset=f"/entry/instrument/NDAttributes/{data_key}", shape=(exposures_per_event,) if exposures_per_event > 1 else (), dtype_numpy=np_datatype, # NDAttributes appear to always be configured with # this chunk size chunk_shape=(16384,), ) ) describe = { ds.data_key: DataKey( source=self.fileio.full_file_name.source, shape=list(ds.shape), dtype="array" if exposures_per_event > 1 or len(ds.shape) > 1 else "number", dtype_numpy=ds.dtype_numpy, external="STREAM:", ) for ds in self._datasets } return describe
[docs] async def collect_stream_docs( self, name: str, indices_written: int ) -> AsyncIterator[StreamAsset]: # TODO: fail if we get dropped frames await self.fileio.flush_now.set(True) if indices_written: if not self._composer: path = Path(await self.fileio.full_file_name.get_value()) self._composer = HDFDocumentComposer( # See https://github.com/bluesky/ophyd-async/issues/122 path, self._datasets, ) # stream resource says "here is a dataset", # stream datum says "here are N frames in that stream resource", # you get one stream resource and many stream datums per scan for doc in self._composer.stream_resources(): yield "stream_resource", doc for doc in self._composer.stream_data(indices_written): yield "stream_datum", doc