Source code for ophyd_async.core._flyer

from abc import ABC, abstractmethod
from typing import Any, Generic

from bluesky.protocols import Flyable, Preparable, Stageable
from pydantic import Field

from ._device import Device
from ._status import AsyncStatus
from ._utils import CALCULATE_TIMEOUT, CalculatableTimeout, ConfinedModel, T


[docs] class FlyerController(ABC, Generic[T]): """Base class for controlling 'flyable' devices. [`bluesky.protocols.Flyable`](#bluesky.protocols.Flyable). """
[docs] @abstractmethod async def prepare(self, value: T) -> Any: """Move to the start of the flyscan."""
[docs] @abstractmethod async def kickoff(self): """Start the flyscan."""
[docs] @abstractmethod async def complete(self): """Block until the flyscan is done."""
[docs] @abstractmethod async def stop(self): """Stop flying and wait everything to be stopped."""
[docs] class FlyMotorInfo(ConfinedModel): """Minimal set of information required to fly a motor.""" start_position: float = Field(frozen=True) """Absolute position of the motor once it finishes accelerating to desired velocity, in motor EGUs""" end_position: float = Field(frozen=True) """Absolute position of the motor once it begins decelerating from desired velocity, in EGUs""" time_for_move: float = Field(frozen=True, gt=0) """Time taken for the motor to get from start_position to end_position, excluding run-up and run-down, in seconds.""" timeout: CalculatableTimeout = Field(frozen=True, default=CALCULATE_TIMEOUT) """Maximum time for the complete motor move, including run up and run down. Defaults to `time_for_move` + run up and run down times + 10s.""" @property def velocity(self) -> float: """Calculate the velocity of the constant velocity phase.""" return (self.end_position - self.start_position) / self.time_for_move
[docs] def ramp_up_start_pos(self, acceleration_time: float) -> float: """Calculate the start position with run-up distance added on.""" return self.start_position - acceleration_time * self.velocity / 2
[docs] def ramp_down_end_pos(self, acceleration_time: float) -> float: """Calculate the end position with run-down distance added on.""" return self.end_position + acceleration_time * self.velocity / 2
[docs] class StandardFlyer( Device, Stageable, Preparable, Flyable, Generic[T], ): """Base class for 'flyable' devices. [`bluesky.protocols.Flyable`](#bluesky.protocols.Flyable). """ def __init__( self, trigger_logic: FlyerController[T], name: str = "", ): self._trigger_logic = trigger_logic super().__init__(name=name) @property def trigger_logic(self) -> FlyerController[T]: return self._trigger_logic
[docs] @AsyncStatus.wrap async def stage(self) -> None: await self.unstage()
[docs] @AsyncStatus.wrap async def unstage(self) -> None: await self._trigger_logic.stop()
[docs] def prepare(self, value: T) -> AsyncStatus: return AsyncStatus(self._prepare(value))
async def _prepare(self, value: T) -> None: # Move to start and setup the flyscan await self._trigger_logic.prepare(value)
[docs] @AsyncStatus.wrap async def kickoff(self) -> None: await self._trigger_logic.kickoff()
[docs] @AsyncStatus.wrap async def complete(self) -> None: await self._trigger_logic.complete()