Source code for ophyd_async.core.signal

from __future__ import annotations

import asyncio
import functools
from typing import AsyncGenerator, Callable, Dict, Generic, Optional, Union

from bluesky.protocols import (
    Descriptor,
    Locatable,
    Location,
    Movable,
    Readable,
    Reading,
    Stageable,
    Subscribable,
)

from .async_status import AsyncStatus
from .device import Device
from .signal_backend import SignalBackend
from .sim_signal_backend import SimSignalBackend
from .utils import DEFAULT_TIMEOUT, Callback, ReadingValueCallback, T

_sim_backends: Dict[Signal, SimSignalBackend] = {}


def _add_timeout(func):
    @functools.wraps(func)
    async def wrapper(self: Signal, *args, **kwargs):
        return await asyncio.wait_for(func(self, *args, **kwargs), self._timeout)

    return wrapper


def _fail(self, other, *args, **kwargs):
    if isinstance(other, Signal):
        raise TypeError(
            "Can't compare two Signals, did you mean await signal.get_value() instead?"
        )
    else:
        return NotImplemented


[docs] class Signal(Device, Generic[T]): """A Device with the concept of a value, with R, RW, W and X flavours""" def __init__( self, backend: SignalBackend[T], timeout: Optional[float] = DEFAULT_TIMEOUT ) -> None: self._name = "" self._timeout = timeout self._init_backend = self._backend = backend @property def name(self) -> str: return self._name def set_name(self, name: str = ""): self._name = name async def connect(self, sim=False, timeout=DEFAULT_TIMEOUT): if sim: self._backend = SimSignalBackend( datatype=self._init_backend.datatype, source=self._init_backend.source ) _sim_backends[self] = self._backend else: self._backend = self._init_backend _sim_backends.pop(self, None) await self._backend.connect(timeout=timeout) @property def source(self) -> str: """Like ca://PV_PREFIX:SIGNAL, or "" if not set""" return self._backend.source __lt__ = __le__ = __eq__ = __ge__ = __gt__ = __ne__ = _fail def __hash__(self): # Restore the default implementation so we can use in a set or dict return hash(id(self))
class _SignalCache(Generic[T]): def __init__(self, backend: SignalBackend[T], signal: Signal): self._signal = signal self._staged = False self._listeners: Dict[Callback, bool] = {} self._valid = asyncio.Event() self._reading: Optional[Reading] = None self._value: Optional[T] = None self.backend = backend backend.set_callback(self._callback) def close(self): self.backend.set_callback(None) async def get_reading(self) -> Reading: await self._valid.wait() assert self._reading is not None, "Monitor not working" return self._reading async def get_value(self) -> T: await self._valid.wait() assert self._value is not None, "Monitor not working" return self._value def _callback(self, reading: Reading, value: T): self._reading = reading self._value = value self._valid.set() for function, want_value in self._listeners.items(): self._notify(function, want_value) def _notify(self, function: Callback, want_value: bool): if want_value: function(self._value) else: function({self._signal.name: self._reading}) def subscribe(self, function: Callback, want_value: bool) -> None: self._listeners[function] = want_value if self._valid.is_set(): self._notify(function, want_value) def unsubscribe(self, function: Callback) -> bool: self._listeners.pop(function) return self._staged or bool(self._listeners) def set_staged(self, staged: bool): self._staged = staged return self._staged or bool(self._listeners)
[docs] class SignalR(Signal[T], Readable, Stageable, Subscribable): """Signal that can be read from and monitored""" _cache: Optional[_SignalCache] = None def _backend_or_cache( self, cached: Optional[bool] ) -> Union[_SignalCache, SignalBackend]: # If cached is None then calculate it based on whether we already have a cache if cached is None: cached = self._cache is not None if cached: assert self._cache, f"{self.source} not being monitored" return self._cache else: return self._backend def _get_cache(self) -> _SignalCache: if not self._cache: self._cache = _SignalCache(self._backend, self) return self._cache def _del_cache(self, needed: bool): if self._cache and not needed: self._cache.close() self._cache = None
[docs] @_add_timeout async def read(self, cached: Optional[bool] = None) -> Dict[str, Reading]: """Return a single item dict with the reading in it""" return {self.name: await self._backend_or_cache(cached).get_reading()}
[docs] @_add_timeout async def describe(self) -> Dict[str, Descriptor]: """Return a single item dict with the descriptor in it""" return {self.name: await self._backend.get_descriptor()}
[docs] @_add_timeout async def get_value(self, cached: Optional[bool] = None) -> T: """The current value""" return await self._backend_or_cache(cached).get_value()
[docs] def subscribe_value(self, function: Callback[T]): """Subscribe to updates in value of a device""" self._get_cache().subscribe(function, want_value=True)
[docs] def subscribe(self, function: Callback[Dict[str, Reading]]) -> None: """Subscribe to updates in the reading""" self._get_cache().subscribe(function, want_value=False)
[docs] def clear_sub(self, function: Callback) -> None: """Remove a subscription.""" self._del_cache(self._get_cache().unsubscribe(function))
[docs] @AsyncStatus.wrap async def stage(self) -> None: """Start caching this signal""" self._get_cache().set_staged(True)
[docs] @AsyncStatus.wrap async def unstage(self) -> None: """Stop caching this signal""" self._del_cache(self._get_cache().set_staged(False))
USE_DEFAULT_TIMEOUT = "USE_DEFAULT_TIMEOUT"
[docs] class SignalW(Signal[T], Movable): """Signal that can be set"""
[docs] def set(self, value: T, wait=True, timeout=USE_DEFAULT_TIMEOUT) -> AsyncStatus: """Set the value and return a status saying when it's done""" if timeout is USE_DEFAULT_TIMEOUT: timeout = self._timeout coro = self._backend.put(value, wait=wait, timeout=timeout) return AsyncStatus(coro)
[docs] class SignalRW(SignalR[T], SignalW[T], Locatable): """Signal that can be both read and set""" async def locate(self) -> Location: location: Location = { "setpoint": await self._backend.get_setpoint(), "readback": await self.get_value(), } return location
[docs] class SignalX(Signal): """Signal that puts the default value"""
[docs] def trigger(self, wait=True, timeout=USE_DEFAULT_TIMEOUT) -> AsyncStatus: """Trigger the action and return a status saying when it's done""" if timeout is USE_DEFAULT_TIMEOUT: timeout = self._timeout coro = self._backend.put(None, wait=wait, timeout=timeout) return AsyncStatus(coro)
[docs] def set_sim_value(signal: Signal[T], value: T): """Set the value of a signal that is in sim mode.""" _sim_backends[signal]._set_value(value)
[docs] def set_sim_put_proceeds(signal: Signal[T], proceeds: bool): """Allow or block a put with wait=True from proceeding""" event = _sim_backends[signal].put_proceeds if proceeds: event.set() else: event.clear()
[docs] def set_sim_callback(signal: Signal[T], callback: ReadingValueCallback[T]) -> None: """Monitor the value of a signal that is in sim mode""" return _sim_backends[signal].set_callback(callback)
[docs] async def observe_value(signal: SignalR[T]) -> AsyncGenerator[T, None]: """Subscribe to the value of a signal so it can be iterated from. Parameters ---------- signal: Call subscribe_value on this at the start, and clear_sub on it at the end Notes ----- Example usage:: async for value in observe_value(sig): do_something_with(value) """ q: asyncio.Queue[T] = asyncio.Queue() signal.subscribe_value(q.put_nowait) try: while True: yield await q.get() finally: signal.clear_sub(q.put_nowait)
class _ValueChecker(Generic[T]): def __init__(self, matcher: Callable[[T], bool], matcher_name: str): self._last_value: Optional[T] = None self._matcher = matcher self._matcher_name = matcher_name async def _wait_for_value(self, signal: SignalR[T]): async for value in observe_value(signal): self._last_value = value if self._matcher(value): return async def wait_for_value(self, signal: SignalR[T], timeout: Optional[float]): try: await asyncio.wait_for(self._wait_for_value(signal), timeout) except asyncio.TimeoutError as e: raise TimeoutError( f"{signal.name} didn't match {self._matcher_name} in {timeout}s, " f"last value {self._last_value!r}" ) from e
[docs] async def wait_for_value( signal: SignalR[T], match: Union[T, Callable[[T], bool]], timeout: Optional[float] ): """Wait for a signal to have a matching value. Parameters ---------- signal: Call subscribe_value on this at the start, and clear_sub on it at the end match: If a callable, it should return True if the value matches. If not callable then value will be checked for equality with match. timeout: How long to wait for the value to match Notes ----- Example usage:: wait_for_value(device.acquiring, 1, timeout=1) Or:: wait_for_value(device.num_captured, lambda v: v > 45, timeout=1) """ if callable(match): checker = _ValueChecker(match, match.__name__) else: checker = _ValueChecker(lambda v: v == match, repr(match)) await checker.wait_for_value(signal, timeout)
[docs] async def set_and_wait_for_value( signal: SignalRW[T], value: T, timeout: float = DEFAULT_TIMEOUT, status_timeout: Optional[float] = None, ) -> AsyncStatus: """Set a signal and monitor it until it has that value. Useful for busy record, or other Signals with pattern: - Set Signal with wait=True and stash the Status - Read the same Signal to check the operation has started - Return the Status so calling code can wait for operation to complete This function sets a signal to a specified value, optionally with or without a ca/pv put callback, and waits for the readback value of the signal to match the value it was set to. Parameters ---------- signal: The signal to set and monitor value: The value to set it to timeout: How long to wait for the signal to have the value status_timeout: How long the returned Status will wait for the set to complete Notes ----- Example usage:: set_and_wait_for_value(device.acquire, 1) """ status = signal.set(value, timeout=status_timeout) await wait_for_value(signal, value, timeout=timeout) return status