from __future__ import annotations
import asyncio
import sys
from collections.abc import Awaitable, Callable, Iterator, Mapping, MutableMapping
from functools import cached_property
from logging import LoggerAdapter, getLogger
from typing import Any, TypeVar
from bluesky.protocols import HasName
from bluesky.run_engine import call_in_bluesky_event_loop, in_bluesky_event_loop
from ._utils import DEFAULT_TIMEOUT, LazyMock, NotConnected, wait_for_connection
[docs]
class DeviceConnector:
"""Defines how a `Device` should be connected and type hints processed."""
[docs]
def create_children_from_annotations(self, device: Device):
"""Used when children can be created from introspecting the hardware.
Some control systems allow introspection of a device to determine what
children it has. To allow this to work nicely with typing we add these
hints to the Device like so::
my_signal: SignalRW[int]
my_device: MyDevice
This method will be run during ``Device.__init__``, and is responsible
for turning all of those type hints into real Signal and Device instances.
Subsequent runs of this function should do nothing, to allow it to be
called early in Devices that need to pass references to their children
during ``__init__``.
"""
async def connect_mock(self, device: Device, mock: LazyMock):
# Connect serially, no errors to gather up as in mock mode
exceptions: dict[str, Exception] = {}
for name, child_device in device.children():
try:
await child_device.connect(mock=mock.child(name))
except Exception as e:
exceptions[name] = e
if exceptions:
raise NotConnected.with_other_exceptions_logged(exceptions)
[docs]
async def connect_real(self, device: Device, timeout: float, force_reconnect: bool):
"""Used during ``Device.connect``.
This is called when a previous connect has not been done, or has been
done in a different mock more. It should connect the Device and all its
children.
"""
# Connect in parallel, gathering up NotConnected errors
coros = {
name: child_device.connect(timeout=timeout, force_reconnect=force_reconnect)
for name, child_device in device.children()
}
await wait_for_connection(**coros)
[docs]
class Device(HasName):
"""Common base class for all Ophyd Async Devices."""
_name: str = ""
#: The parent Device if it exists
parent: Device | None = None
# None if connect hasn't started, a Task if it has
_connect_task: asyncio.Task | None = None
# The mock if we have connected in mock mode
_mock: LazyMock | None = None
# The separator to use when making child names
_child_name_separator: str = "-"
def __init__(
self, name: str = "", connector: DeviceConnector | None = None
) -> None:
self._connector = connector or DeviceConnector()
self._connector.create_children_from_annotations(self)
if name:
self.set_name(name)
@property
def name(self) -> str:
"""Return the name of the Device"""
return self._name
@cached_property
def _child_devices(self) -> dict[str, Device]:
return {}
def children(self) -> Iterator[tuple[str, Device]]:
yield from self._child_devices.items()
@cached_property
def log(self) -> LoggerAdapter:
return LoggerAdapter(
getLogger("ophyd_async.devices"), {"ophyd_async_device_name": self.name}
)
[docs]
def set_name(self, name: str, *, child_name_separator: str | None = None) -> None:
"""Set ``self.name=name`` and each ``self.child.name=name+"-child"``.
Parameters
----------
name:
New name to set
child_name_separator:
Use this as a separator instead of "-". Use "_" instead to make the same
names as the equivalent ophyd sync device.
"""
self._name = name
if child_name_separator:
self._child_name_separator = child_name_separator
# Ensure logger is recreated after a name change
if "log" in self.__dict__:
del self.log
for attr_name, child in self.children():
child_name = (
f"{self.name}{self._child_name_separator}{attr_name}"
if self.name
else ""
)
child.set_name(child_name, child_name_separator=self._child_name_separator)
def __setattr__(self, name: str, value: Any) -> None:
# Bear in mind that this function is called *a lot*, so
# we need to make sure nothing expensive happens in it...
if name == "parent":
if self.parent not in (value, None):
raise TypeError(
f"Cannot set the parent of {self} to be {value}: "
f"it is already a child of {self.parent}"
)
# ...hence not doing an isinstance check for attributes we
# know not to be Devices
elif name not in _not_device_attrs and isinstance(value, Device):
value.parent = self
self._child_devices[name] = value
# ...and avoiding the super call as we know it resolves to `object`
return object.__setattr__(self, name, value)
[docs]
async def connect(
self,
mock: bool | LazyMock = False,
timeout: float = DEFAULT_TIMEOUT,
force_reconnect: bool = False,
) -> None:
"""Connect self and all child Devices.
Contains a timeout that gets propagated to child.connect methods.
Parameters
----------
mock:
If True then use ``MockSignalBackend`` for all Signals
timeout:
Time to wait before failing with a TimeoutError.
"""
assert hasattr(self, "_connector"), (
f"{self}: doesn't have attribute `_connector`,"
" did you call `super().__init__` in your `__init__` method?"
)
if mock:
# Always connect in mock mode serially
if isinstance(mock, LazyMock):
# Use the provided mock
self._mock = mock
elif not self._mock:
# Make one
self._mock = LazyMock()
await self._connector.connect_mock(self, self._mock)
else:
# Try to cache the connect in real mode
can_use_previous_connect = (
self._mock is None
and self._connect_task
and not (self._connect_task.done() and self._connect_task.exception())
)
if force_reconnect or not can_use_previous_connect:
self._mock = None
coro = self._connector.connect_real(self, timeout, force_reconnect)
self._connect_task = asyncio.create_task(coro)
assert self._connect_task, "Connect task not created, this shouldn't happen"
# Wait for it to complete
await self._connect_task
_not_device_attrs = {
"_name",
"_children",
"_connector",
"_timeout",
"_mock",
"_connect_task",
}
DeviceT = TypeVar("DeviceT", bound=Device)
[docs]
class DeviceVector(MutableMapping[int, DeviceT], Device):
"""
Defines device components with indices.
In the below example, foos becomes a dictionary on the parent device
at runtime, so parent.foos[2] returns a FooDevice. For example usage see
:class:`~ophyd_async.epics.sim.DynamicSensorGroup`
"""
def __init__(
self,
children: Mapping[int, DeviceT],
name: str = "",
) -> None:
self._children: dict[int, DeviceT] = {}
self.update(children)
super().__init__(name=name)
def __setattr__(self, name: str, child: Any) -> None:
if name != "parent" and isinstance(child, Device):
raise AttributeError(
"DeviceVector can only have integer named children, "
"set via device_vector[i] = child"
)
super().__setattr__(name, child)
def __getitem__(self, key: int) -> DeviceT:
return self._children[key]
def __setitem__(self, key: int, value: DeviceT) -> None:
# Check the types on entry to dict to make sure we can't accidentally
# make a non-integer named child
assert isinstance(key, int), f"Expected int, got {key}"
assert isinstance(value, Device), f"Expected Device, got {value}"
self._children[key] = value
value.parent = self
def __delitem__(self, key: int) -> None:
del self._children[key]
def __iter__(self) -> Iterator[int]:
yield from self._children
def __len__(self) -> int:
return len(self._children)
def children(self) -> Iterator[tuple[str, Device]]:
for key, child in self._children.items():
yield str(key), child
def __hash__(self): # to allow DeviceVector to be used as dict keys and in sets
return hash(id(self))
class DeviceProcessor:
"""Sync/Async Context Manager that finds all the Devices declared within it.
Used in `init_devices`
"""
def __init__(self, process_devices: Callable[[dict[str, Device]], Awaitable[None]]):
self._process_devices = process_devices
self._locals_on_enter: dict[str, Any] = {}
self._locals_on_exit: dict[str, Any] = {}
def _caller_locals(self) -> dict[str, Any]:
"""Walk up until we find a stack frame that doesn't have us as self"""
try:
raise ValueError
except ValueError:
_, _, tb = sys.exc_info()
assert tb, "Can't get traceback, this shouldn't happen"
caller_frame = tb.tb_frame
while caller_frame.f_locals.get("self", None) is self:
caller_frame = caller_frame.f_back
assert (
caller_frame
), "No previous frame to the one with self in it, this shouldn't happen"
return caller_frame.f_locals.copy()
def __enter__(self) -> DeviceProcessor:
# Stash the names that were defined before we were called
self._locals_on_enter = self._caller_locals()
return self
async def __aenter__(self) -> DeviceProcessor:
return self.__enter__()
async def __aexit__(self, type, value, traceback):
self._locals_on_exit = self._caller_locals()
await self._on_exit()
def __exit__(self, type_, value, traceback):
if in_bluesky_event_loop():
raise RuntimeError(
"Cannot use DeviceConnector inside a plan, instead use "
"`yield from ophyd_async.plan_stubs.ensure_connected(device)`"
)
self._locals_on_exit = self._caller_locals()
try:
fut = call_in_bluesky_event_loop(self._on_exit())
except RuntimeError as e:
raise NotConnected(
"Could not connect devices. Is the bluesky event loop running? See "
"https://blueskyproject.io/ophyd-async/main/"
"user/explanations/event-loop-choice.html for more info."
) from e
return fut
async def _on_exit(self) -> None:
# Find all the devices
devices = {
name: obj
for name, obj in self._locals_on_exit.items()
if isinstance(obj, Device) and self._locals_on_enter.get(name) is not obj
}
# Call the provided process function on them
await self._process_devices(devices)
[docs]
def init_devices(
set_name=True,
child_name_separator: str = "-",
connect=True,
mock=False,
timeout: float = 10.0,
) -> DeviceProcessor:
"""Auto initialise top level Device instances to be used as a context manager
Parameters
----------
set_name:
If True, call ``device.set_name(variable_name)`` on all Devices
created within the context manager that have an empty ``name``
child_name_separator:
Use this as a separator if we call ``set_name``.
connect:
If True, call ``device.connect(mock, timeout)`` in parallel on all
Devices created within the context manager
mock:
If True, connect Signals in mock mode
timeout:
How long to wait for connect before logging an exception
Notes
-----
Example usage::
[async] with init_devices():
t1x = motor.Motor("BLxxI-MO-TABLE-01:X")
t1y = motor.Motor("pva://BLxxI-MO-TABLE-01:Y")
# Names and connects devices here
assert t1x.name == "t1x"
"""
async def process_devices(devices: dict[str, Device]):
if set_name:
for name, device in devices.items():
if not device.name:
device.set_name(name, child_name_separator=child_name_separator)
if connect:
coros = {
name: device.connect(mock, timeout) for name, device in devices.items()
}
await wait_for_connection(**coros)
return DeviceProcessor(process_devices)