Source code for ophyd_async.panda._trigger

import asyncio
from typing import Optional

from pydantic import BaseModel, Field

from ophyd_async.core import TriggerLogic, wait_for_value
from ophyd_async.panda import (
    PcompBlock,
    PcompDirectionOptions,
    SeqBlock,
    SeqTable,
    TimeUnits,
)


class SeqTableInfo(BaseModel):
    sequence_table: SeqTable = Field(strict=True)
    repeats: int = Field(ge=0)
    prescale_as_us: float = Field(default=1, ge=0)  # microseconds


[docs] class StaticSeqTableTriggerLogic(TriggerLogic[SeqTableInfo]): def __init__(self, seq: SeqBlock) -> None: self.seq = seq
[docs] async def prepare(self, value: SeqTableInfo): await asyncio.gather( self.seq.prescale_units.set(TimeUnits.us), self.seq.enable.set("ZERO"), ) await asyncio.gather( self.seq.prescale.set(value.prescale_as_us), self.seq.repeats.set(value.repeats), self.seq.table.set(value.sequence_table), )
[docs] async def kickoff(self) -> None: await self.seq.enable.set("ONE") await wait_for_value(self.seq.active, True, timeout=1)
[docs] async def complete(self) -> None: await wait_for_value(self.seq.active, False, timeout=None)
[docs] async def stop(self): await self.seq.enable.set("ZERO") await wait_for_value(self.seq.active, False, timeout=1)
class PcompInfo(BaseModel): start_postion: int = Field(description="start position in counts") pulse_width: int = Field(description="width of a single pulse in counts", gt=0) rising_edge_step: int = Field( description="step between rising edges of pulses in counts", gt=0 ) # number_of_pulses: int = Field( description=( "Number of pulses to send before the PCOMP block is disarmed. " "0 means infinite." ), ge=0, ) direction: PcompDirectionOptions = Field( description=( "Specifies which direction the motor counts should be " "moving. Pulses won't be sent unless the values are moving in " "this direction." ) )
[docs] class StaticPcompTriggerLogic(TriggerLogic[PcompInfo]): def __init__(self, pcomp: PcompBlock) -> None: self.pcomp = pcomp
[docs] async def prepare(self, value: PcompInfo): await self.pcomp.enable.set("ZERO") await asyncio.gather( self.pcomp.start.set(value.start_postion), self.pcomp.width.set(value.pulse_width), self.pcomp.step.set(value.rising_edge_step), self.pcomp.pulses.set(value.number_of_pulses), self.pcomp.dir.set(value.direction), )
[docs] async def kickoff(self) -> None: await self.pcomp.enable.set("ONE") await wait_for_value(self.pcomp.active, True, timeout=1)
[docs] async def complete(self, timeout: Optional[float] = None) -> None: await wait_for_value(self.pcomp.active, False, timeout=timeout)
[docs] async def stop(self): await self.pcomp.enable.set("ZERO") await wait_for_value(self.pcomp.active, False, timeout=1)