How to implement a Device for an EPICS areaDetector#
This document will walk through the steps taken to implement an ophyd-async Device that talks to an EPICS areaDetector
Create a module to put the code in#
The first stage is to make a module in the ophyd-async repository to put the code in:
If you haven’t already,
git clone git@github.com:bluesky/ophyd-async.gitand make a new branch to work inCreate a new file under
src/ophyd_async/epics/which is the lowercase version of the epics support module for the detectorFor example, for
ADAravismake a fileadaravis.py
Add an IO class for the PV interface#
Now you need an IO class that subclasses adcore.ADBaseIO. This should add the PVs that are detector driver specific that are required to setup triggering.
For example for ADAravis this would include signals for trigger mode, trigger source, and any detector-specific settings.
Add Trigger Logic for detector-specific triggering#
Now you need a class that subclasses DetectorTriggerLogic. This should implement methods for each trigger mode your detector supports:
prepare_internal(num, livetime, deadtime)- Setup for internal triggering (detector generates its own triggers)prepare_edge(num, livetime)- Setup for external edge triggering (rising edge starts an internally-timed exposure)prepare_level(num)- Setup for external level/gate triggering (high level duration determines exposure time)
If the detector has configuration values that should be captured in the scan then implement:
config_sigs()- Return the set of signals that should appear in read_configuration()
If you support external triggering you should also implement:
get_deadtime(config_values)- Calculate the minimum time between exposures based on configuration values
Only implement the prepare methods for trigger modes your detector actually supports. The detector will automatically report which trigger types are available based on which methods are implemented.
For example, for ADAravis:
class AravisTriggerLogic(DetectorTriggerLogic):
"""Trigger logic for Aravis GigE and USB3 cameras."""
def __init__(self, driver: AravisDriverIO, override_deadtime: float | None = None):
self.driver = driver
self.override_deadtime = override_deadtime
def config_sigs(self) -> set[SignalR]:
return {self.driver.model}
def get_deadtime(self, config_values: SignalDict) -> float:
return get_camera_deadtime(
model=config_values[self.driver.model],
override_deadtime=self.override_deadtime,
)
async def prepare_internal(self, num: int, livetime: float, deadtime: float):
await self.driver.trigger_mode.set(OnOff.OFF)
await prepare_exposures(self.driver, num, livetime, deadtime)
async def prepare_edge(self, num: int, livetime: float):
# Trigger on the rising edge of Line1
# trigger mode must be set first and on its own!
# Hardware race condition in Aravis firmware requires setting trigger mode
# separately before trigger source to avoid undefined behavior.
# https://github.com/AravisProject/aravis/issues/1045
await self.driver.trigger_mode.set(OnOff.ON)
await self.driver.trigger_source.set(AravisTriggerSource.LINE1)
await prepare_exposures(self.driver, num, livetime)
Use ADArmLogic or create custom Arm Logic#
Most areaDetectors can use the standard adcore.ADArmLogic which handles arming and disarming via the driver’s acquire signal. If your detector requires custom arming behavior (e.g., waiting for a specific ready signal), create a DetectorArmLogic subclass with:
arm()- Start acquisitionwait_for_idle()- Wait until acquisition is completedisarm()- Stop acquisition
Add a Detector that puts it all together#
Now you need to make an adcore.AreaDetector subclass that uses your IO and Trigger Logic with the standard Arm Logic and Writer classes that come with ADCore. The __init__ method should:
Create the driver IO instance
Create instances of your logic classes
Call
super().__init__()passing the driver and logic instances to the baseclass
The constructor parameters should include:
prefix: The PV prefix for the driver and pluginspath_provider: APathProviderthat tells the detector where to write data (optional ifwriter_type=None)driver_suffix: A PV suffix for the driver, defaulting to"cam1:"writer_type: Anadcore.ADWriterTypeenum value (HDF, TIFF, JPEG) or None to skip file writingwriter_suffix: An optional PV suffix for the file writer pluginplugins: An optional mapping of {name:adcore.NDPluginBaseIO} for additional pluginsconfig_sigs: Additional signals to report in configuration (beyond the standard acquire_time and acquire_period)name: An optional name for the deviceAny detector-specific override parameters for your trigger logic
For example, for ADAravis:
class AravisDetector(AreaDetector[AravisDriverIO]):
"""Create an ADAravis AreaDetector instance.
:param prefix: EPICS PV prefix for the detector
:param path_provider: Provider for file paths during acquisition
:param driver_suffix: Suffix for the driver PV, defaults to "cam1:"
:param override_deadtime:
If provided, this value is used for deadtime instead of looking up
based on camera model.
:param writer_type: Type of file writer (HDF or TIFF)
:param writer_suffix: Suffix for the writer PV
:param plugins: Additional areaDetector plugins to include
:param config_sigs: Additional signals to include in configuration
:param name: Name for the detector device
"""
def __init__(
self,
prefix: str,
path_provider: PathProvider | None = None,
driver_suffix="cam1:",
override_deadtime: float | None = None,
writer_type: ADWriterType | None = ADWriterType.HDF,
writer_suffix: str | None = None,
plugins: dict[str, NDPluginBaseIO] | None = None,
config_sigs: Sequence[SignalR] = (),
name: str = "",
) -> None:
driver = AravisDriverIO(prefix + driver_suffix)
super().__init__(
prefix=prefix,
driver=driver,
arm_logic=ADArmLogic(driver),
trigger_logic=AravisTriggerLogic(driver, override_deadtime),
path_provider=path_provider,
writer_type=writer_type,
writer_suffix=writer_suffix,
plugins=plugins,
config_sigs=config_sigs,
name=name,
)
The AreaDetector baseclass will:
Store the driver as
self.driverCall
add_detector_logics()to register your trigger and arm logicCreate and register a data logic for file writing if
writer_typeis not NoneAdd configuration signals (driver.acquire_time, driver.acquire_period, and any you specify)
Store any plugins as attributes on the detector
Declare the public interface#
Now you should take all the classes you’ve made and add them to the top level __all__. Typically you should export:
The detector class (e.g.,
AravisDetector)The driver IO class (e.g.,
AravisDriverIO)The trigger logic class (e.g.,
AravisTriggerLogic)Any custom Enum types for PV values (e.g.,
AravisTriggerSource)
For example, for ADAravis:
__all__ = [
"AravisDetector",
"AravisDriverIO",
"AravisTriggerLogic",
"AravisTriggerSource",
]
Add multiple data streams (optional)#
The composition-based architecture makes it possible to add multiple data outputs to a detector. After creating the detector, you can call add_detector_logics() to add additional data sources:
Reading stats plugins alongside file writing#
from ophyd_async.epics.adcore import PluginSignalDataLogic
det = adaravis.AravisDetector(prefix, path_provider)
# Add stats total as a readable signal in events
det.add_detector_logics(adcore.PluginSignalDataLogic(det.driver, det.stats.total))
Multiple HDF writers for different ROIs#
# Don't create default writer
det = adaravis.AravisDetector(prefix, writer_type=None)
# Add separate writers for each ROI
det.add_detector_logics(
adcore.ADHDFDataLogic(path_provider, det.driver, det.roi1_plugin, datakey_suffix="-roi1"),
adcore.ADHDFDataLogic(path_provider, det.driver, det.roi2_plugin, datakey_suffix="-roi2"),
)
Continuously acquiring detector#
For detectors that acquire continuously, use adcore.ADContAcqTriggerLogic instead of creating custom trigger logic. This uses the builtin areaDetector circular buffer plugin to capture frames while the detector runs continuously.
Requirements:
Your AD IOC must have a circular buffer plugin configured
The plugin output should be fed to the file writer
The detector should already be acquiring continuously before use
Example implementation:
driver = adcore.ADBaseIO("PREFIX:DRV:")
cb_plugin = adcore.NDCircularBuffIO("PREFIX:CB1:")
det = adcore.AreaDetector(
driver=driver,
arm_logic=adcore.ADContAcqArmLogic(driver, cb_plugin),
trigger_logic=adcore.ADContAcqTriggerLogic(driver, cb_plugin),
path_provider=path_provider,
plugins={"cb1": cb_plugin},
)
The adcore.ADContAcqTriggerLogic will:
Validate that exposure time matches the detector’s current acquisition period
Configure the circular buffer plugin to capture the requested number of frames
Use the circular buffer’s trigger signal instead of the driver’s acquire signal
Write tests#
Write unit tests to verify your detector implementation works correctly. You should test:
Test fixture setup#
Use a pytest fixture to initialize your detector in mock mode for testing:
@pytest.fixture
async def test_adaravis(
static_path_provider: StaticPathProvider,
) -> adaravis.AravisDetector:
async with init_devices(mock=True):
detector = adaravis.AravisDetector("PREFIX:", static_path_provider)
writer = detector.get_plugin("writer", adcore.NDPluginFileIO)
set_mock_value(writer.file_path_exists, True)
return detector
This fixture:
Initializes the detector with
mock=Truefor unit testingSets up required mock values (e.g.,
file_path_exists)Returns the detector for use in test functions
Test PV correctness#
Verify that your detector driver correctly maps to the expected PV names:
def test_pvs_correct(test_adaravis: adaravis.AravisDetector):
assert test_adaravis.driver.acquire.source == "mock+ca://PREFIX:cam1:Acquire_RBV"
assert (
test_adaravis.driver.trigger_mode.source
== "mock+ca://PREFIX:cam1:TriggerMode_RBV"
)
Test deadtime calculation#
If your detector supports external triggering, test the get_deadtime() method for different detector models and configurations:
@pytest.mark.parametrize(
"model,deadtime", [("Mako G-125", 70e-6), ("Mako G-507", 554e-6)]
)
async def test_deadtime(
test_adaravis: adaravis.AravisDetector,
model: str,
deadtime: float,
):
# Set a default model for tests that need deadtime
set_mock_value(test_adaravis.driver.model, model)
trigger_modes, actual_deadtime = await test_adaravis.get_trigger_deadtime()
assert trigger_modes == {DetectorTrigger.INTERNAL, DetectorTrigger.EXTERNAL_EDGE}
assert deadtime == actual_deadtime
Test prepare methods#
Test each trigger mode your detector supports by verifying that prepare() configures the detector with the correct PV values. Test external edge triggering:
async def test_prepare_external_edge(
test_adaravis: adaravis.AravisDetector,
):
await test_adaravis.prepare(
TriggerInfo(
trigger=DetectorTrigger.EXTERNAL_EDGE,
number_of_events=5,
livetime=0.5,
)
)
assert_has_calls(
test_adaravis.driver,
[
call.trigger_mode.put(OnOff.ON),
call.trigger_source.put(adaravis.AravisTriggerSource.LINE1),
call.image_mode.put(adcore.ADImageMode.MULTIPLE),
call.num_images.put(5),
call.acquire_time.put(0.5),
call.acquire.put(True),
],
)
And test internal triggering:
async def test_prepare_internal(
test_adaravis: adaravis.AravisDetector,
):
await test_adaravis.prepare(TriggerInfo(number_of_events=11))
assert_has_calls(
test_adaravis.driver,
[
call.trigger_mode.put(OnOff.OFF),
call.image_mode.put(adcore.ADImageMode.MULTIPLE),
call.num_images.put(11),
],
)
Conclusion#
You have now made a detector, and can import and create it like this:
from ophyd_async.epics import adaravis
det = adaravis.AravisDetector("PREFIX:", path_provider)
The detector will now support:
Querying supported trigger types with
get_trigger_deadtime()Step scanning with
trigger()Fly scanning with
kickoff()andcomplete()Automatic handling of file writing and StreamAsset document emission
Configuration signal reporting based on your trigger logic