Source code for ophyd_async.tango.testing._one_of_everything

import textwrap
from collections.abc import Sequence
from dataclasses import dataclass
from typing import Any, Generic, TypeVar

import numpy as np
from tango import AttrDataFormat, AttrWriteType, CmdArgType, DevState
from tango.server import Device, attribute, command

from ophyd_async.core import (
    Array1D,
    DTypeScalar_co,
    StrictEnum,
)
from ophyd_async.testing import float_array_value, int_array_value

T = TypeVar("T")


[docs] class ExampleStrEnum(StrictEnum): A = "AAA" B = "BBB" C = "CCC"
def int_image_value( dtype: type[DTypeScalar_co], ): # how do we type this? array_1d = int_array_value(dtype) return np.vstack((array_1d, array_1d)) def float_image_value( dtype: type[DTypeScalar_co], ): # how do we type this? array_1d = float_array_value(dtype) return np.vstack((array_1d, array_1d)) def _valid_command(dformat: AttrDataFormat, dtype: str): if dformat != AttrDataFormat.SCALAR and dtype in ["DevState", "DevEnum"]: return False return True @dataclass class AttributeData(Generic[T]): name: str tango_type: str initial_scalar: T initial_spectrum: Array1D | Sequence[T] _all_attribute_definitions = [ AttributeData( "str", "DevString", "test_string", ["one", "two", "three"], ), AttributeData( "bool", "DevBoolean", True, np.array([False, True], dtype=bool), ), AttributeData("strenum", "DevEnum", 1, np.array([0, 1, 2])), AttributeData("int8", "DevShort", 1, int_array_value(np.int8)), AttributeData("uint8", "DevUChar", 1, int_array_value(np.uint8)), AttributeData("int16", "DevShort", 1, int_array_value(np.int16)), AttributeData("uint16", "DevUShort", 1, int_array_value(np.uint16)), AttributeData("int32", "DevLong", 1, int_array_value(np.int32)), AttributeData("uint32", "DevULong", 1, int_array_value(np.uint32)), AttributeData("int64", "DevLong64", 1, int_array_value(np.int64)), AttributeData("uint64", "DevULong64", 1, int_array_value(np.uint64)), AttributeData("float32", "DevFloat", 1.234, float_array_value(np.float32)), AttributeData("float64", "DevDouble", 1.234, float_array_value(np.float64)), AttributeData( "my_state", "DevState", DevState.INIT, np.array(list(DevState.names.values()), dtype=DevState), # type: ignore ), ]
[docs] class OneOfEverythingTangoDevice(Device): attr_values = {} initial_values = {} def _add_attr(self, attr: attribute, initial_value): self.attr_values[attr.name] = initial_value self.initial_values[attr.name] = initial_value self.add_attribute(attr) self.set_change_event(attr.name, True, False)
[docs] def add_scalar_attr(self, name: str, dtype: str, initial_value: Any): attr = attribute( name=name, dtype=dtype, dformat=AttrDataFormat.SCALAR, access=AttrWriteType.READ_WRITE, fget=self.read, fset=self.write, enum_labels=[e.value for e in ExampleStrEnum], ) self._add_attr(attr, initial_value)
[docs] def long_string_cmd(self, arg): """Echo command for DevVarLongStringArray.""" return arg
[docs] def double_string_cmd(self, arg): """Echo command for DevVarDoubleStringArray.""" return arg
[docs] def void_cmd(self): """Command for DevVoid.""" return
[docs] def add_array_attrs(self, name: str, dtype: str, initial_value: np.ndarray): spectrum_name = f"{name}_spectrum" if hasattr(initial_value, "shape"): max_dim_x = initial_value.shape[-1] else: max_dim_x = len(initial_value) spectrum_attr = attribute( name=spectrum_name, dtype=dtype, dformat=AttrDataFormat.SPECTRUM, access=AttrWriteType.READ_WRITE, fget=self.read, fset=self.write, max_dim_x=max_dim_x, enum_labels=[e.value for e in ExampleStrEnum], ) image_name = f"{name}_image" image_attr = attribute( name=image_name, dtype=dtype, dformat=AttrDataFormat.IMAGE, access=AttrWriteType.READ_WRITE, fget=self.read, fset=self.write, max_dim_x=max_dim_x, max_dim_y=2, enum_labels=[e.value for e in ExampleStrEnum], ) # Arrays of enums are not supported, do not add their attribute data if name in ["strenum", "my_state"]: return self._add_attr(spectrum_attr, initial_value) # have image just be 2 of the initial spectrum stacked # String images are not supported, do not add their attribute data if name in ["str"]: return self._add_attr(image_attr, np.vstack((initial_value, initial_value))) void_cmd = command( f=self.void_cmd, dtype_in=CmdArgType.DevVoid, dtype_out=CmdArgType.DevVoid, ) long_string_table_cmd = command( f=self.long_string_cmd, dtype_in=CmdArgType.DevVarLongStringArray, dtype_out=CmdArgType.DevVarLongStringArray, ) double_string_table_cmd = command( f=self.double_string_cmd, dtype_in=CmdArgType.DevVarDoubleStringArray, dtype_out=CmdArgType.DevVarDoubleStringArray, ) self.add_command(void_cmd) self.add_command(long_string_table_cmd) self.add_command(double_string_table_cmd)
[docs] def add_scalar_command(self, name: str, dtype: str): if _valid_command(AttrDataFormat.SCALAR, dtype): self.add_command( command( f=getattr(self, f"{name}_cmd"), dtype_in=dtype, dtype_out=dtype, dformat_in=AttrDataFormat.SCALAR, dformat_out=AttrDataFormat.SCALAR, ), )
[docs] def add_spectrum_command(self, name: str, dtype: str): if _valid_command(AttrDataFormat.SPECTRUM, dtype): if name in ["int8", "uint8"]: self.add_command( command( f=getattr(self, f"{name}_spectrum_cmd"), dtype_in=CmdArgType.DevVarCharArray, dtype_out=CmdArgType.DevVarCharArray, ), ) else: self.add_command( command( f=getattr(self, f"{name}_spectrum_cmd"), dtype_in=dtype, dtype_out=dtype, dformat_in=AttrDataFormat.SPECTRUM, dformat_out=AttrDataFormat.SPECTRUM, ), )
[docs] def initialize_dynamic_attributes(self): for attr_data in _all_attribute_definitions: self.add_scalar_attr( attr_data.name, attr_data.tango_type, attr_data.initial_scalar ) self.add_array_attrs( attr_data.name, attr_data.tango_type, attr_data.initial_spectrum ) self.add_scalar_command(attr_data.name, attr_data.tango_type) self.add_spectrum_command(attr_data.name, attr_data.tango_type)
[docs] @command(dtype_in=float, dtype_out=bool) def float_to_bool_cmd(self, value: float) -> bool: """Command with float input and bool output (different in/out types).""" return value > 0
[docs] @command def reset_values(self): for attr_name in self.attr_values: self.attr_values[attr_name] = self.initial_values[attr_name]
[docs] def read(self, attr): value = self.attr_values[attr.get_name()] attr.set_value(value)
[docs] def write(self, attr): new_value = attr.get_write_value() self.attr_values[attr.get_name()] = new_value self.push_change_event(attr.get_name(), new_value)
echo_command_code = textwrap.dedent( """\ def {}(self, arg): return arg """ ) for attr_data in _all_attribute_definitions: if _valid_command(AttrDataFormat.SCALAR, attr_data.tango_type): exec(echo_command_code.format(f"{attr_data.name}_cmd")) if _valid_command(AttrDataFormat.SPECTRUM, attr_data.tango_type): exec(echo_command_code.format(f"{attr_data.name}_spectrum_cmd"))