Source code for ophyd_async.tango.demo._motor

from dataclasses import dataclass
from functools import cached_property
from typing import Annotated as A

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    MovableLogic,
    SignalR,
    SignalRW,
    StandardMovable,
    StandardReadable,
    TriggerableCommand,
    wait_for_value,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.tango.core import DevStateEnum, TangoDevice, TangoPolling


@dataclass
class DemoMotorMoveLogic(MovableLogic[float]):
    velocity: SignalRW[float]
    stop_: TriggerableCommand
    state: SignalR[DevStateEnum]

    async def stop(self):
        await self.stop_.trigger()

    async def calculate_timeout(
        self, old_position: float, new_position: float
    ) -> float:
        velocity = await self.velocity.get_value()
        return abs(new_position - old_position) / velocity + DEFAULT_TIMEOUT

    async def move(self, new_position: float, timeout: float | None) -> None:
        # Write the setpoint and wait for the motor state to return to ON,
        # which happens whether the move completes normally or is stopped.
        await self.setpoint.set(new_position, timeout=timeout)
        await wait_for_value(self.state, DevStateEnum.ON, timeout=timeout)


[docs] class DemoMotor(TangoDevice, StandardReadable, StandardMovable): """A demo movable that moves based on velocity.""" # If the server doesn't support events, the TangoPolling annotation gives # the parameters for ophyd to poll instead readback: A[SignalR[float], TangoPolling(0.1, 0.001, 0.001), Format.HINTED_SIGNAL] velocity: A[SignalRW[float], TangoPolling(0.1, 0.001, 0.001), Format.CONFIG_SIGNAL] setpoint: A[SignalRW[float], TangoPolling(0.1, 0.001, 0.001)] state: A[SignalR[DevStateEnum], TangoPolling(0.1)] # If a tango name clashes with a bluesky verb, add a trailing underscore stop_: TriggerableCommand @cached_property def movable_logic(self) -> MovableLogic: return DemoMotorMoveLogic( readback=self.readback, setpoint=self.setpoint, velocity=self.velocity, stop_=self.stop_, state=self.state, )