Source code for ophyd_async.core.soft_signal_backend

from __future__ import annotations

import inspect
import time
from collections import abc
from enum import Enum
from typing import (
    Dict,
    Generic,
    Optional,
    Tuple,
    Type,
    TypedDict,
    Union,
    cast,
    get_origin,
)

import numpy as np
from bluesky.protocols import DataKey, Dtype, Reading

from .signal_backend import RuntimeSubsetEnum, SignalBackend
from .utils import DEFAULT_TIMEOUT, ReadingValueCallback, T, get_dtype

primitive_dtypes: Dict[type, Dtype] = {
    str: "string",
    int: "integer",
    float: "number",
    bool: "boolean",
}


class SignalMetadata(TypedDict):
    units: str | None = None
    precision: int | None = None


class SoftConverter(Generic[T]):
    def value(self, value: T) -> T:
        return value

    def write_value(self, value: T) -> T:
        return value

    def reading(self, value: T, timestamp: float, severity: int) -> Reading:
        return Reading(
            value=value,
            timestamp=timestamp,
            alarm_severity=-1 if severity > 2 else severity,
        )

    def get_datakey(self, source: str, value, **metadata) -> DataKey:
        dk = {"source": source, "shape": [], **metadata}
        dtype = type(value)
        if np.issubdtype(dtype, np.integer):
            dtype = int
        elif np.issubdtype(dtype, np.floating):
            dtype = float
        assert (
            dtype in primitive_dtypes
        ), f"invalid converter for value of type {type(value)}"
        dk["dtype"] = primitive_dtypes[dtype]
        return dk

    def make_initial_value(self, datatype: Optional[Type[T]]) -> T:
        if datatype is None:
            return cast(T, None)

        return datatype()


class SoftArrayConverter(SoftConverter):
    def get_datakey(self, source: str, value, **metadata) -> DataKey:
        return {"source": source, "dtype": "array", "shape": [len(value)], **metadata}

    def make_initial_value(self, datatype: Optional[Type[T]]) -> T:
        if datatype is None:
            return cast(T, None)

        if get_origin(datatype) == abc.Sequence:
            return cast(T, [])

        return cast(T, datatype(shape=0))  # type: ignore


class SoftEnumConverter(SoftConverter):
    choices: Tuple[str, ...]

    def __init__(self, datatype: Union[RuntimeSubsetEnum, Enum]):
        if issubclass(datatype, Enum):
            self.choices = tuple(v.value for v in datatype)
        else:
            self.choices = datatype.choices

    def write_value(self, value: Union[Enum, str]) -> str:
        if isinstance(value, Enum):
            return value.value
        else:  # Runtime enum
            return value

    def get_datakey(self, source: str, value, **metadata) -> DataKey:
        return {
            "source": source,
            "dtype": "string",
            "shape": [],
            "choices": self.choices,
            **metadata,
        }

    def make_initial_value(self, datatype: Optional[Type[T]]) -> T:
        if datatype is None:
            return cast(T, None)

        if issubclass(datatype, Enum):
            return cast(T, list(datatype.__members__.values())[0])  # type: ignore
        return cast(T, self.choices[0])


def make_converter(datatype):
    is_array = get_dtype(datatype) is not None
    is_sequence = get_origin(datatype) == abc.Sequence
    is_enum = inspect.isclass(datatype) and (
        issubclass(datatype, Enum) or issubclass(datatype, RuntimeSubsetEnum)
    )

    if is_array or is_sequence:
        return SoftArrayConverter()
    if is_enum:
        return SoftEnumConverter(datatype)

    return SoftConverter()


[docs] class SoftSignalBackend(SignalBackend[T]): """An backend to a soft Signal, for test signals see ``MockSignalBackend``.""" _value: T _initial_value: Optional[T] _timestamp: float _severity: int def __init__( self, datatype: Optional[Type[T]], initial_value: Optional[T] = None, metadata: SignalMetadata = None, ) -> None: self.datatype = datatype self._initial_value = initial_value self._metadata = metadata or {} self.converter: SoftConverter = make_converter(datatype) if self._initial_value is None: self._initial_value = self.converter.make_initial_value(self.datatype) else: self._initial_value = self.converter.write_value(self._initial_value) self.callback: Optional[ReadingValueCallback[T]] = None self._severity = 0 self.set_value(self._initial_value)
[docs] def source(self, name: str) -> str: return f"soft://{name}"
[docs] async def connect(self, timeout: float = DEFAULT_TIMEOUT) -> None: """Connection isn't required for soft signals.""" pass
[docs] async def put(self, value: Optional[T], wait=True, timeout=None): write_value = ( self.converter.write_value(value) if value is not None else self._initial_value ) self.set_value(write_value)
[docs] def set_value(self, value: T): """Method to bypass asynchronous logic.""" self._value = value self._timestamp = time.monotonic() reading: Reading = self.converter.reading( self._value, self._timestamp, self._severity ) if self.callback: self.callback(reading, self._value)
[docs] async def get_datakey(self, source: str) -> DataKey: return self.converter.get_datakey(source, self._value, **self._metadata)
[docs] async def get_reading(self) -> Reading: return self.converter.reading(self._value, self._timestamp, self._severity)
[docs] async def get_value(self) -> T: return self.converter.value(self._value)
[docs] async def get_setpoint(self) -> T: """For a soft signal, the setpoint and readback values are the same.""" return await self.get_value()
[docs] def set_callback(self, callback: Optional[ReadingValueCallback[T]]) -> None: if callback: assert not self.callback, "Cannot set a callback when one is already set" reading: Reading = self.converter.reading( self._value, self._timestamp, self._severity ) callback(reading, self._value) self.callback = callback