Source code for ophyd_async.epics.motion.motor
import asyncio
import time
from typing import Callable, List, Optional
from bluesky.protocols import Movable, Stoppable
from ophyd_async.core import AsyncStatus, StandardReadable
from ..signal.signal import epics_signal_r, epics_signal_rw, epics_signal_x
[docs]
class Motor(StandardReadable, Movable, Stoppable):
"""Device that moves a motor record"""
def __init__(self, prefix: str, name="") -> None:
# Define some signals
self.setpoint = epics_signal_rw(float, prefix + ".VAL")
self.readback = epics_signal_r(float, prefix + ".RBV")
self.velocity = epics_signal_rw(float, prefix + ".VELO")
self.units = epics_signal_r(str, prefix + ".EGU")
self.precision = epics_signal_r(int, prefix + ".PREC")
# Signals that collide with standard methods should have a trailing underscore
self.stop_ = epics_signal_x(prefix + ".STOP")
# Whether set() should complete successfully or not
self._set_success = True
# Set name and signals for read() and read_configuration()
self.set_readable_signals(
read=[self.readback],
config=[self.velocity, self.units],
)
super().__init__(name=name)
def set_name(self, name: str):
super().set_name(name)
# Readback should be named the same as its parent in read()
self.readback.set_name(name)
async def _move(self, new_position: float, watchers: List[Callable] = []):
self._set_success = True
start = time.monotonic()
old_position, units, precision = await asyncio.gather(
self.setpoint.get_value(),
self.units.get_value(),
self.precision.get_value(),
)
def update_watchers(current_position: float):
for watcher in watchers:
watcher(
name=self.name,
current=current_position,
initial=old_position,
target=new_position,
unit=units,
precision=precision,
time_elapsed=time.monotonic() - start,
)
self.readback.subscribe_value(update_watchers)
try:
await self.setpoint.set(new_position)
finally:
self.readback.clear_sub(update_watchers)
if not self._set_success:
raise RuntimeError("Motor was stopped")
[docs]
def move(self, new_position: float, timeout: Optional[float] = None):
"""Commandline only synchronous move of a Motor"""
from bluesky.run_engine import call_in_bluesky_event_loop, in_bluesky_event_loop
if in_bluesky_event_loop():
raise RuntimeError("Will deadlock run engine if run in a plan")
call_in_bluesky_event_loop(self._move(new_position), timeout) # type: ignore
def set(self, new_position: float, timeout: Optional[float] = None) -> AsyncStatus:
watchers: List[Callable] = []
coro = asyncio.wait_for(self._move(new_position, watchers), timeout=timeout)
return AsyncStatus(coro, watchers)
async def stop(self, success=False):
self._set_success = success
# Put with completion will never complete as we are waiting for completion on
# the move above, so need to pass wait=False
status = self.stop_.trigger(wait=False)
await status