Source code for ophyd_async.epics.motor

import asyncio

from bluesky.protocols import (
    Flyable,
    Locatable,
    Location,
    Preparable,
    Stoppable,
)
from pydantic import BaseModel, Field

from ophyd_async.core import (
    CALCULATE_TIMEOUT,
    DEFAULT_TIMEOUT,
    AsyncStatus,
    CalculatableTimeout,
    StandardReadable,
    WatchableAsyncStatus,
    WatcherUpdate,
    observe_value,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import epics_signal_r, epics_signal_rw, epics_signal_w


class MotorLimitsException(Exception):
    pass


class InvalidFlyMotorException(Exception):
    pass


DEFAULT_MOTOR_FLY_TIMEOUT = 60
DEFAULT_WATCHER_UPDATE_FREQUENCY = 0.2


[docs] class FlyMotorInfo(BaseModel): """Minimal set of information required to fly a motor:""" #: Absolute position of the motor once it finishes accelerating to desired #: velocity, in motor EGUs start_position: float = Field(frozen=True) #: Absolute position of the motor once it begins decelerating from desired #: velocity, in EGUs end_position: float = Field(frozen=True) #: Time taken for the motor to get from start_position to end_position, excluding #: run-up and run-down, in seconds. time_for_move: float = Field(frozen=True, gt=0) #: Maximum time for the complete motor move, including run up and run down. #: Defaults to `time_for_move` + run up and run down times + 10s. timeout: CalculatableTimeout = Field(frozen=True, default=CALCULATE_TIMEOUT)
[docs] class Motor(StandardReadable, Locatable, Stoppable, Flyable, Preparable): """Device that moves a motor record""" def __init__(self, prefix: str, name="") -> None: # Define some signals with self.add_children_as_readables(Format.CONFIG_SIGNAL): self.motor_egu = epics_signal_r(str, prefix + ".EGU") self.velocity = epics_signal_rw(float, prefix + ".VELO") with self.add_children_as_readables(Format.HINTED_SIGNAL): self.user_readback = epics_signal_r(float, prefix + ".RBV") self.user_setpoint = epics_signal_rw(float, prefix + ".VAL") self.max_velocity = epics_signal_r(float, prefix + ".VMAX") self.acceleration_time = epics_signal_rw(float, prefix + ".ACCL") self.precision = epics_signal_r(int, prefix + ".PREC") self.deadband = epics_signal_r(float, prefix + ".RDBD") self.motor_done_move = epics_signal_r(int, prefix + ".DMOV") self.low_limit_travel = epics_signal_rw(float, prefix + ".LLM") self.high_limit_travel = epics_signal_rw(float, prefix + ".HLM") # Note:cannot use epics_signal_x here, as the motor record specifies that # we must write 1 to stop the motor. Simply processing the record is not # sufficient. self.motor_stop = epics_signal_w(int, prefix + ".STOP") # Whether set() should complete successfully or not self._set_success = True # end_position of a fly move, with run_up_distance added on. self._fly_completed_position: float | None = None # Set on kickoff(), complete when motor reaches self._fly_completed_position self._fly_status: WatchableAsyncStatus | None = None # Set during prepare self._fly_timeout: CalculatableTimeout | None = CALCULATE_TIMEOUT super().__init__(name=name) def set_name(self, name: str, *, child_name_separator: str | None = None) -> None: super().set_name(name, child_name_separator=child_name_separator) # Readback should be named the same as its parent in read() self.user_readback.set_name(name)
[docs] @AsyncStatus.wrap async def prepare(self, value: FlyMotorInfo): """Calculate required velocity and run-up distance, then if motor limits aren't breached, move to start position minus run-up distance""" self._fly_timeout = value.timeout # Velocity, at which motor travels from start_position to end_position, in motor # egu/s. fly_velocity = await self._prepare_velocity( value.start_position, value.end_position, value.time_for_move, ) # start_position with run_up_distance added on. fly_prepared_position = await self._prepare_motor_path( abs(fly_velocity), value.start_position, value.end_position ) await self.set(fly_prepared_position) await self.velocity.set(abs(fly_velocity))
[docs] @AsyncStatus.wrap async def kickoff(self): """Begin moving motor from prepared position to final position.""" assert ( self._fly_completed_position ), "Motor must be prepared before attempting to kickoff" self._fly_status = self.set( self._fly_completed_position, timeout=self._fly_timeout )
[docs] def complete(self) -> WatchableAsyncStatus: """Mark as complete once motor reaches completed position.""" assert self._fly_status, "kickoff not called" return self._fly_status
@WatchableAsyncStatus.wrap async def set(self, value: float, timeout: CalculatableTimeout = CALCULATE_TIMEOUT): new_position = value self._set_success = True ( old_position, units, precision, velocity, acceleration_time, ) = await asyncio.gather( self.user_setpoint.get_value(), self.motor_egu.get_value(), self.precision.get_value(), self.velocity.get_value(), self.acceleration_time.get_value(), ) if timeout is CALCULATE_TIMEOUT: assert velocity > 0, "Motor has zero velocity" timeout = ( abs(new_position - old_position) / velocity + 2 * acceleration_time + DEFAULT_TIMEOUT ) move_status = self.user_setpoint.set(new_position, wait=True, timeout=timeout) async for current_position in observe_value( self.user_readback, done_status=move_status ): yield WatcherUpdate( current=current_position, initial=old_position, target=new_position, name=self.name, unit=units, precision=precision, ) if not self._set_success: raise RuntimeError("Motor was stopped") async def stop(self, success=False): self._set_success = success # Put with completion will never complete as we are waiting for completion on # the move above, so need to pass wait=False await self.motor_stop.set(1, wait=False) async def _prepare_velocity( self, start_position: float, end_position: float, time_for_move: float ) -> float: fly_velocity = (end_position - start_position) / time_for_move max_speed, egu = await asyncio.gather( self.max_velocity.get_value(), self.motor_egu.get_value() ) if abs(fly_velocity) > max_speed: raise MotorLimitsException( f"Motor speed of {abs(fly_velocity)} {egu}/s was requested for a motor " f" with max speed of {max_speed} {egu}/s" ) # move to prepare position at maximum velocity await self.velocity.set(abs(max_speed)) return fly_velocity async def locate(self) -> Location[float]: location: Location = { "setpoint": await self.user_setpoint.get_value(), "readback": await self.user_readback.get_value(), } return location async def _prepare_motor_path( self, fly_velocity: float, start_position: float, end_position: float ) -> float: # Distance required for motor to accelerate from stationary to fly_velocity, and # distance required for motor to decelerate from fly_velocity to stationary run_up_distance = ( (await self.acceleration_time.get_value()) * fly_velocity * 0.5 ) self._fly_completed_position = end_position + run_up_distance # Prepared position not used after prepare, so no need to store in self fly_prepared_position = start_position - run_up_distance motor_lower_limit, motor_upper_limit, egu = await asyncio.gather( self.low_limit_travel.get_value(), self.high_limit_travel.get_value(), self.motor_egu.get_value(), ) if ( not motor_upper_limit >= fly_prepared_position >= motor_lower_limit or not motor_upper_limit >= self._fly_completed_position >= motor_lower_limit ): raise MotorLimitsException( f"Motor trajectory for requested fly is from " f"{fly_prepared_position}{egu} to " f"{self._fly_completed_position}{egu} but motor limits are " f"{motor_lower_limit}{egu} <= x <= {motor_upper_limit}{egu} " ) return fly_prepared_position