How to implement a Device for an EPICS areaDetector#
This document will walk through the steps taken to implement an ophyd-async Device that talks to an EPICS areaDetector
Create a module to put the code in#
The first stage is to make a module in the ophyd-async
repository to put the code in:
If you haven’t already,
git clone git@github.com:bluesky/ophyd-async.git
and make a new branch to work inCreate a new directory under
src/ophyd_async/epics/
which is the lowercase version of the epics support module for the detectorFor example, for
ADAravis
make a directoryadaravis
Make an empty
__init__.py
within that directory
Add an IO class for the PV interface#
Now you need an IO class that subclasses adcore.ADBaseIO
. This should add the PVs that are detector driver specific that are required to setup triggering.
For example for ADAravis this is in the file _aravis_io.py
:
from typing import Annotated as A
from ophyd_async.core import SignalRW, StrictEnum, SubsetEnum
from ophyd_async.epics import adcore
from ophyd_async.epics.core import PvSuffix
class AravisTriggerMode(StrictEnum):
"""GigEVision GenICAM standard TriggerMode."""
ON = "On"
"""Use TriggerSource to trigger each frame"""
OFF = "Off"
"""Just trigger as fast as you can"""
class AravisTriggerSource(SubsetEnum):
"""Which trigger source to use when TriggerMode=On."""
LINE1 = "Line1"
class AravisDriverIO(adcore.ADBaseIO):
"""Generic Driver supporting all GiGE cameras.
This mirrors the interface provided by ADAravis/db/aravisCamera.template.
"""
trigger_mode: A[SignalRW[AravisTriggerMode], PvSuffix.rbv("TriggerMode")]
trigger_source: A[SignalRW[AravisTriggerSource], PvSuffix.rbv("TriggerSource")]
Add a Controller that knows how to setup the driver#
Now you need a class that subclasses adcore.ADBaseController
. This should implement at least:
get_deadtime()
to give the amount of time required between triggers for a given exposureprepare()
to set the camera up for a given trigger mode, number of frames and exposure
For example for ADAravis this is in the file _aravis_controller.py
:
import asyncio
from ophyd_async.core import DetectorTrigger, TriggerInfo
from ophyd_async.epics import adcore
from ._aravis_io import AravisDriverIO, AravisTriggerMode, AravisTriggerSource
# The deadtime of an ADaravis controller varies depending on the exact model of camera.
# Ideally we would maximize performance by dynamically retrieving the deadtime at
# runtime. See https://github.com/bluesky/ophyd-async/issues/308
_HIGHEST_POSSIBLE_DEADTIME = 1961e-6
class AravisController(adcore.ADBaseController[AravisDriverIO]):
"""`DetectorController` for an `AravisDriverIO`."""
def get_deadtime(self, exposure: float | None) -> float:
return _HIGHEST_POSSIBLE_DEADTIME
async def prepare(self, trigger_info: TriggerInfo) -> None:
if (exposure := trigger_info.livetime) is not None:
await self.driver.acquire_time.set(exposure)
if trigger_info.trigger is DetectorTrigger.INTERNAL:
# Set trigger mode off to ignore the trigger source
await self.driver.trigger_mode.set(AravisTriggerMode.OFF)
elif trigger_info.trigger in {
DetectorTrigger.CONSTANT_GATE,
DetectorTrigger.EDGE_TRIGGER,
}:
# Trigger on the rising edge of Line1
# trigger mode must be set first and on it's own!
await self.driver.trigger_mode.set(AravisTriggerMode.ON)
await self.driver.trigger_source.set(AravisTriggerSource.LINE1)
else:
raise ValueError(f"ADAravis does not support {trigger_info.trigger}")
if trigger_info.total_number_of_triggers == 0:
image_mode = adcore.ImageMode.CONTINUOUS
else:
image_mode = adcore.ImageMode.MULTIPLE
await asyncio.gather(
self.driver.num_images.set(trigger_info.total_number_of_triggers),
self.driver.image_mode.set(image_mode),
)
Add a Detector that puts it all together#
Now you need to make a StandardDetector
subclass that uses your IO and Controller with the standard file IO and Writer classes that come with ADCore. The __init__
method should take the following:
prefix
: The PV prefix for the driver and pluginspath_provider
: APathProvider
that tells the detector where to write datadrv_suffix
: A PV suffix for the driver, defaulting to"cam1:"
writer_cls
: Anadcore.ADWriter
class to instantiate, defaulting toadcore.ADHDFWriter
fileio_suffix
: An optional PV suffix for the fileio, if not given it will default to the writer class defaultname
: An optional name for the deviceconfig_sigs
: Optionally the signals to report as configurationplugins
: An optional mapping of {name
:adcore.NDPluginBaseIO
} for each additional plugin that might contribute data to the resulting file
For example for ADAravis this is in the file _aravis.py
:
from collections.abc import Sequence
from ophyd_async.core import PathProvider, SignalR
from ophyd_async.epics import adcore
from ._aravis_controller import AravisController
from ._aravis_io import AravisDriverIO
class AravisDetector(adcore.AreaDetector[AravisController]):
"""Implementation of an ADAravis Detector.
The detector may be configured for an external trigger on a GPIO port,
which must be done prior to preparing the detector
"""
def __init__(
self,
prefix: str,
path_provider: PathProvider,
drv_suffix="cam1:",
writer_cls: type[adcore.ADWriter] = adcore.ADHDFWriter,
fileio_suffix: str | None = None,
name: str = "",
config_sigs: Sequence[SignalR] = (),
plugins: dict[str, adcore.NDPluginBaseIO] | None = None,
):
driver = AravisDriverIO(prefix + drv_suffix)
controller = AravisController(driver)
writer = writer_cls.with_io(
prefix,
path_provider,
dataset_source=driver,
fileio_suffix=fileio_suffix,
plugins=plugins,
)
super().__init__(
controller=controller,
writer=writer,
plugins=plugins,
name=name,
config_sigs=config_sigs,
)
Make it importable#
Now you should take all the classes you’ve made and add it to the top level __init__.py
to declare the public interface for this module. Typically you should also include any Enum types you have made to support the IO.
For example for ADAravis this is:
"""Support for the ADAravis areaDetector driver.
https://github.com/areaDetector/ADAravis
"""
from ._aravis import AravisDetector
from ._aravis_controller import AravisController
from ._aravis_io import AravisDriverIO, AravisTriggerMode, AravisTriggerSource
__all__ = [
"AravisDetector",
"AravisController",
"AravisDriverIO",
"AravisTriggerMode",
"AravisTriggerSource",
]
Write tests#
TODO
Conclusion#
You have now made a detector, and can import and create it like this:
from ophyd_async.epics import adaravis, adcore
det = adaravis.AravisDetector(
"PREFIX:",
path_provider,
drv_suffix="DRV:",
writer_cls=adcore.ADHDFWriter,
fileio_suffix="HDF:",
)