"""Tango Signals over Pytango."""
from __future__ import annotations
import logging
from enum import IntEnum
import numpy.typing as npt
from tango import (
AttrWriteType,
DeviceProxy,
DevState,
)
from tango.asyncio import DeviceProxy as AsyncDeviceProxy
from ophyd_async.core import (
DEFAULT_TIMEOUT,
Signal,
SignalDatatypeT,
SignalR,
SignalRW,
SignalW,
SignalX,
)
from ._tango_transport import (
CommandProxyReadCharacter,
TangoSignalBackend,
get_command_character,
get_python_type,
)
from ._utils import get_device_trl_and_attr
logger = logging.getLogger("ophyd_async")
[docs]
def make_backend(
datatype: type[SignalDatatypeT] | None,
read_trl: str = "",
write_trl: str = "",
) -> TangoSignalBackend:
return TangoSignalBackend(datatype, read_trl, write_trl)
[docs]
def tango_signal_rw(
datatype: type[SignalDatatypeT],
read_trl: str,
write_trl: str = "",
timeout: float = DEFAULT_TIMEOUT,
name: str = "",
) -> SignalRW[SignalDatatypeT]:
"""Create a `SignalRW` backed by 1 or 2 Tango Attribute/Command.
Parameters
----------
datatype:
Check that the Attribute/Command is of this type
read_trl:
The Attribute/Command to read and monitor
write_trl:
If given, use this Attribute/Command to write to, otherwise use read_trl
timeout:
The timeout for the read and write operations
name:
The name of the Signal
"""
backend = make_backend(datatype, read_trl, write_trl or read_trl)
return SignalRW(backend, timeout=timeout, name=name)
[docs]
def tango_signal_r(
datatype: type[SignalDatatypeT],
read_trl: str,
timeout: float = DEFAULT_TIMEOUT,
name: str = "",
) -> SignalR[SignalDatatypeT]:
"""Create a `SignalR` backed by 1 Tango Attribute/Command.
Parameters
----------
datatype:
Check that the Attribute/Command is of this type
read_trl:
The Attribute/Command to read and monitor
timeout:
The timeout for the read operation
name:
The name of the Signal
"""
backend = make_backend(datatype, read_trl, read_trl)
return SignalR(backend, timeout=timeout, name=name)
[docs]
def tango_signal_w(
datatype: type[SignalDatatypeT],
write_trl: str,
timeout: float = DEFAULT_TIMEOUT,
name: str = "",
) -> SignalW[SignalDatatypeT]:
"""Create a `SignalW` backed by 1 Tango Attribute/Command.
Parameters
----------
datatype:
Check that the Attribute/Command is of this type
write_trl:
The Attribute/Command to write to
timeout:
The timeout for the write operation
name:
The name of the Signal
"""
backend = make_backend(datatype, write_trl, write_trl)
return SignalW(backend, timeout=timeout, name=name)
[docs]
def tango_signal_x(
write_trl: str,
timeout: float = DEFAULT_TIMEOUT,
name: str = "",
) -> SignalX:
"""Create a `SignalX` backed by 1 Tango Attribute/Command.
Parameters
----------
write_trl:
The Attribute/Command to write its initial value to on execute
timeout:
The timeout for the command operation
name:
The name of the Signal
"""
backend = make_backend(None, write_trl, write_trl)
return SignalX(backend, timeout=timeout, name=name)
[docs]
async def infer_python_type(
trl: str = "", proxy: DeviceProxy | None = None
) -> object | npt.NDArray | type[DevState] | IntEnum:
"""Infers the python type from the TRL."""
# TODO: work out if this is still needed
device_trl, tr_name = get_device_trl_and_attr(trl)
if proxy is None:
dev_proxy = await AsyncDeviceProxy(device_trl)
else:
dev_proxy = proxy
if tr_name in dev_proxy.get_command_list():
config = await dev_proxy.get_command_config(tr_name)
py_type = get_python_type(config)
elif tr_name in dev_proxy.get_attribute_list():
config = await dev_proxy.get_attribute_config(tr_name)
py_type = get_python_type(config)
else:
raise RuntimeError(f"Cannot find {tr_name} in {device_trl}")
return py_type
[docs]
async def infer_signal_type(
trl, proxy: DeviceProxy | None = None
) -> type[Signal] | None:
device_trl, tr_name = get_device_trl_and_attr(trl)
if proxy is None:
dev_proxy = await AsyncDeviceProxy(device_trl)
else:
dev_proxy = proxy
if tr_name not in dev_proxy.get_attribute_list():
if tr_name not in dev_proxy.get_command_list():
raise RuntimeError(f"Cannot find {tr_name} in {device_trl}")
if tr_name in dev_proxy.get_attribute_list():
config = await dev_proxy.get_attribute_config(tr_name)
if config.writable in [AttrWriteType.READ_WRITE, AttrWriteType.READ_WITH_WRITE]:
return SignalRW
elif config.writable == AttrWriteType.READ:
return SignalR
else:
return SignalW
if tr_name in dev_proxy.get_command_list():
config = await dev_proxy.get_command_config(tr_name)
command_character = get_command_character(config)
if command_character == CommandProxyReadCharacter.READ:
return SignalR
elif command_character == CommandProxyReadCharacter.WRITE:
return SignalW
elif command_character == CommandProxyReadCharacter.READ_WRITE:
return SignalRW
elif command_character == CommandProxyReadCharacter.EXECUTE:
return SignalX
raise RuntimeError(f"Unable to infer signal character for {trl}")