import pickle
import multiprocessing
from queue import Empty
import threading
from bluesky.run_engine import Dispatcher, DocumentNames
import zmq
[docs]class RemoteDispatcher:
"""
Dispatch documents received over the network from a 0MQ proxy.
This is designed to be run in a Jupyter kernel.
Parameters
----------
address : tuple | str
Address of a running 0MQ proxy, given either as a string like
``'127.0.0.1:5567'`` or as a tuple like ``('127.0.0.1', 5567)``
prefix : bytes, optional
User-defined bytestring used to distinguish between multiple
Publishers. If set, messages without this prefix will be ignored.
If unset, no mesages will be ignored.
deserializer: function, optional
optional function to deserialize data. Default is pickle.loads
Examples
--------
Print all documents generated by remote RunEngines.
>>> d = RemoteDispatcher(('localhost', 5568))
>>> d.subscribe(stream_documents_into_runs(model.add_run))
>>> d.start() # starts a thread and a long-running subprocess
>>> d.stop() # stops them and blocks until they stop
"""
def __init__(self, address, *, prefix=b"", deserializer=pickle.loads):
if isinstance(prefix, str):
raise ValueError("prefix must be bytes, not string")
if b" " in prefix:
raise ValueError("prefix {!r} may not contain b' '".format(prefix))
self._prefix = prefix
if isinstance(address, str):
address = address.split(":", maxsplit=1)
self._deserializer = deserializer
self.address = (address[0], int(address[1]))
self.closed = False
self._thread = None
self._process = None
self._result_queue = multiprocessing.Queue()
self._kill_worker = multiprocessing.Queue()
self._dispatcher = Dispatcher()
self.subscribe = self._dispatcher.subscribe
self._waiting_for_start = True
def _receive_data(self):
our_prefix = self._prefix # local var to save an attribute lookup
# TODO Pull on the socket more than once here, until it blocks, to
# ensure we do not get more and more behind over time.
message = self._socket.recv()
prefix, name, doc = message.split(b" ", 2)
name = name.decode()
if (not our_prefix) or prefix == our_prefix:
if self._waiting_for_start:
if name == "start":
self._waiting_for_start = False
else:
# We subscribed midstream and are seeing documents for
# which we do not have the full run. Wait for a 'start'
# doc.
return
doc = self._deserializer(doc)
return name, doc
def start(self):
if self.closed:
raise RuntimeError(
"This RemoteDispatcher has already been "
"started and interrupted. Create a fresh "
"instance with {}".format(repr(self))
)
self._process = multiprocessing.Process(
target=self._zmq_worker, args=(self._kill_worker, self._result_queue)
)
self._process.start()
self._thread = threading.Thread(target=self._dispatcher_worker)
self._thread.start()
def _zmq_worker(self, kill_worker, result_queue):
"This runs in a subprocess to avoid stepping on Jupyter's use of zmq."
self._context = zmq.Context()
self._socket = self._context.socket(zmq.SUB)
url = "tcp://%s:%d" % self.address
self._socket.connect(url)
self._socket.setsockopt_string(zmq.SUBSCRIBE, "")
# http://stackoverflow.com/questions/7538988/zeromq-how-to-prevent-infinite-wait#comment68021160_7540299
# self._socket.setsockopt(zmq.RCVTIMEO, 100) # miliseconds
# self._socket.setsockopt(zmq.LINGER, 0)
while kill_worker.empty():
result = self._receive_data()
if result is not None:
result_queue.put(result)
def _dispatcher_worker(self):
"This runs in a thread."
while not self.closed:
try:
result = self._result_queue.get(timeout=0.1)
except Empty:
continue
name, doc = result
self._dispatcher.process(DocumentNames[name], doc)
def stop(self):
self.closed = True
self._kill_worker.put(object())
self._thread.join()
self._process.join()