Source code for ophyd_async.core._signal_backend

from abc import abstractmethod
from collections.abc import Sequence
from typing import Generic, TypedDict, TypeVar, get_origin

import numpy as np
from bluesky.protocols import Reading
from event_model import DataKey, Dtype, Limits

from ._table import Table
from ._utils import Callback, StrictEnum, T

DTypeScalar_co = TypeVar("DTypeScalar_co", covariant=True, bound=np.generic)
Array1D = np.ndarray[tuple[int], np.dtype[DTypeScalar_co]]
Primitive = bool | int | float | str
# NOTE: if you change this union then update the docs to match
SignalDatatype = (
    Primitive
    | Array1D[np.bool_]
    | Array1D[np.int8]
    | Array1D[np.uint8]
    | Array1D[np.int16]
    | Array1D[np.uint16]
    | Array1D[np.int32]
    | Array1D[np.uint32]
    | Array1D[np.int64]
    | Array1D[np.uint64]
    | Array1D[np.float32]
    | Array1D[np.float64]
    | np.ndarray
    | StrictEnum
    | Sequence[str]
    | Sequence[StrictEnum]
    | Table
)
# TODO: These typevars will not be needed when we drop python 3.11
# as you can do MyConverter[SignalType: SignalTypeUnion]:
# rather than MyConverter(Generic[SignalType])
PrimitiveT = TypeVar("PrimitiveT", bound=Primitive)
SignalDatatypeT = TypeVar("SignalDatatypeT", bound=SignalDatatype)
SignalDatatypeV = TypeVar("SignalDatatypeV", bound=SignalDatatype)
EnumT = TypeVar("EnumT", bound=StrictEnum)
TableT = TypeVar("TableT", bound=Table)


[docs] class SignalBackend(Generic[SignalDatatypeT]): """A read/write/monitor backend for a Signals""" def __init__(self, datatype: type[SignalDatatypeT] | None): self.datatype = datatype
[docs] @abstractmethod def source(self, name: str, read: bool) -> str: """Return source of signal. Signals may pass a name to the backend, which can be used or discarded. """
[docs] @abstractmethod async def connect(self, timeout: float): """Connect to underlying hardware"""
[docs] @abstractmethod async def put(self, value: SignalDatatypeT | None, wait: bool): """Put a value to the PV, if wait then wait for completion"""
[docs] @abstractmethod async def get_datakey(self, source: str) -> DataKey: """Metadata like source, dtype, shape, precision, units"""
[docs] @abstractmethod async def get_reading(self) -> Reading[SignalDatatypeT]: """The current value, timestamp and severity"""
[docs] @abstractmethod async def get_value(self) -> SignalDatatypeT: """The current value"""
[docs] @abstractmethod async def get_setpoint(self) -> SignalDatatypeT: """The point that a signal was requested to move to."""
[docs] @abstractmethod def set_callback(self, callback: Callback[T] | None) -> None: """Observe changes to the current value, timestamp and severity"""
_primitive_dtype: dict[type[Primitive], Dtype] = { bool: "boolean", int: "integer", float: "number", str: "string", } class SignalMetadata(TypedDict, total=False): limits: Limits choices: list[str] precision: int units: str def _datakey_dtype(datatype: type[SignalDatatype]) -> Dtype: if ( datatype is np.ndarray or get_origin(datatype) in (Sequence, np.ndarray) or issubclass(datatype, Table) ): return "array" elif issubclass(datatype, StrictEnum): return "string" elif issubclass(datatype, Primitive): return _primitive_dtype[datatype] else: raise TypeError(f"Can't make dtype for {datatype}") def _datakey_dtype_numpy( datatype: type[SignalDatatypeT], value: SignalDatatypeT ) -> np.dtype: if isinstance(value, np.ndarray): # The value already has a dtype, use that return value.dtype elif ( get_origin(datatype) is Sequence or datatype is str or issubclass(datatype, StrictEnum) ): # TODO: use np.dtypes.StringDType when we can use in structured arrays # https://github.com/numpy/numpy/issues/25693 return np.dtype("S40") elif isinstance(value, Table): return value.numpy_dtype() elif issubclass(datatype, Primitive): return np.dtype(datatype) else: raise TypeError(f"Can't make dtype_numpy for {datatype}") def _datakey_shape(value: SignalDatatype) -> list[int]: if type(value) in _primitive_dtype or isinstance(value, StrictEnum): return [] elif isinstance(value, np.ndarray): return list(value.shape) elif isinstance(value, Sequence | Table): return [len(value)] else: raise TypeError(f"Can't make shape for {value}") def make_datakey( datatype: type[SignalDatatypeT], value: SignalDatatypeT, source: str, metadata: SignalMetadata, ) -> DataKey: dtn = _datakey_dtype_numpy(datatype, value) return DataKey( dtype=_datakey_dtype(datatype), shape=_datakey_shape(value), # Ignore until https://github.com/bluesky/event-model/issues/308 dtype_numpy=dtn.descr if len(dtn.descr) > 1 else dtn.str, # type: ignore source=source, **metadata, )