Note
Ophyd async is included on a provisional basis until the v1.0 release and may change API on minor release numbers before then
Make a StandardDetector#
StandardDetector
is an abstract class to assist in creating Device classes for hardware that writes its own data e.g. an AreaDetector implementation, or a PandA writing motor encoder positions to file.
The StandardDetector
is a simple compound device, with 2 standard components:
DetectorWriter
to handle data persistence, i/o and pass information about data to the RunEngine (usually an instance ofADHDFWriter
)DetectorController
with logic for arming and disarming the detector. This will be unique to the StandardDetector implementation.
Writing an AreaDetector StandardDetector#
For an AreaDetector implementation of the StandardDetector, two entity objects which are subdevices of the StandardDetector
are used to map to AreaDetector plugins:
An NDPluginFile instance (for
ADHDFWriter
an instance ofNDFileHDFIO
)An
ADBaseIO
instance mapping to NDArray for the “driver” of the detector implementation
Define a FooDriver
if the NDArray requires fields in addition to those on ADBaseIO
to be exposed. It should extend ADBaseIO
.
Enumeration fields should be named to prevent namespace collision, i.e. for a Signal named “TriggerSource” use the enum “FooTriggerSource”
class FooDriver(adcore.ADBaseIO):
def __init__(self, prefix: str, name: str = "") -> None:
self.trigger_mode = epics_signal_rw_rbv(str, prefix + "TriggerMode")
super().__init__(prefix, name)
Define a FooController
with handling for converting the standard pattern of ophyd_async.core.DetectorController.arm()
and ophyd_async.core.DetectorController.disarm()
to required state of FooDriver
e.g. setting a compatible “FooTriggerSource” for a given DetectorTrigger
, or raising an exception if incompatible with the DetectorTrigger
.
The ophyd_async.core.DetectorController.get_deadtime()
method is used when constructing sequence tables for hardware controlled scanning. Details on how to calculate the deadtime may be only available from technical manuals or otherwise complex. If it requires fetching from signals, it is recommended to cache the value during the StandardDetector `prepare` method.
class FooController(DetectorController):
def __init__(self, driver: FooDriver) -> None:
self._drv = driver
def get_deadtime(self, exposure: float) -> float:
# FooDetector deadtime handling
return 0.001
async def arm(
self,
num: int,
trigger: DetectorTrigger = DetectorTrigger.internal,
exposure: float | None = None,
) -> AsyncStatus:
await asyncio.gather(
self._drv.num_images.set(num),
self._drv.image_mode.set(adcore.ImageMode.multiple),
self._drv.trigger_mode.set(f"FOO{trigger}"),
)
if exposure is not None:
await self._drv.acquire_time.set(exposure)
return await adcore.start_acquiring_driver_and_ensure_status(self._drv)
async def disarm(self):
await adcore.stop_busy_record(self._drv.acquire, False, timeout=1)
FooDetector
ties the Driver, Controller and data persistence layer together. The example FooDetector
writes h5 files using the standard NDPlugin. It additionally supports the HasHints
protocol which is optional but recommended.
Its initialiser assumes the NSLS-II AreaDetector plugin EPICS address suffixes as defaults but allows overriding: this pattern is recommended for consistency.
If the FooDriver
signals that should be read as configuration, they should be added to the “config_sigs” passed to the super.
class FooDetector(StandardDetector, HasHints):
_controller: FooController
_writer: adcore.ADHDFWriter
def __init__(
self,
prefix: str,
path_provider: PathProvider,
drv_suffix="cam1:",
hdf_suffix="HDF1:",
name="",
):
# Must be children to pick up connect
self.drv = FooDriver(prefix + drv_suffix)
self.hdf = adcore.NDFileHDFIO(prefix + hdf_suffix)
super().__init__(
FooController(self.drv),
adcore.ADHDFWriter(
self.hdf,
path_provider,
lambda: self.name,
adcore.ADBaseDatasetDescriber(self.drv),
),
config_sigs=(self.drv.acquire_time,),
name=name,
)
@property
def hints(self) -> Hints:
return self._writer.hints
Writing a non-AreaDetector StandardDetector#
A non-AreaDetector StandardDetector
should implement DetectorController
and DetectorWriter
directly.
Here we construct a DetectorController
that co-ordinates signals on a PandA PositionCapture block - a child device “pcap” of the StandardDetector
implementation, analogous to the FooDriver
.
class PandaPcapController(DetectorController):
def __init__(self, pcap: PcapBlock) -> None:
self.pcap = pcap
self._arm_status: AsyncStatus | None = None
def get_deadtime(self, exposure: float | None) -> float:
return 0.000000008
async def prepare(self, trigger_info: TriggerInfo):
if trigger_info.trigger not in (
DetectorTrigger.CONSTANT_GATE,
DetectorTrigger.VARIABLE_GATE,
):
msg = (
"Only constant_gate and variable_gate triggering is supported on "
"the PandA",
)
raise TypeError(msg)
async def arm(self):
self._arm_status = self.pcap.arm.set(True)
await wait_for_value(self.pcap.active, True, timeout=1)
async def wait_for_idle(self):
pass
async def disarm(self):
await self.pcap.arm.set(False)
await wait_for_value(self.pcap.active, False, timeout=1)
The PandA may write a number of fields, and the PandaHDFWriter
co-ordinates those, configures the filewriter and describes the data for the RunEngine.
class PandaHDFWriter(DetectorWriter):
_ctxt: Context | None = None
def __init__(
self,
path_provider: PathProvider,
name_provider: NameProvider,
panda_data_block: DataBlock,
) -> None:
self.panda_data_block = panda_data_block
self._path_provider = path_provider
self._name_provider = name_provider
self._datasets: list[HDFDataset] = []
self._file: HDFFile | None = None
self._multiplier = 1
# Triggered on PCAP arm
async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
"""Retrieve and get descriptor of all PandA signals marked for capture"""
# Ensure flushes are immediate
await self.panda_data_block.flush_period.set(0)
self._file = None
info = self._path_provider(device_name=self._name_provider())
# Set create dir depth first to guarantee that callback when setting
# directory path has correct value
await self.panda_data_block.create_directory.set(info.create_dir_depth)
# Set the initial values
await asyncio.gather(
self.panda_data_block.hdf_directory.set(str(info.directory_path)),
self.panda_data_block.hdf_file_name.set(
f"{info.filename}.h5",
),
self.panda_data_block.capture_mode.set(CaptureMode.FOREVER),
)
# Make sure that directory exists or has been created.
if not await self.panda_data_block.directory_exists.get_value() == 1:
raise OSError(
f"Directory {info.directory_path} does not exist or "
"is not writable by the PandABlocks-ioc!"
)
# Wait for it to start, stashing the status that tells us when it finishes
await self.panda_data_block.capture.set(True)
if multiplier > 1:
raise ValueError(
"All PandA datasets should be scalar, multiplier should be 1"
)
return await self._describe()
async def _describe(self) -> dict[str, DataKey]:
"""
Return a describe based on the datasets PV
"""
await self._update_datasets()
describe = {
ds.data_key: DataKey(
source=self.panda_data_block.hdf_directory.source,
shape=list(ds.shape),
dtype="array" if ds.shape != [1] else "number",
# PandA data should always be written as Float64
dtype_numpy="<f8",
external="STREAM:",
)
for ds in self._datasets
}
return describe
async def _update_datasets(self) -> None:
"""
Load data from the datasets PV on the panda, update internal
representation of datasets that the panda will write.
"""
capture_table = await self.panda_data_block.datasets.get_value()
self._datasets = [
# TODO: Update chunk size to read signal once available in IOC
# Currently PandA IOC sets chunk size to 1024 points per chunk
HDFDataset(
dataset_name, "/" + dataset_name, [1], multiplier=1, chunk_shape=(1024,)
)
for dataset_name in capture_table.name
]
# Warn user if dataset table is empty in PandA
# i.e. no stream resources will be generated
if len(self._datasets) == 0:
self.panda_data_block.log.warning(
f"PandA {self._name_provider()} DATASETS table is empty! "
"No stream resource docs will be generated. "
"Make sure captured positions have their corresponding "
"*:DATASET PV set to a scientifically relevant name."
)
# Next few functions are exactly the same as AD writer. Could move as default
# StandardDetector behavior
async def wait_for_index(self, index: int, timeout: float | None = DEFAULT_TIMEOUT):
def matcher(value: int) -> bool:
return value >= index
matcher.__name__ = f"index_at_least_{index}"
await wait_for_value(
self.panda_data_block.num_captured, matcher, timeout=timeout
)
async def get_indices_written(self) -> int:
return await self.panda_data_block.num_captured.get_value()
async def observe_indices_written(
self, timeout=DEFAULT_TIMEOUT
) -> AsyncGenerator[int, None]:
"""Wait until a specific index is ready to be collected"""
async for num_captured in observe_value(
self.panda_data_block.num_captured, timeout
):
yield num_captured // self._multiplier
async def collect_stream_docs(
self, indices_written: int
) -> AsyncIterator[StreamAsset]:
# TODO: fail if we get dropped frames
if indices_written:
if not self._file:
self._file = HDFFile(
Path(await self.panda_data_block.hdf_directory.get_value())
/ Path(await self.panda_data_block.hdf_file_name.get_value()),
self._datasets,
)
for doc in self._file.stream_resources():
yield "stream_resource", doc
for doc in self._file.stream_data(indices_written):
yield "stream_datum", doc
# Could put this function as default for StandardDetector
async def close(self):
await self.panda_data_block.capture.set(
False, wait=True, timeout=DEFAULT_TIMEOUT
)
The PandA StandardDetector implementation simply ties the component parts and its child devices together.
class HDFPanda(
CommonPandaBlocks, StandardDetector[PandaPcapController, PandaHDFWriter]
):
def __init__(
self,
prefix: str,
path_provider: PathProvider,
config_sigs: Sequence[SignalR] = (),
name: str = "",
):
error_hint = f"Is PandABlocks-ioc at least version {MINIMUM_PANDA_IOC}?"
# This has to be first so we make self.pcap
connector = fastcs_connector(self, prefix, error_hint)
controller = PandaPcapController(pcap=self.pcap)
writer = PandaHDFWriter(
path_provider=path_provider,
name_provider=lambda: name,
panda_data_block=self.data,
)
super().__init__(
controller=controller,
writer=writer,
config_sigs=config_sigs,
name=name,
connector=connector,
)