import asyncio
from pathlib import Path
from typing import AsyncGenerator, AsyncIterator, Dict, List, Optional
from bluesky.protocols import DataKey, StreamAsset
from p4p.client.thread import Context
from ophyd_async.core import (
DEFAULT_TIMEOUT,
DetectorWriter,
HDFDataset,
HDFFile,
NameProvider,
PathProvider,
observe_value,
wait_for_value,
)
from ._common_blocks import CommonPandaBlocks
[docs]
class PandaHDFWriter(DetectorWriter):
_ctxt: Optional[Context] = None
def __init__(
self,
prefix: str,
path_provider: PathProvider,
name_provider: NameProvider,
panda_device: CommonPandaBlocks,
) -> None:
self.panda_device = panda_device
self._prefix = prefix
self._path_provider = path_provider
self._name_provider = name_provider
self._datasets: List[HDFDataset] = []
self._file: Optional[HDFFile] = None
self._multiplier = 1
# Triggered on PCAP arm
[docs]
async def open(self, multiplier: int = 1) -> Dict[str, DataKey]:
"""Retrieve and get descriptor of all PandA signals marked for capture"""
# Ensure flushes are immediate
await self.panda_device.data.flush_period.set(0)
self._file = None
info = self._path_provider(device_name=self.panda_device.name)
# Set the initial values
await asyncio.gather(
self.panda_device.data.hdf_directory.set(
str(info.root / info.resource_dir)
),
self.panda_device.data.hdf_file_name.set(
f"{info.filename}.h5",
),
self.panda_device.data.num_capture.set(0),
# TODO: Set create_dir_depth once available
# https://github.com/bluesky/ophyd-async/issues/317
)
# Wait for it to start, stashing the status that tells us when it finishes
await self.panda_device.data.capture.set(True)
if multiplier > 1:
raise ValueError(
"All PandA datasets should be scalar, multiplier should be 1"
)
return await self._describe()
async def _describe(self) -> Dict[str, DataKey]:
"""
Return a describe based on the datasets PV
"""
await self._update_datasets()
describe = {
ds.data_key: DataKey(
source=self.panda_device.data.hdf_directory.source,
shape=ds.shape,
dtype="array" if ds.shape != [1] else "number",
dtype_numpy="<f8", # PandA data should always be written as Float64
external="STREAM:",
)
for ds in self._datasets
}
return describe
async def _update_datasets(self) -> None:
"""
Load data from the datasets PV on the panda, update internal
representation of datasets that the panda will write.
"""
capture_table = await self.panda_device.data.datasets.get_value()
self._datasets = [
HDFDataset(dataset_name, "/" + dataset_name, [1], multiplier=1)
for dataset_name in capture_table["name"]
]
# Next few functions are exactly the same as AD writer. Could move as default
# StandardDetector behavior
[docs]
async def wait_for_index(
self, index: int, timeout: Optional[float] = DEFAULT_TIMEOUT
):
def matcher(value: int) -> bool:
return value >= index
matcher.__name__ = f"index_at_least_{index}"
await wait_for_value(
self.panda_device.data.num_captured, matcher, timeout=timeout
)
[docs]
async def get_indices_written(self) -> int:
return await self.panda_device.data.num_captured.get_value()
[docs]
async def observe_indices_written(
self, timeout=DEFAULT_TIMEOUT
) -> AsyncGenerator[int, None]:
"""Wait until a specific index is ready to be collected"""
async for num_captured in observe_value(
self.panda_device.data.num_captured, timeout
):
yield num_captured // self._multiplier
[docs]
async def collect_stream_docs(
self, indices_written: int
) -> AsyncIterator[StreamAsset]:
# TODO: fail if we get dropped frames
if indices_written:
if not self._file:
self._file = HDFFile(
self._path_provider(),
Path(await self.panda_device.data.hdf_directory.get_value())
/ Path(await self.panda_device.data.hdf_file_name.get_value()),
self._datasets,
)
for doc in self._file.stream_resources():
yield "stream_resource", doc
for doc in self._file.stream_data(indices_written):
yield "stream_datum", doc
# Could put this function as default for StandardDetector
[docs]
async def close(self):
await self.panda_device.data.capture.set(
False, wait=True, timeout=DEFAULT_TIMEOUT
)