bluesky_queueserver.ReceiveConsoleOutputAsync

class bluesky_queueserver.ReceiveConsoleOutputAsync(*, zmq_subscribe_addr=None, zmq_topic='QS_Console', timeout=1000)[source]

Async version of ReceiveConsoleOutput class. There are two ways to use the class: explicitly awaiting for the recv function (same as in ReceiveConsoleOutput) or setting up a callback function (plain function or coroutine).

The subscribe() and unsubscribe() methods allow to explicitly subscribe and unsubscribe the socket to the topic. The messages published while the socket is unsubscribed are discarded. Calls to recv() and start() methods always subscribe the socket, stop() method unsubscribes the socket unless called with unsubscribe=False.

Explicitly awaiting recv function:

from bluesky_queueserver import ReceiveConsoleOutputAsync

zmq_subscribe_addr = "tcp://localhost:60625"
rco = ReceiveConsoleOutputAsync(zmq_subscribe_addr=zmq_subscribe_addr)

async def run_acquisition():
    while True:
        try:
            payload = await rco.recv()
            time, msg = payload.get("time", None), payload.get("msg", None)
            # In this example the messages are printed in the terminal.
            sys.stdout.write(msg)
            sys.stdout.flush()
        except TimeoutError:
            # Timeout does not mean communication error!!!
            # Insert the code that needs to be executed on timeout (if any).
            pass
        # Place for the code that should be executed after receiving each
        #   message or after timeout (e.g. check a condition and exit
        #   the loop once the condition is satisfied).

# Subscribe to start caching messages. Calling 'recv()' also subscribes the socket.
rco.subscribe()

asyncio.run(run_acquisition())

# Unsubscribe to discard all new messages
rco.unsubscribe()

Setting up callback function or coroutine (awaitable function):

from bluesky_queueserver import ReceiveConsoleOutputAsync

zmq_subscribe_addr = "tcp://localhost:60625"
rco = ReceiveConsoleOutputAsync(zmq_subscribe_addr=zmq_subscribe_addr)

async def cb_coro(payload):
    time, msg = payload.get("time", None), payload.get("msg", None)
    # In this example the messages are printed in the terminal.
    sys.stdout.write(msg)
    sys.stdout.flush()

rco.set_callback(cb_coro)

async def run_acquisition():
    rco.start()
    # Do something useful here, e.g. sleep
    asyncio.sleep(60)
    rco.stop()

    # Acquisition can be started and stopped multiple time if necessary
    rco.start()
    asyncio.sleep(60)
    rco.stop()

asyncio.run(run_acquisition())

Note

If callback is a plain function, it is executed immediately after the message is received and may potentially block the loop if it takes too long to complete (even occasionally). If the callback is a coroutine, it is not awaited, but instead placed in the loop (with ensure_future), so acquisition of messages will continue. Typically the callback will do a simple operation such as adding the received message to the queue.

Parameters:
zmq_subscribe_addrstr or None

Address of ZMQ server (PUB socket). If None, then the default address is tcp://localhost:60625 is used.

zmq_topicstr

0MQ topic for console output. Only messages from this topic are going to be received.

timeoutint, float or None

Timeout for the receive operation in milliseconds. If None, then wait for the message indefinitely.

__init__(*, zmq_subscribe_addr=None, zmq_topic='QS_Console', timeout=1000)[source]

Methods

__init__(*[, zmq_subscribe_addr, zmq_topic, ...])

recv([timeout])

Get the next published message.

set_callback(cb)

Set callback function, which is called once for each received message.

start()

Start collection of messages published by RE Manager.

stop(*[, unsubscribe])

Stop collection of messages published by RE Manager.

subscribe()

Subscribe 0MQ socket to the console output topic.

unsubscribe()

Unsubscribe 0MQ socket from the console output topic.