import textwrap
from dataclasses import dataclass
from typing import Any, Generic, TypeVar
import numpy as np
from ophyd_async.core import (
Array1D,
DTypeScalar_co,
StrictEnum,
)
from ophyd_async.testing import float_array_value, int_array_value
from tango import AttrDataFormat, AttrWriteType, DevState
from tango.server import Device, attribute, command
T = TypeVar("T")
[docs]
class ExampleStrEnum(StrictEnum):
A = "AAA"
B = "BBB"
C = "CCC"
def int_image_value(
dtype: type[DTypeScalar_co],
):
# how do we type this?
array_1d = int_array_value(dtype)
return np.vstack((array_1d, array_1d))
def float_image_value(
dtype: type[DTypeScalar_co],
):
# how do we type this?
array_1d = float_array_value(dtype)
return np.vstack((array_1d, array_1d))
def _valid_command(dformat: AttrDataFormat, dtype: str):
if dtype == "DevUChar":
return False
if dformat != AttrDataFormat.SCALAR and dtype in ["DevState", "DevEnum"]:
return False
return True
@dataclass
class AttributeData(Generic[T]):
name: str
tango_type: str
initial_scalar: T
initial_spectrum: Array1D
_all_attribute_definitions = [
AttributeData(
"str",
"DevString",
"test_string",
np.array(["one", "two", "three"], dtype=str),
),
AttributeData(
"bool",
"DevBoolean",
True,
np.array([False, True], dtype=bool),
),
AttributeData("strenum", "DevEnum", 1, np.array([0, 1, 2])),
AttributeData("int8", "DevShort", 1, int_array_value(np.int8)),
AttributeData("uint8", "DevUChar", 1, int_array_value(np.uint8)),
AttributeData("int16", "DevShort", 1, int_array_value(np.int16)),
AttributeData("uint16", "DevUShort", 1, int_array_value(np.uint16)),
AttributeData("int32", "DevLong", 1, int_array_value(np.int32)),
AttributeData("uint32", "DevULong", 1, int_array_value(np.uint32)),
AttributeData("int64", "DevLong64", 1, int_array_value(np.int64)),
AttributeData("uint64", "DevULong64", 1, int_array_value(np.uint64)),
AttributeData("float32", "DevFloat", 1.234, float_array_value(np.float32)),
AttributeData("float64", "DevDouble", 1.234, float_array_value(np.float64)),
AttributeData(
"my_state",
"DevState",
DevState.INIT,
np.array([DevState.INIT, DevState.ON, DevState.MOVING], dtype=DevState),
),
]
[docs]
class OneOfEverythingTangoDevice(Device):
attr_values = {}
initial_values = {}
def _add_attr(self, attr: attribute, initial_value):
self.attr_values[attr.name] = initial_value
self.initial_values[attr.name] = initial_value
self.add_attribute(attr)
self.set_change_event(attr.name, True, False)
[docs]
def add_scalar_attr(self, name: str, dtype: str, initial_value: Any):
attr = attribute(
name=name,
dtype=dtype,
dformat=AttrDataFormat.SCALAR,
access=AttrWriteType.READ_WRITE,
fget=self.read,
fset=self.write,
enum_labels=[e.value for e in ExampleStrEnum],
)
self._add_attr(attr, initial_value)
[docs]
def add_array_attrs(self, name: str, dtype: str, initial_value: np.ndarray):
spectrum_name = f"{name}_spectrum"
spectrum_attr = attribute(
name=spectrum_name,
dtype=dtype,
dformat=AttrDataFormat.SPECTRUM,
access=AttrWriteType.READ_WRITE,
fget=self.read,
fset=self.write,
max_dim_x=initial_value.shape[-1],
enum_labels=[e.value for e in ExampleStrEnum],
)
image_name = f"{name}_image"
image_attr = attribute(
name=image_name,
dtype=dtype,
dformat=AttrDataFormat.IMAGE,
access=AttrWriteType.READ_WRITE,
fget=self.read,
fset=self.write,
max_dim_x=initial_value.shape[-1],
max_dim_y=2,
enum_labels=[e.value for e in ExampleStrEnum],
)
self._add_attr(spectrum_attr, initial_value)
# have image just be 2 of the initial spectrum stacked
self._add_attr(image_attr, np.vstack((initial_value, initial_value)))
[docs]
def add_scalar_command(self, name: str, dtype: str):
if _valid_command(AttrDataFormat.SCALAR, dtype):
self.add_command(
command(
f=getattr(self, f"{name}_cmd"),
dtype_in=dtype,
dtype_out=dtype,
dformat_in=AttrDataFormat.SCALAR,
dformat_out=AttrDataFormat.SCALAR,
),
)
[docs]
def add_spectrum_command(self, name: str, dtype: str):
if _valid_command(AttrDataFormat.SPECTRUM, dtype):
self.add_command(
command(
f=getattr(self, f"{name}_spectrum_cmd"),
dtype_in=dtype,
dtype_out=dtype,
dformat_in=AttrDataFormat.SPECTRUM,
dformat_out=AttrDataFormat.SPECTRUM,
),
)
[docs]
def initialize_dynamic_attributes(self):
for attr_data in _all_attribute_definitions:
self.add_scalar_attr(
attr_data.name, attr_data.tango_type, attr_data.initial_scalar
)
self.add_array_attrs(
attr_data.name, attr_data.tango_type, attr_data.initial_spectrum
)
self.add_scalar_command(attr_data.name, attr_data.tango_type)
self.add_spectrum_command(attr_data.name, attr_data.tango_type)
[docs]
@command
def reset_values(self):
for attr_name in self.attr_values:
self.attr_values[attr_name] = self.initial_values[attr_name]
[docs]
def read(self, attr):
value = self.attr_values[attr.get_name()]
attr.set_value(value)
[docs]
def write(self, attr):
new_value = attr.get_write_value()
self.attr_values[attr.get_name()] = new_value
self.push_change_event(attr.get_name(), new_value)
echo_command_code = textwrap.dedent(
"""\
def {}(self, arg):
return arg
"""
)
for attr_data in _all_attribute_definitions:
if _valid_command(AttrDataFormat.SCALAR, attr_data.tango_type):
exec(echo_command_code.format(f"{attr_data.name}_cmd"))
if _valid_command(AttrDataFormat.SPECTRUM, attr_data.tango_type):
exec(echo_command_code.format(f"{attr_data.name}_spectrum_cmd"))