Source code for ophyd_async.tango.testing._one_of_everything

import textwrap
from collections.abc import Sequence
from dataclasses import dataclass
from typing import Any, Generic, TypeVar

import numpy as np
from tango import AttrDataFormat, AttrWriteType, DevState
from tango.server import Device, attribute, command

from ophyd_async.core import (
    Array1D,
    DTypeScalar_co,
    StrictEnum,
)
from ophyd_async.testing import float_array_value, int_array_value

T = TypeVar("T")


[docs] class ExampleStrEnum(StrictEnum): A = "AAA" B = "BBB" C = "CCC"
def int_image_value( dtype: type[DTypeScalar_co], ): # how do we type this? array_1d = int_array_value(dtype) return np.vstack((array_1d, array_1d)) def float_image_value( dtype: type[DTypeScalar_co], ): # how do we type this? array_1d = float_array_value(dtype) return np.vstack((array_1d, array_1d)) def _valid_command(dformat: AttrDataFormat, dtype: str): if dtype == "DevUChar": return False if dformat != AttrDataFormat.SCALAR and dtype in ["DevState", "DevEnum"]: return False return True @dataclass class AttributeData(Generic[T]): name: str tango_type: str initial_scalar: T initial_spectrum: Array1D | Sequence[T] _all_attribute_definitions = [ AttributeData( "str", "DevString", "test_string", ["one", "two", "three"], ), AttributeData( "bool", "DevBoolean", True, np.array([False, True], dtype=bool), ), AttributeData("strenum", "DevEnum", 1, np.array([0, 1, 2])), AttributeData("int8", "DevShort", 1, int_array_value(np.int8)), AttributeData("uint8", "DevUChar", 1, int_array_value(np.uint8)), AttributeData("int16", "DevShort", 1, int_array_value(np.int16)), AttributeData("uint16", "DevUShort", 1, int_array_value(np.uint16)), AttributeData("int32", "DevLong", 1, int_array_value(np.int32)), AttributeData("uint32", "DevULong", 1, int_array_value(np.uint32)), AttributeData("int64", "DevLong64", 1, int_array_value(np.int64)), AttributeData("uint64", "DevULong64", 1, int_array_value(np.uint64)), AttributeData("float32", "DevFloat", 1.234, float_array_value(np.float32)), AttributeData("float64", "DevDouble", 1.234, float_array_value(np.float64)), AttributeData( "my_state", "DevState", DevState.INIT, np.array(list(DevState.names.values()), dtype=DevState), ), ]
[docs] class OneOfEverythingTangoDevice(Device): attr_values = {} initial_values = {} def _add_attr(self, attr: attribute, initial_value): self.attr_values[attr.name] = initial_value self.initial_values[attr.name] = initial_value self.add_attribute(attr) self.set_change_event(attr.name, True, False)
[docs] def add_scalar_attr(self, name: str, dtype: str, initial_value: Any): attr = attribute( name=name, dtype=dtype, dformat=AttrDataFormat.SCALAR, access=AttrWriteType.READ_WRITE, fget=self.read, fset=self.write, enum_labels=[e.value for e in ExampleStrEnum], ) self._add_attr(attr, initial_value)
[docs] def add_array_attrs(self, name: str, dtype: str, initial_value: np.ndarray): spectrum_name = f"{name}_spectrum" if hasattr(initial_value, "shape"): max_dim_x = initial_value.shape[-1] else: max_dim_x = len(initial_value) spectrum_attr = attribute( name=spectrum_name, dtype=dtype, dformat=AttrDataFormat.SPECTRUM, access=AttrWriteType.READ_WRITE, fget=self.read, fset=self.write, max_dim_x=max_dim_x, enum_labels=[e.value for e in ExampleStrEnum], ) image_name = f"{name}_image" image_attr = attribute( name=image_name, dtype=dtype, dformat=AttrDataFormat.IMAGE, access=AttrWriteType.READ_WRITE, fget=self.read, fset=self.write, max_dim_x=max_dim_x, max_dim_y=2, enum_labels=[e.value for e in ExampleStrEnum], ) self._add_attr(spectrum_attr, initial_value) # have image just be 2 of the initial spectrum stacked # String images are not supported, do not add their attribute data if name in ["str", "strenum", "my_state"]: return self._add_attr(image_attr, np.vstack((initial_value, initial_value)))
[docs] def add_scalar_command(self, name: str, dtype: str): if _valid_command(AttrDataFormat.SCALAR, dtype): self.add_command( command( f=getattr(self, f"{name}_cmd"), dtype_in=dtype, dtype_out=dtype, dformat_in=AttrDataFormat.SCALAR, dformat_out=AttrDataFormat.SCALAR, ), )
[docs] def add_spectrum_command(self, name: str, dtype: str): if _valid_command(AttrDataFormat.SPECTRUM, dtype): self.add_command( command( f=getattr(self, f"{name}_spectrum_cmd"), dtype_in=dtype, dtype_out=dtype, dformat_in=AttrDataFormat.SPECTRUM, dformat_out=AttrDataFormat.SPECTRUM, ), )
[docs] def initialize_dynamic_attributes(self): for attr_data in _all_attribute_definitions: self.add_scalar_attr( attr_data.name, attr_data.tango_type, attr_data.initial_scalar ) self.add_array_attrs( attr_data.name, attr_data.tango_type, attr_data.initial_spectrum ) self.add_scalar_command(attr_data.name, attr_data.tango_type) self.add_spectrum_command(attr_data.name, attr_data.tango_type)
[docs] @command def reset_values(self): for attr_name in self.attr_values: self.attr_values[attr_name] = self.initial_values[attr_name]
[docs] def read(self, attr): value = self.attr_values[attr.get_name()] attr.set_value(value)
[docs] def write(self, attr): new_value = attr.get_write_value() self.attr_values[attr.get_name()] = new_value self.push_change_event(attr.get_name(), new_value)
echo_command_code = textwrap.dedent( """\ def {}(self, arg): return arg """ ) for attr_data in _all_attribute_definitions: if _valid_command(AttrDataFormat.SCALAR, attr_data.tango_type): exec(echo_command_code.format(f"{attr_data.name}_cmd")) if _valid_command(AttrDataFormat.SPECTRUM, attr_data.tango_type): exec(echo_command_code.format(f"{attr_data.name}_spectrum_cmd"))