bluesky_queueserver.ReceiveConsoleOutputAsync
- class bluesky_queueserver.ReceiveConsoleOutputAsync(*, zmq_subscribe_addr=None, zmq_topic='QS_Console', timeout=1000)[source]
Async version of
ReceiveConsoleOutput
class. There are two ways to use the class: explicitly awaiting for therecv
function (same as inReceiveConsoleOutput
) or setting up a callback function (plain function or coroutine).The
subscribe()
andunsubscribe()
methods allow to explicitly subscribe and unsubscribe the socket to the topic. The messages published while the socket is unsubscribed are discarded. Calls torecv()
andstart()
methods always subscribe the socket,stop()
method unsubscribes the socket unless called withunsubscribe=False
.Explicitly awaiting
recv
function:from bluesky_queueserver import ReceiveConsoleOutputAsync zmq_subscribe_addr = "tcp://localhost:60625" rco = ReceiveConsoleOutputAsync(zmq_subscribe_addr=zmq_subscribe_addr) async def run_acquisition(): while True: try: payload = await rco.recv() time, msg = payload.get("time", None), payload.get("msg", None) # In this example the messages are printed in the terminal. sys.stdout.write(msg) sys.stdout.flush() except TimeoutError: # Timeout does not mean communication error!!! # Insert the code that needs to be executed on timeout (if any). pass # Place for the code that should be executed after receiving each # message or after timeout (e.g. check a condition and exit # the loop once the condition is satisfied). # Subscribe to start caching messages. Calling 'recv()' also subscribes the socket. rco.subscribe() asyncio.run(run_acquisition()) # Unsubscribe to discard all new messages rco.unsubscribe()
Setting up callback function or coroutine (awaitable function):
from bluesky_queueserver import ReceiveConsoleOutputAsync zmq_subscribe_addr = "tcp://localhost:60625" rco = ReceiveConsoleOutputAsync(zmq_subscribe_addr=zmq_subscribe_addr) async def cb_coro(payload): time, msg = payload.get("time", None), payload.get("msg", None) # In this example the messages are printed in the terminal. sys.stdout.write(msg) sys.stdout.flush() rco.set_callback(cb_coro) async def run_acquisition(): rco.start() # Do something useful here, e.g. sleep asyncio.sleep(60) rco.stop() # Acquisition can be started and stopped multiple time if necessary rco.start() asyncio.sleep(60) rco.stop() asyncio.run(run_acquisition())
Note
If callback is a plain function, it is executed immediately after the message is received and may potentially block the loop if it takes too long to complete (even occasionally). If the callback is a coroutine, it is not awaited, but instead placed in the loop (with
ensure_future
), so acquisition of messages will continue. Typically the callback will do a simple operation such as adding the received message to the queue.- Parameters:
- zmq_subscribe_addrstr or None
Address of ZMQ server (PUB socket). If None, then the default address is
tcp://localhost:60625
is used.- zmq_topicstr
0MQ topic for console output. Only messages from this topic are going to be received.
- timeoutint, float or None
Timeout for the receive operation in milliseconds. If None, then wait for the message indefinitely.
Methods
__init__
(*[, zmq_subscribe_addr, zmq_topic, ...])recv
([timeout])Get the next published message.
set_callback
(cb)Set callback function, which is called once for each received message.
start
()Start collection of messages published by RE Manager.
stop
(*[, unsubscribe])Stop collection of messages published by RE Manager.
Subscribe 0MQ socket to the console output topic.
Unsubscribe 0MQ socket from the console output topic.