Source code for ophyd_async.epics.adcore._hdf_writer

import asyncio
from pathlib import Path
from typing import AsyncGenerator, AsyncIterator, Dict, List, Optional
from xml.etree import ElementTree as ET

from bluesky.protocols import DataKey, Hints, StreamAsset

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    AsyncStatus,
    DatasetDescriber,
    DetectorWriter,
    HDFDataset,
    HDFFile,
    NameProvider,
    PathProvider,
    observe_value,
    set_and_wait_for_value,
    wait_for_value,
)

from ._core_io import NDArrayBaseIO, NDFileHDFIO
from ._utils import (
    FileWriteMode,
    convert_param_dtype_to_np,
    convert_pv_dtype_to_np,
)


[docs] class ADHDFWriter(DetectorWriter): def __init__( self, hdf: NDFileHDFIO, path_provider: PathProvider, name_provider: NameProvider, dataset_describer: DatasetDescriber, *plugins: NDArrayBaseIO, ) -> None: self.hdf = hdf self._path_provider = path_provider self._name_provider = name_provider self._dataset_describer = dataset_describer self._plugins = plugins self._capture_status: Optional[AsyncStatus] = None self._datasets: List[HDFDataset] = [] self._file: Optional[HDFFile] = None self._multiplier = 1
[docs] async def open(self, multiplier: int = 1) -> Dict[str, DataKey]: self._file = None info = self._path_provider(device_name=self.hdf.name) # Set the directory creation depth first, since dir creation callback happens # when directory path PV is processed. await self.hdf.create_directory.set(info.create_dir_depth) await asyncio.gather( self.hdf.num_extra_dims.set(0), self.hdf.lazy_open.set(True), self.hdf.swmr_mode.set(True), # See https://github.com/bluesky/ophyd-async/issues/122 self.hdf.file_path.set(str(info.directory_path)), self.hdf.file_name.set(info.filename), self.hdf.file_template.set("%s/%s.h5"), self.hdf.file_write_mode.set(FileWriteMode.stream), # Never use custom xml layout file but use the one defined # in the source code file NDFileHDF5LayoutXML.cpp self.hdf.xml_file_name.set(""), ) assert ( await self.hdf.file_path_exists.get_value() ), f"File path {info.directory_path} for hdf plugin does not exist" # Overwrite num_capture to go forever await self.hdf.num_capture.set(0) # Wait for it to start, stashing the status that tells us when it finishes self._capture_status = await set_and_wait_for_value(self.hdf.capture, True) name = self._name_provider() detector_shape = await self._dataset_describer.shape() np_dtype = await self._dataset_describer.np_datatype() self._multiplier = multiplier outer_shape = (multiplier,) if multiplier > 1 else () # Add the main data self._datasets = [ HDFDataset( data_key=name, dataset="/entry/data/data", shape=detector_shape, dtype_numpy=np_dtype, multiplier=multiplier, ) ] # And all the scalar datasets for plugin in self._plugins: maybe_xml = await plugin.nd_attributes_file.get_value() # This is the check that ADCore does to see if it is an XML string # rather than a filename to parse if "<Attributes>" in maybe_xml: root = ET.fromstring(maybe_xml) for child in root: datakey = child.attrib["name"] if child.attrib.get("type", "EPICS_PV") == "EPICS_PV": np_datatype = convert_pv_dtype_to_np( child.attrib.get("dbrtype", "DBR_NATIVE") ) else: np_datatype = convert_param_dtype_to_np( child.attrib.get("datatype", "INT") ) self._datasets.append( HDFDataset( datakey, f"/entry/instrument/NDAttributes/{datakey}", (), np_datatype, multiplier, ) ) describe = { ds.data_key: DataKey( source=self.hdf.full_file_name.source, shape=outer_shape + tuple(ds.shape), dtype="array" if ds.shape else "number", dtype_numpy=ds.dtype_numpy, external="STREAM:", ) for ds in self._datasets } return describe
[docs] async def observe_indices_written( self, timeout=DEFAULT_TIMEOUT ) -> AsyncGenerator[int, None]: """Wait until a specific index is ready to be collected""" async for num_captured in observe_value(self.hdf.num_captured, timeout): yield num_captured // self._multiplier
[docs] async def get_indices_written(self) -> int: num_captured = await self.hdf.num_captured.get_value() return num_captured // self._multiplier
[docs] async def collect_stream_docs( self, indices_written: int ) -> AsyncIterator[StreamAsset]: # TODO: fail if we get dropped frames await self.hdf.flush_now.set(True) if indices_written: if not self._file: path = Path(await self.hdf.full_file_name.get_value()) self._file = HDFFile( # See https://github.com/bluesky/ophyd-async/issues/122 path, self._datasets, ) # stream resource says "here is a dataset", # stream datum says "here are N frames in that stream resource", # you get one stream resource and many stream datums per scan for doc in self._file.stream_resources(): yield "stream_resource", doc for doc in self._file.stream_data(indices_written): yield "stream_datum", doc
[docs] async def close(self): # Already done a caput callback in _capture_status, so can't do one here await self.hdf.capture.set(False, wait=False) await wait_for_value(self.hdf.capture, False, DEFAULT_TIMEOUT) if self._capture_status: # We kicked off an open, so wait for it to return await self._capture_status
@property def hints(self) -> Hints: return {"fields": [self._name_provider()]}