Source code for ophyd_async.epics.adcore._utils

from dataclasses import dataclass

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    SignalDatatypeT,
    SignalR,
    SignalRW,
    StrictEnum,
    SubsetEnum,
    wait_for_value,
)


[docs] class ADBaseDataType(StrictEnum): INT8 = "Int8" UINT8 = "UInt8" INT16 = "Int16" UINT16 = "UInt16" INT32 = "Int32" UINT32 = "UInt32" INT64 = "Int64" UINT64 = "UInt64" FLOAT32 = "Float32" FLOAT64 = "Float64"
def convert_ad_dtype_to_np(ad_dtype: ADBaseDataType) -> str: ad_dtype_to_np_dtype = { ADBaseDataType.INT8: "|i1", ADBaseDataType.UINT8: "|u1", ADBaseDataType.INT16: "<i2", ADBaseDataType.UINT16: "<u2", ADBaseDataType.INT32: "<i4", ADBaseDataType.UINT32: "<u4", ADBaseDataType.INT64: "<i8", ADBaseDataType.UINT64: "<u8", ADBaseDataType.FLOAT32: "<f4", ADBaseDataType.FLOAT64: "<f8", } return ad_dtype_to_np_dtype[ad_dtype] def convert_pv_dtype_to_np(datatype: str) -> str: _pvattribute_to_ad_datatype = { "DBR_SHORT": ADBaseDataType.INT16, "DBR_ENUM": ADBaseDataType.INT16, "DBR_INT": ADBaseDataType.INT32, "DBR_LONG": ADBaseDataType.INT32, "DBR_FLOAT": ADBaseDataType.FLOAT32, "DBR_DOUBLE": ADBaseDataType.FLOAT64, } if datatype in ["DBR_STRING", "DBR_CHAR"]: np_datatype = "s40" elif datatype == "DBR_NATIVE": raise ValueError("Don't support DBR_NATIVE yet") else: try: np_datatype = convert_ad_dtype_to_np(_pvattribute_to_ad_datatype[datatype]) except KeyError as e: raise ValueError(f"Invalid dbr type {datatype}") from e return np_datatype def convert_param_dtype_to_np(datatype: str) -> str: _paramattribute_to_ad_datatype = { "INT": ADBaseDataType.INT32, "INT64": ADBaseDataType.INT64, "DOUBLE": ADBaseDataType.FLOAT64, } if datatype in ["STRING"]: np_datatype = "s40" else: try: np_datatype = convert_ad_dtype_to_np( _paramattribute_to_ad_datatype[datatype] ) except KeyError as e: raise ValueError(f"Invalid datatype {datatype}") from e return np_datatype
[docs] class FileWriteMode(StrictEnum): SINGLE = "Single" CAPTURE = "Capture" STREAM = "Stream"
[docs] class ImageMode(SubsetEnum): SINGLE = "Single" MULTIPLE = "Multiple" CONTINUOUS = "Continuous"
[docs] class NDAttributeDataType(StrictEnum): INT = "INT" DOUBLE = "DOUBLE" STRING = "STRING"
[docs] class NDAttributePvDbrType(StrictEnum): DBR_SHORT = "DBR_SHORT" DBR_ENUM = "DBR_ENUM" DBR_INT = "DBR_INT" DBR_LONG = "DBR_LONG" DBR_FLOAT = "DBR_FLOAT" DBR_DOUBLE = "DBR_DOUBLE" DBR_STRING = "DBR_STRING" DBR_CHAR = "DBR_CHAR"
[docs] @dataclass class NDAttributePv: name: str # name of attribute stamped on array, also scientifically useful name # when appended to device.name signal: SignalR # caget the pv given by signal.source and attach to each frame dbrtype: NDAttributePvDbrType description: str = "" # A description that appears in the HDF file as an attribute
[docs] @dataclass class NDAttributeParam: name: str # name of attribute stamped on array, also scientifically useful name # when appended to device.name param: str # The parameter string as seen in the INP link of the record datatype: NDAttributeDataType # The datatype of the parameter addr: int = 0 # The address as seen in the INP link of the record description: str = "" # A description that appears in the HDF file as an attribute
[docs] async def stop_busy_record( signal: SignalRW[SignalDatatypeT], value: SignalDatatypeT, timeout: float = DEFAULT_TIMEOUT, status_timeout: float | None = None, ) -> None: await signal.set(value, wait=False, timeout=status_timeout) await wait_for_value(signal, value, timeout=timeout)