Source code for ophyd_async.core._data_providers

from abc import abstractmethod
from collections.abc import AsyncIterator, Sequence
from dataclasses import dataclass
from typing import Any

from bluesky.protocols import Reading, StreamAsset
from event_model import ComposeStreamResource, DataKey, StreamRange

from ._signal import SignalR, SignalW
from ._utils import ConfinedModel


[docs] class ReadableDataProvider:
[docs] @abstractmethod async def make_datakeys(self) -> dict[str, DataKey]: """Return a DataKey for each Readable that produces a Reading. Called before the first exposure is taken. """
[docs] @abstractmethod async def make_readings(self) -> dict[str, Reading]: """Read the Signals and return their values."""
[docs] @dataclass class SignalDataProvider(ReadableDataProvider): signal: SignalR
[docs] async def make_datakeys(self) -> dict[str, DataKey]: return await self.signal.describe()
[docs] async def make_readings(self) -> dict[str, Reading]: return await self.signal.read(cached=False)
[docs] class StreamableDataProvider: collections_written_signal: SignalR[int]
[docs] @abstractmethod async def make_datakeys(self, collections_per_event: int) -> dict[str, DataKey]: """Return a DataKey for each Signal that produces a Reading. Called before the first exposure is taken. :param collections_per_event: this should appear in the shape of each DataKey """
[docs] async def make_stream_docs( self, collections_written: int, collections_per_event: int ) -> AsyncIterator[StreamAsset]: """Make StreamAsset documents up to the given index. Default implementation is a no-op. Subclasses should override this to emit actual StreamAsset documents. """ while False: yield
[docs] class StreamResourceInfo(ConfinedModel): """A description of a single StreamResource that should be emitted.""" data_key: str """The data_key that will appear in the event descriptor, e.g. det or det.data""" shape: tuple[int | None, ...] """The shape of a single collection's data in the HDF file, e.g. (768, 1024) for arrays or () for scalars""" chunk_shape: tuple[int, ...] """The explicit chunk size written to disk""" dtype_numpy: str """The numpy dtype for this field, e.g. <i2 or <f8""" parameters: dict[str, Any] """Any other parameters that should be included in the StreamResource, e.g. dataset path""" source: str = "" """The source string that should appear in the event descriptor, blank means use uri e.g. ca://HDF:FullFileName_RBV"""
[docs] class StreamResourceDataProvider(StreamableDataProvider): """A helper class to make stream resource and datums for HDF datasets. :param full_file_name: Absolute path to the file that has been written :param datasets: Descriptions of each of the datasets that will appear in the file """ def __init__( self, uri: str, resources: Sequence[StreamResourceInfo], mimetype: str, collections_written_signal: SignalR[int], flush_signal: SignalW[bool] | None = None, ) -> None: self.uri = uri self.resources = list(resources) self.collections_written_signal = collections_written_signal self.flush_signal = flush_signal self.last_emitted = 0 bundler_composer = ComposeStreamResource() self.bundles = [ bundler_composer( mimetype=mimetype, uri=uri, data_key=resource.data_key, parameters={ "chunk_shape": resource.chunk_shape, **resource.parameters, }, uid=None, validate=True, ) for resource in self.resources ]
[docs] async def make_datakeys(self, collections_per_event: int) -> dict[str, DataKey]: describe = { resource.data_key: DataKey( source=resource.source or self.uri, shape=[collections_per_event, *resource.shape], dtype="array" if collections_per_event > 1 or len(resource.shape) > 1 else "number", dtype_numpy=resource.dtype_numpy, external="STREAM:", ) for resource in self.resources } return describe
[docs] async def make_stream_docs( self, collections_written: int, collections_per_event: int ) -> AsyncIterator[StreamAsset]: if self.flush_signal: await self.flush_signal.set(True) # TODO: fail if we get dropped frames indices_written = collections_written // collections_per_event if indices_written and not self.last_emitted: for bundle in self.bundles: yield "stream_resource", bundle.stream_resource_doc if indices_written > self.last_emitted: indices: StreamRange = { "start": self.last_emitted, "stop": indices_written, } self.last_emitted = indices_written for bundle in self.bundles: yield "stream_datum", bundle.compose_stream_datum(indices)