Source code for ophyd_async.epics.signal._p4p

import asyncio
import atexit
import inspect
import logging
import time
from abc import ABCMeta
from dataclasses import dataclass
from enum import Enum
from math import isnan, nan
from typing import Any, Dict, List, Optional, Sequence, Type, Union, get_origin

import numpy as np
from bluesky.protocols import DataKey, Dtype, Reading
from p4p import Value
from p4p.client.asyncio import Context, Subscription
from pydantic import BaseModel

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    NotConnected,
    ReadingValueCallback,
    RuntimeSubsetEnum,
    SignalBackend,
    T,
    get_dtype,
    get_unique,
    wait_for_connection,
)

from ._common import LimitPair, Limits, common_meta, get_supported_values

# https://mdavidsaver.github.io/p4p/values.html
specifier_to_dtype: Dict[str, Dtype] = {
    "?": "integer",  # bool
    "b": "integer",  # int8
    "B": "integer",  # uint8
    "h": "integer",  # int16
    "H": "integer",  # uint16
    "i": "integer",  # int32
    "I": "integer",  # uint32
    "l": "integer",  # int64
    "L": "integer",  # uint64
    "f": "number",  # float32
    "d": "number",  # float64
    "s": "string",
}

specifier_to_np_dtype: Dict[str, str] = {
    "?": "<i2",  # bool
    "b": "|i1",  # int8
    "B": "|u1",  # uint8
    "h": "<i2",  # int16
    "H": "<u2",  # uint16
    "i": "<i4",  # int32
    "I": "<u4",  # uint32
    "l": "<i8",  # int64
    "L": "<u8",  # uint64
    "f": "<f4",  # float32
    "d": "<f8",  # float64
    "s": "|S40",
}


def _data_key_from_value(
    source: str,
    value: Value,
    *,
    shape: Optional[list[int]] = None,
    choices: Optional[list[str]] = None,
    dtype: Optional[Dtype] = None,
) -> DataKey:
    """
    Args:
        value (Value): Description of the the return type of a DB record
        shape: Optional override shape when len(shape) > 1
        choices: Optional list of enum choices to pass as metadata in the datakey
        dtype: Optional override dtype when AugmentedValue is ambiguous, e.g. booleans

    Returns:
        DataKey: A rich DataKey describing the DB record
    """
    shape = shape or []
    type_code = value.type().aspy("value")

    dtype = dtype or specifier_to_dtype[type_code]

    try:
        if isinstance(type_code, tuple):
            dtype_numpy = ""
            if type_code[1] == "enum_t":
                if dtype == "boolean":
                    dtype_numpy = "<i2"
                else:
                    for item in type_code[2]:
                        if item[0] == "choices":
                            dtype_numpy = specifier_to_np_dtype[item[1][1]]
        elif not type_code.startswith("a"):
            dtype_numpy = specifier_to_np_dtype[type_code]
        else:
            # Array type, use typecode of internal element
            dtype_numpy = specifier_to_np_dtype[type_code[1]]
    except KeyError:
        # Case where we can't determine dtype string from value
        dtype_numpy = ""

    display_data = getattr(value, "display", None)

    d = DataKey(
        source=source,
        dtype=dtype,
        dtype_numpy=dtype_numpy,
        shape=shape,
    )
    if display_data is not None:
        for key in common_meta:
            attr = getattr(display_data, key, nan)
            if isinstance(attr, str) or not isnan(attr):
                d[key] = attr

    if choices is not None:
        d["choices"] = choices

    if limits := _limits_from_value(value):
        d["limits"] = limits

    return d


def _limits_from_value(value: Value) -> Limits:
    def get_limits(
        substucture_name: str, low_name: str = "limitLow", high_name: str = "limitHigh"
    ) -> LimitPair:
        substructure = getattr(value, substucture_name, None)
        low = getattr(substructure, low_name, nan)
        high = getattr(substructure, high_name, nan)
        return LimitPair(
            low=None if isnan(low) else low, high=None if isnan(high) else high
        )

    return Limits(
        alarm=get_limits("valueAlarm", "lowAlarmLimit", "highAlarmLimit"),
        control=get_limits("control"),
        display=get_limits("display"),
        warning=get_limits("valueAlarm", "lowWarningLimit", "highWarningLimit"),
    )


