Source code for bluesky.callbacks.zmq

"""
The key classes needed to use 0MQ for multiprocess document communication.

`Publisher` : subscribe this to the RE to emit the documents.  Expects a server to
have a SUBSCRIBE port open to PUB to.

`RemoteDispatcher` : subscribe callbacks to this class in a remote process.  Expects
a server to have a PUB port open to SUBSCRIBE to.

`Proxy` : server that binds ports for Pubslisher to push to and the Dispatcher
to listen to.  Typically this is started with the cli tool ``bluesky-zmq-proxy``

"""

import asyncio
import copy
import pickle
import warnings

from ..run_engine import Dispatcher, DocumentNames


class Bluesky0MQDecodeError(Exception):
    """Custom exception class for things that go wrong reading message from wire."""

    ...


[docs] class Publisher: """ A callback that publishes documents to a 0MQ proxy. Parameters ---------- address : string or tuple Address of a running 0MQ proxy, given either as a string like ``'127.0.0.1:5567'`` or as a tuple like ``('127.0.0.1', 5567)`` prefix : bytes, optional User-defined bytestring used to distinguish between multiple Publishers. May not contain b' '. RE : ``bluesky.RunEngine``, optional DEPRECATED. RunEngine to which the Publisher will be automatically subscribed (and, more importantly, unsubscribed when it is closed). zmq : object, optional By default, the 'zmq' module is imported and used. Anything else mocking its interface is accepted. serializer: function, optional optional function to serialize data. Default is pickle.dumps Examples -------- Publish from a RunEngine to a Proxy running on localhost on port 5567. >>> publisher = Publisher('localhost:5567') >>> RE = RunEngine({}) >>> RE.subscribe(publisher) """ def __init__(self, address, *, prefix=b"", RE=None, zmq=None, serializer=pickle.dumps): if RE is not None: warnings.warn( # noqa: B028 "The RE argument to Publisher is deprecated and " "will be removed in a future release of bluesky. " "Update your code to subscribe this Publisher " "instance to (and, if needed, unsubscribe from) to " "the RunEngine manually." ) if isinstance(prefix, str): raise ValueError("prefix must be bytes, not string") if b" " in prefix: raise ValueError(f"prefix {prefix!r} may not contain b' '") if zmq is None: import zmq if isinstance(address, str): address = address.split(":", maxsplit=1) self.address = (address[0], int(address[1])) self.RE = RE url = "tcp://%s:%d" % self.address self._prefix = bytes(prefix) self._context = zmq.Context() self._socket = self._context.socket(zmq.PUB) self._socket.connect(url) if RE: self._subscription_token = RE.subscribe(self) self._serializer = serializer def __call__(self, name, doc): doc = copy.deepcopy(doc) message = b" ".join([self._prefix, name.encode(), self._serializer(doc)]) self._socket.send(message) def close(self): if self.RE: self.RE.unsubscribe(self._subscription_token) self._socket.close() self._context.destroy() # close Socket(s); terminate Context
[docs] class Proxy: """ Start a 0MQ proxy on the local host. Parameters ---------- in_port : int, optional Port that RunEngines should broadcast to. If None, a random port is used. out_port : int, optional Port that subscribers should subscribe to. If None, a random port is used. zmq : object, optional By default, the 'zmq' module is imported and used. Anything else mocking its interface is accepted. Attributes ---------- in_port : int Port that RunEngines should broadcast to. out_port : int Port that subscribers should subscribe to. closed : boolean True if the Proxy has already been started and subsequently interrupted and is therefore unusable. Examples -------- Run on specific ports. >>> proxy = Proxy(5567, 5568) >>> proxy Proxy(in_port=5567, out_port=5568) >>> proxy.start() # runs until interrupted Run on random ports, and access those ports before starting. >>> proxy = Proxy() >>> proxy Proxy(in_port=56504, out_port=56505) >>> proxy.in_port 56504 >>> proxy.out_port 56505 >>> proxy.start() # runs until interrupted """ def __init__(self, in_port=None, out_port=None, *, zmq=None): if zmq is None: import zmq self.zmq = zmq self.closed = False try: context = zmq.Context(1) # Socket facing clients frontend = context.socket(zmq.SUB) if in_port is None: in_port = frontend.bind_to_random_port("tcp://*") else: frontend.bind("tcp://*:%d" % in_port) frontend.setsockopt_string(zmq.SUBSCRIBE, "") # Socket facing services backend = context.socket(zmq.PUB) if out_port is None: out_port = backend.bind_to_random_port("tcp://*") else: backend.bind("tcp://*:%d" % out_port) except BaseException: # Clean up whichever components we have defined so far. try: frontend.close() except NameError: ... try: backend.close() except NameError: ... context.destroy() raise else: self.in_port = in_port self.out_port = out_port self._frontend = frontend self._backend = backend self._context = context def start(self): if self.closed: raise RuntimeError( "This Proxy has already been started and " "interrupted. Create a fresh instance with " f"{repr(self)}" ) try: self.zmq.device(self.zmq.FORWARDER, self._frontend, self._backend) finally: self.closed = True self._frontend.close() self._backend.close() self._context.destroy() def __repr__(self): return "{}(in_port={in_port}, out_port={out_port})".format(type(self).__name__, **vars(self))
[docs] class RemoteDispatcher(Dispatcher): """ Dispatch documents received over the network from a 0MQ proxy. Parameters ---------- address : tuple Address of a running 0MQ proxy, given either as a string like ``'127.0.0.1:5567'`` or as a tuple like ``('127.0.0.1', 5567)`` prefix : bytes, optional User-defined bytestring used to distinguish between multiple Publishers. If set, messages without this prefix will be ignored. If unset, no mesages will be ignored. loop : zmq.asyncio.ZMQEventLoop, optional zmq : object, optional By default, the 'zmq' module is imported and used. Anything else mocking its interface is accepted. zmq_asyncio : object, optional By default, the 'zmq.asyncio' module is imported and used. Anything else mocking its interface is accepted. deserializer: function, optional optional function to deserialize data. Default is pickle.loads Examples -------- Print all documents generated by remote RunEngines. >>> d = RemoteDispatcher(('localhost', 5568)) >>> d.subscribe(print) >>> d.start() # runs until interrupted """ def __init__( self, address, *, prefix=b"", loop=None, zmq=None, zmq_asyncio=None, deserializer=pickle.loads, strict=False, ): if isinstance(prefix, str): raise ValueError("prefix must be bytes, not string") if b" " in prefix: raise ValueError(f"prefix {prefix!r} may not contain b' '") self._prefix = prefix if zmq is None: import zmq if zmq_asyncio is None: import zmq.asyncio as zmq_asyncio if isinstance(address, str): address = address.split(":", maxsplit=1) self._deserializer = deserializer self.address = (address[0], int(address[1])) if loop is None: loop = asyncio.new_event_loop() self.loop = loop self._context = None self._socket = None def __finish_setup(): asyncio.set_event_loop(self.loop) self._context = zmq_asyncio.Context() self._socket = self._context.socket(zmq.SUB) url = "tcp://%s:%d" % self.address self._socket.connect(url) self._socket.setsockopt_string(zmq.SUBSCRIBE, "") self.__factory = __finish_setup self._task = None self.closed = False self._strict = strict super().__init__() async def _poll(self): our_prefix = self._prefix # local var to save an attribute lookup while True: message = await self._socket.recv() try: prefix, name, doc = message.split(b" ", 2) except ValueError as e: if self._strict: raise Bluesky0MQDecodeError from e else: print( f"The message {message} could not be split into " "three parts by b' '. Dropping message on floor " "and continuing" f"\n\n{e}" ) continue try: name = name.decode() except UnicodeDecodeError as e: if self._strict: raise Bluesky0MQDecodeError from e else: print( f"The name {name} can not be decoded as utf-8. " "Dropping message on the floor and continuing. " f"\n\n{e}" ) continue if (not our_prefix) or prefix == our_prefix: try: doc = self._deserializer(doc) except Exception as e: if self._strict: raise Bluesky0MQDecodeError from e else: if len(doc) > 1024: msg_doc = doc[:1024] + b"--SNIPPED--" else: msg_doc = doc print( f"Failed to deserialize the {name} document " f"{msg_doc} using {self._deserializer}. " "Dropping on floor and continuing" f"\n\n{e}" ) continue self.loop.call_soon(self.process, DocumentNames[name], doc) def start(self): if self.closed: raise RuntimeError( "This RemoteDispatcher has already been " "started and interrupted. Create a fresh " f"instance with {self!r}" ) try: self.__factory() self._task = self.loop.create_task(self._poll()) self.loop.run_until_complete(self._task) task_exception = self._task.exception() if task_exception is not None: raise task_exception finally: self.stop() def stop(self): if self._task is not None: self._task.cancel() if self._socket is not None: self._socket.close() if self._context is not None: self._context.destroy() self.loop.close() self.closed = True