Source code for ophyd_async.epics.adcore._utils

from enum import Enum
from typing import Optional
from xml.etree import cElementTree as ET

from ophyd_async.core import DEFAULT_TIMEOUT, SignalRW, T, wait_for_value


[docs] class ADBaseDataType(str, Enum): Int8 = "Int8" UInt8 = "UInt8" Int16 = "Int16" UInt16 = "UInt16" Int32 = "Int32" UInt32 = "UInt32" Int64 = "Int64" UInt64 = "UInt64" Float32 = "Float32" Float64 = "Float64"
def convert_ad_dtype_to_np(ad_dtype: ADBaseDataType) -> str: ad_dtype_to_np_dtype = { ADBaseDataType.Int8: "|i1", ADBaseDataType.UInt8: "|u1", ADBaseDataType.Int16: "<i2", ADBaseDataType.UInt16: "<u2", ADBaseDataType.Int32: "<i4", ADBaseDataType.UInt32: "<u4", ADBaseDataType.Int64: "<i8", ADBaseDataType.UInt64: "<u8", ADBaseDataType.Float32: "<f4", ADBaseDataType.Float64: "<f8", } return ad_dtype_to_np_dtype[ad_dtype]
[docs] class FileWriteMode(str, Enum): single = "Single" capture = "Capture" stream = "Stream"
[docs] class ImageMode(str, Enum): single = "Single" multiple = "Multiple" continuous = "Continuous"
[docs] class NDAttributeDataType(str, Enum): INT = "INT" DOUBLE = "DOUBLE" STRING = "STRING"
[docs] class NDAttributesXML: """Helper to make NDAttributesFile XML for areaDetector""" _dbr_types = { None: "DBR_NATIVE", NDAttributeDataType.INT: "DBR_LONG", NDAttributeDataType.DOUBLE: "DBR_DOUBLE", NDAttributeDataType.STRING: "DBR_STRING", } def __init__(self): self._root = ET.Element("Attributes")
[docs] def add_epics_pv( self, name: str, pv: str, datatype: Optional[NDAttributeDataType] = None, description: str = "", ): """Add a PV to the attribute list Args: name: The attribute name pv: The pv to get from datatype: An override datatype, otherwise will use native EPICS type description: A description that appears in the HDF file as an attribute """ ET.SubElement( self._root, "Attribute", name=name, type="EPICS_PV", source=pv, datatype=self._dbr_types[datatype], description=description, )
[docs] def add_param( self, name: str, param: str, datatype: NDAttributeDataType, addr: int = 0, description: str = "", ): """Add a driver or plugin parameter to the attribute list Args: name: The attribute name param: The parameter string as seen in the INP link of the record datatype: The datatype of the parameter description: A description that appears in the HDF file as an attribute """ ET.SubElement( self._root, "Attribute", name=name, type="PARAM", source=param, addr=str(addr), datatype=datatype.value, description=description, )
def __str__(self) -> str: """Output the XML pretty printed""" ET.indent(self._root, space=" ", level=0) return ET.tostring(self._root, xml_declaration=True, encoding="utf-8").decode()
[docs] async def stop_busy_record( signal: SignalRW[T], value: T, timeout: float = DEFAULT_TIMEOUT, status_timeout: Optional[float] = None, ) -> None: await signal.set(value, wait=False, timeout=status_timeout) await wait_for_value(signal, value, timeout=timeout)