class PvaConverter:
    def write_value(self, value):
        return value

    def value(self, value):
        return value["value"]

    def reading(self, value):
        ts = value["timeStamp"]
        sv = value["alarm"]["severity"]
        return {
            "value": self.value(value),
            "timestamp": ts["secondsPastEpoch"] + ts["nanoseconds"] * 1e-9,
            "alarm_severity": -1 if sv > 2 else sv,
        }

    def get_datakey(self, source: str, value) -> DataKey:
        return _data_key_from_value(source, value)

    def metadata_fields(self) -> List[str]:
        """
        Fields to request from PVA for metadata.
        """
        return ["alarm", "timeStamp"]

    def value_fields(self) -> List[str]:
        """
        Fields to request from PVA for the value.
        """
        return ["value"]


class PvaArrayConverter(PvaConverter):
    def get_datakey(self, source: str, value) -> DataKey:
        return _data_key_from_value(
            source, value, dtype="array", shape=[len(value["value"])]
        )


class PvaNDArrayConverter(PvaConverter):
    def metadata_fields(self) -> List[str]:
        return super().metadata_fields() + ["dimension"]

    def _get_dimensions(self, value) -> List[int]:
        dimensions: List[Value] = value["dimension"]
        dims = [dim.size for dim in dimensions]
        # Note: dimensions in NTNDArray are in fortran-like order
        # with first index changing fastest.
        #
        # Therefore we need to reverse the order of the dimensions
        # here to get back to a more usual C-like order with the
        # last index changing fastest.
        return dims[::-1]

    def value(self, value):
        dims = self._get_dimensions(value)
        return value["value"].reshape(dims)

    def get_datakey(self, source: str, value) -> DataKey:
        dims = self._get_dimensions(value)
        return _data_key_from_value(source, value, dtype="array", shape=dims)

    def write_value(self, value):
        # No clear use-case for writing directly to an NDArray, and some
        # complexities around flattening to 1-D - e.g. dimension-order.
        # Don't support this for now.
        raise TypeError("Writing to NDArray not supported")


@dataclass
class PvaEnumConverter(PvaConverter):
    """To prevent issues when a signal is restarted and returns with different enum
    values or orders, we put treat an Enum signal as a string, and cache the
    choices on this class.
    """

    def __init__(self, choices: dict[str, str]):
        self.choices = tuple(choices.values())

    def write_value(self, value: Union[Enum, str]):
        if isinstance(value, Enum):
            return value.value
        else:
            return value

    def value(self, value):
        return self.choices[value["value"]["index"]]

    def get_datakey(self, source: str, value) -> DataKey:
        return _data_key_from_value(
            source, value, choices=list(self.choices), dtype="string"
        )


class PvaEmumBoolConverter(PvaConverter):
    def value(self, value):
        return bool(value["value"]["index"])

    def get_datakey(self, source: str, value) -> DataKey:
        return _data_key_from_value(source, value, dtype="boolean")


class PvaTableConverter(PvaConverter):
    def value(self, value):
        return value["value"].todict()

    def get_datakey(self, source: str, value) -> DataKey:
        # This is wrong, but defer until we know how to actually describe a table
        return _data_key_from_value(source, value, dtype="object")


class PvaPydanticModelConverter(PvaConverter):
    def __init__(self, datatype: BaseModel):
        self.datatype = datatype

    def value(self, value: Value):
        return self.datatype(**value.todict())

    def write_value(self, value: Union[BaseModel, Dict[str, Any]]):
        if isinstance(value, self.datatype):
            return value.model_dump(mode="python")
        return value


