Source code for bluesky_queueserver_api.system_info_monitor

import asyncio
import json
import queue
import threading
import time as ttime

from bluesky_queueserver import ReceiveSystemInfo, ReceiveSystemInfoAsync

from .comm_base import RequestTimeoutError

_system_info_monitor_endpoint = "/api/info/ws"


_doc_SystemInfoMonitor_ZMQ = """
    System Info Monitor API (0MQ). The class implements a monitor for console output
    published by RE Manager over 0MQ. The asynchronous version of the class must
    be instantiated in the loop.

    Parameters
    ----------
    zmq_info_addr: str
        Address of 0MQ PUB socket. The SUB socket of the monitor subscribes to this address
        once the class is instantiated.
    poll_timeout: float
        Timeout used internally for polling 0MQ socket. The value does not influence performance.
        It may take longer to stop the background thread or task, if the value is too large.
    max_msgs: int
        Maximum number of messages in the buffer. New messages are ignored if the buffer
        is full. This could happen only if console monitoring is enabled, but messages
        are not read from the buffer. Setting the value to 0 disables collection of messages
        in the buffer.
"""

_doc_SystemInfoMonitor_HTTP = """
    Console Monitor API (HTTP). The class implements a monitor for console output
    published by RE Manager over HTTP. The asynchronous version of the class must
    be instantiated in the loop.

    Parameters
    ----------
    parent: class
        Reference to the parent class (or any class). The class must expose the attribute
        ``_client`` that references configured ``httpx`` client.
    poll_period: float
        Period between consecutive retries to connect to the websocket at the server.
        The value is also used as a timeout for websocket receive operation. It may take
        longer to stop background thread if the value is too large.
    max_msgs: int
        Maximum number of messages in the buffer. New messages are ignored if the buffer
        is full. This could happen only if console monitoring is enabled, but messages
        are not read from the buffer. Setting the value to 0 disables collection of messages
        in the buffer.
"""

_doc_SystemInfoMonitor_enabled = """
    Indicates if monitoring is enabled. Returns ``True`` if monitoring is enabled,
    ``False`` otherwise.

    Examples
    --------

    Synchronous and asyncronous API:

    .. code-block:: python

        is_enabled = RM.system_info_monitor.enabled
"""

_doc_SystemInfoMonitor_enable = """
    Enable monitoring of the system info. Received messages are accumulated in the buffer
    and need to be continuosly read using ``next_msg()`` to prevent buffer from overflowing.
    If the API is called when background thread or task is not running, the buffer is cleared
    and all old messages are discarded. Note, that disabling and then enabling the monitor in
    rapid sequence is unlikely to clear the buffer, because the background thread or task may
    still be running. Use ``clear()`` API to remove messages from the buffer.

    Examples
    --------

    Synchronous and asyncronous API:

    .. code-block:: python

        RM.system_info_monitor.enable()
        RM.system_info_monitor.disable()
"""

_doc_SystemInfoMonitor_disable = """
    Disable monitoring of the system info. The API does not immediately stop the background thread
    or task. If the monitor is quickly re-enabled, the background thread or task may continue running
    continously.

    Examples
    --------

    Synchronous and asyncronous API:

    .. code-block:: python

        RM.system_info_monitor.enable()
        RM.system_info_monitor.disable()
"""

_doc_SystemInfoMonitor_disable_wait = """
    Disable monitoring and wait for completion.

    Parameters
    ----------
    timeout: float
        Wait timeout.

    Raises
    ------
    TimeoutError
        Wait timeout (synchronous API)
    asyncio.TimeoutError
        Wait timeout (asynchronous API)

    Examples
    --------

    Syncronous API:

    .. code-block:: python

        RM.system_info_monitor.enable()
        RM.system_info_monitor.disable_wait()

    Asynchronous API:

    .. code-block:: python

        RM.system_info_monitor.enable()
        await RM.system_info_monitor.disable_wait()
"""

_doc_SystemInfoMonitor_clear = """
    Clear the message buffer. Removes all messages from the buffer.

    Examples
    --------

    Synchronous and asyncronous API:

    .. code-block:: python

        RM.system_info_monitor.clear()
"""

