"""EPICS Signals over CA or PVA"""from__future__importannotationsfromtypingimportOptional,Tuple,Typefromophyd_async.coreimport(SignalBackend,SignalR,SignalRW,SignalW,SignalX,T,get_unique,)from._epics_transportimportEpicsTransport_default_epics_transport=EpicsTransport.cadef_transport_pv(pv:str)->Tuple[EpicsTransport,str]:split=pv.split("://",1)iflen(split)>1:# We got something like pva://mydevice, so use specified comms modetransport_str,pv=splittransport=EpicsTransport[transport_str]else:# No comms mode specified, use the defaulttransport=_default_epics_transportreturntransport,pvdef_make_backend(datatype:Optional[Type[T]],read_pv:str,write_pv:str)->SignalBackend[T]:r_transport,r_pv=_transport_pv(read_pv)w_transport,w_pv=_transport_pv(write_pv)transport=get_unique({read_pv:r_transport,write_pv:w_transport},"transports")returntransport.value(datatype,r_pv,w_pv)
[docs]defepics_signal_rw(datatype:Type[T],read_pv:str,write_pv:Optional[str]=None)->SignalRW[T]:"""Create a `SignalRW` backed by 1 or 2 EPICS PVs Parameters ---------- datatype: Check that the PV is of this type read_pv: The PV to read and monitor write_pv: If given, use this PV to write to, otherwise use read_pv """backend=_make_backend(datatype,read_pv,write_pvorread_pv)returnSignalRW(backend)
[docs]defepics_signal_r(datatype:Type[T],read_pv:str)->SignalR[T]:"""Create a `SignalR` backed by 1 EPICS PV Parameters ---------- datatype: Check that the PV is of this type read_pv: The PV to read and monitor """backend=_make_backend(datatype,read_pv,read_pv)returnSignalR(backend)
[docs]defepics_signal_w(datatype:Type[T],write_pv:str)->SignalW[T]:"""Create a `SignalW` backed by 1 EPICS PVs Parameters ---------- datatype: Check that the PV is of this type write_pv: The PV to write to """backend=_make_backend(datatype,write_pv,write_pv)returnSignalW(backend)
[docs]defepics_signal_x(write_pv:str)->SignalX:"""Create a `SignalX` backed by 1 EPICS PVs Parameters ---------- write_pv: The PV to write its initial value to on trigger """backend:SignalBackend=_make_backend(None,write_pv,write_pv)returnSignalX(backend)