class PvaDictConverter(PvaConverter):
    def reading(self, value):
        ts = time.time()
        value = value.todict()
        # Alarm severity is vacuously 0 for a table
        return {"value": value, "timestamp": ts, "alarm_severity": 0}

    def value(self, value: Value):
        return value.todict()

    def get_datakey(self, source: str, value) -> DataKey:
        raise NotImplementedError("Describing Dict signals not currently supported")

    def metadata_fields(self) -> List[str]:
        """
        Fields to request from PVA for metadata.
        """
        return []

    def value_fields(self) -> List[str]:
        """
        Fields to request from PVA for the value.
        """
        return []


class DisconnectedPvaConverter(PvaConverter):
    def __getattribute__(self, __name: str) -> Any:
        raise NotImplementedError("No PV has been set as connect() has not been called")


def make_converter(datatype: Optional[Type], values: Dict[str, Any]) -> PvaConverter:
    pv = list(values)[0]
    typeid = get_unique({k: v.getID() for k, v in values.items()}, "typeids")
    typ = get_unique(
        {k: type(v.get("value")) for k, v in values.items()}, "value types"
    )
    if "NTScalarArray" in typeid and typ is list:
        # Waveform of strings, check we wanted this
        if datatype and datatype != Sequence[str]:
            raise TypeError(f"{pv} has type [str] not {datatype.__name__}")
        return PvaArrayConverter()
    elif "NTScalarArray" in typeid or "NTNDArray" in typeid:
        pv_dtype = get_unique(
            {k: v["value"].dtype for k, v in values.items()}, "dtypes"
        )
        # This is an array
        if datatype:
            # Check we wanted an array of this type
            dtype = get_dtype(datatype)
            if not dtype:
                raise TypeError(f"{pv} has type [{pv_dtype}] not {datatype.__name__}")
            if dtype != pv_dtype:
                raise TypeError(f"{pv} has type [{pv_dtype}] not [{dtype}]")
        if "NTNDArray" in typeid:
            return PvaNDArrayConverter()
        else:
            return PvaArrayConverter()
    elif "NTEnum" in typeid and datatype is bool:
        # Wanted a bool, but database represents as an enum
        pv_choices_len = get_unique(
            {k: len(v["value"]["choices"]) for k, v in values.items()},
            "number of choices",
        )
        if pv_choices_len != 2:
            raise TypeError(f"{pv} has {pv_choices_len} choices, can't map to bool")
        return PvaEmumBoolConverter()
    elif "NTEnum" in typeid:
        # This is an Enum
        pv_choices = get_unique(
            {k: tuple(v["value"]["choices"]) for k, v in values.items()}, "choices"
        )
        return PvaEnumConverter(get_supported_values(pv, datatype, pv_choices))
    elif "NTScalar" in typeid:
        if (
            typ is str
            and inspect.isclass(datatype)
            and issubclass(datatype, RuntimeSubsetEnum)
        ):
            return PvaEnumConverter(
                get_supported_values(pv, datatype, datatype.choices)
            )
        elif datatype and not issubclass(typ, datatype):
            # Allow int signals to represent float records when prec is 0
            is_prec_zero_float = typ is float and (
                get_unique(
                    {k: v["display"]["precision"] for k, v in values.items()},
                    "precision",
                )
                == 0
            )
            if not (datatype is int and is_prec_zero_float):
                raise TypeError(f"{pv} has type {typ.__name__} not {datatype.__name__}")
        return PvaConverter()
    elif "NTTable" in typeid:
        if (
            datatype
            and inspect.isclass(datatype)
            and
            # Necessary to avoid weirdness in ABCMeta.__subclasscheck__
            isinstance(datatype, ABCMeta)
            and issubclass(datatype, BaseModel)
        ):
            return PvaPydanticModelConverter(datatype)
        return PvaTableConverter()
    elif "structure" in typeid:
        return PvaDictConverter()
    else:
        raise TypeError(f"{pv}: Unsupported typeid {typeid}")


