Source code for ophyd_async.plan_stubs._settings
from __future__ import annotations
import asyncio
from collections.abc import Callable, Mapping
from typing import Any
import bluesky.plan_stubs as bps
import numpy as np
from bluesky.utils import MsgGenerator, plan
from ophyd_async.core import (
Device,
Settings,
SettingsProvider,
SignalRW,
T,
walk_rw_signals,
)
from ophyd_async.core._table import Table
from ._wait_for_awaitable import wait_for_awaitable
@plan
def _get_values_of_signals(
signals: Mapping[T, SignalRW],
) -> MsgGenerator[dict[T, Any]]:
coros = [sig.get_value() for sig in signals.values()]
values = yield from wait_for_awaitable(asyncio.gather(*coros))
named_values = dict(zip(signals, values, strict=True))
return named_values
@plan
def get_current_settings(device: Device) -> MsgGenerator[Settings]:
signals = walk_rw_signals(device)
named_values = yield from _get_values_of_signals(signals)
signal_values = {signals[name]: value for name, value in named_values.items()}
return Settings(device, signal_values)
[docs]
@plan
def store_settings(
provider: SettingsProvider, name: str, device: Device
) -> MsgGenerator[None]:
"""Walk a Device for SignalRWs and store their values with a provider associated
with the given name.
"""
signals = walk_rw_signals(device)
named_values = yield from _get_values_of_signals(signals)
yield from wait_for_awaitable(provider.store(name, named_values))
[docs]
@plan
def retrieve_settings(
provider: SettingsProvider, name: str, device: Device
) -> MsgGenerator[Settings]:
"""Retrieve named Settings for a Device from a provider."""
named_values = yield from wait_for_awaitable(provider.retrieve(name))
signals = walk_rw_signals(device)
unknown_names = set(named_values) - set(signals)
if unknown_names:
raise NameError(f"Unknown signal names {sorted(unknown_names)}")
signal_values = {signals[name]: value for name, value in named_values.items()}
return Settings(device, signal_values)
[docs]
@plan
def apply_settings(settings: Settings) -> MsgGenerator[None]:
"""Set every SignalRW to the given value in Settings. If value is None ignore it."""
signal_values = {
signal: value for signal, value in settings.items() if value is not None
}
if signal_values:
for signal, value in signal_values.items():
yield from bps.abs_set(signal, value, group="apply_settings")
yield from bps.wait("apply_settings")
[docs]
@plan
def apply_settings_if_different(
settings: Settings,
apply_plan: Callable[[Settings], MsgGenerator[None]],
current_settings: Settings | None = None,
) -> MsgGenerator[None]:
"""Set every SignalRW in settings to its given value if it is different to the
current value.
Parameters
----------
apply_plan:
A device specific plan which takes the Settings to apply and applies them to
the Device. Used to add device specific ordering to setting the signals.
current_settings:
If given, should be a superset of settings containing the current value of
the Settings in the Device. If not given it will be created by reading just
the signals given in settings.
"""
if current_settings is None:
# If we aren't give the current settings, then get the
# values of just the signals we were asked to change.
# This allows us to use this plan with Settings for a subset
# of signals in the Device without retrieving them all
signal_values = yield from _get_values_of_signals(
{sig: sig for sig in settings}
)
current_settings = Settings(settings.device, signal_values)
def _is_different(current, required) -> bool:
if isinstance(current, Table):
current = current.model_dump()
if isinstance(required, Table):
required = required.model_dump()
return current.keys() != required.keys() or any(
_is_different(current[k], required[k]) for k in current
)
elif isinstance(current, np.ndarray):
return not np.array_equal(current, required)
else:
return current != required
settings_to_change, _ = settings.partition(
lambda sig: _is_different(current_settings[sig], settings[sig])
)
yield from apply_plan(settings_to_change)