How to implement a Device for an EPICS areaDetector#

This document will walk through the steps taken to implement an ophyd-async Device that talks to an EPICS areaDetector

Create a module to put the code in#

The first stage is to make a module in the ophyd-async repository to put the code in:

  • If you haven’t already, git clone git@github.com:bluesky/ophyd-async.git and make a new branch to work in

  • Create a new file under src/ophyd_async/epics/ which is the lowercase version of the epics support module for the detector

    • For example, for ADAravis make a file adaravis.py

Add an IO class for the PV interface#

Now you need an IO class that subclasses adcore.ADBaseIO. This should add the PVs that are detector driver specific that are required to setup triggering.

For example for ADAravis this would include signals for trigger mode, trigger source, and any detector-specific settings.

Add Trigger Logic for detector-specific triggering#

Now you need a class that subclasses DetectorTriggerLogic. This should implement methods for each trigger mode your detector supports:

  • prepare_internal(num, livetime, deadtime) - Setup for internal triggering (detector generates its own triggers)

  • prepare_edge(num, livetime) - Setup for external edge triggering (rising edge starts an internally-timed exposure)

  • prepare_level(num) - Setup for external level/gate triggering (high level duration determines exposure time)

If the detector has configuration values that should be captured in the scan then implement:

  • config_sigs() - Return the set of signals that should appear in read_configuration()

If you support external triggering you should also implement:

  • get_deadtime(config_values) - Calculate the minimum time between exposures based on configuration values

Only implement the prepare methods for trigger modes your detector actually supports. The detector will automatically report which trigger types are available based on which methods are implemented.

For example, for ADAravis:

class AravisTriggerLogic(DetectorTriggerLogic):
    """Trigger logic for Aravis GigE and USB3 cameras."""

    def __init__(self, driver: AravisDriverIO, override_deadtime: float | None = None):
        self.driver = driver
        self.override_deadtime = override_deadtime

    def config_sigs(self) -> set[SignalR]:
        return {self.driver.model}

    def get_deadtime(self, config_values: SignalDict) -> float:
        return get_camera_deadtime(
            model=config_values[self.driver.model],
            override_deadtime=self.override_deadtime,
        )

    async def prepare_internal(self, num: int, livetime: float, deadtime: float):
        await self.driver.trigger_mode.set(OnOff.OFF)
        await prepare_exposures(self.driver, num, livetime, deadtime)

    async def prepare_edge(self, num: int, livetime: float):
        # Trigger on the rising edge of Line1
        # trigger mode must be set first and on its own!
        # Hardware race condition in Aravis firmware requires setting trigger mode
        # separately before trigger source to avoid undefined behavior.
        # https://github.com/AravisProject/aravis/issues/1045
        await self.driver.trigger_mode.set(OnOff.ON)
        await self.driver.trigger_source.set(AravisTriggerSource.LINE1)
        await prepare_exposures(self.driver, num, livetime)

Use ADArmLogic or create custom Arm Logic#

Most areaDetectors can use the standard adcore.ADArmLogic which handles arming and disarming via the driver’s acquire signal. If your detector requires custom arming behavior (e.g., waiting for a specific ready signal), create a DetectorArmLogic subclass with:

  • arm() - Start acquisition

  • wait_for_idle() - Wait until acquisition is complete

  • disarm() - Stop acquisition

Add a Detector that puts it all together#

Now you need to make an adcore.AreaDetector subclass that uses your IO and Trigger Logic with the standard Arm Logic and Writer classes that come with ADCore. The __init__ method should:

  1. Create the driver IO instance

  2. Create instances of your logic classes

  3. Call super().__init__() passing the driver and logic instances to the baseclass

The constructor parameters should include:

  • prefix: The PV prefix for the driver and plugins

  • path_provider: A PathProvider that tells the detector where to write data (optional if writer_type=None)

  • driver_suffix: A PV suffix for the driver, defaulting to "cam1:"

  • writer_type: An adcore.ADWriterType enum value (HDF, TIFF, JPEG) or None to skip file writing

  • writer_suffix: An optional PV suffix for the file writer plugin

  • plugins: An optional mapping of {name: adcore.NDPluginBaseIO} for additional plugins

  • config_sigs: Additional signals to report in configuration (beyond the standard acquire_time and acquire_period)

  • name: An optional name for the device

  • Any detector-specific override parameters for your trigger logic

