Source code for ophyd_async.epics.core._util

from collections.abc import Callable, Mapping, Sequence
from dataclasses import dataclass
from typing import Any, Generic, TypeVar, get_args, get_origin

import numpy as np

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    SignalBackend,
    SignalDatatypeT,
    SignalRW,
    StrictEnum,
    SubsetEnum,
    SupersetEnum,
    get_dtype,
    get_enum_cls,
    wait_for_value,
)

T = TypeVar("T")


[docs] @dataclass class EpicsOptions(Generic[SignalDatatypeT]): """Options for EPICS Signals.""" wait: bool | Callable[[SignalDatatypeT], bool] = True """Whether to wait for server-side completion of the operation: - `True`: Return when server-side operation has completed - `False`: Return when server-side operation has started - `callable`: Call with the value being put to decide whether to wait For example, use `EpicsOption(wait=non_zero)` for busy records like areaDetector acquire PVs that should not wait when being set to zero as it causes a deadlock. """
def get_pv_basename_and_field(pv: str) -> tuple[str, str | None]: """Split PV into record name and field.""" if "." in pv: record, field = pv.split(".", maxsplit=1) else: record, field = pv, None return (record, field) def get_supported_values( pv: str, datatype: type[T], pv_choices: Sequence[str], ) -> Mapping[str, T | str]: enum_cls = get_enum_cls(datatype) if not enum_cls: raise TypeError(f"{datatype} is not an Enum") choices = [v.value for v in enum_cls] error_msg = f"{pv} has choices {pv_choices}, but {datatype} requested {choices} " if issubclass(enum_cls, StrictEnum): if set(choices) != set(pv_choices): raise TypeError(error_msg + "to be strictly equal to them.") elif issubclass(enum_cls, SubsetEnum): if not set(choices).issubset(pv_choices): raise TypeError(error_msg + "to be a subset of them.") elif issubclass(enum_cls, SupersetEnum): if not set(pv_choices).issubset(choices): raise TypeError(error_msg + "to be a superset of them.") else: raise TypeError(f"{datatype} is not a StrictEnum, SubsetEnum, or SupersetEnum") # Create a map from the string value to the enum instance # For StrictEnum and SupersetEnum, all values here will be enum values # For SubsetEnum, only the values in choices will be enum values, the rest will be # strings supported_values = {x: enum_cls(x) for x in pv_choices} return supported_values def format_datatype(datatype: Any) -> str: if get_origin(datatype) is np.ndarray and get_args(datatype): dtype = get_dtype(datatype) return f"Array1D[np.{dtype.name}]" elif get_origin(datatype) is Sequence: return f"Sequence[{get_args(datatype)[0].__name__}]" elif isinstance(datatype, type): return datatype.__name__ else: return str(datatype) class EpicsSignalBackend(SignalBackend[SignalDatatypeT]): def __init__( self, datatype: type[SignalDatatypeT] | None, read_pv: str = "", write_pv: str = "", options: EpicsOptions | None = None, ): self.read_pv = read_pv self.write_pv = write_pv self.options = options or EpicsOptions() super().__init__(datatype)
[docs] async def stop_busy_record( signal: SignalRW[SignalDatatypeT], value: SignalDatatypeT, timeout: float = DEFAULT_TIMEOUT, ) -> None: await signal.set(value) await wait_for_value(signal, value, timeout=timeout)