Source code for ophyd_async.core._protocol

from __future__ import annotations

from abc import abstractmethod
from typing import (
    TYPE_CHECKING,
    Any,
    Generic,
    Protocol,
    TypeVar,
    runtime_checkable,
)

from bluesky.protocols import HasName, Reading
from event_model import DataKey

if TYPE_CHECKING:
    from ._status import AsyncStatus


[docs] @runtime_checkable class AsyncReadable(HasName, Protocol): """Async implementations of the sync [](#bluesky.protocols.Readable)."""
[docs] @abstractmethod async def read(self) -> dict[str, Reading]: """Return value, timestamp, optional per-point metadata for each field name. For example: { "channel1": {"value": 5, "timestamp": 1472493713.271991}, "channel2": {"value": 16, "timestamp": 1472493713.539238}, } """
[docs] @abstractmethod async def describe(self) -> dict[str, DataKey]: """Return per-scan metadata for each field name in `read()`. For example: { "channel1": {"source": "SOME_PV1", "dtype": "number", "shape": []}, "channel2": {"source": "SOME_PV2", "dtype": "number", "shape": []}, } """
[docs] @runtime_checkable class AsyncConfigurable(HasName, Protocol): """Async implementation of the sync [](#bluesky.protocols.Configurable)."""
[docs] @abstractmethod async def read_configuration(self) -> dict[str, Reading]: """Return value, timestamp, optional per-point metadata for each field name. Same API as [](#AsyncReadable.read) but for slow-changing fields related to configuration. e.g., exposure time. These will typically be read only once per run. """
[docs] @abstractmethod async def describe_configuration(self) -> dict[str, DataKey]: """Return per-scan metadata for each field name in `read_configuration()`."""
@runtime_checkable class AsyncPausable(Protocol): """Async implementation of the sync [](#bluesky.protocols.Pausable).""" @abstractmethod async def pause(self) -> None: """Perform device-specific work when the RunEngine pauses.""" @abstractmethod async def resume(self) -> None: """Perform device-specific work when the RunEngine resumes after a pause."""
[docs] @runtime_checkable class AsyncStageable(Protocol): """Async implementation of the sync [](#bluesky.protocols.Stageable)."""
[docs] @abstractmethod def stage(self) -> AsyncStatus: """Set up the device for acquisition. :return: An `AsyncStatus` that is marked done when the device is done staging. """
[docs] @abstractmethod def unstage(self) -> AsyncStatus: """Clean up the device after acquisition. :return: An `AsyncStatus` that is marked done when the device is done unstaging. """
C = TypeVar("C", contravariant=True)
[docs] class Watcher(Protocol, Generic[C]): """Protocol for watching changes in values.""" def __call__( self, current: C | None = None, initial: C | None = None, target: C | None = None, name: str | None = None, unit: str | None = None, precision: int | None = None, fraction: float | None = None, time_elapsed: float | None = None, time_remaining: float | None = None, ) -> Any: ...