import asyncio
import contextlib
import time
import numpy as np
from bluesky.protocols import Movable, Stoppable
from pydantic import BaseModel, ConfigDict, Field
from ophyd_async.core import (
AsyncStatus,
StandardReadable,
WatchableAsyncStatus,
WatcherUpdate,
observe_value,
soft_signal_r_and_setter,
soft_signal_rw,
)
from ophyd_async.core import StandardReadableFormat as Format
[docs]
class FlySimMotorInfo(BaseModel):
"""Minimal set of information required to fly a [](#SimMotor)."""
model_config = ConfigDict(frozen=True)
cv_start: float
"""Absolute position of the motor once it finishes accelerating to desired
velocity, in motor EGUs"""
cv_end: float
"""Absolute position of the motor once it begins decelerating from desired
velocity, in EGUs"""
cv_time: float = Field(gt=0)
"""Time taken for the motor to get from start_position to end_position, excluding
run-up and run-down, in seconds."""
@property
def velocity(self) -> float:
"""Calculate the velocity of the constant velocity phase."""
return (self.cv_end - self.cv_start) / self.cv_time
[docs]
def start_position(self, acceleration_time: float) -> float:
"""Calculate the start position with run-up distance added on."""
return self.cv_start - acceleration_time * self.velocity / 2
[docs]
def end_position(self, acceleration_time: float) -> float:
"""Calculate the end position with run-down distance added on."""
return self.cv_end + acceleration_time * self.velocity / 2
[docs]
class SimMotor(StandardReadable, Movable, Stoppable):
"""For usage when simulating a motor."""
def __init__(self, name="", instant=True) -> None:
"""Simulate a motor, with optional velocity.
:param name: name of device
:param instant: whether to move instantly or calculate move time using velocity
"""
# Define some signals
with self.add_children_as_readables(Format.HINTED_SIGNAL):
self.user_readback, self._user_readback_set = soft_signal_r_and_setter(
float, 0
)
with self.add_children_as_readables(Format.CONFIG_SIGNAL):
self.velocity = soft_signal_rw(float, 0 if instant else 1.0)
self.acceleration_time = soft_signal_rw(float, 0.5)
self.units = soft_signal_rw(str, "mm")
self.user_setpoint = soft_signal_rw(float, 0)
# Whether set() should complete successfully or not
self._set_success = True
self._move_status: AsyncStatus | None = None
# Stored in prepare
self._fly_info: FlySimMotorInfo | None = None
# Set on kickoff(), complete when motor reaches end position
self._fly_status: WatchableAsyncStatus | None = None
super().__init__(name=name)
[docs]
def set_name(self, name: str, *, child_name_separator: str | None = None) -> None:
super().set_name(name, child_name_separator=child_name_separator)
# Readback should be named the same as its parent in read()
self.user_readback.set_name(name)
[docs]
@AsyncStatus.wrap
async def prepare(self, value: FlySimMotorInfo):
"""Calculate run-up and move there, setting fly velocity when there."""
self._fly_info = value
# Move to start as fast as we can
await self.velocity.set(0)
await self.set(value.start_position(await self.acceleration_time.get_value()))
# Set the velocity for the actual move
await self.velocity.set(value.velocity)
[docs]
@AsyncStatus.wrap
async def kickoff(self):
"""Begin moving motor from prepared position to final position."""
if not self._fly_info:
msg = "Motor must be prepared before attempting to kickoff"
raise RuntimeError(msg)
acceleration_time = await self.acceleration_time.get_value()
self._fly_status = self.set(self._fly_info.end_position(acceleration_time))
[docs]
def complete(self) -> WatchableAsyncStatus:
"""Mark as complete once motor reaches completed position."""
if not self._fly_status:
msg = "kickoff not called"
raise RuntimeError(msg)
return self._fly_status
async def _move(self, old_position: float, new_position: float, velocity: float):
start = time.monotonic()
acceleration_time = abs(await self.acceleration_time.get_value())
sign = np.sign(new_position - old_position)
velocity = abs(velocity) * sign
# The total distance to move
total_distance = new_position - old_position
# The ramp distance is the distance taken to ramp up (the same distance
# is taken to ramp down). This is the area under the triangle of the
# velocity ramp up (base * height / 2)
ramp_distance = acceleration_time * velocity / 2
if abs(ramp_distance * 2) >= abs(total_distance):
# All time is ramp up and down, so recalculate the maximum velocity
# we get to. We know the area under the ramp up triangle is half the
# total distance, and we also know the ratio of velocity over
# acceleration_time is the same as the ration of max_velocity over
# ramp_time, so solve the simultaneous equations to get
# max_velocity and ramp_time.
max_velocity = np.sqrt(total_distance * velocity / acceleration_time) * sign
ramp_time = total_distance / max_velocity
# So move time is just the ramp up and ramp down with no constant
# velocity section
move_time = 2 * ramp_time
else:
# Middle segments of constant velocity
max_velocity = velocity
# Ramp up and down time is exactly the requested acceleration time
ramp_time = acceleration_time
# So move time is twice this, plus the time taken to move the
# remaining distance at constant velocity
move_time = ramp_time * 2 + (total_distance - ramp_distance * 2) / velocity
# Make an array of relative update times at 10Hz intervals
update_times = list(np.arange(0.1, move_time, 0.1, dtype=float))
# With the end position appended
if update_times and np.isclose(update_times[-1], move_time):
update_times[-1] = move_time
else:
update_times.append(move_time)
# Iterate through the update times, calculating new position for each
for t in update_times:
if t <= ramp_time:
# Ramp up phase, calculate area under the ramp up triangle
current_velocity = t / ramp_time * max_velocity
position = old_position + current_velocity * t / 2
elif t >= move_time - ramp_time:
# Ramp down phase, subtract area under the ramp down triangle
time_left = move_time - t
current_velocity = time_left / ramp_time * max_velocity
position = new_position - current_velocity * time_left / 2
else:
# Constant velocity phase
position = old_position + ramp_distance + (t - ramp_time) * max_velocity
# Calculate how long to wait to get there
relative_time = time.monotonic() - start
await asyncio.sleep(t - relative_time)
# Update the readback position
self._user_readback_set(position)
[docs]
@WatchableAsyncStatus.wrap
async def set(self, value: float):
"""Asynchronously move the motor to a new position."""
new_position = value
# Make sure any existing move tasks are stopped
await self.stop()
old_position, units, velocity = await asyncio.gather(
self.user_setpoint.get_value(),
self.units.get_value(),
self.velocity.get_value(),
)
# If zero velocity, do instant move
if velocity == 0:
self._user_readback_set(new_position)
else:
self._move_status = AsyncStatus(
self._move(old_position, new_position, velocity)
)
# If stop is called then this will raise a CancelledError, ignore it
with contextlib.suppress(asyncio.CancelledError):
async for current_position in observe_value(
self.user_readback, done_status=self._move_status
):
yield WatcherUpdate(
current=current_position,
initial=old_position,
target=new_position,
name=self.name,
unit=units,
)
if not self._set_success:
raise RuntimeError("Motor was stopped")
[docs]
async def stop(self, success=True):
"""Stop the motor if it is moving."""
self._set_success = success
if self._move_status:
self._move_status.task.cancel()
self._move_status = None
await self.user_setpoint.set(await self.user_readback.get_value())