Source code for ophyd_async.core.signal_backend

from abc import abstractmethod
from typing import Generic, Optional, Type

from bluesky.protocols import Descriptor, Reading

from .utils import DEFAULT_TIMEOUT, ReadingValueCallback, T


[docs] class SignalBackend(Generic[T]): """A read/write/monitor backend for a Signals""" #: Datatype of the signal value datatype: Optional[Type[T]] = None #: Like ca://PV_PREFIX:SIGNAL source: str = ""
[docs] @abstractmethod async def connect(self, timeout: float = DEFAULT_TIMEOUT): """Connect to underlying hardware"""
[docs] @abstractmethod async def put(self, value: Optional[T], wait=True, timeout=None): """Put a value to the PV, if wait then wait for completion for up to timeout"""
[docs] @abstractmethod async def get_descriptor(self) -> Descriptor: """Metadata like source, dtype, shape, precision, units"""
[docs] @abstractmethod async def get_reading(self) -> Reading: """The current value, timestamp and severity"""
[docs] @abstractmethod async def get_value(self) -> T: """The current value"""
[docs] @abstractmethod async def get_setpoint(self) -> T: """The point that a signal was requested to move to."""
[docs] @abstractmethod def set_callback(self, callback: Optional[ReadingValueCallback[T]]) -> None: """Observe changes to the current value, timestamp and severity"""