Source code for ophyd_async.epics.adcore._utils
from dataclasses import dataclass
from ophyd_async.core import (
DEFAULT_TIMEOUT,
SignalDatatypeT,
SignalR,
SignalRW,
StrictEnum,
wait_for_value,
)
class ADBaseDataType(StrictEnum):
Int8 = "Int8"
UInt8 = "UInt8"
Int16 = "Int16"
UInt16 = "UInt16"
Int32 = "Int32"
UInt32 = "UInt32"
Int64 = "Int64"
UInt64 = "UInt64"
Float32 = "Float32"
Float64 = "Float64"
def convert_ad_dtype_to_np(ad_dtype: ADBaseDataType) -> str:
ad_dtype_to_np_dtype = {
ADBaseDataType.Int8: "|i1",
ADBaseDataType.UInt8: "|u1",
ADBaseDataType.Int16: "<i2",
ADBaseDataType.UInt16: "<u2",
ADBaseDataType.Int32: "<i4",
ADBaseDataType.UInt32: "<u4",
ADBaseDataType.Int64: "<i8",
ADBaseDataType.UInt64: "<u8",
ADBaseDataType.Float32: "<f4",
ADBaseDataType.Float64: "<f8",
}
return ad_dtype_to_np_dtype[ad_dtype]
def convert_pv_dtype_to_np(datatype: str) -> str:
_pvattribute_to_ad_datatype = {
"DBR_SHORT": ADBaseDataType.Int16,
"DBR_ENUM": ADBaseDataType.Int16,
"DBR_INT": ADBaseDataType.Int32,
"DBR_LONG": ADBaseDataType.Int32,
"DBR_FLOAT": ADBaseDataType.Float32,
"DBR_DOUBLE": ADBaseDataType.Float64,
}
if datatype in ["DBR_STRING", "DBR_CHAR"]:
np_datatype = "s40"
elif datatype == "DBR_NATIVE":
raise ValueError("Don't support DBR_NATIVE yet")
else:
try:
np_datatype = convert_ad_dtype_to_np(_pvattribute_to_ad_datatype[datatype])
except KeyError as e:
raise ValueError(f"Invalid dbr type {datatype}") from e
return np_datatype
def convert_param_dtype_to_np(datatype: str) -> str:
_paramattribute_to_ad_datatype = {
"INT": ADBaseDataType.Int32,
"INT64": ADBaseDataType.Int64,
"DOUBLE": ADBaseDataType.Float64,
}
if datatype in ["STRING"]:
np_datatype = "s40"
else:
try:
np_datatype = convert_ad_dtype_to_np(
_paramattribute_to_ad_datatype[datatype]
)
except KeyError as e:
raise ValueError(f"Invalid datatype {datatype}") from e
return np_datatype
class FileWriteMode(StrictEnum):
single = "Single"
capture = "Capture"
stream = "Stream"
class ImageMode(StrictEnum):
single = "Single"
multiple = "Multiple"
continuous = "Continuous"
class NDAttributeDataType(StrictEnum):
INT = "INT"
DOUBLE = "DOUBLE"
STRING = "STRING"
class NDAttributePvDbrType(StrictEnum):
DBR_SHORT = "DBR_SHORT"
DBR_ENUM = "DBR_ENUM"
DBR_INT = "DBR_INT"
DBR_LONG = "DBR_LONG"
DBR_FLOAT = "DBR_FLOAT"
DBR_DOUBLE = "DBR_DOUBLE"
DBR_STRING = "DBR_STRING"
DBR_CHAR = "DBR_CHAR"
[docs]
@dataclass
class NDAttributePv:
name: str # name of attribute stamped on array, also scientifically useful name
# when appended to device.name
signal: SignalR # caget the pv given by signal.source and attach to each frame
dbrtype: NDAttributePvDbrType
description: str = "" # A description that appears in the HDF file as an attribute
[docs]
@dataclass
class NDAttributeParam:
name: str # name of attribute stamped on array, also scientifically useful name
# when appended to device.name
param: str # The parameter string as seen in the INP link of the record
datatype: NDAttributeDataType # The datatype of the parameter
addr: int = 0 # The address as seen in the INP link of the record
description: str = "" # A description that appears in the HDF file as an attribute
async def stop_busy_record(
signal: SignalRW[SignalDatatypeT],
value: SignalDatatypeT,
timeout: float = DEFAULT_TIMEOUT,
status_timeout: float | None = None,
) -> None:
await signal.set(value, wait=False, timeout=status_timeout)
await wait_for_value(signal, value, timeout=timeout)