import asyncio
from typing import Annotated as A
import numpy as np
from bluesky.protocols import Movable, Stoppable
from ophyd_async.core import (
CALCULATE_TIMEOUT,
DEFAULT_TIMEOUT,
CalculatableTimeout,
SignalR,
SignalRW,
SignalX,
StandardReadable,
WatchableAsyncStatus,
WatcherUpdate,
observe_value,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import EpicsDevice, PvSuffix
[docs]
class DemoMotor(EpicsDevice, StandardReadable, Movable, Stoppable):
"""A demo movable that moves based on velocity."""
# Whether set() should complete successfully or not
_set_success = True
# Define some signals
readback: A[SignalR[float], PvSuffix("Readback"), Format.HINTED_SIGNAL]
velocity: A[SignalRW[float], PvSuffix("Velocity"), Format.CONFIG_SIGNAL]
units: A[SignalR[str], PvSuffix("Readback.EGU"), Format.CONFIG_SIGNAL]
setpoint: A[SignalRW[float], PvSuffix("Setpoint")]
precision: A[SignalR[int], PvSuffix("Readback.PREC")]
# If a signal name clashes with a bluesky verb add _ to the attribute name
stop_: A[SignalX, PvSuffix("Stop.PROC")]
[docs]
def set_name(self, name: str, *, child_name_separator: str | None = None) -> None:
super().set_name(name, child_name_separator=child_name_separator)
# Readback should be named the same as its parent in read()
self.readback.set_name(name)
[docs]
@WatchableAsyncStatus.wrap
async def set( # type: ignore
self, new_position: float, timeout: CalculatableTimeout = CALCULATE_TIMEOUT
):
# The move should complete successfully unless stop(success=False) is called
self._set_success = True
# Get some variables for the progress bar reporting
old_position, units, precision, velocity = await asyncio.gather(
self.setpoint.get_value(),
self.units.get_value(),
self.precision.get_value(),
self.velocity.get_value(),
)
# If not supplied, calculate a suitable timeout for the move
if timeout == CALCULATE_TIMEOUT:
timeout = abs(new_position - old_position) / velocity + DEFAULT_TIMEOUT
# Wait for the value to set, but don't wait for put completion callback
await self.setpoint.set(new_position, wait=False)
# Observe the readback Signal, and on each new position...
async for current_position in observe_value(
self.readback, done_timeout=timeout
):
# Emit a progress bar update
yield WatcherUpdate(
current=current_position,
initial=old_position,
target=new_position,
name=self.name,
unit=units,
precision=precision,
)
# If we are at the desired position the break
if np.isclose(current_position, new_position):
break
# If we were told to stop and report an error then do so
if not self._set_success:
raise RuntimeError("Motor was stopped")
[docs]
async def stop(self, success=True):
self._set_success = success
await self.stop_.trigger()