Source code for ophyd_async.epics.adaravis

"""areaDetector support for Aravis GigE and USB3 vision cameras.

https://github.com/areaDetector/ADAravis
"""

from collections.abc import Sequence
from typing import Annotated as A

from ophyd_async.core import (
    DetectorTriggerLogic,
    OnOff,
    PathProvider,
    SignalDict,
    SignalR,
    SignalRW,
    SubsetEnum,
)

from .adcore import (
    ADArmLogic,
    ADBaseIO,
    ADWriterType,
    AreaDetector,
    NDPluginBaseIO,
    prepare_exposures,
)
from .adgenicam import get_camera_deadtime
from .core import PvSuffix

__all__ = [
    "AravisDetector",
    "AravisDriverIO",
    "AravisTriggerLogic",
    "AravisTriggerSource",
]


[docs] class AravisTriggerSource(SubsetEnum): """Which trigger source to use when TriggerMode=On.""" LINE1 = "Line1"
[docs] class AravisDriverIO(ADBaseIO): """Generic Driver supporting all GiGE cameras. This mirrors the interface provided by ADAravis/db/aravisCamera.template. """ trigger_mode: A[SignalRW[OnOff], PvSuffix.rbv("TriggerMode")] trigger_source: A[SignalRW[AravisTriggerSource], PvSuffix.rbv("TriggerSource")]
[docs] class AravisTriggerLogic(DetectorTriggerLogic): """Trigger logic for Aravis GigE and USB3 cameras.""" def __init__(self, driver: AravisDriverIO, override_deadtime: float | None = None): self.driver = driver self.override_deadtime = override_deadtime
[docs] def config_sigs(self) -> set[SignalR]: return {self.driver.model}
[docs] def get_deadtime(self, config_values: SignalDict) -> float: return get_camera_deadtime( model=config_values[self.driver.model], override_deadtime=self.override_deadtime, )
[docs] async def prepare_internal(self, num: int, livetime: float, deadtime: float): await self.driver.trigger_mode.set(OnOff.OFF) await prepare_exposures(self.driver, num, livetime, deadtime)
[docs] async def prepare_edge(self, num: int, livetime: float): # Trigger on the rising edge of Line1 # trigger mode must be set first and on its own! # Hardware race condition in Aravis firmware requires setting trigger mode # separately before trigger source to avoid undefined behavior. # https://github.com/AravisProject/aravis/issues/1045 await self.driver.trigger_mode.set(OnOff.ON) await self.driver.trigger_source.set(AravisTriggerSource.LINE1) await prepare_exposures(self.driver, num, livetime)
[docs] class AravisDetector(AreaDetector[AravisDriverIO]): """Create an ADAravis AreaDetector instance. :param prefix: EPICS PV prefix for the detector :param path_provider: Provider for file paths during acquisition :param driver_suffix: Suffix for the driver PV, defaults to "cam1:" :param override_deadtime: If provided, this value is used for deadtime instead of looking up based on camera model. :param writer_type: Type of file writer (HDF or TIFF) :param writer_suffix: Suffix for the writer PV :param plugins: Additional areaDetector plugins to include :param config_sigs: Additional signals to include in configuration :param name: Name for the detector device """ def __init__( self, prefix: str, path_provider: PathProvider | None = None, driver_suffix="cam1:", override_deadtime: float | None = None, writer_type: ADWriterType | None = ADWriterType.HDF, writer_suffix: str | None = None, plugins: dict[str, NDPluginBaseIO] | None = None, config_sigs: Sequence[SignalR] = (), name: str = "", ) -> None: driver = AravisDriverIO(prefix + driver_suffix) super().__init__( prefix=prefix, driver=driver, arm_logic=ADArmLogic(driver), trigger_logic=AravisTriggerLogic(driver, override_deadtime), path_provider=path_provider, writer_type=writer_type, writer_suffix=writer_suffix, plugins=plugins, config_sigs=config_sigs, name=name, )