Source code for ophyd_async.epics.motion.motor

import asyncio
from dataclasses import replace

from bluesky.protocols import Movable, Stoppable

from ophyd_async.core import (
    AsyncStatus,
    ConfigSignal,
    HintedSignal,
    StandardReadable,
    WatchableAsyncStatus,
)
from ophyd_async.core.signal import observe_value
from ophyd_async.core.utils import WatcherUpdate

from ..signal.signal import epics_signal_r, epics_signal_rw, epics_signal_x


[docs] class Motor(StandardReadable, Movable, Stoppable): """Device that moves a motor record""" def __init__(self, prefix: str, name="") -> None: # Define some signals with self.add_children_as_readables(ConfigSignal): self.motor_egu = epics_signal_r(str, prefix + ".EGU") self.velocity = epics_signal_rw(float, prefix + ".VELO") with self.add_children_as_readables(HintedSignal): self.user_readback = epics_signal_r(float, prefix + ".RBV") self.user_setpoint = epics_signal_rw(float, prefix + ".VAL") self.max_velocity = epics_signal_r(float, prefix + ".VMAX") self.acceleration_time = epics_signal_rw(float, prefix + ".ACCL") self.precision = epics_signal_r(int, prefix + ".PREC") self.deadband = epics_signal_r(float, prefix + ".RDBD") self.motor_done_move = epics_signal_r(int, prefix + ".DMOV") self.low_limit_travel = epics_signal_rw(float, prefix + ".LLM") self.high_limit_travel = epics_signal_rw(float, prefix + ".HLM") self.motor_stop = epics_signal_x(prefix + ".STOP") # Whether set() should complete successfully or not self._set_success = True super().__init__(name=name)
[docs] def set_name(self, name: str): super().set_name(name) # Readback should be named the same as its parent in read() self.user_readback.set_name(name)
async def _move( self, new_position: float ) -> tuple[WatcherUpdate[float], AsyncStatus]: self._set_success = True old_position, units, precision = await asyncio.gather( self.user_setpoint.get_value(), self.motor_egu.get_value(), self.precision.get_value(), ) move_status = self.user_setpoint.set(new_position, wait=True) if not self._set_success: raise RuntimeError("Motor was stopped") return ( WatcherUpdate( initial=old_position, current=old_position, target=new_position, unit=units, precision=precision, ), move_status, )
[docs] @WatchableAsyncStatus.wrap async def set(self, new_position: float, timeout: float | None = None): update, move_status = await self._move(new_position) async for current_position in observe_value( self.user_readback, done_status=move_status ): if not self._set_success: raise RuntimeError("Motor was stopped") yield replace( update, name=self.name, current=current_position, )
[docs] async def stop(self, success=False): self._set_success = success # Put with completion will never complete as we are waiting for completion on # the move above, so need to pass wait=False await self.motor_stop.trigger(wait=False) # Trigger any callbacks await self.user_readback._backend.put(await self.user_readback.get_value())