from __future__ import annotations
import asyncio
import inspect
from abc import abstractmethod
from collections.abc import Awaitable, Callable
from functools import cached_property
from typing import Generic, cast
from unittest.mock import AsyncMock
from ._device import Device, DeviceConnector, LazyMock
from ._soft_signal_backend import SoftConverter, make_converter
from ._status import AsyncStatus
from ._utils import (
CALCULATE_TIMEOUT,
DEFAULT_TIMEOUT,
CalculatableTimeout,
NotConnectedError,
P,
T,
_wait_for,
)
# Canonical signature for no-arg void commands. Hardware backends (e.g. EPICS)
# pass this instead of None so that mock mode always has a concrete signature to
# work with. `None` is reserved for "not yet known until connect time".
NO_ARG_VOID_SIGNATURE = inspect.Signature([], return_annotation=None)
MockExecuteCallback = Callable[P, T] | Callable[P, Awaitable[T]]
[docs]
class CommandBackend(Generic[P, T]):
"""A backend for a Command.
:param signature: The Python signature of the command, or `None` if the
signature is not yet known until connect time (analogous to `datatype=None`
for signals). Hardware backends that are unambiguously void/void should
pass `NO_ARG_VOID_SIGNATURE` instead.
"""
def __init__(self, signature: inspect.Signature | None):
self.signature = signature
[docs]
@abstractmethod
def source(self, name: str) -> str:
"""Return source of command."""
[docs]
@abstractmethod
async def connect(self, timeout: float) -> None:
"""Connect to underlying hardware."""
[docs]
@abstractmethod
async def execute(self, *args: P.args, **kwargs: P.kwargs) -> T:
"""Execute the command and return its result."""
[docs]
class CommandConnector(DeviceConnector, Generic[P, T]):
"""A connector for a Command."""
def __init__(self, backend: CommandBackend[P, T]):
self.backend = self._init_backend = backend
[docs]
async def connect_mock(self, device: Device, mock: LazyMock):
"""Connect the backend in mock mode."""
self.backend = MockCommandBackend(self._init_backend, mock)
[docs]
async def connect_real(self, device: Device, timeout: float, force_reconnect: bool):
"""Connect the backend to real hardware."""
self.backend = self._init_backend
device.log.debug(f"Connecting to {self.backend.source(device.name)}")
await self.backend.connect(timeout)
[docs]
class Command(Device, Generic[P, T]):
"""A Device that can execute a typed remote-procedure command.
`Command[P, T]` wraps a remote procedure call where `P` is the
`ParamSpec` for the arguments and `T` is the return type.
Use [](#TriggerableCommand) when the command takes no arguments and
returns nothing.
The return value is stored on the resulting status object and is available
via `status.value` once the status completes:
```python
status = device.move_to.execute(0.5)
await status
result = status.value # T
```
"""
_connector: CommandConnector[P, T]
def __init__(
self,
backend: CommandBackend[P, T],
timeout: float | None = DEFAULT_TIMEOUT,
name: str = "",
):
super().__init__(name=name, connector=CommandConnector(backend))
self._timeout = timeout
@property
def signature(self) -> inspect.Signature | None:
return self._connector.backend.signature
@property
def source(self) -> str:
"""Returns the source of the command."""
return self._connector.backend.source(self.name)
[docs]
@AsyncStatus.wrap
async def execute(self, *args: P.args, **kwargs: P.kwargs) -> T:
"""Execute the command, returning an [](#AsyncStatus).
The status resolves to the result `T`.
After the status completes the return value is available via `status.value`:
```python
status = device.cmd.execute(arg)
await status
result = status.value
```
"""
self.log.debug(f"Executing command {self.name}")
result = await _wait_for(
self._connector.backend.execute(*args, **kwargs), self._timeout, self.source
)
self.log.debug(f"Command {self.name} returned {result}")
return result
[docs]
class TriggerableCommand(Command[[], None]):
"""A Command that can be triggered without arguments and returns nothing.
Use this when the control system exposes a no-argument, no-return action —
for example an EPICS `.PROC` field or a void/void Tango command. Unlike
a [](#Signal) there is no readable value: the hardware fires an action
when written to.
Declarative EPICS usage:
```python
from typing import Annotated as A
from ophyd_async.epics.core import EpicsDevice, PvSuffix
class MyDevice(EpicsDevice):
reset: A[TriggerableCommand, PvSuffix("Reset.PROC")]
```
Procedural EPICS usage:
```python
reset = epics_triggerable_command("PREFIX:Reset.PROC")
```
Satisfies the [](#bluesky.protocols.Triggerable) protocol so
bluesky plans can call `.trigger()` on it directly. It is the replacement
for the deprecated [](#SignalX).
"""
[docs]
@AsyncStatus.wrap
async def trigger(self, timeout: CalculatableTimeout = CALCULATE_TIMEOUT) -> None:
"""Trigger the action and return a status saying when it's done."""
if timeout == CALCULATE_TIMEOUT:
timeout = self._timeout
source = self._connector.backend.source(self.name)
self.log.debug(f"Putting default value to backend at source {source}")
await _wait_for(self._connector.backend.execute(), timeout, source)
self.log.debug(f"Successfully put default value to backend at source {source}")
[docs]
class SoftCommandBackend(CommandBackend[P, T]):
"""A backend for a Command that uses a Python callback.
Concurrent calls to `execute()` are serialised by an internal lock: a second
caller blocks until the first finishes. This is intentional — hardware
commands should not run concurrently, and it prevents re-entrant callback
invocations.
"""
signature: inspect.Signature
def __init__(
self,
command_cb: Callable[P, T] | Callable[P, Awaitable[T]],
sig: inspect.Signature,
):
self.command_cb = command_cb
self._lock = asyncio.Lock()
params = list(sig.parameters.values())
for p in params:
if p.kind in (
inspect.Parameter.VAR_POSITIONAL,
inspect.Parameter.VAR_KEYWORD,
):
raise TypeError(
f"{command_cb.__name__}() must not use *args/**kwargs; "
f"got parameter {p.name!r}"
)
missing = [p.name for p in params if p.annotation is inspect.Parameter.empty]
if missing:
raise TypeError(
f"{command_cb.__name__}() missing type annotations for parameter(s) "
f"{missing}. All parameters must be annotated."
)
if sig.return_annotation is inspect.Parameter.empty:
raise TypeError(
f"{command_cb.__name__}() missing a return type annotation. "
"The return type must be annotated."
)
self._expected_param_types: dict[str, object] = {
p.name: p.annotation for p in params
}
self._converters: dict[str, SoftConverter] = {}
for name, expected_type in self._expected_param_types.items():
try:
self._converters[name] = make_converter(expected_type)
except TypeError as exc:
raise TypeError(
f"Cannot create converter for parameter '{name}' of type"
f" {expected_type}: {exc}"
) from exc
super().__init__(signature=sig)
[docs]
def source(self, name: str) -> str:
"""Return the source of the command."""
return f"softcmd://{name}"
[docs]
async def connect(self, timeout: float) -> None:
"""No-op for SoftCommandBackend."""
pass
[docs]
async def execute(self, *args: P.args, **kwargs: P.kwargs) -> T:
"""Execute the configured callback and return its result."""
try:
bound = self.signature.bind(*args, **kwargs)
except TypeError as exc:
raise TypeError(str(exc)) from exc
bound.apply_defaults()
for name, value in bound.arguments.items():
try:
bound.arguments[name] = self._converters[name].write_value(value)
except (TypeError, ValueError) as exc:
expected_type = self._expected_param_types[name]
raise TypeError(
f"Argument '{name}' with value {value!r} is not compatible"
f" with expected type {expected_type}: {exc}"
) from exc
async with self._lock:
result = self.command_cb(*bound.args, **bound.kwargs)
if inspect.isawaitable(result):
return await result
else:
return result
[docs]
class MockCommandBackend(CommandBackend[P, T]):
"""A backend for a Command that uses a mock for testing."""
def __init__(self, initial_backend: CommandBackend[P, T], mock: LazyMock):
self._initial_backend = initial_backend
self._mock = mock
self._mock_execute_callback: MockExecuteCallback[P, T] | None = None
sig = initial_backend.signature or NO_ARG_VOID_SIGNATURE
# Build a SoftCommandBackend from the signature and a closure that
# forwards converted calls to execute_mock — same pattern as
# MockSignalBackend wrapping a SoftSignalBackend for type conversion.
# The lambda defers access to execute_mock to keep it lazy-initialised.
self._soft_backend: SoftCommandBackend[P, T] = SoftCommandBackend(
command_cb=lambda *args, **kwargs: self.execute_mock(*args, **kwargs),
sig=sig,
)
self._return_converter: SoftConverter | None = (
make_converter(sig.return_annotation)
if sig.return_annotation not in (None, inspect.Parameter.empty)
else None
)
super().__init__(signature=sig)
[docs]
def source(self, name: str) -> str:
return f"mock+{self._initial_backend.source(name)}"
[docs]
def set_mock_execute_callback(self, callback: MockExecuteCallback[P, T] | None):
"""Set a callback that will be called when the command is executed.
Pass `None` to restore the default side effect (the original callable for
`SoftCommandBackend`, or a manufactured default for hardware backends).
"""
self._mock_execute_callback = callback
if "execute_mock" in self.__dict__:
self.execute_mock.side_effect = self._make_side_effect()
def _make_side_effect(self) -> Callable[P, T] | Callable[P, Awaitable[T]]:
if self._mock_execute_callback is not None:
return self._mock_execute_callback
elif isinstance(self._initial_backend, SoftCommandBackend):
# Args arrive already converted by _soft_backend, so call _command_cb
# directly rather than going through execute() again (which would
# convert a second time and re-acquire the lock).
return self._initial_backend.command_cb
elif self._return_converter is None:
return lambda *args, **kwargs: cast(T, None)
else:
rc = self._return_converter
return lambda *args, **kwargs: rc.write_value(None)
@cached_property
def execute_mock(self) -> AsyncMock:
"""Return the mock that will track calls to the command execution."""
execute_mock = AsyncMock(name="execute", side_effect=self._make_side_effect())
self._mock().attach_mock(execute_mock, "execute")
return execute_mock
[docs]
async def execute(self, *args: P.args, **kwargs: P.kwargs) -> T:
"""Execute the mock command converting arguments as SoftCommandBackend would."""
return await self._soft_backend.execute(*args, **kwargs)
[docs]
async def connect(self, timeout: float) -> None:
"""Mock backend does not support real connection."""
raise NotConnectedError("It is not possible to connect a MockCommandBackend")
[docs]
def soft_command(
command_cb: Callable[P, T] | Callable[P, Awaitable[T]],
name: str = "",
timeout: float | None = DEFAULT_TIMEOUT,
) -> Command[P, T]:
"""Create a [](#Command) backed by a Python callable.
:param command_cb: Callable to wrap — may be sync or async.
:param name: Optional name for the command node.
:param timeout: Timeout in seconds, or `None` for no timeout.
Unlike hardware-backed commands, [](#soft_command) accepts positional,
keyword, and mixed arguments:
```python
async def move(x: float, speed: float = 1.0) -> bool: ...
cmd = soft_command(move, name="move")
await cmd.execute(0.5) # positional
await cmd.execute(x=0.5) # keyword
await cmd.execute(0.5, speed=2.0) # mixed
```
"""
# eval_str=True resolves forward-reference string annotations created by
# ``from __future__ import annotations`` in the caller's module.
sig = inspect.signature(command_cb, eval_str=True)
backend = SoftCommandBackend(command_cb, sig)
return Command(backend, timeout, name)