Source code for ophyd_async.panda._trigger
import asyncio
from typing import Optional
from pydantic import BaseModel, Field
from ophyd_async.core import TriggerLogic, wait_for_value
from ophyd_async.panda import (
PcompBlock,
PcompDirectionOptions,
SeqBlock,
SeqTable,
TimeUnits,
)
class SeqTableInfo(BaseModel):
sequence_table: SeqTable = Field(strict=True)
repeats: int = Field(ge=0)
prescale_as_us: float = Field(default=1, ge=0) # microseconds
[docs]
class StaticSeqTableTriggerLogic(TriggerLogic[SeqTableInfo]):
def __init__(self, seq: SeqBlock) -> None:
self.seq = seq
[docs]
async def prepare(self, value: SeqTableInfo):
await asyncio.gather(
self.seq.prescale_units.set(TimeUnits.us),
self.seq.enable.set("ZERO"),
)
await asyncio.gather(
self.seq.prescale.set(value.prescale_as_us),
self.seq.repeats.set(value.repeats),
self.seq.table.set(value.sequence_table),
)
[docs]
async def kickoff(self) -> None:
await self.seq.enable.set("ONE")
await wait_for_value(self.seq.active, True, timeout=1)
[docs]
async def complete(self) -> None:
await wait_for_value(self.seq.active, False, timeout=None)
[docs]
async def stop(self):
await self.seq.enable.set("ZERO")
await wait_for_value(self.seq.active, False, timeout=1)
class PcompInfo(BaseModel):
start_postion: int = Field(description="start position in counts")
pulse_width: int = Field(description="width of a single pulse in counts", gt=0)
rising_edge_step: int = Field(
description="step between rising edges of pulses in counts", gt=0
) #
number_of_pulses: int = Field(
description=(
"Number of pulses to send before the PCOMP block is disarmed. "
"0 means infinite."
),
ge=0,
)
direction: PcompDirectionOptions = Field(
description=(
"Specifies which direction the motor counts should be "
"moving. Pulses won't be sent unless the values are moving in "
"this direction."
)
)
[docs]
class StaticPcompTriggerLogic(TriggerLogic[PcompInfo]):
def __init__(self, pcomp: PcompBlock) -> None:
self.pcomp = pcomp
[docs]
async def prepare(self, value: PcompInfo):
await self.pcomp.enable.set("ZERO")
await asyncio.gather(
self.pcomp.start.set(value.start_postion),
self.pcomp.width.set(value.pulse_width),
self.pcomp.step.set(value.rising_edge_step),
self.pcomp.pulses.set(value.number_of_pulses),
self.pcomp.dir.set(value.direction),
)
[docs]
async def kickoff(self) -> None:
await self.pcomp.enable.set("ONE")
await wait_for_value(self.pcomp.active, True, timeout=1)
[docs]
async def complete(self, timeout: Optional[float] = None) -> None:
await wait_for_value(self.pcomp.active, False, timeout=timeout)
[docs]
async def stop(self):
await self.pcomp.enable.set("ZERO")
await wait_for_value(self.pcomp.active, False, timeout=1)