bluesky_queueserver.ZMQCommSendThreads

class bluesky_queueserver.ZMQCommSendThreads(*, zmq_server_address=None, timeout_recv=2000, timeout_send=500, raise_exceptions=True, server_public_key=None)[source]

Thread-based API for communication with RE Manager via ZMQ.

Parameters:
zmq_server_addressstr or None

Address of ZMQ server. If None, then the default address is tcp://localhost:60615 is used.

timeout_recvint

Timeout (in ms) for ZMQ receive operations.

timeout_sendint

Timeout (in ms) for ZMQ send operations.

raise_exceptionsbool

Tells if exceptions should be raised in case of communication errors (mostly timeouts) when send_message() is called in blocking mode. This setting can be overridden by specifying raise_exceptions parameter of send_message() function. The exception CommTimeoutError is raised if the parameter is True, otherwise error message is returned by send_message().

server_public_keystr or None

Server public key (z85-encoded 40 character string). The valid public key from the server public/private key pair must be passed if encryption is enabled at the 0MQ server side. Communication requests will time out if the key is invalid. Exception will be raised if the key is improperly formatted. Encryption will be disabled if None is passed.

Examples

zmq_comm = ZMQCommSendThreads()

# Blocking call
try:
    msg = zmq_comm.send_message(method="some_method", params={"some_value": n})
    # Code that uses msg
except CommTimeoutError as ex:
    logger.exception("Exception occurred: %s", ex)

# Non-blocking call (trivial example)
msg_received, msg_err_received = [], []

def cb(msg, msg_err):
    # msg - dict of parameters ({} if communication failed,
    # msg_err - string ("" if communication is successful)

    # In QT application, 'cb' would typically send a signal. There should be
    #   very limited amount of processing done in the callback, since it would
    #   block ZMQ communication.

    msg_received.append(msg)
    msg_err_received.append(msg_err)

zmq_comm.send_message(method="some_method", params={"some_value": n}, cb=cb)

# Wait for the message.
while not msg_received:
    time.sleep(0.01)

# Code to process received 'msg'
__init__(*, zmq_server_address=None, timeout_recv=2000, timeout_send=500, raise_exceptions=True, server_public_key=None)[source]

Methods

__init__(*[, zmq_server_address, ...])

close()

Close ZMQ socket.

send_message(*, method[, params, timeout, ...])

Send message to ZMQ server and wait for the response.