For example, for ADAravis:

class AravisDetector(AreaDetector[AravisDriverIO]):
    """Create an ADAravis AreaDetector instance.

    :param prefix: EPICS PV prefix for the detector
    :param path_provider: Provider for file paths during acquisition
    :param driver_suffix: Suffix for the driver PV, defaults to "cam1:"
    :param override_deadtime:
        If provided, this value is used for deadtime instead of looking up
        based on camera model.
    :param writer_type: Type of file writer (HDF or TIFF)
    :param writer_suffix: Suffix for the writer PV
    :param plugins: Additional areaDetector plugins to include
    :param config_sigs: Additional signals to include in configuration
    :param name: Name for the detector device
    """

    def __init__(
        self,
        prefix: str,
        path_provider: PathProvider | None = None,
        driver_suffix="cam1:",
        override_deadtime: float | None = None,
        writer_type: ADWriterType | None = ADWriterType.HDF,
        writer_suffix: str | None = None,
        plugins: dict[str, NDPluginBaseIO] | None = None,
        config_sigs: Sequence[SignalR] = (),
        name: str = "",
    ) -> None:
        driver = AravisDriverIO(prefix + driver_suffix)
        super().__init__(
            prefix=prefix,
            driver=driver,
            arm_logic=ADArmLogic(driver),
            trigger_logic=AravisTriggerLogic(driver, override_deadtime),
            path_provider=path_provider,
            writer_type=writer_type,
            writer_suffix=writer_suffix,
            plugins=plugins,
            config_sigs=config_sigs,
            name=name,
        )

The AreaDetector baseclass will:

  • Store the driver as self.driver

  • Call add_detector_logics() to register your trigger and arm logic

  • Create and register a data logic for file writing if writer_type is not None

  • Add configuration signals (driver.acquire_time, driver.acquire_period, and any you specify)

  • Store any plugins as attributes on the detector

Declare the public interface#

Now you should take all the classes you’ve made and add them to the top level __all__. Typically you should export:

  • The detector class (e.g., AravisDetector)

  • The driver IO class (e.g., AravisDriverIO)

  • The trigger logic class (e.g., AravisTriggerLogic)

  • Any custom Enum types for PV values (e.g., AravisTriggerSource)

For example, for ADAravis:

__all__ = [
    "AravisDetector",
    "AravisDriverIO",
    "AravisTriggerLogic",
    "AravisTriggerSource",
]

Add multiple data streams (optional)#

The composition-based architecture makes it possible to add multiple data outputs to a detector. After creating the detector, you can call add_detector_logics() to add additional data sources:

Reading stats plugins alongside file writing#

from ophyd_async.epics.adcore import PluginSignalDataLogic

det = adaravis.AravisDetector(prefix, path_provider)
# Add stats total as a readable signal in events
det.add_detector_logics(adcore.PluginSignalDataLogic(det.driver, det.stats.total))

Multiple HDF writers for different ROIs#

# Don't create default writer
det = adaravis.AravisDetector(prefix, writer_type=None)  
# Add separate writers for each ROI
det.add_detector_logics(
    adcore.ADHDFDataLogic(path_provider, det.driver, det.roi1_plugin, datakey_suffix="-roi1"),
    adcore.ADHDFDataLogic(path_provider, det.driver, det.roi2_plugin, datakey_suffix="-roi2"),
)

Continuously acquiring detector#

For detectors that acquire continuously, use adcore.ADContAcqTriggerLogic instead of creating custom trigger logic. This uses the builtin areaDetector circular buffer plugin to capture frames while the detector runs continuously.

Requirements:

  • Your AD IOC must have a circular buffer plugin configured

  • The plugin output should be fed to the file writer

  • The detector should already be acquiring continuously before use

Example implementation:

