How to implement a Device for an EPICS areaDetector#
This document will walk through the steps taken to implement an ophyd-async Device that talks to an EPICS areaDetector
Create a module to put the code in#
The first stage is to make a module in the ophyd-async
repository to put the code in:
If you haven’t already,
git clone git@github.com:bluesky/ophyd-async.git
and make a new branch to work inCreate a new directory under
src/ophyd_async/epics/
which is the lowercase version of the epics support module for the detectorFor example, for
ADAravis
make a directoryadaravis
Make an empty
__init__.py
within that directory
Add an IO class for the PV interface#
Now you need an IO class that subclasses adcore.ADBaseIO
. This should add the PVs that are detector driver specific that are required to setup triggering.
For example for ADAravis this is in the file _aravis_io.py
:
from typing import Annotated as A
from ophyd_async.core import SignalRW, StrictEnum, SubsetEnum
from ophyd_async.epics import adcore
from ophyd_async.epics.core import PvSuffix
class AravisTriggerMode(StrictEnum):
"""GigEVision GenICAM standard TriggerMode."""
ON = "On"
"""Use TriggerSource to trigger each frame"""
OFF = "Off"
"""Just trigger as fast as you can"""
class AravisTriggerSource(SubsetEnum):
"""Which trigger source to use when TriggerMode=On."""
LINE1 = "Line1"
class AravisDriverIO(adcore.ADBaseIO):
"""Generic Driver supporting all GiGE cameras.
This mirrors the interface provided by ADAravis/db/aravisCamera.template.
"""
trigger_mode: A[SignalRW[AravisTriggerMode], PvSuffix.rbv("TriggerMode")]
trigger_source: A[SignalRW[AravisTriggerSource], PvSuffix.rbv("TriggerSource")]
Add a Controller that knows how to setup the driver#
Now you need a class that subclasses adcore.ADBaseController
. This should implement at least:
get_deadtime()
to give the amount of time required between triggers for a given exposureprepare()
to set the camera up for a given trigger mode, number of frames and exposure
For example for ADAravis this is in the file _aravis_controller.py
:
import asyncio
from ophyd_async.core import DetectorTrigger, TriggerInfo
from ophyd_async.epics import adcore
from ._aravis_io import AravisDriverIO, AravisTriggerMode, AravisTriggerSource
# The deadtime of an ADaravis controller varies depending on the exact model of camera.
# Ideally we would maximize performance by dynamically retrieving the deadtime at
# runtime. See https://github.com/bluesky/ophyd-async/issues/308
_HIGHEST_POSSIBLE_DEADTIME = 1961e-6
class AravisController(adcore.ADBaseController[AravisDriverIO]):
"""`DetectorController` for an `AravisDriverIO`."""
def get_deadtime(self, exposure: float | None) -> float:
return _HIGHEST_POSSIBLE_DEADTIME
async def prepare(self, trigger_info: TriggerInfo) -> None:
if (exposure := trigger_info.livetime) is not None:
await self.driver.acquire_time.set(exposure)
if trigger_info.trigger is DetectorTrigger.INTERNAL:
# Set trigger mode off to ignore the trigger source
await self.driver.trigger_mode.set(AravisTriggerMode.OFF)
elif trigger_info.trigger in {
DetectorTrigger.CONSTANT_GATE,
DetectorTrigger.EDGE_TRIGGER,
}:
# Trigger on the rising edge of Line1
# trigger mode must be set first and on it's own!
await self.driver.trigger_mode.set(AravisTriggerMode.ON)
await self.driver.trigger_source.set(AravisTriggerSource.LINE1)
else:
raise ValueError(f"ADAravis does not support {trigger_info.trigger}")
if trigger_info.total_number_of_triggers == 0:
image_mode = adcore.ADImageMode.CONTINUOUS
else:
image_mode = adcore.ADImageMode.MULTIPLE
await asyncio.gather(
self.driver.num_images.set(trigger_info.total_number_of_triggers),
self.driver.image_mode.set(image_mode),
)
Add a Detector that puts it all together#
Now you need to make a StandardDetector
subclass that uses your IO and Controller with the standard file IO and Writer classes that come with ADCore. The __init__
method should take the following:
prefix
: The PV prefix for the driver and pluginspath_provider
: APathProvider
that tells the detector where to write datadrv_suffix
: A PV suffix for the driver, defaulting to"cam1:"
writer_cls
: Anadcore.ADWriter
class to instantiate, defaulting toadcore.ADHDFWriter
fileio_suffix
: An optional PV suffix for the fileio, if not given it will default to the writer class defaultname
: An optional name for the deviceconfig_sigs
: Optionally the signals to report as configurationplugins
: An optional mapping of {name
:adcore.NDPluginBaseIO
} for each additional plugin that might contribute data to the resulting file
For example for ADAravis this is in the file _aravis.py
:
from collections.abc import Sequence
from ophyd_async.core import PathProvider, SignalR
from ophyd_async.epics import adcore
from ._aravis_controller import AravisController
from ._aravis_io import AravisDriverIO
class AravisDetector(adcore.AreaDetector[AravisController]):
"""Implementation of an ADAravis Detector.
The detector may be configured for an external trigger on a GPIO port,
which must be done prior to preparing the detector
"""
def __init__(
self,
prefix: str,
path_provider: PathProvider,
drv_suffix="cam1:",
writer_cls: type[adcore.ADWriter] = adcore.ADHDFWriter,
fileio_suffix: str | None = None,
name: str = "",
config_sigs: Sequence[SignalR] = (),
plugins: dict[str, adcore.NDPluginBaseIO] | None = None,
):
driver = AravisDriverIO(prefix + drv_suffix)
controller = AravisController(driver)
writer = writer_cls.with_io(
prefix,
path_provider,
dataset_source=driver,
fileio_suffix=fileio_suffix,
plugins=plugins,
)
super().__init__(
controller=controller,
writer=writer,
plugins=plugins,
name=name,
config_sigs=config_sigs,
)
Make it importable#
Now you should take all the classes you’ve made and add it to the top level __init__.py
to declare the public interface for this module. Typically you should also include any Enum types you have made to support the IO.
For example for ADAravis this is:
"""Support for the ADAravis areaDetector driver.
https://github.com/areaDetector/ADAravis
"""
from ._aravis import AravisDetector
from ._aravis_controller import AravisController
from ._aravis_io import AravisDriverIO, AravisTriggerMode, AravisTriggerSource
__all__ = [
"AravisDetector",
"AravisController",
"AravisDriverIO",
"AravisTriggerMode",
"AravisTriggerSource",
]
Write tests#
TODO
Conclusion#
You have now made a detector, and can import and create it like this:
from ophyd_async.epics import adaravis, adcore
det = adaravis.AravisDetector(
"PREFIX:",
path_provider,
drv_suffix="DRV:",
writer_cls=adcore.ADHDFWriter,
fileio_suffix="HDF:",
)
Continuously acquiring detector#
In the event that you need to be able to collect data from a detector that is continuously acquiring, you should use the ContAcqAreaDetector
class.
This uses the builtin areaDetector
circular buffer plugin to act as the acquisition start/stop replacement, while the detector runs continuously.
Your AD IOC will require at least one instance of this plugin to be configured, and the output of the plugin should be fed to the file writer of your choosing.
The expectation is that the detector is already acquiring in continuous mode with the expected exposure time prior to using an instance of ContAcqAreaDetector
.
To instantiate a detector instance, import it and create it like this:
from ophyd_async.epics import adcore
det = adcore.ContAcqAreaDetector(
"PREFIX:",
path_provider,
drv_cls=adcore.ADBaseIO,
drv_suffix="DRV:",
cb_suffix="CB:",
writer_cls=adcore.ADHDFWriter,
fileio_suffix="HDF:",
)
Note that typically the only changes from a typical detector are the additional cb_suffix
kwarg, which is used to identify the prefix to use when instantiating the circular buffer (CB) plugin instance, and the drv_cls
kwarg, which allows you to specify the driver to use, with the default being the ADBaseIO
class.