Source code for ophyd_async.epics.adaravis._aravis_controller

import asyncio
from typing import Literal, Tuple

from ophyd_async.core import (
    DetectorControl,
    DetectorTrigger,
    TriggerInfo,
    set_and_wait_for_value,
)
from ophyd_async.core._status import AsyncStatus
from ophyd_async.epics import adcore

from ._aravis_io import AravisDriverIO, AravisTriggerMode, AravisTriggerSource

# The deadtime of an ADaravis controller varies depending on the exact model of camera.
# Ideally we would maximize performance by dynamically retrieving the deadtime at
# runtime. See https://github.com/bluesky/ophyd-async/issues/308
_HIGHEST_POSSIBLE_DEADTIME = 1961e-6


[docs] class AravisController(DetectorControl): GPIO_NUMBER = Literal[1, 2, 3, 4] def __init__(self, driver: AravisDriverIO, gpio_number: GPIO_NUMBER) -> None: self._drv = driver self.gpio_number = gpio_number self._arm_status: AsyncStatus | None = None
[docs] def get_deadtime(self, exposure: float) -> float: return _HIGHEST_POSSIBLE_DEADTIME
[docs] async def prepare(self, trigger_info: TriggerInfo): if (num := trigger_info.number) == 0: image_mode = adcore.ImageMode.continuous else: image_mode = adcore.ImageMode.multiple if (exposure := trigger_info.livetime) is not None: await self._drv.acquire_time.set(exposure) trigger_mode, trigger_source = self._get_trigger_info(trigger_info.trigger) # trigger mode must be set first and on it's own! await self._drv.trigger_mode.set(trigger_mode) await asyncio.gather( self._drv.trigger_source.set(trigger_source), self._drv.num_images.set(num), self._drv.image_mode.set(image_mode), )
[docs] async def arm(self): self._arm_status = await set_and_wait_for_value(self._drv.acquire, True)
[docs] async def wait_for_idle(self): if self._arm_status: await self._arm_status
def _get_trigger_info( self, trigger: DetectorTrigger ) -> Tuple[AravisTriggerMode, AravisTriggerSource]: supported_trigger_types = ( DetectorTrigger.constant_gate, DetectorTrigger.edge_trigger, DetectorTrigger.internal, ) if trigger not in supported_trigger_types: raise ValueError( f"{self.__class__.__name__} only supports the following trigger " f"types: {supported_trigger_types} but was asked to " f"use {trigger}" ) if trigger == DetectorTrigger.internal: return AravisTriggerMode.off, "Freerun" else: return (AravisTriggerMode.on, f"Line{self.gpio_number}")
[docs] async def disarm(self): await adcore.stop_busy_record(self._drv.acquire, False, timeout=1)