Source code for ophyd_async.core.async_status

"""Equivalent of bluesky.protocols.Status for asynchronous tasks."""

import asyncio
import functools
from typing import Awaitable, Callable, Coroutine, List, Optional, cast

from bluesky.protocols import Status

from .utils import Callback, T


[docs] class AsyncStatus(Status): """Convert asyncio awaitable to bluesky Status interface""" def __init__( self, awaitable: Awaitable, watchers: Optional[List[Callable]] = None, ): if isinstance(awaitable, asyncio.Task): self.task = awaitable else: self.task = asyncio.create_task(awaitable) # type: ignore self.task.add_done_callback(self._run_callbacks) self._callbacks = cast(List[Callback[Status]], []) self._watchers = watchers def __await__(self): return self.task.__await__() def add_callback(self, callback: Callback[Status]): if self.done: callback(self) else: self._callbacks.append(callback) def _run_callbacks(self, task: asyncio.Task): if not task.cancelled(): for callback in self._callbacks: callback(self) # TODO: remove ignore and bump min version when bluesky v1.12.0 is released def exception( # type: ignore self, timeout: Optional[float] = 0.0 ) -> Optional[BaseException]: if timeout != 0.0: raise Exception( "cannot honour any timeout other than 0 in an asynchronous function" ) if self.task.done(): try: return self.task.exception() except asyncio.CancelledError as e: return e return None @property def done(self) -> bool: return self.task.done() @property def success(self) -> bool: return ( self.task.done() and not self.task.cancelled() and self.task.exception() is None )
[docs] def watch(self, watcher: Callable): """Add watcher to the list of interested parties. Arguments as per Bluesky :external+bluesky:meth:`watch` protocol. """ if self._watchers is not None: self._watchers.append(watcher)
@classmethod def wrap(cls, f: Callable[[T], Coroutine]) -> Callable[[T], "AsyncStatus"]: @functools.wraps(f) def wrap_f(self) -> AsyncStatus: return AsyncStatus(f(self)) return wrap_f def __repr__(self) -> str: if self.done: if e := self.exception(): status = f"errored: {repr(e)}" else: status = "done" else: status = "pending" return f"<{type(self).__name__}, task: {self.task.get_coro()}, {status}>" __str__ = __repr__