import asyncio
from pathlib import Path
from typing import AsyncGenerator, AsyncIterator, Dict, List, Optional
from bluesky.protocols import DataKey, StreamAsset
from p4p.client.thread import Context
from ophyd_async.core import (
DEFAULT_TIMEOUT,
DetectorWriter,
HDFDataset,
HDFFile,
NameProvider,
PathProvider,
observe_value,
wait_for_value,
)
from ._block import DataBlock
[docs]
class PandaHDFWriter(DetectorWriter):
_ctxt: Optional[Context] = None
def __init__(
self,
prefix: str,
path_provider: PathProvider,
name_provider: NameProvider,
panda_data_block: DataBlock,
) -> None:
self.panda_data_block = panda_data_block
self._prefix = prefix
self._path_provider = path_provider
self._name_provider = name_provider
self._datasets: List[HDFDataset] = []
self._file: Optional[HDFFile] = None
self._multiplier = 1
# Triggered on PCAP arm
[docs]
async def open(self, multiplier: int = 1) -> Dict[str, DataKey]:
"""Retrieve and get descriptor of all PandA signals marked for capture"""
# Ensure flushes are immediate
await self.panda_data_block.flush_period.set(0)
self._file = None
info = self._path_provider(device_name=self._name_provider())
# Set create dir depth first to guarantee that callback when setting
# directory path has correct value
await self.panda_data_block.create_directory.set(info.create_dir_depth)
# Set the initial values
await asyncio.gather(
self.panda_data_block.hdf_directory.set(str(info.directory_path)),
self.panda_data_block.hdf_file_name.set(
f"{info.filename}.h5",
),
self.panda_data_block.num_capture.set(0),
)
# Make sure that directory exists or has been created.
if not await self.panda_data_block.directory_exists.get_value() == 1:
raise OSError(
f"Directory {info.directory_path} does not exist or "
"is not writable by the PandABlocks-ioc!"
)
# Wait for it to start, stashing the status that tells us when it finishes
await self.panda_data_block.capture.set(True)
if multiplier > 1:
raise ValueError(
"All PandA datasets should be scalar, multiplier should be 1"
)
return await self._describe()
async def _describe(self) -> Dict[str, DataKey]:
"""
Return a describe based on the datasets PV
"""
await self._update_datasets()
describe = {
ds.data_key: DataKey(
source=self.panda_data_block.hdf_directory.source,
shape=ds.shape,
dtype="array" if ds.shape != [1] else "number",
dtype_numpy="<f8", # PandA data should always be written as Float64
external="STREAM:",
)
for ds in self._datasets
}
return describe
async def _update_datasets(self) -> None:
"""
Load data from the datasets PV on the panda, update internal
representation of datasets that the panda will write.
"""
capture_table = await self.panda_data_block.datasets.get_value()
self._datasets = [
HDFDataset(dataset_name, "/" + dataset_name, [1], multiplier=1)
for dataset_name in capture_table["name"]
]
# Warn user if dataset table is empty in PandA
# i.e. no stream resources will be generated
if len(self._datasets) == 0:
self.panda_data_block.log.warning(
f"PandA {self._name_provider()} DATASETS table is empty! "
"No stream resource docs will be generated. "
"Make sure captured positions have their corresponding "
"*:DATASET PV set to a scientifically relevant name."
)
# Next few functions are exactly the same as AD writer. Could move as default
# StandardDetector behavior
[docs]
async def wait_for_index(
self, index: int, timeout: Optional[float] = DEFAULT_TIMEOUT
):
def matcher(value: int) -> bool:
return value >= index
matcher.__name__ = f"index_at_least_{index}"
await wait_for_value(
self.panda_data_block.num_captured, matcher, timeout=timeout
)
[docs]
async def get_indices_written(self) -> int:
return await self.panda_data_block.num_captured.get_value()
[docs]
async def observe_indices_written(
self, timeout=DEFAULT_TIMEOUT
) -> AsyncGenerator[int, None]:
"""Wait until a specific index is ready to be collected"""
async for num_captured in observe_value(
self.panda_data_block.num_captured, timeout
):
yield num_captured // self._multiplier
[docs]
async def collect_stream_docs(
self, indices_written: int
) -> AsyncIterator[StreamAsset]:
# TODO: fail if we get dropped frames
if indices_written:
if not self._file:
self._file = HDFFile(
Path(await self.panda_data_block.hdf_directory.get_value())
/ Path(await self.panda_data_block.hdf_file_name.get_value()),
self._datasets,
)
for doc in self._file.stream_resources():
yield "stream_resource", doc
for doc in self._file.stream_data(indices_written):
yield "stream_datum", doc
# Could put this function as default for StandardDetector
[docs]
async def close(self):
await self.panda_data_block.capture.set(
False, wait=True, timeout=DEFAULT_TIMEOUT
)