Source code for ophyd_async.epics.core._epics_connector

from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from ophyd_async.core import Device, DeviceConnector, DeviceFiller

from ._signal import EpicsSignalBackend, get_signal_backend_type, split_protocol_from_pv


[docs] @dataclass class PvSuffix: """Define the PV suffix to be appended to the device prefix.""" read_suffix: str write_suffix: str | None = None
[docs] @classmethod def rbv(cls, write_suffix: str, rbv_suffix: str = "_RBV") -> PvSuffix: return cls(write_suffix + rbv_suffix, write_suffix)
def fill_backend_with_prefix( prefix: str, backend: EpicsSignalBackend, annotations: list[Any] ): unhandled = [] while annotations: annotation = annotations.pop(0) if isinstance(annotation, PvSuffix): backend.read_pv = prefix + annotation.read_suffix backend.write_pv = prefix + ( annotation.write_suffix or annotation.read_suffix ) else: unhandled.append(annotation) annotations.extend(unhandled) # These leftover annotations will now be handled by the iterator
[docs] class EpicsDeviceConnector(DeviceConnector): """Used for connecting signals to static EPICS pvs.""" def __init__(self, prefix: str) -> None: self.prefix = prefix
[docs] def create_children_from_annotations(self, device: Device): if not hasattr(self, "filler"): protocol, prefix = split_protocol_from_pv(self.prefix) self.filler = DeviceFiller( device, signal_backend_factory=get_signal_backend_type(protocol), device_connector_factory=DeviceConnector, ) for backend, annotations in self.filler.create_signals_from_annotations(): fill_backend_with_prefix(prefix, backend, annotations) list(self.filler.create_devices_from_annotations())