Note

Ophyd async is included on a provisional basis until the v1.0 release and may change API on minor release numbers before then

Make a StandardDetector#

StandardDetector is an abstract class to assist in creating Device classes for hardware that writes its own data e.g. an AreaDetector implementation, or a PandA writing motor encoder positions to file. The StandardDetector is a simple compound device, with 2 standard components:

  • DetectorWriter to handle data persistence, i/o and pass information about data to the RunEngine (usually an instance of ADHDFWriter)

  • DetectorController with logic for arming and disarming the detector. This will be unique to the StandardDetector implementation.

Writing an AreaDetector StandardDetector#

For an AreaDetector implementation of the StandardDetector, two entity objects which are subdevices of the StandardDetector are used to map to AreaDetector plugins:

  • An NDPluginFile instance (for ADHDFWriter an instance of NDFileHDFIO)

  • An ADBaseIO instance mapping to NDArray for the “driver” of the detector implementation

Define a FooDriver if the NDArray requires fields in addition to those on ADBaseIO to be exposed. It should extend ADBaseIO. Enumeration fields should be named to prevent namespace collision, i.e. for a Signal named “TriggerSource” use the enum “FooTriggerSource”

class FooDriver(adcore.ADBaseIO):
    def __init__(self, prefix: str, name: str = "") -> None:
        self.trigger_mode = epics_signal_rw_rbv(str, prefix + "TriggerMode")
        super().__init__(prefix, name)

Define a FooController with handling for converting the standard pattern of ophyd_async.core.DetectorController.arm() and ophyd_async.core.DetectorController.disarm() to required state of FooDriver e.g. setting a compatible “FooTriggerSource” for a given DetectorTrigger, or raising an exception if incompatible with the DetectorTrigger.

The ophyd_async.core.DetectorController.get_deadtime() method is used when constructing sequence tables for hardware controlled scanning. Details on how to calculate the deadtime may be only available from technical manuals or otherwise complex. If it requires fetching from signals, it is recommended to cache the value during the StandardDetector `prepare` method.

class FooController(DetectorController):
    def __init__(self, driver: FooDriver) -> None:
        self._drv = driver

    def get_deadtime(self, exposure: float) -> float:
        # FooDetector deadtime handling
        return 0.001

    async def arm(
        self,
        num: int,
        trigger: DetectorTrigger = DetectorTrigger.internal,
        exposure: float | None = None,
    ) -> AsyncStatus:
        await asyncio.gather(
            self._drv.num_images.set(num),
            self._drv.image_mode.set(adcore.ImageMode.multiple),
            self._drv.trigger_mode.set(f"FOO{trigger}"),
        )
        if exposure is not None:
            await self._drv.acquire_time.set(exposure)
        return await adcore.start_acquiring_driver_and_ensure_status(self._drv)

    async def disarm(self):
        await adcore.stop_busy_record(self._drv.acquire, False, timeout=1)

FooDetector ties the Driver, Controller and data persistence layer together. The example FooDetector writes h5 files using the standard NDPlugin. It additionally supports the HasHints protocol which is optional but recommended.

Its initialiser assumes the NSLS-II AreaDetector plugin EPICS address suffixes as defaults but allows overriding: this pattern is recommended for consistency. If the FooDriver signals that should be read as configuration, they should be added to the “config_sigs” passed to the super.

class FooDetector(StandardDetector, HasHints):
    _controller: FooController
    _writer: adcore.ADHDFWriter

    def __init__(
        self,
        prefix: str,
        path_provider: PathProvider,
        drv_suffix="cam1:",
        hdf_suffix="HDF1:",
        name="",
    ):
        # Must be children to pick up connect
        self.drv = FooDriver(prefix + drv_suffix)
        self.hdf = adcore.NDFileHDFIO(prefix + hdf_suffix)

        super().__init__(
            FooController(self.drv),
            adcore.ADHDFWriter(
                self.hdf,
                path_provider,
                lambda: self.name,
                adcore.ADBaseDatasetDescriber(self.drv),
            ),
            config_sigs=(self.drv.acquire_time,),
            name=name,
        )

    @property
    def hints(self) -> Hints:
        return self._writer.hints

Writing a non-AreaDetector StandardDetector#

A non-AreaDetector StandardDetector should implement DetectorController and DetectorWriter directly. Here we construct a DetectorController that co-ordinates signals on a PandA PositionCapture block - a child device “pcap” of the StandardDetector implementation, analogous to the FooDriver.

class PandaPcapController(DetectorController):
    def __init__(self, pcap: PcapBlock) -> None:
        self.pcap = pcap
        self._arm_status: AsyncStatus | None = None

    def get_deadtime(self, exposure: float | None) -> float:
        return 0.000000008

    async def prepare(self, trigger_info: TriggerInfo):
        assert trigger_info.trigger in (
            DetectorTrigger.constant_gate,
            DetectorTrigger.variable_gate,
        ), "Only constant_gate and variable_gate triggering is supported on the PandA"

    async def arm(self):
        self._arm_status = self.pcap.arm.set(True)
        await wait_for_value(self.pcap.active, True, timeout=1)

    async def wait_for_idle(self):
        pass

    async def disarm(self):
        await asyncio.gather(self.pcap.arm.set(False))
        await wait_for_value(self.pcap.active, False, timeout=1)

The PandA may write a number of fields, and the PandaHDFWriter co-ordinates those, configures the filewriter and describes the data for the RunEngine.