_doc_SystemInfoMonitor_next_msg = """
    Returns the next message from the buffer. If ``timeout`` is ``None`` or zero, then
    the API returns the next available message. If the buffer contains no messages, the
    function waits for the next published message for ``timeout`` period and raises
    ``RequestTimeoutError`` if no messages were received. If ``timeout is ``None`` or zero
    and the buffer contains no messages, then the function immediately raises
    ``RequestTimeoutError``.

    The returned message is a dictionary with two keys: ``"time"`` (timestamp indicating time
    when the message was sent by RE Manager) and ``"msg"`` (dictionary with one key, the key
    indicates the message type, e.g. ``"status"``, and the value is the information itself).
    For example:

    .. code-block:: python

        {"time": 1764605683.1407075, "msg": {"status": {<status info of RE Manager>}}}

    Parameters
    ----------
    timeout: float or None
        If timeout is positive floating point number, zero or ``None``.

    Raises
    ------
    RequestTimeoutError
        No messages were no messages received during timeout period.

    Examples
    --------

    Synchronous API:

    .. code-block:: python

        # Make sure RE Manager is started with option '--zmq-publish-console=ON'

        RM = REManagerAPI()
        RM.system_info_monitor.enable()

        # Run any command that generates console output
        RM.environment_open()
        RM.wait_for_idle()

        try:
            msg = RM.system_info_monitor.next_msg(timeout=1)
            print(msg["msg"])
        except RM.RequestTimeoutError:
            pass

        RM.system_info_monitor.disable()
        RM.close()

    Asynchronous API:

    .. code-block:: python

        # Make sure RE Manager is started with option '--zmq-publish-console=ON'

        RM = REManagerAPI()
        RM.system_info_monitor.enable()

        # Run any command that generates console output
        await RM.environment_open()
        await RM.wait_for_idle()

        try:
            msg = await RM.system_info_monitor.next_msg(timeout=1)
            print(msg["msg"])
        except RM.RequestTimeoutError:
            pass

        RM.system_info_monitor.disable()
        await RM.close()
"""


def _websocket_uri(uri, endpoint):
    """
    Generate websocket URI based on the base URI used for http requests
    """
    n = uri.find("://")
    if n >= 0:
        uri_base = f"ws://{uri[n + 3 :]}"
    else:
        uri_base = f"ws://{uri}"
    return f"{uri_base}{endpoint}"


class _SystemInfoMonitor:
    def __init__(self):
        self._monitor_enabled = False
        self._monitor_init()

    def _monitor_init(self):
        raise NotImplementedError()

    def _clear(self):
        raise NotImplementedError()

    def _monitor_enable(self):
        raise NotImplementedError()

    @property
    def enabled(self):
        # Docstring is maintained separately
        return self._monitor_enabled

    def enable(self):
        # Docstring is maintained separately
        if not self._monitor_enabled:
            self._monitor_enable()

    def disable(self):
        # Docstring is maintained separately
        self._monitor_enabled = False

    def clear(self):
        # Docstring is maintained separately
        self._clear()

    def __del__(self):
        self.disable()


class _SystemInfoMonitor_Threads(_SystemInfoMonitor):
    def __init__(self, *, max_msgs):
        self._msg_queue_max = max(max_msgs, 0)
        self._msg_queue = queue.Queue(maxsize=max_msgs)

        self._monitor_enabled = False
        self._monitor_thread = None  # Thread or asyncio task
        self._monitor_thread_running = threading.Event()
        self._monitor_thread_running.set()

        self._monitor_thread_lock = threading.Lock()

        super().__init__()

    def _monitor_enable(self):
        self._monitor_thread = threading.Thread(
            target=self._thread_receive_msgs, name="QS API - Console monitoring", daemon=True
        )
        self._monitor_enabled = True
        self._monitor_thread.start()

    def _add_msg_to_queue(self, msg):
        if self._msg_queue_max:
            self._msg_queue.put_nowait(msg)

    def disable_wait(self, *, timeout=2):
        # Docstring is maintained separately
        self.disable()
        if not self._monitor_thread_running.wait(timeout=timeout):
            raise TimeoutError(f"Timeout occurred while disabling console monitor: timeout={timeout}")

    def next_msg(self, timeout=None):
        # Docstring is maintained separately
        block = bool(timeout)
        try:
            return self._msg_queue.get(block=block, timeout=timeout)
        except queue.Empty:
            raise RequestTimeoutError(f"No message was received (timeout={timeout})", request={})


