Source code for bluesky_widgets.jupyter.zmq_dispatcher

import pickle
import multiprocessing
from queue import Empty
import threading

from bluesky.run_engine import Dispatcher, DocumentNames
import zmq

[docs]class RemoteDispatcher: """ Dispatch documents received over the network from a 0MQ proxy. This is designed to be run in a Jupyter kernel. Parameters ---------- address : tuple | str Address of a running 0MQ proxy, given either as a string like ``''`` or as a tuple like ``('', 5567)`` prefix : bytes, optional User-defined bytestring used to distinguish between multiple Publishers. If set, messages without this prefix will be ignored. If unset, no mesages will be ignored. deserializer: function, optional optional function to deserialize data. Default is pickle.loads Examples -------- Print all documents generated by remote RunEngines. >>> d = RemoteDispatcher(('localhost', 5568)) >>> d.subscribe(stream_documents_into_runs(model.add_run)) >>> d.start() # starts a thread and a long-running subprocess >>> d.stop() # stops them and blocks until they stop """ def __init__(self, address, *, prefix=b"", deserializer=pickle.loads): if isinstance(prefix, str): raise ValueError("prefix must be bytes, not string") if b" " in prefix: raise ValueError("prefix {!r} may not contain b' '".format(prefix)) self._prefix = prefix if isinstance(address, str): address = address.split(":", maxsplit=1) self._deserializer = deserializer self.address = (address[0], int(address[1])) self.closed = False self._thread = None self._process = None self._result_queue = multiprocessing.Queue() self._kill_worker = multiprocessing.Queue() self._dispatcher = Dispatcher() self.subscribe = self._dispatcher.subscribe self._waiting_for_start = True def _receive_data(self): our_prefix = self._prefix # local var to save an attribute lookup # TODO Pull on the socket more than once here, until it blocks, to # ensure we do not get more and more behind over time. message = self._socket.recv() prefix, name, doc = message.split(b" ", 2) name = name.decode() if (not our_prefix) or prefix == our_prefix: if self._waiting_for_start: if name == "start": self._waiting_for_start = False else: # We subscribed midstream and are seeing documents for # which we do not have the full run. Wait for a 'start' # doc. return doc = self._deserializer(doc) return name, doc def start(self): if self.closed: raise RuntimeError( "This RemoteDispatcher has already been " "started and interrupted. Create a fresh " "instance with {}".format(repr(self)) ) self._process = multiprocessing.Process( target=self._zmq_worker, args=(self._kill_worker, self._result_queue) ) self._process.start() self._thread = threading.Thread(target=self._dispatcher_worker) self._thread.start() def _zmq_worker(self, kill_worker, result_queue): "This runs in a subprocess to avoid stepping on Jupyter's use of zmq." self._context = zmq.Context() self._socket = self._context.socket(zmq.SUB) url = "tcp://%s:%d" % self.address self._socket.connect(url) self._socket.setsockopt_string(zmq.SUBSCRIBE, "") # # self._socket.setsockopt(zmq.RCVTIMEO, 100) # miliseconds # self._socket.setsockopt(zmq.LINGER, 0) while kill_worker.empty(): result = self._receive_data() if result is not None: result_queue.put(result) def _dispatcher_worker(self): "This runs in a thread." while not self.closed: try: result = self._result_queue.get(timeout=0.1) except Empty: continue name, doc = result self._dispatcher.process(DocumentNames[name], doc) def stop(self): self.closed = True self._kill_worker.put(object()) self._thread.join() self._process.join()