class PandaHDFWriter(DetectorWriter):
    _ctxt: Context | None = None

    def __init__(
        self,
        prefix: str,
        path_provider: PathProvider,
        name_provider: NameProvider,
        panda_data_block: DataBlock,
    ) -> None:
        self.panda_data_block = panda_data_block
        self._prefix = prefix
        self._path_provider = path_provider
        self._name_provider = name_provider
        self._datasets: list[HDFDataset] = []
        self._file: HDFFile | None = None
        self._multiplier = 1

    # Triggered on PCAP arm
    async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
        """Retrieve and get descriptor of all PandA signals marked for capture"""

        # Ensure flushes are immediate
        await self.panda_data_block.flush_period.set(0)

        self._file = None
        info = self._path_provider(device_name=self._name_provider())

        # Set create dir depth first to guarantee that callback when setting
        # directory path has correct value
        await self.panda_data_block.create_directory.set(info.create_dir_depth)

        # Set the initial values
        await asyncio.gather(
            self.panda_data_block.hdf_directory.set(str(info.directory_path)),
            self.panda_data_block.hdf_file_name.set(
                f"{info.filename}.h5",
            ),
            self.panda_data_block.capture_mode.set(CaptureMode.FOREVER),
        )

        # Make sure that directory exists or has been created.
        if not await self.panda_data_block.directory_exists.get_value() == 1:
            raise OSError(
                f"Directory {info.directory_path} does not exist or "
                "is not writable by the PandABlocks-ioc!"
            )

        # Wait for it to start, stashing the status that tells us when it finishes
        await self.panda_data_block.capture.set(True)
        if multiplier > 1:
            raise ValueError(
                "All PandA datasets should be scalar, multiplier should be 1"
            )

        return await self._describe()

    async def _describe(self) -> dict[str, DataKey]:
        """
        Return a describe based on the datasets PV
        """

        await self._update_datasets()
        describe = {
            ds.data_key: DataKey(
                source=self.panda_data_block.hdf_directory.source,
                shape=list(ds.shape),
                dtype="array" if ds.shape != [1] else "number",
                # PandA data should always be written as Float64
                # Ignore type check until https://github.com/bluesky/event-model/issues/308
                dtype_numpy="<f8",  # type: ignore
                external="STREAM:",
            )
            for ds in self._datasets
        }
        return describe

    async def _update_datasets(self) -> None:
        """
        Load data from the datasets PV on the panda, update internal
        representation of datasets that the panda will write.
        """

        capture_table = await self.panda_data_block.datasets.get_value()
        self._datasets = [
            # TODO: Update chunk size to read signal once available in IOC
            # Currently PandA IOC sets chunk size to 1024 points per chunk
            HDFDataset(
                dataset_name, "/" + dataset_name, [1], multiplier=1, chunk_shape=(1024,)
            )
            for dataset_name in capture_table["name"]
        ]

        # Warn user if dataset table is empty in PandA
        # i.e. no stream resources will be generated
        if len(self._datasets) == 0:
            self.panda_data_block.log.warning(
                f"PandA {self._name_provider()} DATASETS table is empty! "
                "No stream resource docs will be generated. "
                "Make sure captured positions have their corresponding "
                "*:DATASET PV set to a scientifically relevant name."
            )

    # Next few functions are exactly the same as AD writer. Could move as default
    # StandardDetector behavior
    async def wait_for_index(self, index: int, timeout: float | None = DEFAULT_TIMEOUT):
        def matcher(value: int) -> bool:
            return value >= index

        matcher.__name__ = f"index_at_least_{index}"
        await wait_for_value(
            self.panda_data_block.num_captured, matcher, timeout=timeout
        )

    async def get_indices_written(self) -> int:
        return await self.panda_data_block.num_captured.get_value()

    async def observe_indices_written(
        self, timeout=DEFAULT_TIMEOUT
    ) -> AsyncGenerator[int, None]:
        """Wait until a specific index is ready to be collected"""
        async for num_captured in observe_value(
            self.panda_data_block.num_captured, timeout
        ):
            yield num_captured // self._multiplier

    async def collect_stream_docs(
        self, indices_written: int
    ) -> AsyncIterator[StreamAsset]:
        # TODO: fail if we get dropped frames
        if indices_written:
            if not self._file:
                self._file = HDFFile(
                    Path(await self.panda_data_block.hdf_directory.get_value())
                    / Path(await self.panda_data_block.hdf_file_name.get_value()),
                    self._datasets,
                )
                for doc in self._file.stream_resources():
                    yield "stream_resource", doc
            for doc in self._file.stream_data(indices_written):
                yield "stream_datum", doc

    # Could put this function as default for StandardDetector
    async def close(self):
        await self.panda_data_block.capture.set(
            False, wait=True, timeout=DEFAULT_TIMEOUT
        )

The PandA StandardDetector implementation simply ties the component parts and its child devices together.

class HDFPanda(CommonPandaBlocks, StandardDetector):
    def __init__(
        self,
        prefix: str,
        path_provider: PathProvider,
        config_sigs: Sequence[SignalR] = (),
        name: str = "",
    ):
        self._prefix = prefix

        create_children_from_annotations(self)
        controller = PandaPcapController(pcap=self.pcap)
        writer = PandaHDFWriter(
            prefix=prefix,
            path_provider=path_provider,
            name_provider=lambda: name,
            panda_data_block=self.data,
        )
        super().__init__(
            controller=controller,
            writer=writer,
            config_sigs=config_sigs,
            name=name,
        )

    async def connect(
        self,
        mock: bool = False,
        timeout: float = DEFAULT_TIMEOUT,
        force_reconnect: bool = False,
    ):
        # TODO: this doesn't support caching
        # https://github.com/bluesky/ophyd-async/issues/472
        await fill_pvi_entries(self, self._prefix + "PVI", timeout=timeout, mock=mock)
        await super().connect(
            mock=mock, timeout=timeout, force_reconnect=force_reconnect
        )