Source code for ophyd_async.tango.core._converters

from collections.abc import Sequence
from typing import Any, Generic

import numpy as np
from numpy.typing import NDArray
from tango import DevState

from ophyd_async.core import SignalDatatypeT

from ._utils import DevStateEnum, TangoDoubleStringTable, TangoLongStringTable


[docs] class TangoConverter(Generic[SignalDatatypeT]):
[docs] def write_value(self, value: Any) -> Any: return value
[docs] def value(self, value: Any) -> Any: return value
[docs] class TangoEnumConverter(TangoConverter): def __init__(self, labels: list[str]): self._labels = labels
[docs] def write_value(self, value: str): if not isinstance(value, str): raise TypeError("TangoEnumConverter expects str value") return self._labels.index(value)
[docs] def value(self, value: int): return self._labels[value]
[docs] class TangoEnumArrayConverter(TangoConverter): def __init__(self, labels: list[str]): self._labels = labels
[docs] def write_value(self, value: NDArray[np.str_]) -> NDArray[np.integer]: vfunc = np.vectorize(self._labels.index) new_array = vfunc(value) return new_array
[docs] def value(self, value: NDArray[np.integer]) -> NDArray[np.str_]: vfunc = np.vectorize(self._labels.__getitem__) new_array = vfunc(value) return new_array
[docs] class TangoDevStateConverter(TangoConverter): _labels = [e.value for e in DevStateEnum]
[docs] def write_value(self, value: str) -> DevState: idx = self._labels.index(value) return DevState(idx)
[docs] def value(self, value: DevState) -> str: idx = int(value) return self._labels[idx]
[docs] class TangoDevStateArrayConverter(TangoConverter): _labels = [e.value for e in DevStateEnum] def _write_convert(self, value): return DevState(self._labels.index(value)) def _convert(self, value): return self._labels[int(value)]
[docs] def write_value(self, value: NDArray[np.str_]) -> NDArray[np.int_]: vfunc = np.vectorize(self._write_convert, otypes=[DevState]) new_array = vfunc(value) return new_array
[docs] def value(self, value: NDArray[np.int_]) -> NDArray[np.str_]: vfunc = np.vectorize(self._convert) new_array = vfunc(value) return new_array
[docs] class TangoLongStringTableConverter(TangoConverter[TangoLongStringTable]):
[docs] def write_value( self, value: TangoLongStringTable ) -> tuple[NDArray[np.int32], Sequence[str]]: """Convert from TangoLongStringTable to DevVarLongStringArray format.""" return (value.long, value.string)
[docs] def value( self, value: tuple[NDArray[np.int32], Sequence[str]] ) -> TangoLongStringTable: """Convert from DevVarLongStringArray format to TangoLongStringTable.""" return TangoLongStringTable(long=value[0], string=value[1])
[docs] class TangoDoubleStringTableConverter(TangoConverter[TangoDoubleStringTable]):
[docs] def write_value( self, value: TangoDoubleStringTable ) -> tuple[NDArray[np.float64], Sequence[str]]: """Convert from TangoDoubleStringTable to DevVarDoubleStringArray format.""" return (value.double, value.string)
[docs] def value( self, value: tuple[NDArray[np.float64], Sequence[str]] ) -> TangoDoubleStringTable: """Convert from DevVarDoubleStringArray format to TangoDoubleStringTable.""" return TangoDoubleStringTable(double=value[0], string=value[1])