"""
The key classes needed to use 0MQ for multiprocess document communication.
`Publisher` : subscribe this to the RE to emit the documents. Expects a server to
have a SUBSCRIBE port open to PUB to.
`RemoteDispatcher` : subscribe callbacks to this class in a remote process. Expects
a server to have a PUB port open to SUBSCRIBE to.
`Proxy` : server that binds ports for Pubslisher to push to and the Dispatcher
to listen to. Typically this is started with the cli tool ``bluesky-zmq-proxy``
"""
import asyncio
import copy
import pickle
import warnings
from typing import Union
from ..run_engine import Dispatcher, DocumentNames
def _normalize_address(inp: Union[str, tuple, int]):
if isinstance(inp, str):
if "://" in inp:
protocol, _, rest_str = inp.partition("://")
else:
protocol = "tcp"
rest_str = inp
elif isinstance(inp, tuple):
if inp[0] in ["tcp", "ipc"]:
protocol, *rest = inp
else:
protocol = "tcp"
rest = list(inp)
if protocol == "tcp":
if len(rest) == 2:
rest_str = ":".join(str(r) for r in rest)
else:
(rest_str,) = rest
else:
(rest_str,) = rest
elif isinstance(inp, int):
protocol = "tcp"
rest_str = f"0.0.0.0:{inp}"
else:
raise TypeError(f"Input expected to be int, str, or tuple, not {type(inp)}")
return f"{protocol}://{rest_str}"
class Bluesky0MQDecodeError(Exception):
"""Custom exception class for things that go wrong reading message from wire."""
...
[docs]
class Publisher:
"""
A callback that publishes documents to a 0MQ proxy.
Parameters
----------
address : string or tuple
Address of a running 0MQ proxy, given either as a string like
``'127.0.0.1:5567'`` or as a tuple like ``('127.0.0.1', 5567)``
prefix : bytes, optional
User-defined bytestring used to distinguish between multiple
Publishers. May not contain b' '.
RE : ``bluesky.RunEngine``, optional
DEPRECATED.
RunEngine to which the Publisher will be automatically subscribed
(and, more importantly, unsubscribed when it is closed).
zmq : object, optional
By default, the 'zmq' module is imported and used. Anything else
mocking its interface is accepted.
serializer: function, optional
optional function to serialize data. Default is pickle.dumps
Examples
--------
Publish from a RunEngine to a Proxy running on localhost on port 5567.
>>> publisher = Publisher('localhost:5567')
>>> RE = RunEngine({})
>>> RE.subscribe(publisher)
"""
def __init__(self, address, *, prefix=b"", RE=None, zmq=None, serializer=pickle.dumps):
if RE is not None:
warnings.warn( # noqa: B028
"The RE argument to Publisher is deprecated and "
"will be removed in a future release of bluesky. "
"Update your code to subscribe this Publisher "
"instance to (and, if needed, unsubscribe from) to "
"the RunEngine manually."
)
if isinstance(prefix, str):
raise ValueError("prefix must be bytes, not string")
if b" " in prefix:
raise ValueError(f"prefix {prefix!r} may not contain b' '")
if zmq is None:
import zmq
self.address = _normalize_address(address)
self.RE = RE
self._prefix = bytes(prefix)
self._context = zmq.Context()
self._socket = self._context.socket(zmq.PUB)
self._socket.connect(self.address)
if RE:
self._subscription_token = RE.subscribe(self)
self._serializer = serializer
def __call__(self, name, doc):
doc = copy.deepcopy(doc)
message = b" ".join([self._prefix, name.encode(), self._serializer(doc)])
self._socket.send(message)
def close(self):
if self.RE:
self.RE.unsubscribe(self._subscription_token)
self._socket.close()
self._context.destroy() # close Socket(s); terminate Context
[docs]
class Proxy:
"""
Start a 0MQ proxy on the local host.
The addresses can be specified flexibly. It is best to use
a domain_socket (available on unix):
- ``'icp:///tmp/domain_socket'``
- ``('ipc', '/tmp/domain_socket')``
tcp sockets are also supported:
- ``'tcp://localhost:6557'``
- ``6657`` (implicitly binds to ``'tcp://localhost:6557'``
- ``('tcp', 'localhost', 6657)``
- ``('localhost', 6657)``
Parameters
----------
in_address : str or tuple or int, optional
Address that RunEngines should broadcast to.
If None, a random tcp port on all interfaces is used.
out_address : str or tuple or int, optional
Address that subscribers should subscribe to.
If None, a random tcp port on all interfaces is used.
zmq : object, optional
By default, the 'zmq' module is imported and used. Anything else
mocking its interface is accepted.
Attributes
----------
in_address: int or str or tuple
Port that RunEngines should broadcast to.
out_address : int or str or tuple
Port that subscribers should subscribe to.
closed : boolean
True if the Proxy has already been started and subsequently
interrupted and is therefore unusable.
Examples
--------
Run on specific ports.
>>> proxy = Proxy(5567, 5568)
>>> proxy
Proxy(in_port=5567, out_port=5568)
>>> proxy.start() # runs until interrupted
Run on random ports, and access those ports before starting.
>>> proxy = Proxy()
>>> proxy
Proxy(in_port=56504, out_port=56505)
>>> proxy.in_port
56504
>>> proxy.out_port
56505
>>> proxy.start() # runs until interrupted
"""
def __init__(
self,
in_address=None,
out_address=None,
*,
zmq=None,
in_port=None,
out_port=None,
):
# Handle backward compatibility for in_port -> in_address
if in_port is not None and in_address is not None:
raise ValueError("Cannot specify both 'in_port' and 'in_address'. Use 'in_address' only.")
if in_port is not None:
warnings.warn(
"The 'in_port' parameter is deprecated and will be removed in a future release. "
"Use 'in_address' instead.",
DeprecationWarning,
stacklevel=2,
)
in_address = in_port
# Handle backward compatibility for out_port -> out_address
if out_port is not None and out_address is not None:
raise ValueError("Cannot specify both 'out_port' and 'out_address'. Use 'out_address' only.")
if out_port is not None:
warnings.warn(
"The 'out_port' parameter is deprecated and will be removed in a future release. "
"Use 'out_address' instead.",
DeprecationWarning,
stacklevel=2,
)
out_address = out_port
# Delete deprecated parameter names
del in_port, out_port
if zmq is None:
import zmq
self.zmq = zmq
self.closed = False
try:
context = zmq.Context(1)
# Socket facing clients
frontend = context.socket(zmq.SUB)
if in_address is None:
in_bind_result = frontend.bind_to_random_port("tcp://*")
else:
in_address = _normalize_address(in_address)
in_bind_result = frontend.bind(in_address)
frontend.setsockopt_string(zmq.SUBSCRIBE, "")
# Socket facing services
backend = context.socket(zmq.PUB)
if out_address is None:
out_bind_result = backend.bind_to_random_port("tcp://*")
else:
out_address = _normalize_address(out_address)
out_bind_result = backend.bind(out_address)
except BaseException:
# Clean up whichever components we have defined so far.
try:
frontend.close()
except NameError:
...
try:
backend.close()
except NameError:
...
context.destroy()
raise
else:
self.in_port = (
in_bind_result.addr if hasattr(in_bind_result, "addr") else _normalize_address(in_bind_result)
)
self.out_port = (
out_bind_result.addr if hasattr(out_bind_result, "addr") else _normalize_address(out_bind_result)
)
self._frontend = frontend
self._backend = backend
self._context = context
def start(self):
if self.closed:
raise RuntimeError(
f"This Proxy has already been started and interrupted. Create a fresh instance with {repr(self)}"
)
try:
self.zmq.device(self.zmq.FORWARDER, self._frontend, self._backend)
finally:
self.closed = True
self._frontend.close()
self._backend.close()
self._context.destroy()
def __repr__(self):
return "{}(in_port={in_port}, out_port={out_port})".format(type(self).__name__, **vars(self))
[docs]
class RemoteDispatcher(Dispatcher):
"""
Dispatch documents received over the network from a 0MQ proxy.
Parameters
----------
address : tuple
Address of a running 0MQ proxy, given either as a string like
``'127.0.0.1:5567'`` or as a tuple like ``('127.0.0.1', 5567)``
prefix : bytes, optional
User-defined bytestring used to distinguish between multiple
Publishers. If set, messages without this prefix will be ignored.
If unset, no mesages will be ignored.
loop : zmq.asyncio.ZMQEventLoop, optional
zmq : object, optional
By default, the 'zmq' module is imported and used. Anything else
mocking its interface is accepted.
zmq_asyncio : object, optional
By default, the 'zmq.asyncio' module is imported and used. Anything
else mocking its interface is accepted.
deserializer: function, optional
optional function to deserialize data. Default is pickle.loads
Examples
--------
Print all documents generated by remote RunEngines.
>>> d = RemoteDispatcher(('localhost', 5568))
>>> d.subscribe(print)
>>> d.start() # runs until interrupted
"""
def __init__(
self,
address,
*,
prefix=b"",
loop=None,
zmq=None,
zmq_asyncio=None,
deserializer=pickle.loads,
strict=False,
):
if isinstance(prefix, str):
raise ValueError("prefix must be bytes, not string")
if b" " in prefix:
raise ValueError(f"prefix {prefix!r} may not contain b' '")
self._prefix = prefix
if zmq is None:
import zmq
if zmq_asyncio is None:
import zmq.asyncio as zmq_asyncio
self._deserializer = deserializer
self.address = _normalize_address(address)
if loop is None:
loop = asyncio.new_event_loop()
self.loop = loop
self._context = None
self._socket = None
def __finish_setup():
asyncio.set_event_loop(self.loop)
self._context = zmq_asyncio.Context()
self._socket = self._context.socket(zmq.SUB)
self._socket.connect(self.address)
self._socket.setsockopt_string(zmq.SUBSCRIBE, "")
self.__factory = __finish_setup
self._task = None
self.closed = False
self._strict = strict
super().__init__()
async def _poll(self):
our_prefix = self._prefix # local var to save an attribute lookup
while True:
message = await self._socket.recv()
try:
prefix, name, doc = message.split(b" ", 2)
except ValueError as e:
if self._strict:
raise Bluesky0MQDecodeError from e
else:
print(
f"The message {message} could not be split into "
"three parts by b' '. Dropping message on floor "
"and continuing"
f"\n\n{e}"
)
continue
try:
name = name.decode()
except UnicodeDecodeError as e:
if self._strict:
raise Bluesky0MQDecodeError from e
else:
print(
f"The name {name} can not be decoded as utf-8. "
"Dropping message on the floor and continuing. "
f"\n\n{e}"
)
continue
if (not our_prefix) or prefix == our_prefix:
try:
doc = self._deserializer(doc)
except Exception as e:
if self._strict:
raise Bluesky0MQDecodeError from e
else:
if len(doc) > 1024:
msg_doc = doc[:1024] + b"--SNIPPED--"
else:
msg_doc = doc
print(
f"Failed to deserialize the {name} document "
f"{msg_doc} using {self._deserializer}. "
"Dropping on floor and continuing"
f"\n\n{e}"
)
continue
self.loop.call_soon(self.process, DocumentNames[name], doc)
def start(self):
if self.closed:
raise RuntimeError(
"This RemoteDispatcher has already been "
"started and interrupted. Create a fresh "
f"instance with {self!r}"
)
try:
self.__factory()
self._task = self.loop.create_task(self._poll())
self.loop.run_until_complete(self._task)
task_exception = self._task.exception()
if task_exception is not None:
raise task_exception
finally:
self.stop()
def stop(self):
if self._task is not None:
self._task.cancel()
if self._socket is not None:
self._socket.close()
if self._context is not None:
self._context.destroy()
self.loop.close()
self.closed = True