import warnings
from import Awaitable, Callable, Generator, Sequence
from contextlib import contextmanager
from enum import Enum
from typing import Any, cast
from bluesky.protocols import HasHints, Hints, Reading
from event_model import DataKey
from ._device import Device, DeviceVector
from ._protocol import AsyncConfigurable, AsyncReadable, AsyncStageable
from ._signal import SignalR
from ._status import AsyncStatus
from ._utils import merge_gathered_dicts
# Back compat
class _WarningMatcher:
def __init__(self, name: str, target: StandardReadableFormat):
self._name = name
self._target = target
def __eq__(self, value: object) -> bool:
f"Use `StandardReadableFormat.{}` "
f"instead of `{self._name}`"
return value == self._target
def _compat_format(name: str, target: StandardReadableFormat) -> StandardReadableFormat:
return cast(StandardReadableFormat, _WarningMatcher(name, target))
ConfigSignal = _compat_format("ConfigSignal", StandardReadableFormat.CONFIG_SIGNAL)
HintedSignal: Any = _compat_format("HintedSignal", StandardReadableFormat.HINTED_SIGNAL)
HintedSignal.uncached = _compat_format(
"HintedSignal.uncached", StandardReadableFormat.HINTED_UNCACHED_SIGNAL
class StandardReadable(
Device, AsyncReadable, AsyncConfigurable, AsyncStageable, HasHints
"""Device that provides selected child Device values in `read()`.
Provides the ability for children to be registered to:
- Participate in `stage()` and `unstage()`
- Provide their value in `read()` and `describe()
- Provide their value in `read_configuration()` and `describe_configuration()
- Select a value to appear in `hints`
The behavior is customized with a [](#StandardReadableFormat)
# These must be immutable types to avoid accidental sharing between
# different instances of the class
_describe_config_funcs: tuple[Callable[[], Awaitable[dict[str, DataKey]]], ...] = ()
_read_config_funcs: tuple[Callable[[], Awaitable[dict[str, Reading]]], ...] = ()
_describe_funcs: tuple[Callable[[], Awaitable[dict[str, DataKey]]], ...] = ()
_read_funcs: tuple[Callable[[], Awaitable[dict[str, Reading]]], ...] = ()
_stageables: tuple[AsyncStageable, ...] = ()
_has_hints: tuple[HasHints, ...] = ()
async def stage(self) -> None:
for sig in self._stageables:
await sig.stage().task
async def unstage(self) -> None:
for sig in self._stageables:
await sig.unstage().task
async def describe_configuration(self) -> dict[str, DataKey]:
return await merge_gathered_dicts(
[func() for func in self._describe_config_funcs]
async def read_configuration(self) -> dict[str, Reading]:
return await merge_gathered_dicts([func() for func in self._read_config_funcs])
async def describe(self) -> dict[str, DataKey]:
return await merge_gathered_dicts([func() for func in self._describe_funcs])
async def read(self) -> dict[str, Reading]:
return await merge_gathered_dicts([func() for func in self._read_funcs])
def hints(self) -> Hints:
hints: Hints = {}
for new_hint in self._has_hints:
# Merge the existing and new hints, based on the type of the value.
# This avoids default dict merge behavior that overrides the values;
# we want to combine them when they are Sequences, and ensure they are
# identical when string values.
for key, value in new_hint.hints.items():
# fail early for unkwon types
if isinstance(value, str):
if key in hints:
if hints[key] != value:
msg = f"Hints key {key} value may not be overridden"
raise RuntimeError(msg)
hints[key] = value # type: ignore[literal-required]
elif isinstance(value, Sequence):
if key in hints:
for new_val in value:
if new_val in hints[key]:
msg = f"Hint {key} {new_val} overrides existing hint"
raise RuntimeError(msg)
hints[key] = ( # type: ignore[literal-required]
hints[key] + value # type: ignore[literal-required]
hints[key] = value # type: ignore[literal-required]
msg = (
f"{}: Unknown type for value '{value}'"
f" for key '{key}'"
raise TypeError(msg)
return hints
def add_children_as_readables(
format: StandardReadableFormat = StandardReadableFormat.CHILD,
) -> Generator[None, None, None]:
"""Context manager that calls [](#add_readables) on child Devices added within.
Scans `self.children()` on entry and exit to context manager, and calls
`add_readables()` on any that are added with the provided
dict_copy = dict(self.children())
# Set symmetric difference operator gives all newly added keys
new_dict = dict(self.children())
new_keys = dict_copy.keys() ^ new_dict.keys()
new_values = [new_dict[key] for key in new_keys]
flattened_values = []
for value in new_values:
if isinstance(value, DeviceVector):
new_devices = list(filter(lambda x: isinstance(x, Device), flattened_values))
self.add_readables(new_devices, format)
def add_readables(
devices: Sequence[Device],
format: StandardReadableFormat = StandardReadableFormat.CHILD,
) -> None:
"""Add devices to contribute to various bluesky verbs.
Use output from the given devices to contribute to the verbs of the following
- [](#bluesky.protocols.Readable)
- [](#bluesky.protocols.Configurable)
- [](#bluesky.protocols.Stageable)
- [](#bluesky.protocols.HasHints)
:param devices: The devices to be added
:param format:
Determines which of the devices functions are added to which verb as
per the [](#StandardReadableFormat) documentation
def assert_device_is_signalr(device: Device) -> SignalR:
if not isinstance(device, SignalR):
raise TypeError(f"{device} is not a SignalR")
return device
for device in devices:
match format:
case StandardReadableFormat.CHILD:
if isinstance(device, AsyncConfigurable):
self._describe_config_funcs += (device.describe_configuration,)
self._read_config_funcs += (device.read_configuration,)
if isinstance(device, AsyncReadable):
self._describe_funcs += (device.describe,)
self._read_funcs += (,)
if isinstance(device, AsyncStageable):
self._stageables += (device,)
if isinstance(device, HasHints):
self._has_hints += (device,)
case StandardReadableFormat.CONFIG_SIGNAL:
signalr_device = assert_device_is_signalr(device=device)
self._describe_config_funcs += (signalr_device.describe,)
self._read_config_funcs += (,)
case StandardReadableFormat.HINTED_SIGNAL:
signalr_device = assert_device_is_signalr(device=device)
self._describe_funcs += (signalr_device.describe,)
self._read_funcs += (,)
self._stageables += (signalr_device,)
self._has_hints += (_HintsFromName(signalr_device),)
case StandardReadableFormat.UNCACHED_SIGNAL:
signalr_device = assert_device_is_signalr(device=device)
self._describe_funcs += (signalr_device.describe,)
self._read_funcs += (_UncachedRead(signalr_device),)
case StandardReadableFormat.HINTED_UNCACHED_SIGNAL:
signalr_device = assert_device_is_signalr(device=device)
self._describe_funcs += (signalr_device.describe,)
self._read_funcs += (_UncachedRead(signalr_device),)
self._has_hints += (_HintsFromName(signalr_device),)
class _UncachedRead:
def __init__(self, signal: SignalR) -> None:
self.signal = signal
async def __call__(self) -> dict[str, Reading]:
return await
class _HintsFromName(HasHints):
def __init__(self, device: Device) -> None:
self.device = device
def name(self) -> str:
def hints(self) -> Hints:
fields = [] if else []
return {"fields": fields}