Source code for ophyd_async.fastcs.eiger._eiger_controller
import asyncio
from ophyd_async.core import (
DEFAULT_TIMEOUT,
DetectorController,
DetectorTrigger,
TriggerInfo,
)
from ._eiger_io import EigerDriverIO, EigerTriggerMode
EIGER_TRIGGER_MODE_MAP = {
DetectorTrigger.INTERNAL: EigerTriggerMode.INTERNAL,
DetectorTrigger.CONSTANT_GATE: EigerTriggerMode.GATE,
DetectorTrigger.VARIABLE_GATE: EigerTriggerMode.GATE,
DetectorTrigger.EDGE_TRIGGER: EigerTriggerMode.EDGE,
}
[docs]
class EigerController(DetectorController):
def __init__(
self,
driver: EigerDriverIO,
) -> None:
self._drv = driver
[docs]
def get_deadtime(self, exposure: float | None) -> float:
# See https://media.dectris.com/filer_public/30/14/3014704e-5f3b-43ba-8ccf-8ef720e60d2a/240202_usermanual_eiger2.pdf
return 0.0001
[docs]
async def set_energy(self, energy: float, tolerance: float = 0.1):
"""Set photon energy."""
"""Changing photon energy takes some time so only do so if the current energy is
outside the tolerance."""
current_energy = await self._drv.detector.photon_energy.get_value()
if abs(current_energy - energy) > tolerance:
await self._drv.detector.photon_energy.set(energy)
[docs]
async def prepare(self, trigger_info: TriggerInfo):
coros = [
self._drv.detector.trigger_mode.set(
EIGER_TRIGGER_MODE_MAP[trigger_info.trigger].value
),
self._drv.detector.nimages.set(trigger_info.total_number_of_exposures),
]
if trigger_info.livetime is not None:
coros.extend(
[
self._drv.detector.count_time.set(trigger_info.livetime),
self._drv.detector.frame_time.set(trigger_info.livetime),
]
)
await asyncio.gather(*coros)
[docs]
async def arm(self):
self._arm_status = self._drv.detector.arm.trigger(timeout=DEFAULT_TIMEOUT)
[docs]
async def wait_for_idle(self):
if self._arm_status:
await self._arm_status
[docs]
async def disarm(self):
await self._drv.detector.disarm.trigger()