import textwrap
from collections.abc import Sequence
from dataclasses import dataclass
from typing import Any, Generic, TypeVar
import numpy as np
from tango import AttrDataFormat, AttrWriteType, DevState
from tango.server import Device, attribute, command
from ophyd_async.core import (
Array1D,
DTypeScalar_co,
StrictEnum,
)
from ophyd_async.testing import float_array_value, int_array_value
T = TypeVar("T")
[docs]
class ExampleStrEnum(StrictEnum):
A = "AAA"
B = "BBB"
C = "CCC"
def int_image_value(
dtype: type[DTypeScalar_co],
):
# how do we type this?
array_1d = int_array_value(dtype)
return np.vstack((array_1d, array_1d))
def float_image_value(
dtype: type[DTypeScalar_co],
):
# how do we type this?
array_1d = float_array_value(dtype)
return np.vstack((array_1d, array_1d))
def _valid_command(dformat: AttrDataFormat, dtype: str):
if dtype == "DevUChar":
return False
if dformat != AttrDataFormat.SCALAR and dtype in ["DevState", "DevEnum"]:
return False
return True
@dataclass
class AttributeData(Generic[T]):
name: str
tango_type: str
initial_scalar: T
initial_spectrum: Array1D | Sequence[T]
_all_attribute_definitions = [
AttributeData(
"str",
"DevString",
"test_string",
["one", "two", "three"],
),
AttributeData(
"bool",
"DevBoolean",
True,
np.array([False, True], dtype=bool),
),
AttributeData("strenum", "DevEnum", 1, np.array([0, 1, 2])),
AttributeData("int8", "DevShort", 1, int_array_value(np.int8)),
AttributeData("uint8", "DevUChar", 1, int_array_value(np.uint8)),
AttributeData("int16", "DevShort", 1, int_array_value(np.int16)),
AttributeData("uint16", "DevUShort", 1, int_array_value(np.uint16)),
AttributeData("int32", "DevLong", 1, int_array_value(np.int32)),
AttributeData("uint32", "DevULong", 1, int_array_value(np.uint32)),
AttributeData("int64", "DevLong64", 1, int_array_value(np.int64)),
AttributeData("uint64", "DevULong64", 1, int_array_value(np.uint64)),
AttributeData("float32", "DevFloat", 1.234, float_array_value(np.float32)),
AttributeData("float64", "DevDouble", 1.234, float_array_value(np.float64)),
AttributeData(
"my_state",
"DevState",
DevState.INIT,
np.array(list(DevState.names.values()), dtype=DevState),
),
]
[docs]
class OneOfEverythingTangoDevice(Device):
attr_values = {}
initial_values = {}
def _add_attr(self, attr: attribute, initial_value):
self.attr_values[attr.name] = initial_value
self.initial_values[attr.name] = initial_value
self.add_attribute(attr)
self.set_change_event(attr.name, True, False)
[docs]
def add_scalar_attr(self, name: str, dtype: str, initial_value: Any):
attr = attribute(
name=name,
dtype=dtype,
dformat=AttrDataFormat.SCALAR,
access=AttrWriteType.READ_WRITE,
fget=self.read,
fset=self.write,
enum_labels=[e.value for e in ExampleStrEnum],
)
self._add_attr(attr, initial_value)
[docs]
def add_array_attrs(self, name: str, dtype: str, initial_value: np.ndarray):
spectrum_name = f"{name}_spectrum"
if hasattr(initial_value, "shape"):
max_dim_x = initial_value.shape[-1]
else:
max_dim_x = len(initial_value)
spectrum_attr = attribute(
name=spectrum_name,
dtype=dtype,
dformat=AttrDataFormat.SPECTRUM,
access=AttrWriteType.READ_WRITE,
fget=self.read,
fset=self.write,
max_dim_x=max_dim_x,
enum_labels=[e.value for e in ExampleStrEnum],
)
image_name = f"{name}_image"
image_attr = attribute(
name=image_name,
dtype=dtype,
dformat=AttrDataFormat.IMAGE,
access=AttrWriteType.READ_WRITE,
fget=self.read,
fset=self.write,
max_dim_x=max_dim_x,
max_dim_y=2,
enum_labels=[e.value for e in ExampleStrEnum],
)
self._add_attr(spectrum_attr, initial_value)
# have image just be 2 of the initial spectrum stacked
# String images are not supported, do not add their attribute data
if name in ["str", "strenum", "my_state"]:
return
self._add_attr(image_attr, np.vstack((initial_value, initial_value)))
[docs]
def add_scalar_command(self, name: str, dtype: str):
if _valid_command(AttrDataFormat.SCALAR, dtype):
self.add_command(
command(
f=getattr(self, f"{name}_cmd"),
dtype_in=dtype,
dtype_out=dtype,
dformat_in=AttrDataFormat.SCALAR,
dformat_out=AttrDataFormat.SCALAR,
),
)
[docs]
def add_spectrum_command(self, name: str, dtype: str):
if _valid_command(AttrDataFormat.SPECTRUM, dtype):
self.add_command(
command(
f=getattr(self, f"{name}_spectrum_cmd"),
dtype_in=dtype,
dtype_out=dtype,
dformat_in=AttrDataFormat.SPECTRUM,
dformat_out=AttrDataFormat.SPECTRUM,
),
)
[docs]
def initialize_dynamic_attributes(self):
for attr_data in _all_attribute_definitions:
self.add_scalar_attr(
attr_data.name, attr_data.tango_type, attr_data.initial_scalar
)
self.add_array_attrs(
attr_data.name, attr_data.tango_type, attr_data.initial_spectrum
)
self.add_scalar_command(attr_data.name, attr_data.tango_type)
self.add_spectrum_command(attr_data.name, attr_data.tango_type)
[docs]
@command
def reset_values(self):
for attr_name in self.attr_values:
self.attr_values[attr_name] = self.initial_values[attr_name]
[docs]
def read(self, attr):
value = self.attr_values[attr.get_name()]
attr.set_value(value)
[docs]
def write(self, attr):
new_value = attr.get_write_value()
self.attr_values[attr.get_name()] = new_value
self.push_change_event(attr.get_name(), new_value)
echo_command_code = textwrap.dedent(
"""\
def {}(self, arg):
return arg
"""
)
for attr_data in _all_attribute_definitions:
if _valid_command(AttrDataFormat.SCALAR, attr_data.tango_type):
exec(echo_command_code.format(f"{attr_data.name}_cmd"))
if _valid_command(AttrDataFormat.SPECTRUM, attr_data.tango_type):
exec(echo_command_code.format(f"{attr_data.name}_spectrum_cmd"))