Source code for ophyd_async.tango.core._utils
import inspect
import re
from collections.abc import Sequence
from typing import Any, ParamSpec, Protocol, TypeVar, runtime_checkable
import numpy as np
from tango import AttrDataFormat, AttributeAlarmInfo, CmdArgType, StdStringVector
from ophyd_async.core import Array1D, SignalDatatype, StrictEnum, Table
T = TypeVar("T")
P = ParamSpec("P")
[docs]
class DevStateEnum(StrictEnum):
ON = "ON"
OFF = "OFF"
CLOSE = "CLOSE"
OPEN = "OPEN"
INSERT = "INSERT"
EXTRACT = "EXTRACT"
MOVING = "MOVING"
STANDBY = "STANDBY"
FAULT = "FAULT"
INIT = "INIT"
RUNNING = "RUNNING"
ALARM = "ALARM"
DISABLE = "DISABLE"
UNKNOWN = "UNKNOWN"
[docs]
def get_full_attr_trl(device_trl: str, attr_name: str) -> str:
device_parts = device_trl.split("#", 1)
# my/device/name#dbase=no splits into my/device/name and
# dbase=no
full_trl = device_parts[0] + "/" + attr_name
if len(device_parts) > 1:
full_trl += "#" + device_parts[1]
return full_trl
[docs]
def get_device_trl_and_attr(name: str):
# trl can have form:
# <protocol://><server:host/>domain/family/member/attr_name<#dbase=no>
# e.g. tango://127.0.0.1:8888/test/nodb/test#dbase=no
re_str = (
r"([\.a-zA-Z0-9_-]*://)?([\.a-zA-Z0-9_-]+:[0-9]+/)?"
r"([^#/]+/[^#/]+/[^#/]+/)([^#/]+)(#dbase=[a-z]+)?"
)
search = re.search(re_str, name)
if not search:
raise ValueError(f"Could not parse device and attribute from trl {name}")
groups = [part if part else "" for part in search.groups()]
attr = groups.pop(3) # extract attr name from groups
groups[2] = groups[2].removesuffix("/") # remove trailing slash from device name
device = "".join(groups)
return device, attr
[docs]
def try_to_cast_as_float(value: Any) -> float | None:
"""Attempt to cast a value to float, returning None on failure."""
try:
return float(value)
except (ValueError, TypeError):
return None
[docs]
class TangoLongStringTable(Table):
long: Array1D[np.int32]
string: Sequence[str]
def __eq__(self, other):
if not isinstance(other, TangoLongStringTable):
return False
long_equal = np.array_equal(self.long, other.long)
string_equal = self.string == other.string
return long_equal and string_equal
[docs]
class TangoDoubleStringTable(Table):
double: Array1D[np.float64]
string: Sequence[str]
def __eq__(self, other):
if not isinstance(other, TangoDoubleStringTable):
return False
double_equal = np.array_equal(self.double, other.double)
string_equal = self.string == other.string
return double_equal and string_equal
[docs]
def sig_from_types(
in_type: type[SignalDatatype] | None, out_type: type[SignalDatatype] | None
):
params = (
[
inspect.Parameter(
"arg", inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=in_type
)
]
if in_type not in (None, type(None))
else []
)
return inspect.Signature(params, return_annotation=out_type)
[docs]
@runtime_checkable
class AttributeConfig(Protocol):
data_type: int
data_format: AttrDataFormat
enum_labels: StdStringVector | list[str]
format: str
min_value: str
max_value: str
alarms: AttributeAlarmInfo
unit: str
[docs]
@runtime_checkable
class CommandConfig(Protocol):
in_type: CmdArgType
out_type: CmdArgType