Source code for ophyd_async.epics.adcore._data_logic

import asyncio
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from enum import Enum
from pathlib import PureWindowsPath
from typing import Any
from xml.etree import ElementTree as ET

import numpy as np

from ophyd_async.core import (
    DetectorDataLogic,
    EnableDisable,
    PathInfo,
    PathProvider,
    SignalDataProvider,
    SignalR,
    StreamableDataProvider,
    StreamResourceDataProvider,
    StreamResourceInfo,
    set_and_wait_for_value,
)
from ophyd_async.epics.core import stop_busy_record

from ._io import (
    ADBaseColorMode,
    ADBaseDataType,
    ADBaseIO,
    ADFileWriteMode,
    NDArrayBaseIO,
    NDFileHDF5IO,
    NDPluginBaseIO,
    NDPluginFileIO,
)
from ._ndattribute import NDAttributeDataType, NDAttributePvDbrType


[docs] @dataclass class PluginSignalDataLogic(DetectorDataLogic): driver: ADBaseIO signal: SignalR hinted: bool = True
[docs] async def prepare_single(self, detector_name: str) -> SignalDataProvider: # Need to wait for all the plugins to have finished before we can read # the plugin signal await self.driver.wait_for_plugins.set(True) return SignalDataProvider(self.signal)
[docs] def get_hinted_fields(self, detector_name: str) -> Sequence[str]: return [self.signal.name] if self.hinted else []
[docs] @dataclass class NDArrayDescription: shape_signals: Sequence[SignalR[int]] data_type_signal: SignalR[ADBaseDataType] color_mode_signal: SignalR[ADBaseColorMode]
async def get_ndarray_resource_info( description: NDArrayDescription, data_key: str, parameters: dict[str, Any], frames_per_chunk: int = 1, ) -> StreamResourceInfo: # Grab the dimensions and datatype of the NDArray shape, datatype, color_mode = await asyncio.gather( asyncio.gather(*[sig.get_value() for sig in description.shape_signals]), description.data_type_signal.get_value(), description.color_mode_signal.get_value(), ) # Remove entries in shape that are zero shape = [x for x in shape if x > 0] if datatype is ADBaseDataType.UNDEFINED: raise ValueError( f"{description.data_type_signal.source} is blank, this is not supported" ) if color_mode == ADBaseColorMode.RGB1: shape = [3, *shape] elif color_mode != ADBaseColorMode.MONO: raise RuntimeError( f"Unsupported ColorMode {color_mode}! Only Mono and RGB1 are supported." ) return StreamResourceInfo( data_key=data_key, shape=tuple(shape), chunk_shape=(frames_per_chunk, *shape), dtype_numpy=np.dtype(datatype.value.lower()).str, parameters=parameters, ) async def get_ndattribute_dtype_source( elements: Sequence[NDArrayBaseIO], ) -> dict[str, tuple[str, str]]: nd_attribute_xmls = await asyncio.gather( *[x.nd_attributes_file.get_value() for x in elements] ) ndattribute_dtypes: dict[str, tuple[str, str]] = {} for maybe_xml in nd_attribute_xmls: # This is the check that ADCore does to see if it is an XML string # rather than a filename to parse if "<Attributes>" in maybe_xml: root = ET.fromstring(maybe_xml) for child in root: if child.attrib.get("type", "EPICS_PV") == "EPICS_PV": dbrtype = child.attrib.get("dbrtype", "DBR_NATIVE") if dbrtype == "DBR_NATIVE": raise RuntimeError( f"NDAttribute {child.attrib['name']} has dbrtype " "DBR_NATIVE, which is not supported" ) dtype_numpy = NDAttributePvDbrType[dbrtype].value source = "ca://" + child.attrib["source"] else: datatype = child.attrib.get("datatype", "INT") dtype_numpy = NDAttributeDataType[datatype].value source = "" ndattribute_dtypes[child.attrib["name"]] = (dtype_numpy, source) return ndattribute_dtypes async def prepare_file_paths( path_info: PathInfo, file_template: str, writer: NDPluginFileIO ): # Set the directory creation depth first, since dir creation callback happens # when directory path PV is processed. await writer.create_directory.set(path_info.create_dir_depth) # When setting the path for windows based AD IOCs, areaDetector adds a '/' # rather than a '\\', which will cause the readback to never register the # same value. # Ensure that trailing separator is added to the directory path to avoid this. if isinstance(path_info.directory_path, PureWindowsPath): directory_path = f"{path_info.directory_path}\\" else: directory_path = f"{path_info.directory_path}/" await asyncio.gather( writer.file_path.set(directory_path), writer.file_name.set(path_info.filename), writer.file_template.set(file_template), writer.auto_increment.set(True), writer.file_number.set(0), writer.file_write_mode.set(ADFileWriteMode.STREAM), ) # Check the path exists on the host if not await writer.file_path_exists.get_value(): msg = f"Path {directory_path} doesn't exist or not writable!" raise FileNotFoundError(msg) # Overwrite num_capture to go forever await writer.num_capture.set(0)
[docs] @dataclass class ADHDFDataLogic(DetectorDataLogic): """Data logic for AreaDetector HDF5 writer plugin. :param shape_signals: Signals that provide the shape of the NDArray. :param data_type_signal: Signal that provides the data type of the NDArray. :param path_provider: Callable that provides path information for file writing. :param driver: The AreaDetector driver instance. :param writer: The NDFileHDFIO plugin instance. :param plugins: Additional NDPluginBaseIO instances to extract NDAttributes from. :param datakey_suffix: Suffix to append to the data key for the main dataset """ description: NDArrayDescription path_provider: PathProvider driver: ADBaseIO writer: NDFileHDF5IO plugins: Sequence[NDPluginBaseIO] = () datakey_suffix: str = ""
[docs] async def prepare_unbounded(self, detector_name: str) -> StreamableDataProvider: # Work out where to write path_info = self.path_provider(self.writer.name) # Determine number of frames that will be saved per HDF chunk. # On a fresh IOC startup, this is set to zero until the first capture, # so if it is zero, set it to 1. frames_per_chunk = await self.writer.num_frames_chunks.get_value() if frames_per_chunk == 0: frames_per_chunk = 1 await self.writer.num_frames_chunks.set(frames_per_chunk) # Setup the HDF writer await asyncio.gather( self.writer.chunk_size_auto.set(True), self.writer.num_extra_dims.set(0), self.writer.lazy_open.set(True), self.writer.swmr_mode.set(True), self.writer.xml_file_name.set(""), self.writer.enable_callbacks.set(EnableDisable.ENABLE), prepare_file_paths( path_info=path_info, file_template="%s%s.h5", writer=self.writer ), ) # Start capturing await set_and_wait_for_value( self.writer.capture, True, wait_for_set_completion=False ) # Return a provider that reflects what we have made main_dataset = await get_ndarray_resource_info( description=self.description, data_key=detector_name + self.datakey_suffix, parameters={"dataset": "/entry/data/data"}, frames_per_chunk=frames_per_chunk, ) ndattribute_dtype_sources = await get_ndattribute_dtype_source( (self.driver, *self.plugins) ) ndattribute_datasets = [ StreamResourceInfo( data_key=name, shape=(), # NDAttributes appear to always be configured with # this chunk size chunk_shape=(16384,), dtype_numpy=dtype_numpy, source=source, parameters={"dataset": f"/entry/instrument/NDAttributes/{name}"}, ) for name, (dtype_numpy, source) in ndattribute_dtype_sources.items() ] return StreamResourceDataProvider( uri=f"{path_info.directory_uri}{path_info.filename}.h5", resources=[main_dataset] + ndattribute_datasets, mimetype="application/x-hdf5", collections_written_signal=self.writer.num_captured, flush_signal=self.writer.flush_now, )
[docs] async def stop(self) -> None: await stop_busy_record(self.writer.capture)
[docs] def get_hinted_fields(self, detector_name: str) -> Sequence[str]: # The main NDArray dataset is always hinted return [detector_name + self.datakey_suffix]
[docs] @dataclass class ADMultipartDataLogic(DetectorDataLogic): """Data logic for multipart AreaDetector file writers (e.g. JPEG, TIFF). :param shape_signals: Signals that provide the shape of the NDArray. :param data_type_signal: Signal that provides the data type of the NDArray. :param path_provider: Callable that provides path information for file writing. :param writer: The NDFilePluginIO instance. :param extension: File extension for the written files (e.g. ".jpg", ".tiff"). :param mimetype: Mimetype for the written files (e.g. "multipart/related;type=image/jpeg"). :param datakey_suffix: Suffix to append to the data key for the main dataset """ description: NDArrayDescription path_provider: PathProvider writer: NDPluginFileIO extension: str mimetype: str datakey_suffix: str = ""
[docs] async def prepare_unbounded(self, detector_name: str) -> StreamableDataProvider: # Work out where to write path_info = self.path_provider(self.writer.name) # Setup the file writer await prepare_file_paths( path_info=path_info, file_template="%s%s_%6.6d" + self.extension, writer=self.writer, ) # Start capturing await set_and_wait_for_value( self.writer.capture, True, wait_for_set_completion=False ) # Return a provider that reflects what we have made main_dataset = await get_ndarray_resource_info( description=self.description, data_key=detector_name + self.datakey_suffix, parameters={"template": path_info.filename + "_{:06d}" + self.extension}, ) return StreamResourceDataProvider( # TODO: remove the type ignore after # https://github.com/bluesky/ophyd-async/issues/1186 uri=path_info.directory_uri, # type: ignore resources=[main_dataset], mimetype=self.mimetype, collections_written_signal=self.writer.num_captured, )
[docs] async def stop(self) -> None: await stop_busy_record(self.writer.capture)
[docs] def get_hinted_fields(self, detector_name: str) -> Sequence[str]: # The main NDArray dataset is always hinted return [detector_name + self.datakey_suffix]
[docs] class ADWriterType(Enum): HDF = "HDF" JPEG = "JPEG" TIFF = "TIFF"
def make_writer_data_logic( prefix: str, path_provider: PathProvider, writer_suffix: str | None, driver: ADBaseIO, writer_type: ADWriterType, plugins: Mapping[str, NDPluginBaseIO] | None = None, ) -> tuple[NDPluginFileIO, DetectorDataLogic]: plugins = plugins or {} description = NDArrayDescription( shape_signals=[driver.array_size_z, driver.array_size_y, driver.array_size_x], data_type_signal=driver.data_type, color_mode_signal=driver.color_mode, ) match writer_type: case ADWriterType.HDF: writer = NDFileHDF5IO(f"{prefix}{writer_suffix or 'HDF1:'}") data_logic = ADHDFDataLogic( description=description, path_provider=path_provider, driver=driver, writer=writer, plugins=list(plugins.values()), ) case ADWriterType.JPEG: writer = NDPluginFileIO(f"{prefix}{writer_suffix or 'JPEG1:'}") data_logic = ADMultipartDataLogic( description=description, path_provider=path_provider, writer=writer, extension=".jpg", mimetype="multipart/related;type=image/jpeg", ) case ADWriterType.TIFF: writer = NDPluginFileIO(f"{prefix}{writer_suffix or 'TIFF1:'}") data_logic = ADMultipartDataLogic( description=description, path_provider=path_provider, writer=writer, extension=".tiff", mimetype="multipart/related;type=image/tiff", ) case _: raise RuntimeError("Not implemented") return writer, data_logic