Source code for ophyd_async.epics.demo._motor

from dataclasses import dataclass
from functools import cached_property
from typing import Annotated as A

import numpy as np

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    MovableLogic,
    SignalR,
    SignalRW,
    StandardMovable,
    StandardReadable,
    TriggerableCommand,
    set_and_wait_for_other_value,
)
from ophyd_async.core import StandardReadableFormat as Format
from ophyd_async.epics.core import EpicsDevice, PvSuffix


@dataclass
class DemoMotorMoveLogic(MovableLogic[float]):
    velocity: SignalRW[float]
    stop_: TriggerableCommand

    async def stop(self):
        await self.stop_.trigger()

    async def calculate_timeout(
        self, old_position: float, new_position: float
    ) -> float:
        velocity = await self.velocity.get_value()
        return abs(new_position - old_position) / velocity + DEFAULT_TIMEOUT

    async def move(self, new_position: float, timeout: float | None) -> None:
        # If we are close to the desired position then break
        await set_and_wait_for_other_value(
            self.setpoint,
            new_position,
            self.readback,
            lambda v: bool(np.isclose(v, new_position)),
            timeout=timeout,
        )


[docs] class DemoMotor(EpicsDevice, StandardReadable, StandardMovable): """A demo movable that moves based on velocity.""" # Define some signals readback: A[SignalR[float], PvSuffix("Readback"), Format.HINTED_SIGNAL] velocity: A[SignalRW[float], PvSuffix("Velocity"), Format.CONFIG_SIGNAL] setpoint: A[SignalRW[float], PvSuffix("Setpoint")] # If a signal name clashes with a bluesky verb add _ to the attribute name stop_: A[TriggerableCommand, PvSuffix("Stop.PROC")] @cached_property def movable_logic(self) -> MovableLogic: return DemoMotorMoveLogic( readback=self.readback, setpoint=self.setpoint, velocity=self.velocity, stop_=self.stop_, )