Source code for ophyd_async.epics.adcore._trigger_logic

import asyncio

from ophyd_async.core import DetectorTriggerLogic, EnableDisable

from ._io import ADBaseIO, ADImageMode, NDCBFlushOnSoftTrgMode, NDCircularBuffIO


[docs] async def prepare_exposures( driver: ADBaseIO, num: int, livetime: float = 0.0, deadtime: float = 0.0, ): image_mode = ADImageMode.CONTINUOUS if num == 0 else ADImageMode.MULTIPLE coros = [ driver.image_mode.set(image_mode), driver.num_images.set(num), ] if livetime: coros.append(driver.acquire_time.set(livetime)) if deadtime: coros.append(driver.acquire_period.set(livetime + deadtime)) await asyncio.gather(*coros)
[docs] class ADContAcqTriggerLogic(DetectorTriggerLogic): def __init__(self, driver: ADBaseIO, cb_plugin: NDCircularBuffIO): self.driver = driver self.cb_plugin = cb_plugin async def _ensure_driver_acquiring(self, livetime: float): # Check the current state of the system image_mode, acquiring, acquire_time = await asyncio.gather( self.driver.image_mode.get_value(), self.driver.acquire.get_value(), self.driver.acquire_time.get_value(), ) # Ensure the detector is in continuous acquisition mode if image_mode != ADImageMode.CONTINUOUS or not acquiring: raise RuntimeError( "Driver must be acquiring in continuous mode to use the " "cont acq interface" ) # Not all detectors allow for changing exposure times during an acquisition, # so in this least-common-denominator implementation check to see if # exposure time matches the current exposure time. if livetime and livetime != acquire_time: raise ValueError( f"Detector exposure time currently set to {acquire_time}, " f"but requested exposure is {livetime}" )
[docs] async def prepare_internal(self, num: int, livetime: float, deadtime: float): await self._ensure_driver_acquiring(livetime) # Setup the CB plugin with the current parameters await asyncio.gather( self.cb_plugin.enable_callbacks.set(EnableDisable.ENABLE), self.cb_plugin.pre_count.set(0), self.cb_plugin.post_count.set(num), self.cb_plugin.preset_trigger_count.set(1), self.cb_plugin.flush_on_soft_trg.set(NDCBFlushOnSoftTrgMode.ON_NEW_IMAGE), )