[docs]classPandaHDFWriter(DetectorWriter):"""For writing for PandA data from the `DataBlock`."""def__init__(self,path_provider:PathProvider,panda_data_block:DataBlock,)->None:self.panda_data_block=panda_data_blockself._path_provider=path_providerself._datasets:list[HDFDatasetDescription]=[]self._composer:HDFDocumentComposer|None=None# Triggered on PCAP arm
[docs]asyncdefopen(self,name:str,exposures_per_event:int=1)->dict[str,DataKey]:"""Retrieve and get descriptor of all PandA signals marked for capture."""self._exposures_per_event=exposures_per_event# Ensure flushes are immediateawaitself.panda_data_block.flush_period.set(0)self._composer=Noneinfo=self._path_provider(device_name=name)# Set create dir depth first to guarantee that callback when setting# directory path has correct valueawaitself.panda_data_block.create_directory.set(info.create_dir_depth)# Set the initial valuesawaitasyncio.gather(self.panda_data_block.hdf_directory.set(str(info.directory_path)),self.panda_data_block.hdf_file_name.set(f"{info.filename}.h5",),self.panda_data_block.capture_mode.set(PandaCaptureMode.FOREVER),)# Make sure that directory exists or has been created.ifnotawaitself.panda_data_block.directory_exists.get_value()==1:raiseOSError(f"Directory {info.directory_path} does not exist or ""is not writable by the PandABlocks-ioc!")# Wait for it to start, stashing the status that tells us when it finishesawaitself.panda_data_block.capture.set(True)returnawaitself._describe(name)
asyncdef_describe(self,name:str)->dict[str,DataKey]:"""Return a describe based on the datasets PV."""awaitself._update_datasets(name)describe={ds.data_key:DataKey(source=self.panda_data_block.hdf_directory.source,shape=list(ds.shape),dtype="array"ifself._exposures_per_event>1orlen(ds.shape)>1else"number",# PandA data should always be written as Float64dtype_numpy=ds.dtype_numpy,external="STREAM:",)fordsinself._datasets}returndescribeasyncdef_update_datasets(self,name:str)->None:# Load data from the datasets PV on the panda, update internal# representation of datasets that the panda will write.capture_table=awaitself.panda_data_block.datasets.get_value()self._datasets=[# TODO: Update chunk size to read signal once available in IOC# Currently PandA IOC sets chunk size to 1024 points per chunkHDFDatasetDescription(data_key=dataset_name,dataset="/"+dataset_name,shape=(self._exposures_per_event,)ifself._exposures_per_event>1else(),dtype_numpy="<f8",chunk_shape=(1024,),)fordataset_nameincapture_table.name]# Warn user if dataset table is empty in PandA# i.e. no stream resources will be generatediflen(self._datasets)==0:self.panda_data_block.log.warning(f"PandA {name} DATASETS table is empty! ""No stream resource docs will be generated. ""Make sure captured positions have their corresponding ""*:DATASET PV set to a scientifically relevant name.")# Next few functions are exactly the same as AD writer. Could move as default# StandardDetector behavior
[docs]asyncdefwait_for_index(self,index:int,timeout:float|None=DEFAULT_TIMEOUT):defmatcher(value:int)->bool:# Index is already divided by exposures_per_event, so we need to also# divide the value by exposures_per_event to get the correct indexreturnvalue//self._exposures_per_event>=indexmatcher.__name__=f"index_at_least_{index}"awaitwait_for_value(self.panda_data_block.num_captured,matcher,timeout=timeout)
[docs]asyncdefobserve_indices_written(self,timeout:float)->AsyncGenerator[int,None]:"""Wait until a specific index is ready to be collected."""asyncfornum_capturedinobserve_value(self.panda_data_block.num_captured,timeout):yieldnum_captured//self._exposures_per_event
[docs]asyncdefcollect_stream_docs(self,name:str,indices_written:int)->AsyncIterator[StreamAsset]:# TODO: fail if we get dropped framesifindices_written:ifnotself._composer:self._composer=HDFDocumentComposer(Path(awaitself.panda_data_block.hdf_directory.get_value())/Path(awaitself.panda_data_block.hdf_file_name.get_value()),self._datasets,)fordocinself._composer.stream_resources():yield"stream_resource",docfordocinself._composer.stream_data(indices_written):yield"stream_datum",doc
# Could put this function as default for StandardDetector