Source code for ophyd_async.epics.motion.motor

import asyncio

from bluesky.protocols import Movable, Stoppable

from ophyd_async.core import (
    ConfigSignal,
    HintedSignal,
    StandardReadable,
    WatchableAsyncStatus,
)
from ophyd_async.core.signal import observe_value
from ophyd_async.core.utils import (
    DEFAULT_TIMEOUT,
    CalculatableTimeout,
    CalculateTimeout,
    WatcherUpdate,
)

from ..signal.signal import epics_signal_r, epics_signal_rw, epics_signal_x


[docs] class Motor(StandardReadable, Movable, Stoppable): """Device that moves a motor record""" def __init__(self, prefix: str, name="") -> None: # Define some signals with self.add_children_as_readables(ConfigSignal): self.motor_egu = epics_signal_r(str, prefix + ".EGU") self.velocity = epics_signal_rw(float, prefix + ".VELO") with self.add_children_as_readables(HintedSignal): self.user_readback = epics_signal_r(float, prefix + ".RBV") self.user_setpoint = epics_signal_rw(float, prefix + ".VAL") self.max_velocity = epics_signal_r(float, prefix + ".VMAX") self.acceleration_time = epics_signal_rw(float, prefix + ".ACCL") self.precision = epics_signal_r(int, prefix + ".PREC") self.deadband = epics_signal_r(float, prefix + ".RDBD") self.motor_done_move = epics_signal_r(int, prefix + ".DMOV") self.low_limit_travel = epics_signal_rw(float, prefix + ".LLM") self.high_limit_travel = epics_signal_rw(float, prefix + ".HLM") self.motor_stop = epics_signal_x(prefix + ".STOP") # Whether set() should complete successfully or not self._set_success = True super().__init__(name=name)
[docs] def set_name(self, name: str): super().set_name(name) # Readback should be named the same as its parent in read() self.user_readback.set_name(name)
[docs] @WatchableAsyncStatus.wrap async def set( self, new_position: float, timeout: CalculatableTimeout = CalculateTimeout ): self._set_success = True ( old_position, units, precision, velocity, acceleration_time, ) = await asyncio.gather( self.user_setpoint.get_value(), self.motor_egu.get_value(), self.precision.get_value(), self.velocity.get_value(), self.acceleration_time.get_value(), ) if timeout is CalculateTimeout: assert velocity > 0, "Motor has zero velocity" timeout = ( abs(new_position - old_position) / velocity + 2 * acceleration_time + DEFAULT_TIMEOUT ) move_status = self.user_setpoint.set(new_position, wait=True, timeout=timeout) async for current_position in observe_value( self.user_readback, done_status=move_status ): yield WatcherUpdate( current=current_position, initial=old_position, target=new_position, name=self.name, unit=units, precision=precision, ) if not self._set_success: raise RuntimeError("Motor was stopped")
[docs] async def stop(self, success=False): self._set_success = success # Put with completion will never complete as we are waiting for completion on # the move above, so need to pass wait=False await self.motor_stop.trigger(wait=False)