Note
Ophyd async is included on a provisional basis until the v1.0 release and may change API on minor release numbers before then
Make a StandardDetector#
StandardDetector
is an abstract class to assist in creating Device classes for hardware that writes its own data e.g. an AreaDetector implementation, or a PandA writing motor encoder positions to file.
The StandardDetector
is a simple compound device, with 2 standard components:
DetectorWriter
to handle data persistence, i/o and pass information about data to the RunEngine (usually an instance ofHDFWriter
)DetectorControl
with logic for arming and disarming the detector. This will be unique to the StandardDetector implementation.
Writing an AreaDetector StandardDetector#
For an AreaDetector implementation of the StandardDetector, two entity objects which are subdevices of the StandardDetector
are used to map to AreaDetector plugins:
An NDPluginFile instance (for
HDFWriter
an instance ofNDFileHDF
)An
ADBase
instance mapping to NDArray for the “driver” of the detector implementation
Define a FooDriver
if the NDArray requires fields in addition to those on ADBase
to be exposed. It should extend ADBase
.
Enumeration fields should be named to prevent namespace collision, i.e. for a Signal named “TriggerSource” use the enum “FooTriggerSource”
class FooDriver(ADBase):
def __init__(self, prefix: str, name: str = "") -> None:
self.trigger_mode = ad_rw(str, prefix + "TriggerMode")
super().__init__(prefix, name)
Define a FooController
with handling for converting the standard pattern of ophyd_async.core.DetectorControl.arm()
and ophyd_async.core.DetectorControl.disarm()
to required state of FooDriver
e.g. setting a compatible “FooTriggerSource” for a given DetectorTrigger
, or raising an exception if incompatible with the DetectorTrigger
.
The ophyd_async.core.DetectorControl.get_deadtime()
method is used when constructing sequence tables for hardware controlled scanning. Details on how to calculate the deadtime may be only available from technical manuals or otherwise complex. If it requires fetching from signals, it is recommended to cache the value during the StandardDetector `prepare` method.
class FooController(DetectorControl):
def __init__(self, driver: FooDriver) -> None:
self._drv = driver
def get_deadtime(self, exposure: float) -> float:
# FooDetector deadtime handling
return 0.001
async def arm(
self,
num: int,
trigger: DetectorTrigger = DetectorTrigger.internal,
exposure: Optional[float] = None,
) -> AsyncStatus:
await asyncio.gather(
self._drv.num_images.set(num),
self._drv.image_mode.set(ImageMode.multiple),
self._drv.trigger_mode.set(f"FOO{trigger}"),
)
if exposure is not None:
await self._drv.acquire_time.set(exposure)
return await start_acquiring_driver_and_ensure_status(self._drv)
async def disarm(self):
await stop_busy_record(self._drv.acquire, False, timeout=1)
FooDetector
ties the Driver, Controller and data persistence layer together. The example FooDetector
writes h5 files using the standard NDPlugin. It additionally supports the HasHints
protocol which is optional but recommended.
Its initialiser assumes the NSLS-II AreaDetector plugin EPICS address suffixes as defaults but allows overriding: this pattern is recommended for consistency.
If the FooDriver
signals that should be read as configuration, they should be added to the “config_sigs” passed to the super.
class FooDetector(StandardDetector, HasHints):
_controller: FooController
_writer: HDFWriter
def __init__(
self,
prefix: str,
directory_provider: DirectoryProvider,
drv_suffix="cam1:",
hdf_suffix="HDF1:",
name="",
):
# Must be children to pick up connect
self.drv = FooDriver(prefix + drv_suffix)
self.hdf = NDFileHDF(prefix + hdf_suffix)
super().__init__(
FooController(self.drv),
HDFWriter(
self.hdf,
directory_provider,
lambda: self.name,
ADBaseShapeProvider(self.drv),
),
config_sigs=(self.drv.acquire_time,),
name=name,
)
@property
def hints(self) -> Hints:
return self._writer.hints
Writing a non-AreaDetector StandardDetector#
A non-AreaDetector StandardDetector
should implement DetectorControl
and DetectorWriter
directly.
Here we construct a DetectorControl
that co-ordinates signals on a PandA PositionCapture block - a child device “pcap” of the StandardDetector
implementation, analogous to the FooDriver
.
class PandaPcapController(DetectorControl):
def __init__(self, pcap: PcapBlock) -> None:
self.pcap = pcap
def get_deadtime(self, exposure: float) -> float:
return 0.000000008
async def arm(
self,
num: int,
trigger: DetectorTrigger = DetectorTrigger.constant_gate,
exposure: Optional[float] = None,
) -> AsyncStatus:
assert trigger in (
DetectorTrigger.constant_gate,
trigger == DetectorTrigger.variable_gate,
), "Only constant_gate and variable_gate triggering is supported on the PandA"
await asyncio.gather(self.pcap.arm.set(True))
await wait_for_value(self.pcap.active, True, timeout=1)
return AsyncStatus(wait_for_value(self.pcap.active, False, timeout=None))
async def disarm(self) -> AsyncStatus:
await asyncio.gather(self.pcap.arm.set(False))
await wait_for_value(self.pcap.active, False, timeout=1)
return AsyncStatus(wait_for_value(self.pcap.active, False, timeout=None))
The PandA may write a number of fields, and the PandaHDFWriter
co-ordinates those, configures the filewriter and describes the data for the RunEngine.
class PandaHDFWriter(DetectorWriter):
_ctxt: Optional[Context] = None
def __init__(
self,
directory_provider: DirectoryProvider,
panda_device: CommonPandaBlocks,
) -> None:
self.panda_device = panda_device
self._directory_provider = directory_provider
self._datasets: List[_HDFDataset] = []
self._file: Optional[_HDFFile] = None
self._multiplier = 1
# Triggered on PCAP arm
async def open(self, multiplier: int = 1) -> Dict[str, DataKey]:
"""Retrieve and get descriptor of all PandA signals marked for capture"""
# Ensure flushes are immediate
await self.panda_device.data.flush_period.set(0)
self._file = None
info = self._directory_provider()
# Set the initial values
await asyncio.gather(
self.panda_device.data.hdf_directory.set(
str(info.root / info.resource_dir)
),
self.panda_device.data.hdf_file_name.set(
f"{info.prefix}{self.panda_device.name}{info.suffix}.h5",
),
self.panda_device.data.num_capture.set(0),
)
# Wait for it to start, stashing the status that tells us when it finishes
await self.panda_device.data.capture.set(True)
if multiplier > 1:
raise ValueError(
"All PandA datasets should be scalar, multiplier should be 1"
)
return await self._describe()
async def _describe(self) -> Dict[str, DataKey]:
"""
Return a describe based on the datasets PV
"""
await self._update_datasets()
describe = {
ds.data_key: DataKey(
source=self.panda_device.data.hdf_directory.source,
shape=ds.shape,
dtype="array" if ds.shape != [1] else "number",
external="STREAM:",
)
for ds in self._datasets
}
return describe
async def _update_datasets(self) -> None:
"""
Load data from the datasets PV on the panda, update internal
representation of datasets that the panda will write.
"""
capture_table = await self.panda_device.data.datasets.get_value()
self._datasets = [
_HDFDataset(dataset_name, "/" + dataset_name, [1], multiplier=1)
for dataset_name in capture_table["name"]
]
# Next few functions are exactly the same as AD writer. Could move as default
# StandardDetector behavior
async def wait_for_index(
self, index: int, timeout: Optional[float] = DEFAULT_TIMEOUT
):
def matcher(value: int) -> bool:
return value >= index
matcher.__name__ = f"index_at_least_{index}"
await wait_for_value(
self.panda_device.data.num_captured, matcher, timeout=timeout
)
async def get_indices_written(self) -> int:
return await self.panda_device.data.num_captured.get_value()
async def observe_indices_written(
self, timeout=DEFAULT_TIMEOUT
) -> AsyncGenerator[int, None]:
"""Wait until a specific index is ready to be collected"""
async for num_captured in observe_value(
self.panda_device.data.num_captured, timeout
):
yield num_captured // self._multiplier
async def collect_stream_docs(
self, indices_written: int
) -> AsyncIterator[StreamAsset]:
# TODO: fail if we get dropped frames
if indices_written:
if not self._file:
self._file = _HDFFile(
self._directory_provider(),
Path(await self.panda_device.data.hdf_file_name.get_value()),
self._datasets,
)
for doc in self._file.stream_resources():
yield "stream_resource", doc
for doc in self._file.stream_data(indices_written):
yield "stream_datum", doc
# Could put this function as default for StandardDetector
async def close(self):
await self.panda_device.data.capture.set(
False, wait=True, timeout=DEFAULT_TIMEOUT
)
The PandA StandardDetector implementation simply ties the component parts and its child devices together.
class HDFPanda(CommonPandaBlocks, StandardDetector):
def __init__(
self,
prefix: str,
directory_provider: DirectoryProvider,
config_sigs: Sequence[SignalR] = (),
name: str = "",
):
self._prefix = prefix
create_children_from_annotations(self)
controller = PandaPcapController(pcap=self.pcap)
writer = PandaHDFWriter(
directory_provider=directory_provider,
panda_device=self,
)
super().__init__(
controller=controller,
writer=writer,
config_sigs=config_sigs,
name=name,
)
async def connect(
self, mock: bool = False, timeout: float = DEFAULT_TIMEOUT
) -> None:
await fill_pvi_entries(self, self._prefix + "PVI", timeout=timeout, mock=mock)
await super().connect(mock=mock, timeout=timeout)