[docs] class SystemInfoMonitor_ZMQ_Threads(_SystemInfoMonitor_Threads): # Docstring is maintained separately
[docs] def __init__(self, *, zmq_info_addr, zmq_encoding, poll_timeout, max_msgs): self._zmq_subscribe_addr = zmq_info_addr self._zmq_encoding = zmq_encoding self._monitor_poll_timeout = poll_timeout super().__init__(max_msgs=max_msgs)
def _monitor_init(self): self._rco = ReceiveSystemInfo( zmq_subscribe_addr=self._zmq_subscribe_addr, encoding=self._zmq_encoding, timeout=int(self._monitor_poll_timeout * 1000), ) def _thread_receive_msgs(self): with self._monitor_thread_lock: if not self._monitor_thread_running.is_set(): return self._monitor_thread_running.clear() self.clear() self._rco.subscribe() while True: with self._monitor_thread_lock: if not self._monitor_enabled: self._rco.unsubscribe() self._monitor_thread_running.set() break try: msg = self._rco.recv() self._add_msg_to_queue(msg) except TimeoutError: # No published messages are detected pass except queue.Full: # Queue is full, ignore the new messages pass def _clear(self): self._msg_queue.queue.clear()
[docs] class SystemInfoMonitor_HTTP_Threads(_SystemInfoMonitor_Threads): # Docstring is maintained separately
[docs] def __init__(self, *, parent, poll_period, max_msgs): # The parent class is must have ``_client`` attribute with # active httpx client. self._parent = parent # Reference to the parent class self._monitor_poll_period = poll_period super().__init__(max_msgs=max_msgs)
def _monitor_init(self): ... def _thread_receive_msgs(self): with self._monitor_thread_lock: if not self._monitor_thread_running.is_set(): return self._monitor_thread_running.clear() self.clear() while True: with self._monitor_thread_lock: if not self._monitor_enabled: self._monitor_thread_running.set() break websocket_uri = _websocket_uri(self._parent._http_server_uri, _system_info_monitor_endpoint) try: from websockets.sync.client import connect with connect(websocket_uri) as websocket: while self._monitor_enabled: try: msg_json = websocket.recv(timeout=self._monitor_poll_period, decode=False) try: msg = json.loads(msg_json) self._add_msg_to_queue(msg) except json.JSONDecodeError: pass except queue.Full: # Queue is full, ignore the new messages pass except TimeoutError: pass except Exception: # Ignore communication errors. More detailed processing may be added later. pass ttime.sleep(self._monitor_poll_period) def _clear(self): self._msg_queue.queue.clear()
class _SystemInfoMonitor_Async(_SystemInfoMonitor): def __init__(self, *, max_msgs): self._msg_queue_max = max_msgs self._msg_queue = asyncio.Queue(maxsize=max_msgs) self._monitor_task = None # Thread or asyncio task self._monitor_task_running = asyncio.Event() self._monitor_task_running.set() self._monitor_task_lock = asyncio.Lock() super().__init__() def _add_msg_to_queue(self, msg): if self._msg_queue_max: self._msg_queue.put_nowait(msg) def _monitor_enable(self): self._monitor_task = asyncio.create_task(self._task_receive_msgs()) self._monitor_enabled = True async def disable_wait(self, *, timeout=2): # Docstring is maintained separately self.disable() await asyncio.wait_for(self._monitor_task_running.wait(), timeout=timeout) async def next_msg(self, timeout=None): # Docstring is maintained separately try: if timeout: return await asyncio.wait_for(self._msg_queue.get(), timeout=timeout) else: return self._msg_queue.get_nowait() except (asyncio.QueueEmpty, asyncio.TimeoutError): raise RequestTimeoutError(f"No message was received (timeout={timeout})", request={})
[docs] class SystemInfoMonitor_ZMQ_Async(_SystemInfoMonitor_Async): # Docstring is maintained separately
[docs] def __init__(self, *, zmq_info_addr, zmq_encoding, poll_timeout, max_msgs): self._zmq_subscribe_addr = zmq_info_addr self._zmq_encoding = zmq_encoding self._monitor_poll_timeout = poll_timeout super().__init__(max_msgs=max_msgs)
def _monitor_init(self): self._rco = ReceiveSystemInfoAsync( zmq_subscribe_addr=self._zmq_subscribe_addr, encoding=self._zmq_encoding, timeout=int(self._monitor_poll_timeout * 1000), ) async def _task_receive_msgs(self): async with self._monitor_task_lock: if not self._monitor_task_running.is_set(): return self._monitor_task_running.clear() self.clear() self._rco.subscribe() while True: async with self._monitor_task_lock: if not self._monitor_enabled: self._rco.unsubscribe() self._monitor_task_running.set() break try: msg = await self._rco.recv() self._add_msg_to_queue(msg) except TimeoutError: # No published messages are detected pass except asyncio.QueueFull: # Queue is full, ignore the new messages pass def _clear(self): try: while True: self._msg_queue.get_nowait() except asyncio.QueueEmpty: pass
[docs] class SystemInfoMonitor_HTTP_Async(_SystemInfoMonitor_Async): # Docstring is maintained separately
[docs] def __init__(self, *, parent, poll_period, max_msgs): # The parent class is must have ``_client`` attribute with # active httpx client. self._parent = parent # Reference to the parent class self._monitor_poll_period = poll_period super().__init__(max_msgs=max_msgs)
def _monitor_init(self): ... async def _task_receive_msgs(self): async with self._monitor_task_lock: if not self._monitor_task_running.is_set(): return self._monitor_task_running.clear() self.clear() while True: async with self._monitor_task_lock: if not self._monitor_enabled: self._monitor_task_running.set() break websocket_uri = _websocket_uri(self._parent._http_server_uri, _system_info_monitor_endpoint) try: from websockets.asyncio.client import connect async with connect(websocket_uri) as websocket: while self._monitor_enabled: try: msg_json = await asyncio.wait_for( websocket.recv(decode=False), timeout=self._monitor_poll_period ) try: msg = json.loads(msg_json) self._add_msg_to_queue(msg) except json.JSONDecodeError: pass except asyncio.QueueFull: # Queue is full, ignore the new messages pass except asyncio.TimeoutError: pass except Exception: # Ignore communication errors. More detailed processing may be added later. pass await asyncio.sleep(self._monitor_poll_period) def _clear(self): try: while True: self._msg_queue.get_nowait() except asyncio.QueueEmpty: pass
_SystemInfoMonitor.enabled.__doc__ = _doc_SystemInfoMonitor_enabled _SystemInfoMonitor.enable.__doc__ = _doc_SystemInfoMonitor_enable _SystemInfoMonitor.disable.__doc__ = _doc_SystemInfoMonitor_disable _SystemInfoMonitor.clear.__doc__ = _doc_SystemInfoMonitor_clear _SystemInfoMonitor_Threads.disable_wait.__doc__ = _doc_SystemInfoMonitor_disable_wait _SystemInfoMonitor_Threads.next_msg.__doc__ = _doc_SystemInfoMonitor_next_msg SystemInfoMonitor_ZMQ_Threads.__doc__ = _doc_SystemInfoMonitor_ZMQ SystemInfoMonitor_HTTP_Threads.__doc__ = _doc_SystemInfoMonitor_HTTP _SystemInfoMonitor_Async.disable_wait.__doc__ = _doc_SystemInfoMonitor_disable_wait _SystemInfoMonitor_Async.next_msg.__doc__ = _doc_SystemInfoMonitor_next_msg SystemInfoMonitor_ZMQ_Async.__doc__ = _doc_SystemInfoMonitor_ZMQ SystemInfoMonitor_HTTP_Async.__doc__ = _doc_SystemInfoMonitor_HTTP