"""Module which defines abstract classes to work with detectors."""importasyncioimporttimefromabcimportABC,abstractmethodfromcollections.abcimportAsyncGenerator,AsyncIterator,Callable,Iterator,SequencefromenumimportEnumfromfunctoolsimportcached_propertyfromtypingimport(Generic,TypeVar,)frombluesky.protocolsimport(Collectable,Flyable,Hints,Preparable,Reading,Stageable,StreamAsset,Triggerable,WritesStreamAssets,)fromevent_modelimportDataKeyfrompydanticimportBaseModel,Field,NonNegativeInt,PositiveInt,computed_fieldfrom._deviceimportDevice,DeviceConnectorfrom._protocolimportAsyncConfigurable,AsyncReadablefrom._signalimportSignalRfrom._statusimportAsyncStatus,WatchableAsyncStatusfrom._utilsimportDEFAULT_TIMEOUT,WatcherUpdate,merge_gathered_dicts
[docs]classDetectorTrigger(Enum):"""Type of mechanism for triggering a detector to take frames."""INTERNAL="INTERNAL""""Detector generates internal trigger for given rate"""EDGE_TRIGGER="EDGE_TRIGGER""""Expect a series of arbitrary length trigger signals"""CONSTANT_GATE="CONSTANT_GATE""""Expect a series of constant width external gate signals"""VARIABLE_GATE="VARIABLE_GATE""""Expect a series of variable width external gate signals"""
[docs]classTriggerInfo(BaseModel):"""Minimal set of information required to setup triggering on a detector."""number_of_events:NonNegativeInt|list[NonNegativeInt]=Field(default=1)"""Number of events that will be processed, (0 means infinite). Can be: - A single integer or - A list of integers for multiple events Example for tomography: ``TriggerInfo(number_of_events=[2,3,100,3])``. This would process: - 2 events for dark field images - 3 events for initial flat field images - 100 events for projections - 3 events for final flat field images """trigger:DetectorTrigger=Field(default=DetectorTrigger.INTERNAL)"""Sort of triggers that will be sent"""deadtime:float=Field(default=0.0,ge=0)"""What is the minimum deadtime between exposures"""livetime:float|None=Field(default=None,ge=0)"""What is the maximum high time of the exposures"""exposure_timeout:float|None=Field(default=None,gt=0)"""What is the maximum timeout on waiting for an exposure"""exposures_per_event:PositiveInt=1"""The number of exposures that are grouped into a single StreamDatum index. A exposures_per_event > 1 can be useful to have exposures from a faster detector able to be zipped with a single exposure from a slower detector. E.g. if number_of_events=10 and exposures_per_event=5 then the detector will take 10 exposures, but publish 2 StreamDatum indices, and describe() will show a shape of (5, h, w) for each. Default is 1. """@computed_field@cached_propertydeftotal_number_of_exposures(self)->int:return(sum(self.number_of_events)ifisinstance(self.number_of_events,list)elseself.number_of_events)*self.exposures_per_event
[docs]classDetectorController(ABC):"""Detector logic for arming and disarming the detector."""
[docs]@abstractmethoddefget_deadtime(self,exposure:float|None)->float:"""For a given exposure, how long should the time between exposures be."""
[docs]@abstractmethodasyncdefprepare(self,trigger_info:TriggerInfo)->None:"""Do all necessary steps to prepare the detector for triggers. :param trigger_info: The sort of triggers to expect. """
[docs]@abstractmethodasyncdefarm(self)->None:"""Arm the detector."""
[docs]@abstractmethodasyncdefwait_for_idle(self):"""Wait on the internal _arm_status and wait for it to get disarmed/idle."""
[docs]@abstractmethodasyncdefdisarm(self):"""Disarm the detector, return detector to an idle state."""
[docs]classDetectorWriter(ABC):"""Logic for making detector write data to somewhere persistent (e.g. HDF5 file)."""
[docs]@abstractmethodasyncdefopen(self,name:str,exposures_per_event:PositiveInt=1)->dict[str,DataKey]:"""Open writer and wait for it to be ready for data. :param exposures_per_event: Each StreamDatum index corresponds to this many written exposures :return: Output for ``describe()`` """
[docs]defget_hints(self,name:str)->Hints:"""The hints to be used for the detector."""return{}
[docs]@abstractmethodasyncdefget_indices_written(self)->int:"""Get the number of indices written."""
# Note: this method is really async, but if we make it async here then we# need to give it a body with a redundant yield statement, which is a bit# awkward. So we just leave it as a regular method and let the user# implement it as async.
[docs]@abstractmethoddefobserve_indices_written(self,timeout:float)->AsyncGenerator[int,None]:"""Yield the index of each frame (or equivalent data point) as it is written."""
[docs]@abstractmethoddefcollect_stream_docs(self,name:str,indices_written:int)->AsyncIterator[StreamAsset]:"""Create Stream docs up to given number written."""
[docs]@abstractmethodasyncdefclose(self)->None:"""Close writer, blocks until I/O is complete."""
# Add type var for controller so we can define# StandardDetector[KinetixController, ADWriter] for exampleDetectorControllerT=TypeVar("DetectorControllerT",bound=DetectorController)DetectorWriterT=TypeVar("DetectorWriterT",bound=DetectorWriter)def_ensure_trigger_info_exists(trigger_info:TriggerInfo|None)->TriggerInfo:# make absolute sure we realy have a valid TriggerInfo ... mostly for pylanceiftrigger_infoisNone:raiseRuntimeError("Trigger info must be set before calling this method.")returntrigger_info
[docs]classStandardDetector(Device,Stageable,AsyncConfigurable,AsyncReadable,Triggerable,Preparable,Flyable,Collectable,WritesStreamAssets,Generic[DetectorControllerT,DetectorWriterT],):"""Detector base class for step and fly scanning detectors. Aggregates controller and writer logic together. :param controller: Logic for arming and disarming the detector :param writer: Logic for making the detector write persistent data :param config_sigs: Signals to read when describe and read configuration are called :param name: Device name """def__init__(self,controller:DetectorControllerT,writer:DetectorWriterT,config_sigs:Sequence[SignalR]=(),name:str="",connector:DeviceConnector|None=None,)->None:self._controller=controllerself._writer=writerself._describe:dict[str,DataKey]={}self._config_sigs=list(config_sigs)# For prepareself._arm_status:AsyncStatus|None=Noneself._trigger_info:TriggerInfo|None=None# For kickoffself._watchers:list[Callable]=[]self._fly_status:WatchableAsyncStatus|None=Noneself._fly_start:float|None=Noneself._events_to_complete:int=0# Represents the total number of exposures that will have been completed at the# end of the next `complete`.self._completable_exposures:int=0self._number_of_events_iter:Iterator[int]|None=Noneself._initial_frame:int=0super().__init__(name,connector=connector)
[docs]@AsyncStatus.wrapasyncdefstage(self)->None:"""Make sure the detector is idle and ready to be used."""awaitself._check_config_sigs()awaitasyncio.gather(self._writer.close(),self._controller.disarm())self._trigger_info=None
asyncdef_check_config_sigs(self):"""Check configuration signals are named and connected."""forsignalinself._config_sigs:ifsignal.name=="":raiseException("config signal must be named before it is passed to the detector")try:awaitsignal.get_value()exceptNotImplementedErrorase:raiseException(f"config signal {signal.name} must be connected before it is "+"passed to the detector")frome
[docs]@AsyncStatus.wrapasyncdefunstage(self)->None:"""Disarm the detector and stop file writing."""awaitasyncio.gather(self._writer.close(),self._controller.disarm())
[docs]asyncdefread(self)->dict[str,Reading]:"""There is no data to be placed in events, so this is empty."""# All data is in StreamResources, not Events, so nothing to output herereturn{}
[docs]@AsyncStatus.wrapasyncdeftrigger(self)->None:ifself._trigger_infoisNone:awaitself.prepare(TriggerInfo(number_of_events=1,trigger=DetectorTrigger.INTERNAL,))trigger_info=_ensure_trigger_info_exists(self._trigger_info)iftrigger_info.triggerisnotDetectorTrigger.INTERNAL:msg="The trigger method can only be called with INTERNAL triggering"raiseValueError(msg)iftrigger_info.number_of_events!=1:raiseValueError("Triggering is not supported for multiple events, the detector was "f"prepared with number_of_events={trigger_info.number_of_events}.")# Arm the detector and wait for it to finish.indices_written=awaitself._writer.get_indices_written()awaitself._controller.arm()awaitself._controller.wait_for_idle()end_observation=indices_written+1asyncforindexinself._writer.observe_indices_written(DEFAULT_TIMEOUT+(trigger_info.livetimeor0)+trigger_info.deadtime):ifindex>=end_observation:break
[docs]@AsyncStatus.wrapasyncdefprepare(self,value:TriggerInfo)->None:"""Arm detector. Prepare the detector with trigger information. This is determined at and passed in from the plan level. :param value: TriggerInfo describing how to trigger the detector """ifvalue.trigger!=DetectorTrigger.INTERNALandnotvalue.deadtime:msg="Deadtime must be supplied when in externally triggered mode"raiseValueError(msg)required_deadtime=self._controller.get_deadtime(value.livetime)ifvalue.deadtimeandrequired_deadtime>value.deadtime:msg=(f"Detector {self._controller} needs at least {required_deadtime}s "f"deadtime, but trigger logic provides only {value.deadtime}s")raiseValueError(msg)elifnotvalue.deadtime:value.deadtime=self._controller.get_deadtime(value.livetime)self._trigger_info=valueself._number_of_events_iter=iter(value.number_of_eventsifisinstance(value.number_of_events,list)else[value.number_of_events])awaitself._controller.prepare(value)self._describe=awaitself._writer.open(self.name,value.exposures_per_event)self._initial_frame=awaitself._writer.get_indices_written()ifvalue.trigger!=DetectorTrigger.INTERNAL:awaitself._controller.arm()self._trigger_info=value
[docs]@AsyncStatus.wrapasyncdefkickoff(self):ifself._trigger_infoisNoneorself._number_of_events_iterisNone:raiseRuntimeError("Prepare must be called before kickoff!")ifself._trigger_info.trigger==DetectorTrigger.INTERNAL:awaitself._controller.arm()self._fly_start=time.monotonic()try:self._events_to_complete=next(self._number_of_events_iter)self._completable_exposures+=(self._events_to_complete*self._trigger_info.exposures_per_event)exceptStopIterationaserr:raiseRuntimeError(f"Kickoff called more than the configured number of "f"{self._trigger_info.total_number_of_exposures} iteration(s)!")fromerr
[docs]asyncdefcollect_asset_docs(self,index:int|None=None)->AsyncIterator[StreamAsset]:# Collect stream datum documents for all indices written.# The index is optional, and provided for fly scans, however this needs to be# retrieved for step scans.ifindexisNone:index=awaitself._writer.get_indices_written()asyncfordocinself._writer.collect_stream_docs(self.name,index):yielddoc