Source code for ophyd_async.epics.core._util
from collections.abc import Mapping, Sequence
from typing import Any, TypeVar, get_args, get_origin
import numpy as np
from ophyd_async.core import (
DEFAULT_TIMEOUT,
SignalBackend,
SignalDatatypeT,
SignalRW,
StrictEnum,
SubsetEnum,
SupersetEnum,
get_dtype,
get_enum_cls,
wait_for_value,
)
T = TypeVar("T")
def get_pv_basename_and_field(pv: str) -> tuple[str, str | None]:
"""Split PV into record name and field."""
if "." in pv:
record, field = pv.split(".", maxsplit=1)
else:
record, field = pv, None
return (record, field)
def get_supported_values(
pv: str,
datatype: type[T],
pv_choices: Sequence[str],
) -> Mapping[str, T | str]:
enum_cls = get_enum_cls(datatype)
if not enum_cls:
raise TypeError(f"{datatype} is not an Enum")
choices = [v.value for v in enum_cls]
error_msg = f"{pv} has choices {pv_choices}, but {datatype} requested {choices} "
if issubclass(enum_cls, StrictEnum):
if set(choices) != set(pv_choices):
raise TypeError(error_msg + "to be strictly equal to them.")
elif issubclass(enum_cls, SubsetEnum):
if not set(choices).issubset(pv_choices):
raise TypeError(error_msg + "to be a subset of them.")
elif issubclass(enum_cls, SupersetEnum):
if not set(pv_choices).issubset(choices):
raise TypeError(error_msg + "to be a superset of them.")
else:
raise TypeError(f"{datatype} is not a StrictEnum, SubsetEnum, or SupersetEnum")
# Create a map from the string value to the enum instance
# For StrictEnum and SupersetEnum, all values here will be enum values
# For SubsetEnum, only the values in choices will be enum values, the rest will be
# strings
supported_values = {x: enum_cls(x) for x in pv_choices}
return supported_values
def format_datatype(datatype: Any) -> str:
if get_origin(datatype) is np.ndarray and get_args(datatype):
dtype = get_dtype(datatype)
return f"Array1D[np.{dtype.name}]"
elif get_origin(datatype) is Sequence:
return f"Sequence[{get_args(datatype)[0].__name__}]"
elif isinstance(datatype, type):
return datatype.__name__
else:
return str(datatype)
class EpicsSignalBackend(SignalBackend[SignalDatatypeT]):
def __init__(
self,
datatype: type[SignalDatatypeT] | None,
read_pv: str = "",
write_pv: str = "",
):
self.read_pv = read_pv
self.write_pv = write_pv
super().__init__(datatype)
[docs]
async def stop_busy_record(
signal: SignalRW[SignalDatatypeT],
value: SignalDatatypeT,
timeout: float = DEFAULT_TIMEOUT,
) -> None:
await signal.set(value, wait=False)
await wait_for_value(signal, value, timeout=timeout)