Source code for ophyd_async.core._soft_signal_backend

from __future__ import annotations

import time
from abc import abstractmethod
from collections.abc import Sequence
from dataclasses import dataclass
from functools import lru_cache
from typing import Any, Generic, get_args, get_origin

import numpy as np
from bluesky.protocols import Reading
from event_model import DataKey

from ._signal_backend import (
    Array1D,
    EnumT,
    Primitive,
    PrimitiveT,
    SignalBackend,
    SignalDatatype,
    SignalDatatypeT,
    SignalMetadata,
    TableT,
    make_datakey,
)
from ._table import Table
from ._utils import Callback, get_dtype, get_enum_cls


class SoftConverter(Generic[SignalDatatypeT]):
    # This is Any -> SignalDatatypeT because we support coercing
    # value types to SignalDatatype to allow people to do things like
    # SignalRW[Enum].set("enum value")
    @abstractmethod
    def write_value(self, value: Any) -> SignalDatatypeT: ...


@dataclass
class PrimitiveSoftConverter(SoftConverter[PrimitiveT]):
    datatype: type[PrimitiveT]

    def write_value(self, value: Any) -> PrimitiveT:
        return self.datatype(value) if value else self.datatype()


class SequenceStrSoftConverter(SoftConverter[Sequence[str]]):
    def write_value(self, value: Any) -> Sequence[str]:
        return [str(v) for v in value] if value else []


@dataclass
class SequenceEnumSoftConverter(SoftConverter[Sequence[EnumT]]):
    datatype: type[EnumT]

    def write_value(self, value: Any) -> Sequence[EnumT]:
        return [self.datatype(v) for v in value] if value else []


@dataclass
class NDArraySoftConverter(SoftConverter[Array1D]):
    datatype: np.dtype | None = None

    def write_value(self, value: Any) -> Array1D:
        return np.array(() if value is None else value, dtype=self.datatype)


@dataclass
class EnumSoftConverter(SoftConverter[EnumT]):
    datatype: type[EnumT]

    def write_value(self, value: Any) -> EnumT:
        return (
            self.datatype(value)
            if value
            else list(self.datatype.__members__.values())[0]
        )


@dataclass
class TableSoftConverter(SoftConverter[TableT]):
    datatype: type[TableT]

    def write_value(self, value: Any) -> TableT:
        if isinstance(value, dict):
            return self.datatype(**value)
        elif isinstance(value, self.datatype):
            return value
        elif value is None:
            return self.datatype()
        else:
            raise TypeError(f"Cannot convert {value} to {self.datatype}")


@lru_cache
def make_converter(datatype: type[SignalDatatype]) -> SoftConverter:
    enum_cls = get_enum_cls(datatype)
    if datatype == Sequence[str]:
        return SequenceStrSoftConverter()
    elif get_origin(datatype) == Sequence and enum_cls:
        return SequenceEnumSoftConverter(enum_cls)
    elif datatype is np.ndarray:
        return NDArraySoftConverter()
    elif get_origin(datatype) == np.ndarray:
        if datatype not in get_args(SignalDatatype):
            raise TypeError(f"Expected Array1D[dtype], got {datatype}")
        return NDArraySoftConverter(get_dtype(datatype))
    elif enum_cls:
        return EnumSoftConverter(enum_cls)
    elif issubclass(datatype, Table):
        return TableSoftConverter(datatype)
    elif issubclass(datatype, Primitive):
        return PrimitiveSoftConverter(datatype)
    raise TypeError(f"Can't make converter for {datatype}")


[docs] class SoftSignalBackend(SignalBackend[SignalDatatypeT]): """An backend to a soft Signal, for test signals see ``MockSignalBackend``.""" def __init__( self, datatype: type[SignalDatatypeT] | None, initial_value: SignalDatatypeT | None = None, units: str | None = None, precision: int | None = None, ): # Create the right converter for the datatype self.converter = make_converter(datatype or float) # Add the extra static metadata to the dictionary self.metadata: SignalMetadata = {} if units is not None: self.metadata["units"] = units if precision is not None: self.metadata["precision"] = precision if enum_cls := get_enum_cls(datatype): self.metadata["choices"] = [v.value for v in enum_cls] # Create and set the initial value self.initial_value = self.converter.write_value(initial_value) self.reading: Reading[SignalDatatypeT] self.callback: Callback[Reading[SignalDatatypeT]] | None = None self.set_value(self.initial_value) super().__init__(datatype) def set_value(self, value: SignalDatatypeT): self.reading = Reading( value=self.converter.write_value(value), timestamp=time.monotonic(), alarm_severity=0, ) if self.callback: self.callback(self.reading) def source(self, name: str, read: bool) -> str: return f"soft://{name}" async def connect(self, timeout: float): pass async def put(self, value: SignalDatatypeT | None, wait: bool) -> None: write_value = self.initial_value if value is None else value self.set_value(write_value) async def get_datakey(self, source: str) -> DataKey: return make_datakey( self.datatype or float, self.reading["value"], source, self.metadata ) async def get_reading(self) -> Reading[SignalDatatypeT]: return self.reading async def get_value(self) -> SignalDatatypeT: return self.reading["value"] async def get_setpoint(self) -> SignalDatatypeT: # For a soft signal, the setpoint and readback values are the same. return self.reading["value"] def set_callback(self, callback: Callback[Reading[SignalDatatypeT]] | None) -> None: if callback: assert not self.callback, "Cannot set a callback when one is already set" callback(self.reading) self.callback = callback