import textwrap
from collections.abc import Sequence
from dataclasses import dataclass
from typing import Any, Generic, TypeVar
import numpy as np
from tango import AttrDataFormat, AttrWriteType, CmdArgType, DevState
from tango.server import Device, attribute, command
from ophyd_async.core import (
Array1D,
DTypeScalar_co,
StrictEnum,
)
from ophyd_async.testing import float_array_value, int_array_value
T = TypeVar("T")
[docs]
class ExampleStrEnum(StrictEnum):
A = "AAA"
B = "BBB"
C = "CCC"
def int_image_value(
dtype: type[DTypeScalar_co],
):
# how do we type this?
array_1d = int_array_value(dtype)
return np.vstack((array_1d, array_1d))
def float_image_value(
dtype: type[DTypeScalar_co],
):
# how do we type this?
array_1d = float_array_value(dtype)
return np.vstack((array_1d, array_1d))
def _valid_command(dformat: AttrDataFormat, dtype: str):
if dformat != AttrDataFormat.SCALAR and dtype in ["DevState", "DevEnum"]:
return False
return True
@dataclass
class AttributeData(Generic[T]):
name: str
tango_type: str
initial_scalar: T
initial_spectrum: Array1D | Sequence[T]
_all_attribute_definitions = [
AttributeData(
"str",
"DevString",
"test_string",
["one", "two", "three"],
),
AttributeData(
"bool",
"DevBoolean",
True,
np.array([False, True], dtype=bool),
),
AttributeData("strenum", "DevEnum", 1, np.array([0, 1, 2])),
AttributeData("int8", "DevShort", 1, int_array_value(np.int8)),
AttributeData("uint8", "DevUChar", 1, int_array_value(np.uint8)),
AttributeData("int16", "DevShort", 1, int_array_value(np.int16)),
AttributeData("uint16", "DevUShort", 1, int_array_value(np.uint16)),
AttributeData("int32", "DevLong", 1, int_array_value(np.int32)),
AttributeData("uint32", "DevULong", 1, int_array_value(np.uint32)),
AttributeData("int64", "DevLong64", 1, int_array_value(np.int64)),
AttributeData("uint64", "DevULong64", 1, int_array_value(np.uint64)),
AttributeData("float32", "DevFloat", 1.234, float_array_value(np.float32)),
AttributeData("float64", "DevDouble", 1.234, float_array_value(np.float64)),
AttributeData(
"my_state",
"DevState",
DevState.INIT,
np.array(list(DevState.names.values()), dtype=DevState), # type: ignore
),
]
[docs]
class OneOfEverythingTangoDevice(Device):
attr_values = {}
initial_values = {}
def _add_attr(self, attr: attribute, initial_value):
self.attr_values[attr.name] = initial_value
self.initial_values[attr.name] = initial_value
self.add_attribute(attr)
self.set_change_event(attr.name, True, False)
[docs]
def add_scalar_attr(self, name: str, dtype: str, initial_value: Any):
attr = attribute(
name=name,
dtype=dtype,
dformat=AttrDataFormat.SCALAR,
access=AttrWriteType.READ_WRITE,
fget=self.read,
fset=self.write,
enum_labels=[e.value for e in ExampleStrEnum],
)
self._add_attr(attr, initial_value)
[docs]
def long_string_cmd(self, arg):
"""Echo command for DevVarLongStringArray."""
return arg
[docs]
def double_string_cmd(self, arg):
"""Echo command for DevVarDoubleStringArray."""
return arg
[docs]
def void_cmd(self):
"""Command for DevVoid."""
return
[docs]
def add_array_attrs(self, name: str, dtype: str, initial_value: np.ndarray):
spectrum_name = f"{name}_spectrum"
if hasattr(initial_value, "shape"):
max_dim_x = initial_value.shape[-1]
else:
max_dim_x = len(initial_value)
spectrum_attr = attribute(
name=spectrum_name,
dtype=dtype,
dformat=AttrDataFormat.SPECTRUM,
access=AttrWriteType.READ_WRITE,
fget=self.read,
fset=self.write,
max_dim_x=max_dim_x,
enum_labels=[e.value for e in ExampleStrEnum],
)
image_name = f"{name}_image"
image_attr = attribute(
name=image_name,
dtype=dtype,
dformat=AttrDataFormat.IMAGE,
access=AttrWriteType.READ_WRITE,
fget=self.read,
fset=self.write,
max_dim_x=max_dim_x,
max_dim_y=2,
enum_labels=[e.value for e in ExampleStrEnum],
)
# Arrays of enums are not supported, do not add their attribute data
if name in ["strenum", "my_state"]:
return
self._add_attr(spectrum_attr, initial_value)
# have image just be 2 of the initial spectrum stacked
# String images are not supported, do not add their attribute data
if name in ["str"]:
return
self._add_attr(image_attr, np.vstack((initial_value, initial_value)))
void_cmd = command(
f=self.void_cmd,
dtype_in=CmdArgType.DevVoid,
dtype_out=CmdArgType.DevVoid,
)
long_string_table_cmd = command(
f=self.long_string_cmd,
dtype_in=CmdArgType.DevVarLongStringArray,
dtype_out=CmdArgType.DevVarLongStringArray,
)
double_string_table_cmd = command(
f=self.double_string_cmd,
dtype_in=CmdArgType.DevVarDoubleStringArray,
dtype_out=CmdArgType.DevVarDoubleStringArray,
)
self.add_command(void_cmd)
self.add_command(long_string_table_cmd)
self.add_command(double_string_table_cmd)
[docs]
def add_scalar_command(self, name: str, dtype: str):
if _valid_command(AttrDataFormat.SCALAR, dtype):
self.add_command(
command(
f=getattr(self, f"{name}_cmd"),
dtype_in=dtype,
dtype_out=dtype,
dformat_in=AttrDataFormat.SCALAR,
dformat_out=AttrDataFormat.SCALAR,
),
)
[docs]
def add_spectrum_command(self, name: str, dtype: str):
if _valid_command(AttrDataFormat.SPECTRUM, dtype):
if name in ["int8", "uint8"]:
self.add_command(
command(
f=getattr(self, f"{name}_spectrum_cmd"),
dtype_in=CmdArgType.DevVarCharArray,
dtype_out=CmdArgType.DevVarCharArray,
),
)
else:
self.add_command(
command(
f=getattr(self, f"{name}_spectrum_cmd"),
dtype_in=dtype,
dtype_out=dtype,
dformat_in=AttrDataFormat.SPECTRUM,
dformat_out=AttrDataFormat.SPECTRUM,
),
)
[docs]
def initialize_dynamic_attributes(self):
for attr_data in _all_attribute_definitions:
self.add_scalar_attr(
attr_data.name, attr_data.tango_type, attr_data.initial_scalar
)
self.add_array_attrs(
attr_data.name, attr_data.tango_type, attr_data.initial_spectrum
)
self.add_scalar_command(attr_data.name, attr_data.tango_type)
self.add_spectrum_command(attr_data.name, attr_data.tango_type)
[docs]
@command(dtype_in=float, dtype_out=bool)
def float_to_bool_cmd(self, value: float) -> bool:
"""Command with float input and bool output (different in/out types)."""
return value > 0
[docs]
@command
def reset_values(self):
for attr_name in self.attr_values:
self.attr_values[attr_name] = self.initial_values[attr_name]
[docs]
def read(self, attr):
value = self.attr_values[attr.get_name()]
attr.set_value(value)
[docs]
def write(self, attr):
new_value = attr.get_write_value()
self.attr_values[attr.get_name()] = new_value
self.push_change_event(attr.get_name(), new_value)
echo_command_code = textwrap.dedent(
"""\
def {}(self, arg):
return arg
"""
)
for attr_data in _all_attribute_definitions:
if _valid_command(AttrDataFormat.SCALAR, attr_data.tango_type):
exec(echo_command_code.format(f"{attr_data.name}_cmd"))
if _valid_command(AttrDataFormat.SPECTRUM, attr_data.tango_type):
exec(echo_command_code.format(f"{attr_data.name}_spectrum_cmd"))