Source code for ophyd_async.tango.demo._mover

import asyncio
from typing import Annotated as A

from bluesky.protocols import Movable, Stoppable

from ophyd_async.core import (
    CALCULATE_TIMEOUT,
    DEFAULT_TIMEOUT,
    AsyncStatus,
    CalculatableTimeout,
    SignalR,
    SignalRW,
    SignalX,
    WatchableAsyncStatus,
    WatcherUpdate,
    observe_value,
    wait_for_value,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.tango.core import TangoPolling, TangoReadable
from tango import DevState


[docs] class TangoMover(TangoReadable, Movable, Stoppable): """Tango moving device.""" # Enter the name and type of the signals you want to use # If the server doesn't support events, the TangoPolling annotation gives # the parameters for ophyd to poll instead position: A[SignalRW[float], TangoPolling(0.1, 0.1, 0.1)] velocity: A[SignalRW[float], TangoPolling(0.1, 0.1, 0.1)] state: A[SignalR[DevState], TangoPolling(0.1)] # If a tango name clashes with a bluesky verb, add a trailing underscore stop_: SignalX def __init__(self, trl: str | None = "", name=""): super().__init__(trl, name=name) self.add_readables([self.position], Format.HINTED_SIGNAL) self.add_readables([self.velocity], Format.CONFIG_SIGNAL) self._set_success = True
[docs] @WatchableAsyncStatus.wrap async def set(self, value: float, timeout: CalculatableTimeout = CALCULATE_TIMEOUT): self._set_success = True (old_position, velocity) = await asyncio.gather( self.position.get_value(), self.velocity.get_value() ) # TODO: check whether Tango does work with negative velocity if timeout is CALCULATE_TIMEOUT and velocity == 0: msg = "Motor has zero velocity" raise ValueError(msg) else: timeout = abs(value - old_position) / velocity + DEFAULT_TIMEOUT if not (isinstance(timeout, float) or timeout is None): raise ValueError("Timeout must be a float or None") # For this server, set returns immediately so this status should not be awaited await self.position.set(value, wait=False, timeout=timeout) move_status = AsyncStatus( wait_for_value(self.state, DevState.ON, timeout=timeout) ) try: async for current_position in observe_value( self.position, done_status=move_status ): yield WatcherUpdate( current=current_position, initial=old_position, target=value, name=self.name, ) except RuntimeError as exc: self._set_success = False raise RuntimeError("Motor was stopped") from exc if not self._set_success: raise RuntimeError("Motor was stopped")
[docs] def stop(self, success: bool = True) -> AsyncStatus: self._set_success = success return self.stop_.trigger()