Source code for ophyd_async.core._protocol
from __future__ import annotations
from abc import abstractmethod
from typing import (
TYPE_CHECKING,
Any,
Generic,
Protocol,
TypeVar,
runtime_checkable,
)
from bluesky.protocols import HasName, Reading
from event_model import DataKey
if TYPE_CHECKING:
from ._status import AsyncStatus
[docs]
@runtime_checkable
class AsyncReadable(HasName, Protocol):
"""Async implementations of the sync [](#bluesky.protocols.Readable)."""
[docs]
@abstractmethod
async def read(self) -> dict[str, Reading]:
"""Return value, timestamp, optional per-point metadata for each field name.
For example:
{
"channel1": {"value": 5, "timestamp": 1472493713.271991},
"channel2": {"value": 16, "timestamp": 1472493713.539238},
}
"""
[docs]
@abstractmethod
async def describe(self) -> dict[str, DataKey]:
"""Return per-scan metadata for each field name in `read()`.
For example:
{
"channel1": {"source": "SOME_PV1", "dtype": "number", "shape": []},
"channel2": {"source": "SOME_PV2", "dtype": "number", "shape": []},
}
"""
[docs]
@runtime_checkable
class AsyncConfigurable(HasName, Protocol):
"""Async implementation of the sync [](#bluesky.protocols.Configurable)."""
[docs]
@abstractmethod
async def read_configuration(self) -> dict[str, Reading]:
"""Return value, timestamp, optional per-point metadata for each field name.
Same API as [](#AsyncReadable.read) but for slow-changing fields related to
configuration. e.g., exposure time. These will typically be read only
once per run.
"""
[docs]
@abstractmethod
async def describe_configuration(self) -> dict[str, DataKey]:
"""Return per-scan metadata for each field name in `read_configuration()`."""
@runtime_checkable
class AsyncPausable(Protocol):
"""Async implementation of the sync [](#bluesky.protocols.Pausable)."""
@abstractmethod
async def pause(self) -> None:
"""Perform device-specific work when the RunEngine pauses."""
@abstractmethod
async def resume(self) -> None:
"""Perform device-specific work when the RunEngine resumes after a pause."""
[docs]
@runtime_checkable
class AsyncStageable(Protocol):
"""Async implementation of the sync [](#bluesky.protocols.Stageable)."""
[docs]
@abstractmethod
def stage(self) -> AsyncStatus:
"""Set up the device for acquisition.
:return: An `AsyncStatus` that is marked done when the device is done staging.
"""
[docs]
@abstractmethod
def unstage(self) -> AsyncStatus:
"""Clean up the device after acquisition.
:return: An `AsyncStatus` that is marked done when the device is done unstaging.
"""
C = TypeVar("C", contravariant=True)
[docs]
class Watcher(Protocol, Generic[C]):
"""Protocol for watching changes in values."""
def __call__(
self,
current: C | None = None,
initial: C | None = None,
target: C | None = None,
name: str | None = None,
unit: str | None = None,
precision: int | None = None,
fraction: float | None = None,
time_elapsed: float | None = None,
time_remaining: float | None = None,
) -> Any: ...