Source code for ophyd_async.core.signal_backend
from abc import abstractmethod
from typing import Generic, Optional, Type
from bluesky.protocols import Descriptor, Reading
from .utils import DEFAULT_TIMEOUT, ReadingValueCallback, T
[docs]
class SignalBackend(Generic[T]):
"""A read/write/monitor backend for a Signals"""
#: Datatype of the signal value
datatype: Optional[Type[T]] = None
#: Like ca://PV_PREFIX:SIGNAL
source: str = ""
[docs]
@abstractmethod
async def connect(self, timeout: float = DEFAULT_TIMEOUT):
"""Connect to underlying hardware"""
[docs]
@abstractmethod
async def put(self, value: Optional[T], wait=True, timeout=None):
"""Put a value to the PV, if wait then wait for completion for up to timeout"""
[docs]
@abstractmethod
async def get_descriptor(self) -> Descriptor:
"""Metadata like source, dtype, shape, precision, units"""
[docs]
@abstractmethod
async def get_reading(self) -> Reading:
"""The current value, timestamp and severity"""
[docs]
@abstractmethod
async def get_value(self) -> T:
"""The current value"""
[docs]
@abstractmethod
async def get_setpoint(self) -> T:
"""The point that a signal was requested to move to."""
[docs]
@abstractmethod
def set_callback(self, callback: Optional[ReadingValueCallback[T]]) -> None:
"""Observe changes to the current value, timestamp and severity"""