bluesky_queueserver.ZMQCommSendThreads
- class bluesky_queueserver.ZMQCommSendThreads(*, zmq_server_address=None, timeout_recv=2000, timeout_send=500, raise_exceptions=True, server_public_key=None)[source]
Thread-based API for communication with RE Manager via ZMQ.
- Parameters:
- zmq_server_addressstr or None
Address of ZMQ server. If None, then the default address is
tcp://localhost:60615
is used.- timeout_recvint
Timeout (in ms) for ZMQ receive operations.
- timeout_sendint
Timeout (in ms) for ZMQ send operations.
- raise_exceptionsbool
Tells if exceptions should be raised in case of communication errors (mostly timeouts) when
send_message()
is called in blocking mode. This setting can be overridden by specifyingraise_exceptions
parameter ofsend_message()
function. The exceptionCommTimeoutError
is raised if the parameter isTrue
, otherwise error message is returned bysend_message()
.- server_public_keystr or None
Server public key (z85-encoded 40 character string). The valid public key from the server public/private key pair must be passed if encryption is enabled at the 0MQ server side. Communication requests will time out if the key is invalid. Exception will be raised if the key is improperly formatted. Encryption will be disabled if
None
is passed.
Examples
zmq_comm = ZMQCommSendThreads() # Blocking call try: msg = zmq_comm.send_message(method="some_method", params={"some_value": n}) # Code that uses msg except CommTimeoutError as ex: logger.exception("Exception occurred: %s", ex) # Non-blocking call (trivial example) msg_received, msg_err_received = [], [] def cb(msg, msg_err): # msg - dict of parameters ({} if communication failed, # msg_err - string ("" if communication is successful) # In QT application, 'cb' would typically send a signal. There should be # very limited amount of processing done in the callback, since it would # block ZMQ communication. msg_received.append(msg) msg_err_received.append(msg_err) zmq_comm.send_message(method="some_method", params={"some_value": n}, cb=cb) # Wait for the message. while not msg_received: time.sleep(0.01) # Code to process received 'msg'
- __init__(*, zmq_server_address=None, timeout_recv=2000, timeout_send=500, raise_exceptions=True, server_public_key=None)[source]
Methods
__init__
(*[, zmq_server_address, ...])close
()Close ZMQ socket.
send_message
(*, method[, params, timeout, ...])Send message to ZMQ server and wait for the response.