driver = adcore.ADBaseIO("PREFIX:DRV:")
cb_plugin = adcore.NDCircularBuffIO("PREFIX:CB1:")
det = adcore.AreaDetector(
    driver=driver,
    arm_logic=adcore.ADContAcqArmLogic(driver, cb_plugin),
    trigger_logic=adcore.ADContAcqTriggerLogic(driver, cb_plugin),
    path_provider=path_provider,
    plugins={"cb1": cb_plugin},
)

The adcore.ADContAcqTriggerLogic will:

  • Validate that exposure time matches the detector’s current acquisition period

  • Configure the circular buffer plugin to capture the requested number of frames

  • Use the circular buffer’s trigger signal instead of the driver’s acquire signal

Write tests#

Write unit tests to verify your detector implementation works correctly. You should test:

Test fixture setup#

Use a pytest fixture to initialize your detector in mock mode for testing:

@pytest.fixture
async def test_adaravis(
    static_path_provider: StaticPathProvider,
) -> adaravis.AravisDetector:
    async with init_devices(mock=True):
        detector = adaravis.AravisDetector("PREFIX:", static_path_provider)
    writer = detector.get_plugin("writer", adcore.NDPluginFileIO)
    set_mock_value(writer.file_path_exists, True)
    return detector

This fixture:

  • Initializes the detector with mock=True for unit testing

  • Sets up required mock values (e.g., file_path_exists)

  • Returns the detector for use in test functions

Test PV correctness#

Verify that your detector driver correctly maps to the expected PV names:

def test_pvs_correct(test_adaravis: adaravis.AravisDetector):
    assert test_adaravis.driver.acquire.source == "mock+ca://PREFIX:cam1:Acquire_RBV"
    assert (
        test_adaravis.driver.trigger_mode.source
        == "mock+ca://PREFIX:cam1:TriggerMode_RBV"
    )

Test deadtime calculation#

If your detector supports external triggering, test the get_deadtime() method for different detector models and configurations:

@pytest.mark.parametrize(
    "model,deadtime", [("Mako G-125", 70e-6), ("Mako G-507", 554e-6)]
)
async def test_deadtime(
    test_adaravis: adaravis.AravisDetector,
    model: str,
    deadtime: float,
):
    # Set a default model for tests that need deadtime
    set_mock_value(test_adaravis.driver.model, model)
    trigger_modes, actual_deadtime = await test_adaravis.get_trigger_deadtime()
    assert trigger_modes == {DetectorTrigger.INTERNAL, DetectorTrigger.EXTERNAL_EDGE}
    assert deadtime == actual_deadtime

Test prepare methods#

Test each trigger mode your detector supports by verifying that prepare() configures the detector with the correct PV values. Test external edge triggering:

async def test_prepare_external_edge(
    test_adaravis: adaravis.AravisDetector,
):
    await test_adaravis.prepare(
        TriggerInfo(
            trigger=DetectorTrigger.EXTERNAL_EDGE,
            number_of_events=5,
            livetime=0.5,
        )
    )
    assert_has_calls(
        test_adaravis.driver,
        [
            call.trigger_mode.put(OnOff.ON),
            call.trigger_source.put(adaravis.AravisTriggerSource.LINE1),
            call.image_mode.put(adcore.ADImageMode.MULTIPLE),
            call.num_images.put(5),
            call.acquire_time.put(0.5),
            call.acquire.put(True),
        ],
    )

And test internal triggering:

async def test_prepare_internal(
    test_adaravis: adaravis.AravisDetector,
):
    await test_adaravis.prepare(TriggerInfo(number_of_events=11))
    assert_has_calls(
        test_adaravis.driver,
        [
            call.trigger_mode.put(OnOff.OFF),
            call.image_mode.put(adcore.ADImageMode.MULTIPLE),
            call.num_images.put(11),
        ],
    )

Conclusion#

You have now made a detector, and can import and create it like this:

from ophyd_async.epics import adaravis

det = adaravis.AravisDetector("PREFIX:", path_provider)

The detector will now support:

  • Querying supported trigger types with get_trigger_deadtime()

  • Step scanning with trigger()

  • Fly scanning with kickoff() and complete()

  • Automatic handling of file writing and StreamAsset document emission

  • Configuration signal reporting based on your trigger logic