from __future__ import annotations
import asyncio
import functools
import inspect
import time
from collections.abc import AsyncGenerator, Awaitable, Callable
from typing import Any, Generic, TypeVar, cast
from bluesky.protocols import (
Configurable,
Locatable,
Location,
Movable,
Reading,
Status,
Subscribable,
)
from event_model import DataKey
from ._device import Device, DeviceConnector
from ._mock_signal_backend import MockSignalBackend
from ._protocol import AsyncReadable, AsyncStageable
from ._signal_backend import SignalBackend, SignalDatatypeT, SignalDatatypeV
from ._soft_signal_backend import SoftSignalBackend
from ._status import AsyncStatus
from ._utils import (
CALCULATE_TIMEOUT,
DEFAULT_TIMEOUT,
CalculatableTimeout,
Callback,
LazyMock,
T,
)
async def _wait_for(coro: Awaitable[T], timeout: float | None, source: str) -> T:
try:
return await asyncio.wait_for(coro, timeout)
except asyncio.TimeoutError as e:
raise asyncio.TimeoutError(source) from e
def _add_timeout(func):
@functools.wraps(func)
async def wrapper(self: Signal, *args, **kwargs):
return await _wait_for(func(self, *args, **kwargs), self._timeout, self.source)
return wrapper
[docs]
class SignalConnector(DeviceConnector):
"""Used for connecting signals with a given backend."""
def __init__(self, backend: SignalBackend):
self.backend = self._init_backend = backend
[docs]
async def connect_mock(self, device: Device, mock: LazyMock):
self.backend = MockSignalBackend(self._init_backend, mock)
[docs]
async def connect_real(self, device: Device, timeout: float, force_reconnect: bool):
self.backend = self._init_backend
device.log.debug(f"Connecting to {self.backend.source(device.name, read=True)}")
await self.backend.connect(timeout)
class _ChildrenNotAllowed(dict[str, Device]):
def __setitem__(self, key: str, value: Device) -> None:
raise KeyError(
f"Cannot add Device or Signal child {key}={value} of Signal, "
"make a subclass of Device instead"
)
[docs]
class Signal(Device, Generic[SignalDatatypeT]):
"""A Device with the concept of a value, with R, RW, W and X flavours.
:param backend: The backend for providing Signal values.
:param timeout: The default timeout for operations on the Signal.
:param name: The name of the signal.
"""
_connector: SignalConnector
_child_devices = _ChildrenNotAllowed() # type: ignore
def __init__(
self,
backend: SignalBackend[SignalDatatypeT],
timeout: float | None = DEFAULT_TIMEOUT,
name: str = "",
) -> None:
super().__init__(name=name, connector=SignalConnector(backend))
self._timeout = timeout
@property
def source(self) -> str:
"""Returns the source of the signal.
E.g. "ca://PV_PREFIX:SIGNAL", or "" if not available until connection.
"""
return self._connector.backend.source(self.name, read=True)
SignalT = TypeVar("SignalT", bound=Signal)
class _SignalCache(Generic[SignalDatatypeT]):
def __init__(self, backend: SignalBackend[SignalDatatypeT], signal: Signal) -> None:
self._signal: Signal[Any] = signal
self._staged = False
self._listeners: dict[Callback, bool] = {}
self._valid = asyncio.Event()
self._reading: Reading[SignalDatatypeT] | None = None
self.backend: SignalBackend[SignalDatatypeT] = backend
signal.log.debug(f"Making subscription on source {signal.source}")
backend.set_callback(self._callback)
def close(self) -> None:
self.backend.set_callback(None)
self._signal.log.debug(f"Closing subscription on source {self._signal.source}")
def _ensure_reading(self) -> Reading[SignalDatatypeT]:
if not self._reading:
msg = "Monitor not working"
raise RuntimeError(msg)
return self._reading
async def get_reading(self) -> Reading[SignalDatatypeT]:
await self._valid.wait()
return self._ensure_reading()
async def get_value(self) -> SignalDatatypeT:
reading: Reading[SignalDatatypeT] = await self.get_reading()
return reading["value"]
def _callback(self, reading: Reading[SignalDatatypeT]) -> None:
self._signal.log.debug(
f"Updated subscription: reading of source {self._signal.source} changed "
f"from {self._reading} to {reading}"
)
self._reading = reading
self._valid.set()
for function, want_value in self._listeners.items():
self._notify(function, want_value)
def _notify(
self,
function: Callback[dict[str, Reading[SignalDatatypeT]] | SignalDatatypeT],
want_value: bool,
) -> None:
function(self._ensure_reading()["value"]) if want_value else function(
{self._signal.name: self._ensure_reading()}
)
def subscribe(self, function: Callback, want_value: bool) -> None:
self._listeners[function] = want_value
if self._valid.is_set():
self._notify(function, want_value)
def unsubscribe(self, function: Callback) -> bool:
self._listeners.pop(function)
return self._staged or bool(self._listeners)
def set_staged(self, staged: bool) -> bool:
self._staged = staged
return self._staged or bool(self._listeners)
[docs]
class SignalR(Signal[SignalDatatypeT], AsyncReadable, AsyncStageable, Subscribable):
"""Signal that can be read from and monitored."""
_cache: _SignalCache | None = None
def _backend_or_cache(
self, cached: bool | None = None
) -> _SignalCache | SignalBackend:
# If cached is None then calculate it based on whether we already have a cache
if cached is None:
cached = self._cache is not None
if cached:
if not self._cache:
msg = f"{self.source} not being monitored"
raise RuntimeError(msg)
# assert self._cache, f"{self.source} not being monitored"
return self._cache
else:
return self._connector.backend
def _get_cache(self) -> _SignalCache:
if not self._cache:
self._cache = _SignalCache(self._connector.backend, self)
return self._cache
def _del_cache(self, needed: bool):
if self._cache and not needed:
self._cache.close()
self._cache = None
[docs]
@_add_timeout
async def read(self, cached: bool | None = None) -> dict[str, Reading]:
"""Return a single item dict with the reading in it.
:param cached:
Whether to use the cached monitored value:
- If None, use the cache if it exists.
- If False, do an explicit get.
- If True, explicitly use the cache and raise an error if it doesn't exist.
"""
return {self.name: await self._backend_or_cache(cached).get_reading()}
[docs]
@_add_timeout
async def describe(self) -> dict[str, DataKey]:
"""Return a single item dict describing the signal value."""
return {self.name: await self._connector.backend.get_datakey(self.source)}
[docs]
@_add_timeout
async def get_value(self, cached: bool | None = None) -> SignalDatatypeT:
"""Return the current value.
:param cached:
Whether to use the cached monitored value:
- If None, use the cache if it exists.
- If False, do an explicit get.
- If True, explicitly use the cache and raise an error if it doesn't exist.
"""
value = await self._backend_or_cache(cached).get_value()
self.log.debug(f"get_value() on source {self.source} returned {value}")
return value
[docs]
def subscribe_value(self, function: Callback[SignalDatatypeT]):
"""Subscribe to updates in value of a device.
:param function: The callback function to call when the value changes.
"""
self._get_cache().subscribe(function, want_value=True)
[docs]
def subscribe(
self, function: Callback[dict[str, Reading[SignalDatatypeT]]]
) -> None:
"""Subscribe to updates in the reading.
:param function: The callback function to call when the reading changes.
"""
self._get_cache().subscribe(function, want_value=False)
[docs]
def clear_sub(self, function: Callback) -> None:
"""Remove a subscription passed to `subscribe` or `subscribe_value`.
:param function: The callback function to remove.
"""
self._del_cache(self._get_cache().unsubscribe(function))
[docs]
@AsyncStatus.wrap
async def stage(self) -> None:
"""Start caching this signal."""
self._get_cache().set_staged(True)
[docs]
@AsyncStatus.wrap
async def unstage(self) -> None:
"""Stop caching this signal."""
self._del_cache(self._get_cache().set_staged(False))
[docs]
class SignalW(Signal[SignalDatatypeT], Movable):
"""Signal that can be set."""
[docs]
@AsyncStatus.wrap
async def set(
self,
value: SignalDatatypeT,
wait=True,
timeout: CalculatableTimeout = CALCULATE_TIMEOUT,
) -> None:
"""Set the value and return a status saying when it's done.
:param value: The value to set.
:param wait: If True, wait for the set to complete.
:param timeout: The timeout for the set.
"""
if timeout == CALCULATE_TIMEOUT:
timeout = self._timeout
source = self._connector.backend.source(self.name, read=False)
self.log.debug(f"Putting value {value} to backend at source {source}")
await _wait_for(self._connector.backend.put(value, wait=wait), timeout, source)
self.log.debug(f"Successfully put value {value} to backend at source {source}")
[docs]
class SignalRW(SignalR[SignalDatatypeT], SignalW[SignalDatatypeT], Locatable):
"""Signal that can be both read and set."""
[docs]
@_add_timeout
async def locate(self) -> Location:
"""Return the setpoint and readback."""
setpoint, readback = await asyncio.gather(
self._connector.backend.get_setpoint(), self._backend_or_cache().get_value()
)
return Location(setpoint=setpoint, readback=readback)
[docs]
class SignalX(Signal):
"""Signal that puts the default value."""
[docs]
@AsyncStatus.wrap
async def trigger(
self, wait=True, timeout: CalculatableTimeout = CALCULATE_TIMEOUT
) -> None:
"""Trigger the action and return a status saying when it's done.
:param wait: If True, wait for the trigger to complete.
:param timeout: The timeout for the trigger.
"""
if timeout == CALCULATE_TIMEOUT:
timeout = self._timeout
source = self._connector.backend.source(self.name, read=False)
self.log.debug(f"Putting default value to backend at source {source}")
await _wait_for(self._connector.backend.put(None, wait=wait), timeout, source)
self.log.debug(f"Successfully put default value to backend at source {source}")
[docs]
def soft_signal_rw(
datatype: type[SignalDatatypeT],
initial_value: SignalDatatypeT | None = None,
name: str = "",
units: str | None = None,
precision: int | None = None,
) -> SignalRW[SignalDatatypeT]:
"""Create a read-writable Signal with a [](#SoftSignalBackend).
May pass metadata, which are propagated into describe.
:param datatype: The datatype of the signal.
:param initial_value: The initial value of the signal.
:param name: The name of the signal.
:param units: The units of the signal.
:param precision: The precision of the signal.
"""
backend = SoftSignalBackend(datatype, initial_value, units, precision)
signal = SignalRW(backend=backend, name=name)
return signal
[docs]
def soft_signal_r_and_setter(
datatype: type[SignalDatatypeT],
initial_value: SignalDatatypeT | None = None,
name: str = "",
units: str | None = None,
precision: int | None = None,
) -> tuple[SignalR[SignalDatatypeT], Callable[[SignalDatatypeT], None]]:
"""Create a read-only Signal with a [](#SoftSignalBackend).
May pass metadata, which are propagated into describe.
Use soft_signal_rw if you want a device that is externally modifiable.
:param datatype: The datatype of the signal.
:param initial_value: The initial value of the signal.
:param name: The name of the signal.
:param units: The units of the signal.
:param precision: The precision of the signal.
:return: A tuple of the created SignalR and a callable to set its value.
"""
backend = SoftSignalBackend(datatype, initial_value, units, precision)
signal = SignalR(backend=backend, name=name)
return (signal, backend.set_value)
[docs]
async def observe_value(
signal: SignalR[SignalDatatypeT],
timeout: float | None = None,
done_status: Status | None = None,
done_timeout: float | None = None,
) -> AsyncGenerator[SignalDatatypeT, None]:
"""Subscribe to the value of a signal so it can be iterated from.
The first value yielded in the iterator will be the current value of the
Signal, and subsequent updates from the control system will result in that
value being yielded, even if it is the same as the previous value.
:param signal:
Call subscribe_value on this at the start, and clear_sub on it at the end.
:param timeout:
If given, how long to wait for each updated value in seconds. If an
update is not produced in this time then raise asyncio.TimeoutError.
:param done_status:
If this status is complete, stop observing and make the iterator return.
If it raises an exception then this exception will be raised by the
iterator.
:param done_timeout:
If given, the maximum time to watch a signal, in seconds. If the loop is
still being watched after this length, raise asyncio.TimeoutError. This
should be used instead of on an 'asyncio.wait_for' timeout.
Due to a rare condition with busy signals, it is not recommended to use this
function with asyncio.timeout, including in an `asyncio.wait_for` loop.
Instead, this timeout should be given to the done_timeout parameter.
:example:
```python
async for value in observe_value(sig):
do_something_with(value)
```
"""
async for _, value in observe_signals_value(
signal,
timeout=timeout,
done_status=done_status,
done_timeout=done_timeout,
):
yield value
def _get_iteration_timeout(
timeout: float | None, overall_deadline: float | None
) -> float | None:
overall_deadline = overall_deadline - time.monotonic() if overall_deadline else None
return min([x for x in [overall_deadline, timeout] if x is not None], default=None)
[docs]
async def observe_signals_value(
*signals: SignalR[SignalDatatypeT],
timeout: float | None = None,
done_status: Status | None = None,
done_timeout: float | None = None,
) -> AsyncGenerator[tuple[SignalR[SignalDatatypeT], SignalDatatypeT], None]:
"""Subscribe to a set of signals so they can be iterated from.
The first values yielded in the iterator will be the current values of the
Signals, and subsequent updates from the control system will result in that
value being yielded, even if it is the same as the previous value.
:param signals:
Call subscribe_value on all the signals at the start, and clear_sub on
it at the end.
:param timeout:
If given, how long to wait for each updated value in seconds. If an
update is not produced in this time then raise asyncio.TimeoutError.
:param done_status:
If this status is complete, stop observing and make the iterator return.
If it raises an exception then this exception will be raised by the
iterator.
:param done_timeout:
If given, the maximum time to watch a signal, in seconds. If the loop is
still being watched after this length, raise asyncio.TimeoutError. This
should be used instead of on an `asyncio.wait_for` timeout.
:example:
```python
async for signal, value in observe_signals_values(sig1, sig2, ..):
if signal is sig1:
do_something_with(value)
elif signal is sig2:
do_something_else_with(value)
```
"""
q: asyncio.Queue[tuple[SignalR[SignalDatatypeT], SignalDatatypeT] | Status] = (
asyncio.Queue()
)
cbs: dict[SignalR, Callback] = {}
for signal in signals:
def queue_value(value: SignalDatatypeT, signal=signal):
q.put_nowait((signal, value))
cbs[signal] = queue_value
signal.subscribe_value(queue_value)
if done_status is not None:
done_status.add_callback(q.put_nowait)
overall_deadline = time.monotonic() + done_timeout if done_timeout else None
try:
while True:
if overall_deadline and time.monotonic() >= overall_deadline:
raise asyncio.TimeoutError(
f"observe_value was still observing signals "
f"{[signal.source for signal in signals]} after "
f"timeout {done_timeout}s"
)
iteration_timeout = _get_iteration_timeout(timeout, overall_deadline)
item = await asyncio.wait_for(q.get(), iteration_timeout)
if done_status and item is done_status:
if exc := done_status.exception():
raise exc
else:
break
else:
yield cast(tuple[SignalR[SignalDatatypeT], SignalDatatypeT], item)
finally:
for signal, cb in cbs.items():
signal.clear_sub(cb)
class _ValueChecker(Generic[SignalDatatypeT]):
def __init__(self, matcher: Callable[[SignalDatatypeT], bool], matcher_name: str):
self._last_value: SignalDatatypeT | None = None
self._matcher = matcher
self._matcher_name = matcher_name
async def _wait_for_value(self, signal: SignalR[SignalDatatypeT]):
async for value in observe_value(signal):
self._last_value = value
if self._matcher(value):
return
async def wait_for_value(
self, signal: SignalR[SignalDatatypeT], timeout: float | None
):
try:
await asyncio.wait_for(self._wait_for_value(signal), timeout)
except asyncio.TimeoutError as e:
raise asyncio.TimeoutError(
f"{signal.name} didn't match {self._matcher_name} in {timeout}s, "
f"last value {self._last_value!r}"
) from e
[docs]
async def wait_for_value(
signal: SignalR[SignalDatatypeT],
match: SignalDatatypeT | Callable[[SignalDatatypeT], bool],
timeout: float | None,
) -> None:
"""Wait for a signal to have a matching value.
:param signal:
Call subscribe_value on this at the start, and clear_sub on it at the
end.
:param match:
If a callable, it should return True if the value matches. If not
callable then value will be checked for equality with match.
:param timeout: How long to wait for the value to match.
:example:
```python
await wait_for_value(device.acquiring, 1, timeout=1)
# or
await wait_for_value(device.num_captured, lambda v: v > 45, timeout=1)
```
"""
if callable(match):
checker = _ValueChecker(match, match.__name__) # type: ignore
else:
checker = _ValueChecker(lambda v: v == match, repr(match))
await checker.wait_for_value(signal, timeout)
[docs]
async def set_and_wait_for_other_value(
set_signal: SignalW[SignalDatatypeT],
set_value: SignalDatatypeT,
match_signal: SignalR[SignalDatatypeV],
match_value: SignalDatatypeV | Callable[[SignalDatatypeV], bool],
timeout: float = DEFAULT_TIMEOUT,
set_timeout: float | None = None,
wait_for_set_completion: bool = True,
) -> AsyncStatus:
"""Set a signal and monitor another signal until it has the specified value.
This function sets a set_signal to a specified set_value and waits for
a match_signal to have the match_value.
:param set_signal: The signal to set.
:param set_value: The value to set it to.
:param match_signal: The signal to monitor.
:param match_value:
The value (or callable that says if the value matches) to wait for.
:param timeout: How long to wait for the signal to have the value.
:param set_timeout: How long to wait for the set to complete.
:param wait_for_set_completion:
If False then return as soon as the match_signal matches match_value. If
True then also wait for the set operation to complete before returning.
:seealso:
[](#interact-with-signals)
:example:
To set the setpoint and wait for the readback to match:
```python
await set_and_wait_for_value(device.setpoint, 1, device.readback, 1)
```
"""
# Start monitoring before the set to avoid a race condition
values_gen = observe_value(match_signal)
# Get the initial value from the monitor to make sure we've created it
current_value = await anext(values_gen)
status = set_signal.set(set_value, timeout=set_timeout)
if callable(match_value):
matcher: Callable[[SignalDatatypeV], bool] = match_value # type: ignore
else:
def matcher(value):
return value == match_value
matcher.__name__ = f"equals_{match_value}"
# If the value was the same as before no need to wait for it to change
if not matcher(current_value):
async def _wait_for_value():
async for value in values_gen:
if matcher(value):
break
try:
await asyncio.wait_for(_wait_for_value(), timeout)
if wait_for_set_completion:
await status
except asyncio.TimeoutError as e:
raise asyncio.TimeoutError(
f"{match_signal.name} value didn't match value from"
f" {matcher.__name__}() in {timeout}s"
) from e
return status
[docs]
async def set_and_wait_for_value(
signal: SignalRW[SignalDatatypeT],
value: SignalDatatypeT,
match_value: SignalDatatypeT | Callable[[SignalDatatypeT], bool] | None = None,
timeout: float = DEFAULT_TIMEOUT,
set_timeout: float | None = None,
wait_for_set_completion: bool = True,
) -> AsyncStatus:
"""Set a signal and monitor that same signal until it has the specified value.
This function sets a set_signal to a specified set_value and waits for
a match_signal to have the match_value.
:param signal: The signal to set.
:param value: The value to set it to.
:param match_value:
The value (or callable that says if the value matches) to wait for.
:param timeout: How long to wait for the signal to have the value.
:param set_timeout: How long to wait for the set to complete.
:param wait_for_set_completion:
If False then return as soon as the match_signal matches match_value. If
True then also wait for the set operation to complete before returning.
:seealso:
[](#interact-with-signals)
:examples:
To set a parameter and wait for it's value to change:
```python
await set_and_wait_for_value(device.parameter, 1)
```
For busy record, or other Signals with pattern:
- Set Signal with `wait=True` and stash the Status
- Read the same Signal to check the operation has started
- Return the Status so calling code can wait for operation to complete
```python
status = await set_and_wait_for_value(
device.acquire, 1, wait_for_set_completion=False
)
# device is now acquiring
await status
# device has finished acquiring
```
"""
if match_value is None:
match_value = value
return await set_and_wait_for_other_value(
signal,
value,
signal,
match_value,
timeout,
set_timeout,
wait_for_set_completion,
)
[docs]
def walk_rw_signals(device: Device, path_prefix: str = "") -> dict[str, SignalRW[Any]]:
"""Retrieve all SignalRWs from a device.
Stores retrieved signals with their dotted attribute paths in a dictionary. Used as
part of saving and loading a device.
:param device: Device to retrieve read-write signals from.
:param path_prefix: For internal use, leave blank when calling the method.
:return:
A dictionary matching the string attribute path of a SignalRW with the
signal itself.
"""
signals: dict[str, SignalRW[Any]] = {}
for attr_name, attr in device.children():
dot_path = f"{path_prefix}{attr_name}"
if type(attr) is SignalRW:
signals[dot_path] = attr
attr_signals = walk_rw_signals(attr, path_prefix=dot_path + ".")
signals.update(attr_signals)
return signals
[docs]
async def walk_config_signals(
device: Device, path_prefix: str = ""
) -> dict[str, SignalRW[Any]]:
"""Retrieve all configuration signals from a device.
Stores retrieved signals with their dotted attribute paths in a dictionary. Used as
part of saving and loading a device.
:param device: Device to retrieve configuration signals from.
:param path_prefix: For internal use, leave blank when calling the method.
:return:
A dictionary matching the string attribute path of a SignalRW with the
signal itself.
"""
signals: dict[str, SignalRW[Any]] = {}
config_names: list[str] = []
if isinstance(device, Configurable):
configuration = device.read_configuration()
if inspect.isawaitable(configuration):
configuration = await configuration
config_names = list(configuration.keys())
for attr_name, attr in device.children():
dot_path = f"{path_prefix}{attr_name}"
if isinstance(attr, SignalRW) and attr.name in config_names:
signals[dot_path] = attr
signals.update(await walk_config_signals(attr, path_prefix=dot_path + "."))
return signals
[docs]
class Ignore:
"""Annotation to ignore a signal when connecting a device."""
pass