Source code for ophyd_async.epics.pmac._pmac_trajectory

import asyncio

import numpy as np
from scanspec.core import Path
from scanspec.specs import Spec

from ophyd_async.core import (
    DEFAULT_TIMEOUT,
    FlyerController,
    set_and_wait_for_value,
    wait_for_value,
)
from ophyd_async.epics.motor import Motor
from ophyd_async.epics.pmac import PmacIO
from ophyd_async.epics.pmac._pmac_io import CS_LETTERS
from ophyd_async.epics.pmac._utils import (
    _PmacMotorInfo,
    _Trajectory,
    calculate_ramp_position_and_duration,
)


[docs] class PmacTrajectoryTriggerLogic(FlyerController): def __init__(self, pmac: PmacIO) -> None: self.pmac = pmac self.scantime: float | None = None
[docs] async def prepare(self, value: Spec[Motor]): slice = Path(value.calculate()).consume() motor_info = await _PmacMotorInfo.from_motors(self.pmac, slice.axes()) ramp_up_pos, ramp_up_time = calculate_ramp_position_and_duration( slice, motor_info, True ) ramp_down_pos, ramp_down_time = calculate_ramp_position_and_duration( slice, motor_info, False ) trajectory = _Trajectory.from_slice(slice, ramp_up_time, motor_info) await asyncio.gather( self._build_trajectory( trajectory, motor_info, ramp_down_pos, ramp_down_time ), self._move_to_start(motor_info, ramp_up_pos), )
[docs] async def kickoff(self): if not self.scantime: raise RuntimeError("Cannot kickoff. Must call prepare first.") self.status = await self.pmac.trajectory.execute_profile.set( True, timeout=self.scantime + 1 )
[docs] async def complete(self): if not self.scantime: raise RuntimeError("Cannot complete. Must call prepare first.") await wait_for_value( self.pmac.trajectory.execute_profile, False, timeout=self.scantime + DEFAULT_TIMEOUT, )
[docs] async def stop(self): await self.pmac.trajectory.abort_profile.set(True)
async def _build_trajectory( self, trajectory: _Trajectory, motor_info: _PmacMotorInfo, ramp_down_pos: dict[Motor, np.float64], ramp_down_time: float, ): trajectory = trajectory.append_ramp_down(ramp_down_pos, ramp_down_time, 0) self.scantime = np.sum(trajectory.durations) use_axis = {axis + 1: False for axis in range(len(CS_LETTERS))} size = 0 for motor, number in motor_info.motor_cs_index.items(): use_axis[number + 1] = True await self.pmac.trajectory.positions[number + 1].set( trajectory.positions[motor] ) await self.pmac.trajectory.velocities[number + 1].set( trajectory.velocities[motor] ) size += len(trajectory.positions[motor]) coros = [ self.pmac.trajectory.profile_cs_name.set(motor_info.cs_port), self.pmac.trajectory.time_array.set(trajectory.durations), self.pmac.trajectory.user_array.set(trajectory.user_programs), self.pmac.trajectory.points_to_build.set(size), self.pmac.trajectory.calculate_velocities.set(False), ] + [ self.pmac.trajectory.use_axis[number].set(use) for number, use in use_axis.items() ] await asyncio.gather(*coros) await self.pmac.trajectory.build_profile.set(True) async def _move_to_start( self, motor_info: _PmacMotorInfo, ramp_up_position: dict[Motor, np.float64] ): coord = self.pmac.coord[motor_info.cs_number] coros = [] await coord.defer_moves.set(True) for motor, position in ramp_up_position.items(): coros.append( set_and_wait_for_value( coord.cs_axis_setpoint[motor_info.motor_cs_index[motor] + 1], position, set_timeout=10, wait_for_set_completion=False, ) ) statuses = await asyncio.gather(*coros) await coord.defer_moves.set(False) await asyncio.gather(*statuses)