Source code for ophyd_async.core.utils
import asyncio
from typing import Awaitable, Callable, Dict, Iterable, List, Optional, Type, TypeVar
import numpy as np
from bluesky.protocols import Reading
T = TypeVar("T")
Callback = Callable[[T], None]
#: A function that will be called with the Reading and value when the
#: monitor updates
ReadingValueCallback = Callable[[Reading, T], None]
DEFAULT_TIMEOUT = 10.0
[docs]
class NotConnected(Exception):
"""Exception to be raised if a `Device.connect` is cancelled"""
def __init__(self, *lines: str):
self.lines = list(lines)
def __str__(self) -> str:
return "\n".join(self.lines)
[docs]
async def wait_for_connection(**coros: Awaitable[None]):
"""Call many underlying signals, accumulating `NotConnected` exceptions
Raises
------
`NotConnected` if cancelled
"""
ts = {k: asyncio.create_task(c) for (k, c) in coros.items()} # type: ignore
try:
done, pending = await asyncio.wait(ts.values())
except asyncio.CancelledError:
for t in ts.values():
t.cancel()
lines: List[str] = []
for k, t in ts.items():
try:
await t
except NotConnected as e:
if len(e.lines) == 1:
lines.append(f"{k}: {e.lines[0]}")
else:
lines.append(f"{k}:")
lines += [f" {line}" for line in e.lines]
raise NotConnected(*lines)
else:
# Wait for everything to foreground the exceptions
for f in list(done) + list(pending):
await f
[docs]
def get_dtype(typ: Type) -> Optional[np.dtype]:
"""Get the runtime dtype from a numpy ndarray type annotation
>>> import numpy.typing as npt
>>> import numpy as np
>>> get_dtype(npt.NDArray[np.int8])
dtype('int8')
"""
if getattr(typ, "__origin__", None) == np.ndarray:
# datatype = numpy.ndarray[typing.Any, numpy.dtype[numpy.float64]]
# so extract numpy.float64 from it
return np.dtype(typ.__args__[1].__args__[0]) # type: ignore
return None
[docs]
def get_unique(values: Dict[str, T], types: str) -> T:
"""If all values are the same, return that value, otherwise return TypeError
>>> get_unique({"a": 1, "b": 1}, "integers")
1
>>> get_unique({"a": 1, "b": 2}, "integers")
Traceback (most recent call last):
...
TypeError: Differing integers: a has 1, b has 2
"""
set_values = set(values.values())
if len(set_values) != 1:
diffs = ", ".join(f"{k} has {v}" for k, v in values.items())
raise TypeError(f"Differing {types}: {diffs}")
return set_values.pop()
[docs]
async def merge_gathered_dicts(
coros: Iterable[Awaitable[Dict[str, T]]]
) -> Dict[str, T]:
"""Merge dictionaries produced by a sequence of coroutines.
Can be used for merging ``read()`` or ``describe``. For instance::
combined_read = await merge_gathered_dicts(s.read() for s in signals)
"""
ret: Dict[str, T] = {}
for result in await asyncio.gather(*coros):
ret.update(result)
return ret