Source code for ophyd_async.epics.demo._mover

import asyncio

import numpy as np
from bluesky.protocols import Movable, Stoppable

from ophyd_async.core import (
    CALCULATE_TIMEOUT,
    DEFAULT_TIMEOUT,
    AsyncStatus,
    CalculatableTimeout,
    Device,
    StandardReadable,
    WatchableAsyncStatus,
    WatcherUpdate,
    observe_value,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import epics_signal_r, epics_signal_rw, epics_signal_x


[docs] class Mover(StandardReadable, Movable, Stoppable): """A demo movable that moves based on velocity""" def __init__(self, prefix: str, name="") -> None: # Define some signals with self.add_children_as_readables(Format.HINTED_SIGNAL): self.readback = epics_signal_r(float, prefix + "Readback") with self.add_children_as_readables(Format.CONFIG_SIGNAL): self.velocity = epics_signal_rw(float, prefix + "Velocity") self.units = epics_signal_r(str, prefix + "Readback.EGU") self.setpoint = epics_signal_rw(float, prefix + "Setpoint") self.precision = epics_signal_r(int, prefix + "Readback.PREC") # Signals that collide with standard methods should have a trailing underscore self.stop_ = epics_signal_x(prefix + "Stop.PROC") # Whether set() should complete successfully or not self._set_success = True super().__init__(name=name) def set_name(self, name: str, *, child_name_separator: str | None = None) -> None: super().set_name(name, child_name_separator=child_name_separator) # Readback should be named the same as its parent in read() self.readback.set_name(name) @WatchableAsyncStatus.wrap async def set(self, value: float, timeout: CalculatableTimeout = CALCULATE_TIMEOUT): new_position = value self._set_success = True old_position, units, precision, velocity = await asyncio.gather( self.setpoint.get_value(), self.units.get_value(), self.precision.get_value(), self.velocity.get_value(), ) if timeout == CALCULATE_TIMEOUT: assert velocity > 0, "Mover has zero velocity" timeout = abs(new_position - old_position) / velocity + DEFAULT_TIMEOUT # Make an Event that will be set on completion, and a Status that will # error if not done in time done = asyncio.Event() done_status = AsyncStatus(asyncio.wait_for(done.wait(), timeout)) # Wait for the value to set, but don't wait for put completion callback await self.setpoint.set(new_position, wait=False) async for current_position in observe_value( self.readback, done_status=done_status ): yield WatcherUpdate( current=current_position, initial=old_position, target=new_position, name=self.name, unit=units, precision=precision, ) if np.isclose(current_position, new_position): done.set() break if not self._set_success: raise RuntimeError("Motor was stopped") async def stop(self, success=True): self._set_success = success status = self.stop_.trigger() await status
[docs] class SampleStage(Device): """A demo sample stage with X and Y movables""" def __init__(self, prefix: str, name="") -> None: # Define some child Devices self.x = Mover(prefix + "X:") self.y = Mover(prefix + "Y:") # Set name of device and child devices super().__init__(name=name)