Source code for ophyd_async.fastcs.odin._data_logic
import asyncio
from collections.abc import Sequence
import numpy as np
from ophyd_async.core import (
DEFAULT_TIMEOUT,
DetectorDataLogic,
PathProvider,
SignalR,
StreamableDataProvider,
StreamResourceDataProvider,
StreamResourceInfo,
wait_for_value,
)
from ._io import OdinIO
[docs]
class OdinDataLogic(DetectorDataLogic):
def __init__(
self,
path_provider: PathProvider,
odin: OdinIO,
detector_bit_depth: SignalR[int],
):
self.path_provider = path_provider
self.odin = odin
self.detector_bit_depth = detector_bit_depth
[docs]
async def prepare_unbounded(self, detector_name: str) -> StreamableDataProvider:
# Work out where to write
path_info = self.path_provider(detector_name)
# Get the current bit depth
datatype = f"uint{await self.detector_bit_depth.get_value()}"
# Setup the HDF writer
filename = f"{path_info.filename}.h5"
await asyncio.gather(
self.odin.fp.data_datatype.set(datatype),
self.odin.fp.data_compression.set("BSLZ4"),
self.odin.fp.frames.set(0),
self.odin.fp.process_frames_per_block.set(1000),
self.odin.fp.file_path.set(str(path_info.directory_path)),
self.odin.mw.directory.set(str(path_info.directory_path)),
self.odin.fp.file_prefix.set(filename),
self.odin.mw.file_prefix.set(filename),
self.odin.mw.acquisition_id.set(filename),
)
# Start writing
await self.odin.fp.start_writing.trigger()
await wait_for_value(self.odin.fp.writing, True, timeout=DEFAULT_TIMEOUT)
await wait_for_value(self.odin.mw.writing, True, timeout=DEFAULT_TIMEOUT)
# Return a provider that reflects what we have made
data_shape = await asyncio.gather(
self.odin.fp.data_dims_0.get_value(), self.odin.fp.data_dims_1.get_value()
)
resource = StreamResourceInfo(
data_key=detector_name,
shape=data_shape,
chunk_shape=(1, *data_shape),
dtype_numpy=np.dtype(datatype).str,
parameters={"dataset": "/data"},
)
return StreamResourceDataProvider(
uri=f"{path_info.directory_uri}{filename}",
resources=[resource],
mimetype="application/x-hdf5",
collections_written_signal=self.odin.fp.frames_written,
)
[docs]
async def stop(self) -> None:
await asyncio.gather(
self.odin.fp.stop_writing.trigger(),
self.odin.mw.stop.trigger(),
)
[docs]
def get_hinted_fields(self, detector_name: str) -> Sequence[str]:
# The main dataset is always hinted
return [detector_name]