Source code for ophyd_async.epics.eiger._eiger_controller
import asyncio
from typing import Optional
from ophyd_async.core import (
DEFAULT_TIMEOUT,
AsyncStatus,
DetectorControl,
DetectorTrigger,
set_and_wait_for_other_value,
)
from ._eiger_io import EigerDriverIO, EigerTriggerMode
EIGER_TRIGGER_MODE_MAP = {
DetectorTrigger.internal: EigerTriggerMode.internal,
DetectorTrigger.constant_gate: EigerTriggerMode.gate,
DetectorTrigger.variable_gate: EigerTriggerMode.gate,
DetectorTrigger.edge_trigger: EigerTriggerMode.edge,
}
[docs]
class EigerController(DetectorControl):
def __init__(
self,
driver: EigerDriverIO,
) -> None:
self._drv = driver
[docs]
def get_deadtime(self, exposure: float) -> float:
# See https://media.dectris.com/filer_public/30/14/3014704e-5f3b-43ba-8ccf-8ef720e60d2a/240202_usermanual_eiger2.pdf
return 0.0001
[docs]
async def set_energy(self, energy: float, tolerance: float = 0.1):
"""Changing photon energy takes some time so only do so if the current energy is
outside the tolerance."""
current_energy = await self._drv.photon_energy.get_value()
if abs(current_energy - energy) > tolerance:
await self._drv.photon_energy.set(energy)
[docs]
@AsyncStatus.wrap
async def arm(
self,
num: int,
trigger: DetectorTrigger = DetectorTrigger.internal,
exposure: Optional[float] = None,
):
coros = [
self._drv.trigger_mode.set(EIGER_TRIGGER_MODE_MAP[trigger].value),
self._drv.num_images.set(num),
]
if exposure is not None:
coros.extend(
[
self._drv.acquire_time.set(exposure),
self._drv.acquire_period.set(exposure),
]
)
await asyncio.gather(*coros)
# TODO: Detector state should be an enum see https://github.com/DiamondLightSource/eiger-fastcs/issues/43
await set_and_wait_for_other_value(
self._drv.arm, 1, self._drv.state, "ready", timeout=DEFAULT_TIMEOUT
)
[docs]
async def disarm(self):
await self._drv.disarm.set(1)