Source code for ophyd_async.testing._one_of_everything

from collections.abc import Sequence
from typing import Any

import numpy as np

from ophyd_async.core import (
    Array1D,
    Device,
    DTypeScalar_co,
    SignalRW,
    StandardReadable,
    StrictEnum,
    Table,
    soft_signal_r_and_setter,
    soft_signal_rw,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.core._device import DeviceVector


[docs] class ExampleEnum(StrictEnum): """Example of a strict Enum datatype.""" A = "Aaa" B = "Bbb" C = "Ccc"
[docs] class ExampleTable(Table): a_bool: Array1D[np.bool_] a_int: Array1D[np.int32] a_float: Array1D[np.float64] a_str: Sequence[str] a_enum: Sequence[ExampleEnum]
[docs] def int_array_value(dtype: type[DTypeScalar_co]): iinfo = np.iinfo(dtype) # type: ignore return np.array([iinfo.min, iinfo.max, 0, 1, 2, 3, 4], dtype=dtype)
def int_array_signal( dtype: type[DTypeScalar_co], name: str = "" ) -> SignalRW[Array1D[DTypeScalar_co]]: value = int_array_value(dtype) return soft_signal_rw(Array1D[dtype], value, name)
[docs] def float_array_value(dtype: type[DTypeScalar_co]): finfo = np.finfo(dtype) # type: ignore return np.array( [ finfo.min, finfo.max, finfo.smallest_normal, finfo.smallest_subnormal, 0, 1.234, 2.34e5, 3.45e-6, ], dtype=dtype, )
def float_array_signal( dtype: type[DTypeScalar_co], name: str = "" ) -> SignalRW[Array1D[DTypeScalar_co]]: value = float_array_value(dtype) return soft_signal_rw(Array1D[dtype], value, name)
[docs] class OneOfEverythingDevice(StandardReadable): """A device with one of every datatype allowed on signals.""" # make a detector to test assert_configuration def __init__(self, name=""): # add all signals to configuration with self.add_children_as_readables(Format.CONFIG_SIGNAL): self.a_int = soft_signal_rw(int, 1) self.a_float = soft_signal_rw(float, 1.234) self.a_str = soft_signal_rw(str, "test_string") self.a_bool = soft_signal_rw(bool, True) self.a_enum = soft_signal_rw(ExampleEnum, ExampleEnum.B) self.boola = soft_signal_rw( Array1D[np.bool_], np.array([False, False, True]) ) self.int8a = int_array_signal(np.int8) self.uint8a = int_array_signal(np.uint8) self.int16a = int_array_signal(np.int16) self.uint16a = int_array_signal(np.uint16) self.int32a = int_array_signal(np.int32) self.uint32a = int_array_signal(np.uint32) self.int64a = int_array_signal(np.int64) self.uint64a = int_array_signal(np.uint64) self.float32a = float_array_signal(np.float32) self.float64a = float_array_signal(np.float64) self.stra = soft_signal_rw( Sequence[str], ["one", "two", "three"], ) self.enuma = soft_signal_rw( Sequence[ExampleEnum], [ExampleEnum.A, ExampleEnum.C], ) self.table = soft_signal_rw( ExampleTable, ExampleTable( a_bool=np.array([False, False, True, True], np.bool_), a_int=np.array([1, 8, -9, 32], np.int32), a_float=np.array([1.8, 8.2, -6, 32.9887], np.float64), a_str=["Hello", "World", "Foo", "Bar"], a_enum=[ExampleEnum.A, ExampleEnum.B, ExampleEnum.A, ExampleEnum.C], ), ) self.ndarray = soft_signal_rw(np.ndarray, np.array(([1, 2, 3], [4, 5, 6]))) super().__init__(name)
[docs] async def get_signal_values(self): return await _get_signal_values(self)
async def _get_signal_values(child: Device) -> dict[SignalRW, Any]: if isinstance(child, SignalRW): return {child: await child.get_value()} ret = {} for _, c in child.children(): ret.update(await _get_signal_values(c)) return ret
[docs] class ParentOfEverythingDevice(Device): """Device containing subdevices with one of every datatype allowed on signals.""" def __init__(self, name=""): self.child = OneOfEverythingDevice() self.vector = DeviceVector( {1: OneOfEverythingDevice(), 3: OneOfEverythingDevice()} ) self.sig_rw = soft_signal_rw(str, "Top level SignalRW") self.sig_r, _ = soft_signal_r_and_setter(str, "Top level SignalR") self._sig_rw = soft_signal_rw(str, "Top level private SignalRW") super().__init__(name=name)
[docs] async def get_signal_values(self): return await _get_signal_values(self)