Source code for ophyd_async.epics.adcore._data_logic

import asyncio
from collections.abc import Callable, Sequence
from dataclasses import dataclass
from pathlib import PureWindowsPath
from typing import Any, Generic
from xml.etree import ElementTree as ET

import numpy as np

from ophyd_async.core import (
    DetectorDataLogic,
    EnableDisable,
    PathInfo,
    PathProvider,
    SignalDataProvider,
    SignalR,
    StreamableDataProvider,
    StreamResourceDataProvider,
    StreamResourceInfo,
    set_and_wait_for_value,
)
from ophyd_async.epics.core import stop_busy_record

from ._io import (
    ADBaseColorMode,
    ADBaseDataType,
    ADBaseIO,
    ADFileWriteMode,
    NDArrayBaseIO,
    NDFileHDF5IO,
    NDPluginBaseIO,
    NDPluginFileIO,
    NDPluginFileIOT,
)
from ._ndattribute import NDAttributeDataType, NDAttributePvDbrType


[docs] @dataclass class PluginSignalDataLogic(DetectorDataLogic): driver: ADBaseIO signal: SignalR hinted: bool = True
[docs] async def prepare_single(self, datakey_name: str) -> SignalDataProvider: # Need to wait for all the plugins to have finished before we can read # the plugin signal await self.driver.wait_for_plugins.set(True) return SignalDataProvider(self.signal)
[docs] def get_hinted_fields(self, datakey_name: str) -> Sequence[str]: return [self.signal.name] if self.hinted else []
[docs] @dataclass class NDArrayDescription: """Signals that describe the shape and data type of an NDArray frame. :param shape_signals: Signals providing the frame dimensions (e.g. ``size_y``, ``size_x``). Zero-valued entries are filtered out, so it is safe to include ``array_size_z`` for 2-D detectors. :param data_type_signal: Signal providing the pixel data type. :param color_mode_signal: Signal providing the colour mode (MONO or RGB1). """ shape_signals: Sequence[SignalR[int]] data_type_signal: SignalR[ADBaseDataType] color_mode_signal: SignalR[ADBaseColorMode]
async def get_ndarray_resource_info( array_description: NDArrayDescription, data_key: str, parameters: dict[str, Any], frames_per_chunk: int = 1, ) -> StreamResourceInfo: # Grab the dimensions and datatype of the NDArray shape, datatype, color_mode = await asyncio.gather( asyncio.gather(*[sig.get_value() for sig in array_description.shape_signals]), array_description.data_type_signal.get_value(), array_description.color_mode_signal.get_value(), ) # Remove entries in shape that are zero shape = [x for x in shape if x > 0] if datatype is ADBaseDataType.UNDEFINED: raise ValueError( f"{array_description.data_type_signal.source} is blank, " "this is not supported" ) if color_mode == ADBaseColorMode.RGB1: shape = [3, *shape] elif color_mode != ADBaseColorMode.MONO: raise RuntimeError( f"Unsupported ColorMode {color_mode}! Only Mono and RGB1 are supported." ) return StreamResourceInfo( data_key=data_key, shape=tuple(shape), chunk_shape=(frames_per_chunk, *shape), dtype_numpy=np.dtype(datatype.value.lower()).str, parameters=parameters, ) async def get_ndattribute_dtype_source( elements: Sequence[NDArrayBaseIO], ) -> dict[str, tuple[str, str]]: nd_attribute_xmls = await asyncio.gather( *[x.nd_attributes_file.get_value() for x in elements] ) ndattribute_dtypes: dict[str, tuple[str, str]] = {} for maybe_xml in nd_attribute_xmls: # This is the check that ADCore does to see if it is an XML string # rather than a filename to parse if "<Attributes>" in maybe_xml: root = ET.fromstring(maybe_xml) for child in root: if child.attrib.get("type", "EPICS_PV") == "EPICS_PV": dbrtype = child.attrib.get("dbrtype", "DBR_NATIVE") if dbrtype == "DBR_NATIVE": raise RuntimeError( f"NDAttribute {child.attrib['name']} has dbrtype " "DBR_NATIVE, which is not supported" ) dtype_numpy = NDAttributePvDbrType[dbrtype].value source = "ca://" + child.attrib["source"] else: datatype = child.attrib.get("datatype", "INT") dtype_numpy = NDAttributeDataType[datatype].value source = "" ndattribute_dtypes[child.attrib["name"]] = (dtype_numpy, source) return ndattribute_dtypes async def prepare_file_paths( path_info: PathInfo, file_template: str, writer: NDPluginFileIO ): # Set the directory creation depth first, since dir creation callback happens # when directory path PV is processed. await writer.create_directory.set(path_info.create_dir_depth) # When setting the path for windows based AD IOCs, areaDetector adds a '/' # rather than a '\\', which will cause the readback to never register the # same value. # Ensure that trailing separator is added to the directory path to avoid this. if isinstance(path_info.directory_path, PureWindowsPath): directory_path = f"{path_info.directory_path}\\" else: directory_path = f"{path_info.directory_path}/" await asyncio.gather( writer.file_path.set(directory_path), writer.file_name.set(path_info.filename), writer.file_template.set(file_template), writer.auto_increment.set(True), writer.file_number.set(0), writer.file_write_mode.set(ADFileWriteMode.STREAM), ) # Check the path exists on the host if not await writer.file_path_exists.get_value(): msg = f"Path {directory_path} doesn't exist or not writable!" raise FileNotFoundError(msg) # Overwrite num_capture to go forever await writer.num_capture.set(0)
[docs] @dataclass class ADHDFDataLogic(DetectorDataLogic): """Data logic for AreaDetector HDF5 writer plugin. :param array_description: Signals describing the NDArray shape and data type. :param path_provider: Callable that provides path information for file writing. :param driver: The AreaDetector driver instance. :param writer: The NDFileHDFIO plugin instance. :param plugins: Additional NDPluginBaseIO instances to extract NDAttributes from. :param datakey_suffix: Suffix to append to the data key for the main dataset """ array_description: NDArrayDescription path_provider: PathProvider driver: NDArrayBaseIO writer: NDFileHDF5IO plugins: Sequence[NDPluginBaseIO] = () datakey_suffix: str = ""
[docs] async def prepare_unbounded(self, datakey_name: str) -> StreamableDataProvider: # Work out where to write path_info = self.path_provider(datakey_name) # Determine number of frames that will be saved per HDF chunk. # On a fresh IOC startup, this is set to zero until the first capture, # so if it is zero, set it to 1. frames_per_chunk = await self.writer.num_frames_chunks.get_value() if frames_per_chunk == 0: frames_per_chunk = 1 await self.writer.num_frames_chunks.set(frames_per_chunk) # Setup the HDF writer await asyncio.gather( self.writer.chunk_size_auto.set(True), self.writer.num_extra_dims.set(0), self.writer.lazy_open.set(True), self.writer.swmr_mode.set(True), self.writer.xml_file_name.set(""), self.writer.enable_callbacks.set(EnableDisable.ENABLE), prepare_file_paths( path_info=path_info, file_template="%s%s.h5", writer=self.writer ), ) # Start capturing await set_and_wait_for_value( self.writer.capture, True, wait_for_set_completion=False ) # Return a provider that reflects what we have made main_dataset = await get_ndarray_resource_info( array_description=self.array_description, data_key=datakey_name, parameters={"dataset": "/entry/data/data"}, frames_per_chunk=frames_per_chunk, ) ndattribute_dtype_sources = await get_ndattribute_dtype_source( (self.driver, *self.plugins) ) ndattribute_datasets = [ StreamResourceInfo( data_key=name, shape=(), # NDAttributes appear to always be configured with # this chunk size chunk_shape=(16384,), dtype_numpy=dtype_numpy, source=source, parameters={"dataset": f"/entry/instrument/NDAttributes/{name}"}, ) for name, (dtype_numpy, source) in ndattribute_dtype_sources.items() ] return StreamResourceDataProvider( uri=f"{path_info.directory_uri}{path_info.filename}.h5", resources=[main_dataset] + ndattribute_datasets, mimetype="application/x-hdf5", collections_written_signal=self.writer.num_captured, flush_signal=self.writer.flush_now, )
[docs] async def stop(self) -> None: await stop_busy_record(self.writer.capture)
[docs] def get_hinted_fields(self, datakey_name: str) -> Sequence[str]: # The main NDArray dataset is always hinted return [datakey_name]
[docs] @dataclass class ADMultipartDataLogic(DetectorDataLogic): """Data logic for multipart AreaDetector file writers (e.g. JPEG, TIFF). :param array_description: Signals describing the NDArray shape and data type. :param path_provider: Callable that provides path information for file writing. :param writer: The NDFilePluginIO instance. :param extension: File extension for the written files (e.g. ".jpg", ".tiff"). :param mimetype: Mimetype for the written files (e.g. "multipart/related;type=image/jpeg"). :param datakey_suffix: Suffix to append to the data key for the main dataset """ array_description: NDArrayDescription path_provider: PathProvider writer: NDPluginFileIO extension: str mimetype: str datakey_suffix: str = ""
[docs] async def prepare_unbounded(self, datakey_name: str) -> StreamableDataProvider: # Work out where to write path_info = self.path_provider(datakey_name) # Setup the file writer await prepare_file_paths( path_info=path_info, file_template="%s%s_%6.6d" + self.extension, writer=self.writer, ) # Start capturing await set_and_wait_for_value( self.writer.capture, True, wait_for_set_completion=False ) # Return a provider that reflects what we have made main_dataset = await get_ndarray_resource_info( array_description=self.array_description, data_key=datakey_name, parameters={"template": path_info.filename + "_{:06d}" + self.extension}, ) return StreamResourceDataProvider( # TODO: remove the type ignore after # https://github.com/bluesky/ophyd-async/issues/1186 uri=path_info.directory_uri, resources=[main_dataset], mimetype=self.mimetype, collections_written_signal=self.writer.num_captured, )
[docs] async def stop(self) -> None: await stop_busy_record(self.writer.capture)
[docs] def get_hinted_fields(self, datakey_name: str) -> Sequence[str]: # The main NDArray dataset is always hinted return [datakey_name]
[docs] @dataclass class ADWriterFactory(Generic[NDPluginFileIOT]): """Factory that creates a file-writer plugin and its matching data logic. Construct using the classmethods `hdf`, `jpeg`, or `tiff`, then pass one or more instances to `AreaDetector` as positional `*writer_factories` arguments. When the detector is initialised `__call__` is invoked with the detector's PV `prefix`, its `driver`, and the flat list of extra `plugins`; it returns the writer device and the corresponding `DetectorDataLogic`. :param writer_cls: Concrete `NDPluginFileIO` subclass to instantiate. :param writer_suffix: PV suffix appended to *prefix* to form the writer's PV prefix. :param writer_name: Attribute name under which the writer device is stored on the `AreaDetector` instance. Each static method defaults to its own name (``"hdf"``, ``"jpeg"``, ``"tiff"``); override when passing multiple factories so each writer gets a distinct name. :param datakey_suffix: Suffix appended to the datakey name in stream resources. :param array_description: Override the array shape/type description built from the driver. May be an `NDArrayDescription` instance or a callable ``(driver) → NDArrayDescription``; use a callable when the description depends on signals that are only available once the driver has been constructed (e.g. inside a detector subclass that creates its own driver). :param data_logic_factory: Callable ``(writer, array_description, driver, plugins) → DetectorDataLogic`` that builds the data logic given the already-constructed writer. """ writer_cls: type[NDPluginFileIOT] writer_suffix: str writer_name: str datakey_suffix: str array_description: ( NDArrayDescription | Callable[[ADBaseIO], NDArrayDescription] | None ) data_logic_factory: Callable[ [NDPluginFileIOT, NDArrayDescription, ADBaseIO, Sequence[NDPluginBaseIO]], DetectorDataLogic, ] def __call__( self, prefix: str, driver: ADBaseIO, plugins: Sequence[NDPluginBaseIO], ) -> tuple[NDPluginFileIOT, DetectorDataLogic]: """Instantiate the writer plugin and build the data logic. :param prefix: EPICS PV prefix for the detector (same as `AreaDetector.prefix`). :param driver: The detector driver, used to read array shape/type metadata. :param plugins: Additional plugins whose NDAttribute XML files should be included in HDF5 metadata. :return: ``(writer, data_logic)`` tuple ready to attach to the detector. """ writer = self.writer_cls(prefix + self.writer_suffix) if callable(self.array_description): array_description = self.array_description(driver) elif self.array_description is not None: array_description = self.array_description else: array_description = NDArrayDescription( shape_signals=[ driver.array_size_z, driver.array_size_y, driver.array_size_x, ], data_type_signal=driver.data_type, color_mode_signal=driver.color_mode, ) data_logic = self.data_logic_factory(writer, array_description, driver, plugins) return writer, data_logic
[docs] @staticmethod def hdf( path_provider: PathProvider, writer_suffix: str = "HDF1:", writer_name: str = "hdf", datakey_suffix: str = "", array_description: NDArrayDescription | Callable[[ADBaseIO], NDArrayDescription] | None = None, ) -> "ADWriterFactory[NDFileHDF5IO]": """Create a factory for an HDF5 file writer. :param path_provider: Provides file path information for each acquisition. :param writer_suffix: PV suffix for the NDFileHDF5 plugin, defaults to ``HDF1:``. :param writer_name: Attribute name for the writer on the detector, defaults to ``"hdf"``. :param datakey_suffix: Suffix appended to the datakey name, defaults to ``""``. :param array_description: Override the array shape/type description built from the driver. Pass an `NDArrayDescription` or a callable ``(driver) → NDArrayDescription`` when the shape/type comes from a plugin rather than the main driver (e.g. an ROI plugin). """ return ADWriterFactory( writer_cls=NDFileHDF5IO, writer_suffix=writer_suffix, writer_name=writer_name, datakey_suffix=datakey_suffix, array_description=array_description, data_logic_factory=lambda writer, desc, driver, plugins: ADHDFDataLogic( array_description=desc, path_provider=path_provider, driver=driver, writer=writer, plugins=list(plugins), datakey_suffix=datakey_suffix, ), )
[docs] @staticmethod def jpeg( path_provider: PathProvider, writer_suffix: str = "JPEG1:", writer_name: str = "jpeg", datakey_suffix: str = "", array_description: NDArrayDescription | Callable[[ADBaseIO], NDArrayDescription] | None = None, ) -> "ADWriterFactory[NDPluginFileIO]": """Create a factory for a JPEG file writer. :param path_provider: Provides file path information for each acquisition. :param writer_suffix: PV suffix for the NDPluginFile plugin, defaults to ``JPEG1:``. :param writer_name: Attribute name for the writer on the detector, defaults to ``"jpeg"``. :param datakey_suffix: Suffix appended to the datakey name, defaults to ``""``. :param array_description: Override the array shape/type description built from the driver. Pass an `NDArrayDescription` or a callable ``(driver) → NDArrayDescription`` when the shape/type comes from a plugin. """ return ADWriterFactory( writer_cls=NDPluginFileIO, writer_suffix=writer_suffix, writer_name=writer_name, datakey_suffix=datakey_suffix, array_description=array_description, data_logic_factory=lambda writer, desc, driver, plugins: ( ADMultipartDataLogic( array_description=desc, path_provider=path_provider, writer=writer, extension=".jpg", mimetype="multipart/related;type=image/jpeg", datakey_suffix=datakey_suffix, ) ), )
[docs] @staticmethod def tiff( path_provider: PathProvider, writer_suffix: str = "TIFF1:", writer_name: str = "tiff", datakey_suffix: str = "", array_description: NDArrayDescription | Callable[[ADBaseIO], NDArrayDescription] | None = None, ) -> "ADWriterFactory[NDPluginFileIO]": """Create a factory for a TIFF file writer. :param path_provider: Provides file path information for each acquisition. :param writer_suffix: PV suffix for the NDPluginFile plugin, defaults to ``TIFF1:``. :param writer_name: Attribute name for the writer on the detector, defaults to ``"tiff"``. :param datakey_suffix: Suffix appended to the datakey name, defaults to ``""``. :param array_description: Override the array shape/type description built from the driver. Pass an `NDArrayDescription` or a callable ``(driver) → NDArrayDescription`` when the shape/type comes from a plugin. """ return ADWriterFactory( writer_cls=NDPluginFileIO, writer_suffix=writer_suffix, writer_name=writer_name, datakey_suffix=datakey_suffix, array_description=array_description, data_logic_factory=lambda writer, desc, driver, plugins: ( ADMultipartDataLogic( array_description=desc, path_provider=path_provider, writer=writer, extension=".tiff", mimetype="multipart/related;type=image/tiff", datakey_suffix=datakey_suffix, ) ), )