Source code for ophyd_async.core.flyer
from abc import ABC, abstractmethod
from typing import Dict, Generic, Optional, Sequence, TypeVar
from bluesky.protocols import Descriptor, Flyable, Preparable, Reading, Stageable
from .async_status import AsyncStatus
from .detector import TriggerInfo
from .device import Device
from .signal import SignalR
from .utils import merge_gathered_dicts
T = TypeVar("T")
[docs]
class TriggerLogic(ABC, Generic[T]):
[docs]
@abstractmethod
def trigger_info(self, value: T) -> TriggerInfo:
"""Return info about triggers that will be produced for a given value"""
[docs]
@abstractmethod
async def prepare(self, value: T):
"""Move to the start of the flyscan"""
[docs]
@abstractmethod
async def start(self):
"""Start the flyscan"""
[docs]
@abstractmethod
async def stop(self):
"""Stop flying and wait everything to be stopped"""
[docs]
class HardwareTriggeredFlyable(
Device,
Stageable,
Preparable,
Flyable,
Generic[T],
):
def __init__(
self,
trigger_logic: TriggerLogic[T],
configuration_signals: Sequence[SignalR],
name: str = "",
):
self._trigger_logic = trigger_logic
self._configuration_signals = tuple(configuration_signals)
self._describe: Dict[str, Descriptor] = {}
self._fly_status: Optional[AsyncStatus] = None
self._trigger_info: Optional[TriggerInfo] = None
super().__init__(name=name)
@property
def trigger_logic(self) -> TriggerLogic[T]:
return self._trigger_logic
@property
def trigger_info(self) -> Optional[TriggerInfo]:
return self._trigger_info
@AsyncStatus.wrap
async def stage(self) -> None:
await self.unstage()
@AsyncStatus.wrap
async def unstage(self) -> None:
await self._trigger_logic.stop()
[docs]
def prepare(self, value: T) -> AsyncStatus:
"""Setup trajectories"""
return AsyncStatus(self._prepare(value))
async def _prepare(self, value: T) -> None:
self._trigger_info = self._trigger_logic.trigger_info(value)
# Move to start and setup the flyscan
await self._trigger_logic.prepare(value)
@AsyncStatus.wrap
async def kickoff(self) -> None:
self._fly_status = AsyncStatus(self._trigger_logic.start())
def complete(self) -> AsyncStatus:
assert self._fly_status, "Kickoff not run"
return self._fly_status
async def describe_configuration(self) -> Dict[str, Descriptor]:
return await merge_gathered_dicts(
[sig.describe() for sig in self._configuration_signals]
)
async def read_configuration(self) -> Dict[str, Reading]:
return await merge_gathered_dicts(
[sig.read() for sig in self._configuration_signals]
)