Source code for ophyd_async.epics.core._pvi_connector

from __future__ import annotations

from typing import Literal, cast

from ophyd_async.core import (
    Device,
    DeviceConnector,
    DeviceFiller,
    LazyMock,
    Signal,
    SignalR,
    SignalRW,
    SignalW,
    SignalX,
)

from ._epics_connector import fill_backend_with_prefix
from ._signal import PvaSignalBackend, pvget_with_timeout

Entry = dict[str, str]

OldPVIVector = list[Entry | None]
# The older PVI structure has vectors of the form
# structure[] ttlout
#     (none)
#     structure
#         string d PANDABLOCKS_IOC:TTLOUT1:PVI
#     structure
#         string d PANDABLOCKS_IOC:TTLOUT2:PVI
#     structure
#         string d PANDABLOCKS_IOC:TTLOUT3:PVI


FastCSPVIVector = dict[Literal["d"], Entry]
# The newer pva FastCS PVI structure has vectors of the form
# structure ttlout
#     structure d
#         string v1 FASTCS_PANDA:Ttlout1:PVI
#         string v2 FASTCS_PANDA:Ttlout2:PVI
#         string v3 FASTCS_PANDA:Ttlout3:PVI
#         string v4 FASTCS_PANDA:Ttlout4:PVI


def _get_signal_details(entry: Entry) -> tuple[type[Signal], str, str]:
    match entry:
        case {"r": read_pv, "w": write_pv}:
            return SignalRW, read_pv, write_pv
        case {"r": read_pv}:
            return SignalR, read_pv, read_pv
        case {"w": write_pv}:
            return SignalW, write_pv, write_pv
        case {"rw": read_write_pv}:
            return SignalRW, read_write_pv, read_write_pv
        case {"x": execute_pv}:
            return SignalX, execute_pv, execute_pv
        case _:
            raise TypeError(f"Can't process entry {entry}")


def _is_device_vector_entry(entry: Entry | OldPVIVector | FastCSPVIVector) -> bool:
    return isinstance(entry, list) or (
        entry.keys() == {"d"} and isinstance(entry["d"], dict)
    )


[docs] class PviDeviceConnector(DeviceConnector): """Connect to PVI structure served over PVA. At init, fill in all the type hinted signals. At connection check their types and fill in any extra signals. :param prefix: The PV prefix of the device, "PVI" will be appended to it to get the PVI PV. :param error_hint: If given, this will be appended to the error message if any of they type hinted Signals are not present. """ mock_device_vector_len: int = 2 def __init__(self, prefix: str = "", error_hint: str = "") -> None: # TODO: what happens if we get a leading "pva://" here? self.prefix = prefix self.pvi_pv = prefix + "PVI" self.error_hint = error_hint
[docs] def create_children_from_annotations(self, device: Device): if not hasattr(self, "filler"): self.filler = DeviceFiller( device=device, signal_backend_factory=PvaSignalBackend, device_connector_factory=PviDeviceConnector, ) # Devices will be created with unfilled PviDeviceConnectors list(self.filler.create_devices_from_annotations(filled=False)) # Signals can be filled in with EpicsSignalSuffix and checked at runtime for backend, annotations in self.filler.create_signals_from_annotations( filled=False ): fill_backend_with_prefix(self.prefix, backend, annotations) self.filler.check_created()
def _fill_child(self, name: str, entry: Entry, vector_index: int | None = None): if set(entry) == {"d"}: connector = self.filler.fill_child_device(name, vector_index=vector_index) connector.pvi_pv = entry["d"] else: signal_type, read_pv, write_pv = _get_signal_details(entry) backend = self.filler.fill_child_signal(name, signal_type, vector_index) backend.read_pv = read_pv backend.write_pv = write_pv
[docs] async def connect_mock(self, device: Device, mock: LazyMock): self.filler.create_device_vector_entries_to_mock(self.mock_device_vector_len) # Set the name of the device to name all children device.set_name(device.name) return await super().connect_mock(device, mock)
def _fill_vector_child(self, name: str, entry: OldPVIVector | FastCSPVIVector): if isinstance(entry, list): for i, e in enumerate(entry): if e: self._fill_child(name, e, i) else: for i_string, e in entry["d"].items(): self._fill_child(name, {"d": e}, int(i_string.lstrip("v")))
[docs] async def connect_real( self, device: Device, timeout: float, force_reconnect: bool ) -> None: pvi_structure = await pvget_with_timeout(self.pvi_pv, timeout) entries: dict[str, Entry | OldPVIVector | FastCSPVIVector] = pvi_structure[ "value" ].todict() # Fill based on what PVI gives us for name, entry in entries.items(): if _is_device_vector_entry(entry): self._fill_vector_child( name, cast(OldPVIVector | FastCSPVIVector, entry) ) else: # This is a child self._fill_child(name, cast(Entry, entry)) # Check that all the requested children have been filled suffix = f"\n{self.error_hint}" if self.error_hint else "" self.filler.check_filled(f"{self.pvi_pv}: {entries}{suffix}") # Set the name of the device to name all children device.set_name(device.name) return await super().connect_real(device, timeout, force_reconnect)