from __future__ import annotations
import asyncio
import functools
from collections.abc import AsyncGenerator, Awaitable, Callable, Mapping
from typing import Any, Generic, cast
from bluesky.protocols import (
Locatable,
Location,
Movable,
Status,
Subscribable,
)
from event_model import DataKey
from ._device import Device, DeviceConnector
from ._mock_signal_backend import MockSignalBackend
from ._protocol import (
AsyncConfigurable,
AsyncReadable,
AsyncStageable,
Reading,
)
from ._signal_backend import (
SignalBackend,
SignalDatatypeT,
SignalDatatypeV,
)
from ._soft_signal_backend import SoftSignalBackend
from ._status import AsyncStatus
from ._utils import (
CALCULATE_TIMEOUT,
DEFAULT_TIMEOUT,
CalculatableTimeout,
Callback,
LazyMock,
T,
)
async def _wait_for(coro: Awaitable[T], timeout: float | None, source: str) -> T:
try:
return await asyncio.wait_for(coro, timeout)
except asyncio.TimeoutError as e:
raise asyncio.TimeoutError(source) from e
def _add_timeout(func):
@functools.wraps(func)
async def wrapper(self: Signal, *args, **kwargs):
return await _wait_for(func(self, *args, **kwargs), self._timeout, self.source)
return wrapper
class SignalConnector(DeviceConnector):
def __init__(self, backend: SignalBackend):
self.backend = self._init_backend = backend
async def connect_mock(self, device: Device, mock: LazyMock):
self.backend = MockSignalBackend(self._init_backend, mock)
async def connect_real(self, device: Device, timeout: float, force_reconnect: bool):
self.backend = self._init_backend
device.log.debug(f"Connecting to {self.backend.source(device.name, read=True)}")
await self.backend.connect(timeout)
class _ChildrenNotAllowed(dict[str, Device]):
def __setitem__(self, key: str, value: Device) -> None:
raise AttributeError(
f"Cannot add Device or Signal child {key}={value} of Signal, "
"make a subclass of Device instead"
)
[docs]
class Signal(Device, Generic[SignalDatatypeT]):
"""A Device with the concept of a value, with R, RW, W and X flavours"""
_connector: SignalConnector
_child_devices = _ChildrenNotAllowed() # type: ignore
def __init__(
self,
backend: SignalBackend[SignalDatatypeT],
timeout: float | None = DEFAULT_TIMEOUT,
name: str = "",
) -> None:
super().__init__(name=name, connector=SignalConnector(backend))
self._timeout = timeout
@property
def source(self) -> str:
"""Like ca://PV_PREFIX:SIGNAL, or "" if not set"""
return self._connector.backend.source(self.name, read=True)
class _SignalCache(Generic[SignalDatatypeT]):
def __init__(self, backend: SignalBackend[SignalDatatypeT], signal: Signal):
self._signal = signal
self._staged = False
self._listeners: dict[Callback, bool] = {}
self._valid = asyncio.Event()
self._reading: Reading[SignalDatatypeT] | None = None
self.backend = backend
signal.log.debug(f"Making subscription on source {signal.source}")
backend.set_callback(self._callback)
def close(self):
self.backend.set_callback(None)
self._signal.log.debug(f"Closing subscription on source {self._signal.source}")
async def get_reading(self) -> Reading[SignalDatatypeT]:
await self._valid.wait()
assert self._reading is not None, "Monitor not working"
return self._reading
async def get_value(self) -> SignalDatatypeT:
reading = await self.get_reading()
return reading["value"]
def _callback(self, reading: Reading[SignalDatatypeT]):
self._signal.log.debug(
f"Updated subscription: reading of source {self._signal.source} changed"
f"from {self._reading} to {reading}"
)
self._reading = reading
self._valid.set()
for function, want_value in self._listeners.items():
self._notify(function, want_value)
def _notify(
self,
function: Callback[dict[str, Reading[SignalDatatypeT]] | SignalDatatypeT],
want_value: bool,
):
assert self._reading, "Monitor not working"
if want_value:
function(self._reading["value"])
else:
function({self._signal.name: self._reading})
def subscribe(self, function: Callback, want_value: bool) -> None:
self._listeners[function] = want_value
if self._valid.is_set():
self._notify(function, want_value)
def unsubscribe(self, function: Callback) -> bool:
self._listeners.pop(function)
return self._staged or bool(self._listeners)
def set_staged(self, staged: bool):
self._staged = staged
return self._staged or bool(self._listeners)
[docs]
class SignalR(Signal[SignalDatatypeT], AsyncReadable, AsyncStageable, Subscribable):
"""Signal that can be read from and monitored"""
_cache: _SignalCache | None = None
def _backend_or_cache(
self, cached: bool | None = None
) -> _SignalCache | SignalBackend:
# If cached is None then calculate it based on whether we already have a cache
if cached is None:
cached = self._cache is not None
if cached:
assert self._cache, f"{self.source} not being monitored"
return self._cache
else:
return self._connector.backend
def _get_cache(self) -> _SignalCache:
if not self._cache:
self._cache = _SignalCache(self._connector.backend, self)
return self._cache
def _del_cache(self, needed: bool):
if self._cache and not needed:
self._cache.close()
self._cache = None
[docs]
@_add_timeout
async def read(self, cached: bool | None = None) -> dict[str, Reading]:
"""Return a single item dict with the reading in it"""
return {self.name: await self._backend_or_cache(cached).get_reading()}
[docs]
@_add_timeout
async def describe(self) -> dict[str, DataKey]:
"""Return a single item dict with the descriptor in it"""
return {self.name: await self._connector.backend.get_datakey(self.source)}
[docs]
@_add_timeout
async def get_value(self, cached: bool | None = None) -> SignalDatatypeT:
"""The current value"""
value = await self._backend_or_cache(cached).get_value()
self.log.debug(f"get_value() on source {self.source} returned {value}")
return value
[docs]
def subscribe_value(self, function: Callback[SignalDatatypeT]):
"""Subscribe to updates in value of a device"""
self._get_cache().subscribe(function, want_value=True)
[docs]
def subscribe(self, function: Callback[dict[str, Reading]]) -> None:
"""Subscribe to updates in the reading"""
self._get_cache().subscribe(function, want_value=False)
[docs]
def clear_sub(self, function: Callback) -> None:
"""Remove a subscription."""
self._del_cache(self._get_cache().unsubscribe(function))
[docs]
@AsyncStatus.wrap
async def stage(self) -> None:
"""Start caching this signal"""
self._get_cache().set_staged(True)
[docs]
@AsyncStatus.wrap
async def unstage(self) -> None:
"""Stop caching this signal"""
self._del_cache(self._get_cache().set_staged(False))
[docs]
class SignalW(Signal[SignalDatatypeT], Movable):
"""Signal that can be set"""
[docs]
@AsyncStatus.wrap
async def set(
self,
value: SignalDatatypeT,
wait=True,
timeout: CalculatableTimeout = CALCULATE_TIMEOUT,
) -> None:
"""Set the value and return a status saying when it's done"""
if timeout == CALCULATE_TIMEOUT:
timeout = self._timeout
source = self._connector.backend.source(self.name, read=False)
self.log.debug(f"Putting value {value} to backend at source {source}")
await _wait_for(self._connector.backend.put(value, wait=wait), timeout, source)
self.log.debug(f"Successfully put value {value} to backend at source {source}")
[docs]
class SignalRW(SignalR[SignalDatatypeT], SignalW[SignalDatatypeT], Locatable):
"""Signal that can be both read and set"""
[docs]
@_add_timeout
async def locate(self) -> Location:
"""Return the setpoint and readback."""
setpoint, readback = await asyncio.gather(
self._connector.backend.get_setpoint(), self._backend_or_cache().get_value()
)
return Location(setpoint=setpoint, readback=readback)
[docs]
class SignalX(Signal):
"""Signal that puts the default value"""
[docs]
@AsyncStatus.wrap
async def trigger(
self, wait=True, timeout: CalculatableTimeout = CALCULATE_TIMEOUT
) -> None:
"""Trigger the action and return a status saying when it's done"""
if timeout == CALCULATE_TIMEOUT:
timeout = self._timeout
source = self._connector.backend.source(self.name, read=False)
self.log.debug(f"Putting default value to backend at source {source}")
await _wait_for(self._connector.backend.put(None, wait=wait), timeout, source)
self.log.debug(f"Successfully put default value to backend at source {source}")
[docs]
def soft_signal_rw(
datatype: type[SignalDatatypeT],
initial_value: SignalDatatypeT | None = None,
name: str = "",
units: str | None = None,
precision: int | None = None,
) -> SignalRW[SignalDatatypeT]:
"""Creates a read-writable Signal with a SoftSignalBackend.
May pass metadata, which are propagated into describe.
"""
backend = SoftSignalBackend(datatype, initial_value, units, precision)
signal = SignalRW(backend=backend, name=name)
return signal
[docs]
def soft_signal_r_and_setter(
datatype: type[SignalDatatypeT],
initial_value: SignalDatatypeT | None = None,
name: str = "",
units: str | None = None,
precision: int | None = None,
) -> tuple[SignalR[SignalDatatypeT], Callable[[SignalDatatypeT], None]]:
"""Returns a tuple of a read-only Signal and a callable through
which the signal can be internally modified within the device.
May pass metadata, which are propagated into describe.
Use soft_signal_rw if you want a device that is externally modifiable
"""
backend = SoftSignalBackend(datatype, initial_value, units, precision)
signal = SignalR(backend=backend, name=name)
return (signal, backend.set_value)
def _generate_assert_error_msg(name: str, expected_result, actual_result) -> str:
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
return (
f"Expected {WARNING}{name}{ENDC} to produce"
+ f"\n{FAIL}{expected_result}{ENDC}"
+ f"\nbut actually got \n{FAIL}{actual_result}{ENDC}"
)
[docs]
async def assert_value(signal: SignalR[SignalDatatypeT], value: Any) -> None:
"""Assert a signal's value and compare it an expected signal.
Parameters
----------
signal:
signal with get_value.
value:
The expected value from the signal.
Notes
-----
Example usage::
await assert_value(signal, value)
"""
actual_value = await signal.get_value()
assert actual_value == value, _generate_assert_error_msg(
name=signal.name,
expected_result=value,
actual_result=actual_value,
)
[docs]
async def assert_reading(
readable: AsyncReadable, expected_reading: Mapping[str, Reading]
) -> None:
"""Assert readings from readable.
Parameters
----------
readable:
Callable with readable.read function that generate readings.
reading:
The expected readings from the readable.
Notes
-----
Example usage::
await assert_reading(readable, reading)
"""
actual_reading = await readable.read()
assert expected_reading == actual_reading, _generate_assert_error_msg(
name=readable.name,
expected_result=expected_reading,
actual_result=actual_reading,
)
[docs]
async def assert_configuration(
configurable: AsyncConfigurable,
configuration: Mapping[str, Reading],
) -> None:
"""Assert readings from Configurable.
Parameters
----------
configurable:
Configurable with Configurable.read function that generate readings.
configuration:
The expected readings from configurable.
Notes
-----
Example usage::
await assert_configuration(configurable configuration)
"""
actual_configurable = await configurable.read_configuration()
assert configuration == actual_configurable, _generate_assert_error_msg(
name=configurable.name,
expected_result=configuration,
actual_result=actual_configurable,
)
[docs]
def assert_emitted(docs: Mapping[str, list[dict]], **numbers: int):
"""Assert emitted document generated by running a Bluesky plan
Parameters
----------
Doc:
A dictionary
numbers:
expected emission in kwarg from
Notes
-----
Example usage::
assert_emitted(docs, start=1, descriptor=1,
resource=1, datum=1, event=1, stop=1)
"""
assert list(docs) == list(numbers), _generate_assert_error_msg(
name="documents",
expected_result=list(numbers),
actual_result=list(docs),
)
actual_numbers = {name: len(d) for name, d in docs.items()}
assert actual_numbers == numbers, _generate_assert_error_msg(
name="emitted",
expected_result=numbers,
actual_result=actual_numbers,
)
[docs]
async def observe_value(
signal: SignalR[SignalDatatypeT],
timeout: float | None = None,
done_status: Status | None = None,
) -> AsyncGenerator[SignalDatatypeT, None]:
"""Subscribe to the value of a signal so it can be iterated from.
Parameters
----------
signal:
Call subscribe_value on this at the start, and clear_sub on it at the
end
timeout:
If given, how long to wait for each updated value in seconds. If an update
is not produced in this time then raise asyncio.TimeoutError
done_status:
If this status is complete, stop observing and make the iterator return.
If it raises an exception then this exception will be raised by the iterator.
Notes
-----
Example usage::
async for value in observe_value(sig):
do_something_with(value)
"""
q: asyncio.Queue[SignalDatatypeT | Status] = asyncio.Queue()
if done_status is not None:
done_status.add_callback(q.put_nowait)
signal.subscribe_value(q.put_nowait)
try:
while True:
# yield here in case something else is filling the queue
# like in test_observe_value_times_out_with_no_external_task()
await asyncio.sleep(0)
item = await asyncio.wait_for(q.get(), timeout)
if done_status and item is done_status:
if exc := done_status.exception():
raise exc
else:
break
else:
yield cast(SignalDatatypeT, item)
finally:
signal.clear_sub(q.put_nowait)
class _ValueChecker(Generic[SignalDatatypeT]):
def __init__(self, matcher: Callable[[SignalDatatypeT], bool], matcher_name: str):
self._last_value: SignalDatatypeT | None = None
self._matcher = matcher
self._matcher_name = matcher_name
async def _wait_for_value(self, signal: SignalR[SignalDatatypeT]):
async for value in observe_value(signal):
self._last_value = value
if self._matcher(value):
return
async def wait_for_value(
self, signal: SignalR[SignalDatatypeT], timeout: float | None
):
try:
await asyncio.wait_for(self._wait_for_value(signal), timeout)
except asyncio.TimeoutError as e:
raise asyncio.TimeoutError(
f"{signal.name} didn't match {self._matcher_name} in {timeout}s, "
f"last value {self._last_value!r}"
) from e
[docs]
async def wait_for_value(
signal: SignalR[SignalDatatypeT],
match: SignalDatatypeT | Callable[[SignalDatatypeT], bool],
timeout: float | None,
):
"""Wait for a signal to have a matching value.
Parameters
----------
signal:
Call subscribe_value on this at the start, and clear_sub on it at the
end
match:
If a callable, it should return True if the value matches. If not
callable then value will be checked for equality with match.
timeout:
How long to wait for the value to match
Notes
-----
Example usage::
wait_for_value(device.acquiring, 1, timeout=1)
Or::
wait_for_value(device.num_captured, lambda v: v > 45, timeout=1)
"""
if callable(match):
checker = _ValueChecker(match, match.__name__) # type: ignore
else:
checker = _ValueChecker(lambda v: v == match, repr(match))
await checker.wait_for_value(signal, timeout)
[docs]
async def set_and_wait_for_other_value(
set_signal: SignalW[SignalDatatypeT],
set_value: SignalDatatypeT,
read_signal: SignalR[SignalDatatypeV],
read_value: SignalDatatypeV,
timeout: float = DEFAULT_TIMEOUT,
set_timeout: float | None = None,
) -> AsyncStatus:
"""Set a signal and monitor another signal until it has the specified value.
This function sets a set_signal to a specified set_value and waits for
a read_signal to have the read_value.
Parameters
----------
signal:
The signal to set
set_value:
The value to set it to
read_signal:
The signal to monitor
read_value:
The value to wait for
timeout:
How long to wait for the signal to have the value
set_timeout:
How long to wait for the set to complete
Notes
-----
Example usage::
set_and_wait_for_value(device.acquire, 1, device.acquire_rbv, 1)
"""
# Start monitoring before the set to avoid a race condition
values_gen = observe_value(read_signal)
# Get the initial value from the monitor to make sure we've created it
current_value = await anext(values_gen)
status = set_signal.set(set_value, timeout=set_timeout)
# If the value was the same as before no need to wait for it to change
if current_value != read_value:
async def _wait_for_value():
async for value in values_gen:
if value == read_value:
break
try:
await asyncio.wait_for(_wait_for_value(), timeout)
except asyncio.TimeoutError as e:
raise TimeoutError(
f"{read_signal.name} didn't match {read_value} in {timeout}s"
) from e
return status
[docs]
async def set_and_wait_for_value(
signal: SignalRW[SignalDatatypeT],
value: SignalDatatypeT,
timeout: float = DEFAULT_TIMEOUT,
status_timeout: float | None = None,
) -> AsyncStatus:
"""Set a signal and monitor it until it has that value.
Useful for busy record, or other Signals with pattern:
- Set Signal with wait=True and stash the Status
- Read the same Signal to check the operation has started
- Return the Status so calling code can wait for operation to complete
Parameters
----------
signal:
The signal to set
value:
The value to set it to
timeout:
How long to wait for the signal to have the value
status_timeout:
How long the returned Status will wait for the set to complete
Notes
-----
Example usage::
set_and_wait_for_value(device.acquire, 1)
"""
return await set_and_wait_for_other_value(
signal, value, signal, value, timeout, status_timeout
)