"""Support for EPICS motor record.
https://github.com/epics-modules/motor
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from functools import cached_property
from bluesky.protocols import Flyable, Preparable
from ophyd_async.core import (
DEFAULT_TIMEOUT,
AsyncStatus,
DeviceMock,
FlyMotorInfo,
MovableLogic,
SignalRW,
SignalW,
StandardMovable,
StandardReadable,
StrictEnum,
WatchableAsyncStatus,
callback_on_mock_put,
default_mock_class,
error_if_none,
set_mock_value,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import epics_signal_r, epics_signal_rw, epics_signal_w
__all__ = ["MotorLimitsError", "Motor", "InstantMotorMock", "OffsetMode", "UseSetMode"]
[docs]
class MotorLimitsError(Exception):
"""Exception for invalid motor limits."""
pass
# Back compat - delete before 1.0
def __getattr__(name):
import warnings
renames = {
"MotorLimitsException": MotorLimitsError,
}
rename = renames.get(name)
if rename is not None:
warnings.warn(
DeprecationWarning(
f"{name!r} is deprecated, use {rename.__name__!r} instead"
),
stacklevel=2,
)
return rename
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
[docs]
class OffsetMode(StrictEnum):
"""In Set mode, determine what to do when the motor setpoint is written."""
VARIABLE = "Variable"
"""Change the offset so the readback matches the setpoint."""
FROZEN = "Frozen"
"""Tell the controller to change the readback without changing the offset."""
[docs]
class UseSetMode(StrictEnum):
"""Determine what to do when the motor setpoint is written."""
USE = "Use"
"""Tell the controller to move to the setpoint."""
SET = "Set"
"""Change offset (in record or in controller) when setpoint is written."""
@dataclass
class MotorMoveLogic(MovableLogic[float]):
"""Add the specific logic for moving a motor."""
motor_stop: SignalW[int]
low_limit_travel: SignalRW[float]
high_limit_travel: SignalRW[float]
dial_low_limit_travel: SignalRW[float]
dial_high_limit_travel: SignalRW[float]
velocity: SignalRW[float]
acceleration_time: SignalRW[float]
async def stop(self):
"""Request to stop moving."""
await self.motor_stop.set(1)
async def check_move(self, new_position: float):
"""Check the positions are within limits.
Will raise a MotorLimitsException if the given absolute positions will be
outside the motor soft limits.
"""
(
motor_lower_limit,
motor_upper_limit,
(units, _),
dial_lower_limit,
dial_upper_limit,
) = await asyncio.gather(
self.low_limit_travel.get_value(),
self.high_limit_travel.get_value(),
self.get_units_precision(),
self.dial_low_limit_travel.get_value(),
self.dial_high_limit_travel.get_value(),
)
# EPICS motor record treats dial limits of 0, 0 as no limit
# Use DLLM and DHLM to check
if dial_lower_limit == 0 and dial_upper_limit == 0:
return
old_position = await self.readback.get_value()
# Use real motor limit(i.e. HLM and LLM) to check if the move is permissible
if (
not motor_upper_limit >= old_position >= motor_lower_limit
or not motor_upper_limit >= new_position >= motor_lower_limit
):
name = self.readback.name
raise MotorLimitsError(
f"{name} motor trajectory for requested fly/move is from "
f"{old_position}{units} to "
f"{new_position}{units} but motor limits are "
f"{motor_lower_limit}{units} <= x <= {motor_upper_limit}{units} "
f"dial limits are "
f"{dial_lower_limit}{units} <= x <= {dial_upper_limit}."
)
async def calculate_timeout(
self, old_position: float, new_position: float
) -> float:
(
velocity,
acceleration_time,
) = await asyncio.gather(
self.velocity.get_value(),
self.acceleration_time.get_value(),
)
try:
return (
abs((new_position - old_position) / velocity)
+ 2 * acceleration_time
+ DEFAULT_TIMEOUT
)
except ZeroDivisionError as error:
msg = f"Motor {self.readback.name} has zero velocity."
raise ValueError(msg) from error
async def move(self, new_position: float, timeout: float | None) -> None:
"""Move by setting the setpoint and waiting for put completion."""
await self.setpoint.set(new_position, timeout)
[docs]
class InstantMotorMock(DeviceMock["Motor"]):
"""Mock behaviour that instantly moves readback to setpoint."""
[docs]
async def connect(self, device: Motor) -> None:
"""Mock signals to do an instant move on setpoint write."""
# Set sensible defaults to avoid runtime errors
set_mock_value(device.velocity, 1000) # Prevent ZeroDivisionError
set_mock_value(device.max_velocity, 1000) # Prevent ZeroDivisionError
# Motor starts in "done" state (not moving)
set_mock_value(device.motor_done_move, 1)
# When setpoint is written to, immediately update readback and done flag
def _instant_move(value):
set_mock_value(device.motor_done_move, 0) # Moving
set_mock_value(device.user_readback, value) # Arrive instantly
set_mock_value(device.motor_done_move, 1) # Done
callback_on_mock_put(device.user_setpoint, _instant_move)
[docs]
@default_mock_class(InstantMotorMock)
class Motor(StandardMovable, StandardReadable, Flyable, Preparable):
"""Device that moves a motor record."""
def __init__(self, prefix: str, name="") -> None:
# Define some signals
with self.add_children_as_readables(Format.CONFIG_SIGNAL):
self.motor_egu = epics_signal_r(str, prefix + ".EGU")
self.velocity = epics_signal_rw(float, prefix + ".VELO")
self.offset = epics_signal_rw(float, prefix + ".OFF")
with self.add_children_as_readables(Format.HINTED_SIGNAL):
self.user_readback = epics_signal_r(float, prefix + ".RBV")
self.user_setpoint = epics_signal_rw(float, prefix + ".VAL")
self.max_velocity = epics_signal_r(float, prefix + ".VMAX")
self.acceleration_time = epics_signal_rw(float, prefix + ".ACCL")
self.precision = epics_signal_r(int, prefix + ".PREC")
self.deadband = epics_signal_r(float, prefix + ".RDBD")
self.motor_done_move = epics_signal_r(int, prefix + ".DMOV")
self.low_limit_travel = epics_signal_rw(float, prefix + ".LLM")
self.high_limit_travel = epics_signal_rw(float, prefix + ".HLM")
self.dial_low_limit_travel = epics_signal_rw(float, prefix + ".DLLM")
self.dial_high_limit_travel = epics_signal_rw(float, prefix + ".DHLM")
self.offset_freeze_switch = epics_signal_rw(OffsetMode, prefix + ".FOFF")
self.high_limit_switch = epics_signal_r(int, prefix + ".HLS")
self.low_limit_switch = epics_signal_r(int, prefix + ".LLS")
self.output_link = epics_signal_r(str, prefix + ".OUT")
self.set_use_switch = epics_signal_rw(UseSetMode, prefix + ".SET")
# Note:cannot use epics_signal_x here, as the motor record specifies that
# we must write 1 to stop the motor. Simply processing the record is not
# sufficient.
# Put with completion will never complete as we are waiting for completion on
# the move in set, so need to pass wait=False
self.motor_stop = epics_signal_w(int, prefix + ".STOP", wait=False)
# Currently requested fly info, stored in prepare
self._fly_info: FlyMotorInfo | None = None
# Set on kickoff(), complete when motor reaches self._fly_completed_position
self._fly_status: WatchableAsyncStatus | None = None
super().__init__(name)
@cached_property
def movable_logic(self) -> MovableLogic:
"""Return MotorMoveLogic for this motor."""
return MotorMoveLogic(
readback=self.user_readback,
setpoint=self.user_setpoint,
motor_stop=self.motor_stop,
low_limit_travel=self.low_limit_travel,
high_limit_travel=self.high_limit_travel,
dial_low_limit_travel=self.dial_low_limit_travel,
dial_high_limit_travel=self.dial_high_limit_travel,
velocity=self.velocity,
acceleration_time=self.acceleration_time,
)
[docs]
@AsyncStatus.wrap
async def prepare(self, value: FlyMotorInfo):
"""Move to the beginning of a suitable run-up distance ready for a fly scan."""
self._fly_info = value
# Velocity, at which motor travels from start_position to end_position, in motor
# egu/s.
max_speed, egu = await asyncio.gather(
self.max_velocity.get_value(), self.motor_egu.get_value()
)
if abs(value.velocity) > max_speed:
raise MotorLimitsError(
f"Velocity {abs(value.velocity)} {egu}/s was requested for motor "
f"{self.name} with max speed of {max_speed} {egu}/s."
)
# Check limits are okay
acceleration_time = await self.acceleration_time.get_value()
ramp_up_start_pos = value.ramp_up_start_pos(acceleration_time)
ramp_down_end_pos = value.ramp_down_end_pos(acceleration_time)
await asyncio.gather(
self.check_value(ramp_up_start_pos), self.check_value(ramp_down_end_pos)
)
# move to prepare position at maximum velocity
await self.velocity.set(abs(max_speed))
await self.set(ramp_up_start_pos)
# Set velocity we will be using for the fly scan
await self.velocity.set(abs(value.velocity))
[docs]
@AsyncStatus.wrap
async def kickoff(self):
"""Begin moving motor from prepared position to final position."""
fly_info = error_if_none(
self._fly_info,
f"Motor {self.name} must be prepared before attempting to kickoff.",
)
acceleration_time = await self.acceleration_time.get_value()
self._fly_status = self.set(
fly_info.ramp_down_end_pos(acceleration_time),
timeout=fly_info.timeout,
)
[docs]
def complete(self) -> WatchableAsyncStatus:
"""Mark as complete once motor reaches completed position."""
fly_status = error_if_none(
self._fly_status, f"kickoff for motor {self.name} not called."
)
return fly_status