[docs] class PvaSignalBackend(SignalBackend[T]): _ctxt: Optional[Context] = None _ALLOWED_DATATYPES = ( bool, int, float, str, Sequence, np.ndarray, Enum, RuntimeSubsetEnum, BaseModel, dict, )
[docs] @classmethod def datatype_allowed(cls, datatype: Optional[Type]) -> bool: stripped_origin = get_origin(datatype) or datatype if datatype is None: return True return inspect.isclass(stripped_origin) and issubclass( stripped_origin, cls._ALLOWED_DATATYPES )
def __init__(self, datatype: Optional[Type[T]], read_pv: str, write_pv: str): self.datatype = datatype if not PvaSignalBackend.datatype_allowed(self.datatype): raise TypeError(f"Given datatype {self.datatype} unsupported in PVA.") self.read_pv = read_pv self.write_pv = write_pv self.initial_values: Dict[str, Any] = {} self.converter: PvaConverter = DisconnectedPvaConverter() self.subscription: Optional[Subscription] = None
[docs] def source(self, name: str): return f"pva://{self.read_pv}"
@property def ctxt(self) -> Context: if PvaSignalBackend._ctxt is None: PvaSignalBackend._ctxt = Context("pva", nt=False) @atexit.register def _del_ctxt(): # If we don't do this we get messages like this on close: # Error in sys.excepthook: # Original exception was: PvaSignalBackend._ctxt = None return PvaSignalBackend._ctxt async def _store_initial_value(self, pv, timeout: float = DEFAULT_TIMEOUT): try: self.initial_values[pv] = await asyncio.wait_for( self.ctxt.get(pv), timeout=timeout ) except asyncio.TimeoutError as exc: logging.debug(f"signal pva://{pv} timed out", exc_info=True) raise NotConnected(f"pva://{pv}") from exc
[docs] async def connect(self, timeout: float = DEFAULT_TIMEOUT): if self.read_pv != self.write_pv: # Different, need to connect both await wait_for_connection( read_pv=self._store_initial_value(self.read_pv, timeout=timeout), write_pv=self._store_initial_value(self.write_pv, timeout=timeout), ) else: # The same, so only need to connect one await self._store_initial_value(self.read_pv, timeout=timeout) self.converter = make_converter(self.datatype, self.initial_values)
[docs] async def put(self, value: Optional[T], wait=True, timeout=None): if value is None: write_value = self.initial_values[self.write_pv] else: write_value = self.converter.write_value(value) coro = self.ctxt.put(self.write_pv, {"value": write_value}, wait=wait) try: await asyncio.wait_for(coro, timeout) except asyncio.TimeoutError as exc: logging.debug( f"signal pva://{self.write_pv} timed out \ put value: {write_value}", exc_info=True, ) raise NotConnected(f"pva://{self.write_pv}") from exc
[docs] async def get_datakey(self, source: str) -> DataKey: value = await self.ctxt.get(self.read_pv) return self.converter.get_datakey(source, value)
def _pva_request_string(self, fields: List[str]) -> str: """ Converts a list of requested fields into a PVA request string which can be passed to p4p. """ return f"field({','.join(fields)})"
[docs] async def get_reading(self) -> Reading: request: str = self._pva_request_string( self.converter.value_fields() + self.converter.metadata_fields() ) value = await self.ctxt.get(self.read_pv, request=request) return self.converter.reading(value)
[docs] async def get_value(self) -> T: request: str = self._pva_request_string(self.converter.value_fields()) value = await self.ctxt.get(self.read_pv, request=request) return self.converter.value(value)
[docs] async def get_setpoint(self) -> T: value = await self.ctxt.get(self.write_pv, "field(value)") return self.converter.value(value)
[docs] def set_callback(self, callback: Optional[ReadingValueCallback[T]]) -> None: if callback: assert ( not self.subscription ), "Cannot set a callback when one is already set" async def async_callback(v): callback(self.converter.reading(v), self.converter.value(v)) request: str = self._pva_request_string( self.converter.value_fields() + self.converter.metadata_fields() ) self.subscription = self.ctxt.monitor( self.read_pv, async_callback, request=request ) else: if self.subscription: self.subscription.close() self.subscription = None