from __future__ import annotations
from abc import abstractmethod
from typing import (
TYPE_CHECKING,
Any,
Generic,
Protocol,
TypeVar,
runtime_checkable,
)
from bluesky.protocols import HasName, Location, Reading, T_co
from event_model import DataKey
from ._utils import T
if TYPE_CHECKING:
from ._status import AsyncStatus
[docs]
@runtime_checkable
class AsyncReadable(HasName, Protocol):
"""Async implementations of the sync [](#bluesky.protocols.Readable)."""
[docs]
@abstractmethod
async def read(self) -> dict[str, Reading]:
"""Return value, timestamp, optional per-point metadata for each field name.
For example:
{
"channel1": {"value": 5, "timestamp": 1472493713.271991},
"channel2": {"value": 16, "timestamp": 1472493713.539238},
}
"""
[docs]
@abstractmethod
async def describe(self) -> dict[str, DataKey]:
"""Return per-scan metadata for each field name in `read()`.
For example:
{
"channel1": {"source": "SOME_PV1", "dtype": "number", "shape": []},
"channel2": {"source": "SOME_PV2", "dtype": "number", "shape": []},
}
"""
[docs]
@runtime_checkable
class AsyncConfigurable(HasName, Protocol):
"""Async implementation of the sync [](#bluesky.protocols.Configurable)."""
[docs]
@abstractmethod
async def read_configuration(self) -> dict[str, Reading]:
"""Return value, timestamp, optional per-point metadata for each field name.
Same API as [](#AsyncReadable.read) but for slow-changing fields related to
configuration. e.g., exposure time. These will typically be read only
once per run.
"""
[docs]
@abstractmethod
async def describe_configuration(self) -> dict[str, DataKey]:
"""Return per-scan metadata for each field name in `read_configuration()`."""
@runtime_checkable
class AsyncPausable(Protocol):
"""Async implementation of the sync [](#bluesky.protocols.Pausable)."""
@abstractmethod
async def pause(self) -> None:
"""Perform device-specific work when the RunEngine pauses."""
@abstractmethod
async def resume(self) -> None:
"""Perform device-specific work when the RunEngine resumes after a pause."""
[docs]
@runtime_checkable
class AsyncStageable(Protocol):
"""Async implementation of the sync [](#bluesky.protocols.Stageable)."""
[docs]
@abstractmethod
def stage(self) -> AsyncStatus:
"""Set up the device for acquisition.
:return: An `AsyncStatus` that is marked done when the device is done staging.
"""
[docs]
@abstractmethod
def unstage(self) -> AsyncStatus:
"""Clean up the device after acquisition.
:return: An `AsyncStatus` that is marked done when the device is done unstaging.
"""
@runtime_checkable
class AsyncMovable(Protocol[T_co]):
@abstractmethod
def set(self, value: T_co) -> AsyncStatus:
"""Return a ``Status`` that is marked done when the device is done moving."""
@runtime_checkable
class AsyncLocatable(AsyncMovable[T], Protocol):
@abstractmethod
async def locate(self) -> Location[T]:
"""Return the current location of a Device.
While a ``Readable`` reports many values, a ``Movable`` will have the
concept of location. This is where the Device currently is, and where it
was last requested to move to. This protocol formalizes how to get the
location from a ``Movable``.
"""
C = TypeVar("C", contravariant=True)
[docs]
class Watcher(Protocol, Generic[C]):
"""Protocol for watching changes in values."""
def __call__(
self,
current: C | None = None,
initial: C | None = None,
target: C | None = None,
name: str | None = None,
unit: str | None = None,
precision: int | None = None,
fraction: float | None = None,
time_elapsed: float | None = None,
time_remaining: float | None = None,
) -> Any: ...