How to implement a Device for an EPICS areaDetector#

This document will walk through the steps taken to implement an ophyd-async Device that talks to an EPICS areaDetector

Create a module to put the code in#

The first stage is to make a module in the ophyd-async repository to put the code in:

  • If you haven’t already, git clone git@github.com:bluesky/ophyd-async.git and make a new branch to work in

  • Create a new directory under src/ophyd_async/epics/ which is the lowercase version of the epics support module for the detector

    • For example, for ADAravis make a directory adaravis

  • Make an empty __init__.py within that directory

Add an IO class for the PV interface#

Now you need an IO class that subclasses adcore.ADBaseIO. This should add the PVs that are detector driver specific that are required to setup triggering.

For example for ADAravis this is in the file _aravis_io.py:

from typing import Annotated as A

from ophyd_async.core import SignalRW, StrictEnum, SubsetEnum
from ophyd_async.epics import adcore
from ophyd_async.epics.core import PvSuffix


class AravisTriggerMode(StrictEnum):
    """GigEVision GenICAM standard TriggerMode."""

    ON = "On"
    """Use TriggerSource to trigger each frame"""

    OFF = "Off"
    """Just trigger as fast as you can"""


class AravisTriggerSource(SubsetEnum):
    """Which trigger source to use when TriggerMode=On."""

    LINE1 = "Line1"


class AravisDriverIO(adcore.ADBaseIO):
    """Generic Driver supporting all GiGE cameras.

    This mirrors the interface provided by ADAravis/db/aravisCamera.template.
    """

    trigger_mode: A[SignalRW[AravisTriggerMode], PvSuffix.rbv("TriggerMode")]
    trigger_source: A[SignalRW[AravisTriggerSource], PvSuffix.rbv("TriggerSource")]

Add a Controller that knows how to setup the driver#

Now you need a class that subclasses adcore.ADBaseController. This should implement at least:

  • get_deadtime() to give the amount of time required between triggers for a given exposure

  • prepare() to set the camera up for a given trigger mode, number of frames and exposure

For example for ADAravis this is in the file _aravis_controller.py:

import asyncio

from ophyd_async.core import DetectorTrigger, TriggerInfo
from ophyd_async.epics import adcore

from ._aravis_io import AravisDriverIO, AravisTriggerMode, AravisTriggerSource

# The deadtime of an ADaravis controller varies depending on the exact model of camera.
# Ideally we would maximize performance by dynamically retrieving the deadtime at
# runtime. See https://github.com/bluesky/ophyd-async/issues/308
_HIGHEST_POSSIBLE_DEADTIME = 1961e-6


class AravisController(adcore.ADBaseController[AravisDriverIO]):
    """`DetectorController` for an `AravisDriverIO`."""

    def get_deadtime(self, exposure: float | None) -> float:
        return _HIGHEST_POSSIBLE_DEADTIME

    async def prepare(self, trigger_info: TriggerInfo) -> None:
        if (exposure := trigger_info.livetime) is not None:
            await self.driver.acquire_time.set(exposure)

        if trigger_info.trigger is DetectorTrigger.INTERNAL:
            # Set trigger mode off to ignore the trigger source
            await self.driver.trigger_mode.set(AravisTriggerMode.OFF)
        elif trigger_info.trigger in {
            DetectorTrigger.CONSTANT_GATE,
            DetectorTrigger.EDGE_TRIGGER,
        }:
            # Trigger on the rising edge of Line1
            # trigger mode must be set first and on it's own!
            await self.driver.trigger_mode.set(AravisTriggerMode.ON)
            await self.driver.trigger_source.set(AravisTriggerSource.LINE1)
        else:
            raise ValueError(f"ADAravis does not support {trigger_info.trigger}")

        if trigger_info.total_number_of_triggers == 0:
            image_mode = adcore.ImageMode.CONTINUOUS
        else:
            image_mode = adcore.ImageMode.MULTIPLE
        await asyncio.gather(
            self.driver.num_images.set(trigger_info.total_number_of_triggers),
            self.driver.image_mode.set(image_mode),
        )

Add a Detector that puts it all together#

Now you need to make a StandardDetector subclass that uses your IO and Controller with the standard file IO and Writer classes that come with ADCore. The __init__ method should take the following:

  • prefix: The PV prefix for the driver and plugins

  • path_provider: A PathProvider that tells the detector where to write data

  • drv_suffix: A PV suffix for the driver, defaulting to "cam1:"

  • writer_cls: An adcore.ADWriter class to instantiate, defaulting to adcore.ADHDFWriter

  • fileio_suffix: An optional PV suffix for the fileio, if not given it will default to the writer class default

  • name: An optional name for the device

  • config_sigs: Optionally the signals to report as configuration

  • plugins: An optional mapping of {name: adcore.NDPluginBaseIO} for each additional plugin that might contribute data to the resulting file

For example for ADAravis this is in the file _aravis.py:

from collections.abc import Sequence

from ophyd_async.core import PathProvider, SignalR
from ophyd_async.epics import adcore

from ._aravis_controller import AravisController
from ._aravis_io import AravisDriverIO


class AravisDetector(adcore.AreaDetector[AravisController]):
    """Implementation of an ADAravis Detector.

    The detector may be configured for an external trigger on a GPIO port,
    which must be done prior to preparing the detector
    """

    def __init__(
        self,
        prefix: str,
        path_provider: PathProvider,
        drv_suffix="cam1:",
        writer_cls: type[adcore.ADWriter] = adcore.ADHDFWriter,
        fileio_suffix: str | None = None,
        name: str = "",
        config_sigs: Sequence[SignalR] = (),
        plugins: dict[str, adcore.NDPluginBaseIO] | None = None,
    ):
        driver = AravisDriverIO(prefix + drv_suffix)
        controller = AravisController(driver)
        writer = writer_cls.with_io(
            prefix,
            path_provider,
            dataset_source=driver,
            fileio_suffix=fileio_suffix,
            plugins=plugins,
        )
        super().__init__(
            controller=controller,
            writer=writer,
            plugins=plugins,
            name=name,
            config_sigs=config_sigs,
        )

Make it importable#

Now you should take all the classes you’ve made and add it to the top level __init__.py to declare the public interface for this module. Typically you should also include any Enum types you have made to support the IO.

For example for ADAravis this is:

"""Support for the ADAravis areaDetector driver.

https://github.com/areaDetector/ADAravis
"""

from ._aravis import AravisDetector
from ._aravis_controller import AravisController
from ._aravis_io import AravisDriverIO, AravisTriggerMode, AravisTriggerSource

__all__ = [
    "AravisDetector",
    "AravisController",
    "AravisDriverIO",
    "AravisTriggerMode",
    "AravisTriggerSource",
]

Write tests#

TODO

Conclusion#

You have now made a detector, and can import and create it like this:

from ophyd_async.epics import adaravis, adcore


det = adaravis.AravisDetector(
   "PREFIX:", 
   path_provider, 
   drv_suffix="DRV:", 
   writer_cls=adcore.ADHDFWriter, 
   fileio_suffix="HDF:",
)