import pickle
from bluesky.run_engine import Dispatcher, DocumentNames
import zmq
from ..qt.threading import create_worker
from qtpy.QtCore import QTimer, QObject
LOADING_LATENCY = 0.01
[docs]class RemoteDispatcher(QObject):
"""
Dispatch documents received over the network from a 0MQ proxy.
This is designed to be run in a Qt application.
Parameters
----------
address : tuple | str
Address of a running 0MQ proxy, given either as a string like
``'127.0.0.1:5567'`` or as a tuple like ``('127.0.0.1', 5567)``
prefix : bytes, optional
User-defined bytestring used to distinguish between multiple
Publishers. If set, messages without this prefix will be ignored.
If unset, no mesages will be ignored.
deserializer: function, optional
optional function to deserialize data. Default is pickle.loads
Examples
--------
Print all documents generated by remote RunEngines.
>>> d = RemoteDispatcher(('localhost', 5568))
>>> d.subscribe(stream_documents_into_runs(model.add_run))
>>> d.start() # launches periodic workers on background threads
>>> d.stop() # stop launching workers
"""
def __init__(self, address, *, prefix=b"", deserializer=pickle.loads, parent=None):
super().__init__(parent)
if isinstance(prefix, str):
raise ValueError("prefix must be bytes, not string")
if b" " in prefix:
raise ValueError("prefix {!r} may not contain b' '".format(prefix))
self._prefix = prefix
if isinstance(address, str):
address = address.split(":", maxsplit=1)
self._deserializer = deserializer
self.address = (address[0], int(address[1]))
self._context = zmq.Context()
self._socket = self._context.socket(zmq.SUB)
url = "tcp://%s:%d" % self.address
self._socket.connect(url)
self._socket.setsockopt_string(zmq.SUBSCRIBE, "")
self._task = None
self.closed = False
self._timer = QTimer(self)
self._dispatcher = Dispatcher()
self.subscribe = self._dispatcher.subscribe
self._waiting_for_start = True
def _receive_data(self):
our_prefix = self._prefix # local var to save an attribute lookup
# TODO Pull on the socket more than once here, until it blocks, to
# ensure we do not get more and more behind over time.
message = self._socket.recv()
prefix, name, doc = message.split(b" ", 2)
name = name.decode()
if (not our_prefix) or prefix == our_prefix:
if self._waiting_for_start:
if name == "start":
self._waiting_for_start = False
else:
# We subscribed midstream and are seeing documents for
# which we do not have the full run. Wait for a 'start'
# doc.
return
doc = self._deserializer(doc)
return name, doc
def start(self):
if self.closed:
raise RuntimeError(
"This RemoteDispatcher has already been "
"started and interrupted. Create a fresh "
"instance with {}".format(repr(self))
)
self._work_loop()
def _work_loop(self):
worker = create_worker(
self._receive_data,
)
# Schedule this method to be run again after a brief wait.
worker.finished.connect(
lambda: self._timer.singleShot(LOADING_LATENCY, self._work_loop)
)
worker.returned.connect(self._process_result)
worker.start()
def _process_result(self, result):
if result is None:
return
name, doc = result
self._dispatcher.process(DocumentNames[name], doc)
def stop(self):
self.closed = True