Asynchronous Optimization with Bluesky Queueserver#

Warning

The queueserver integration is experimental. The API is not yet stable and may change in future releases without a deprecation period. It is not recommended for production use.

In this tutorial, you will learn how to run Blop optimization against a remote Bluesky Queueserver. This architecture is used when:

  • The experiment hardware is controlled by a shared instrument server

  • You want the optimizer to run in a separate process from the RunEngine

  • You need asynchronous, non-blocking optimization (the agent submits plans and reacts to completions)

We will optimize the same Himmelblau function from the simple experiment tutorial, but now the devices live inside a remote queueserver process rather than in the same Python session as the agent.

Architecture#

The distributed system has five components:

        flowchart
    subgraph docker["Docker Compose Stack"]
        redis["Redis"]
        rem["RE Manager<br/>(devices + plans)"]
        zmqp["ZMQ Proxy<br/>(pub/sub)"]
        bridge["ZMQ-Tiled Bridge<br/>(persists docs to Tiled)"]
        tiled["Tiled Server<br/>(data storage)"]

        redis <-->|state| rem
        rem -->|publishes documents| zmqp
        zmqp -->|document stream| bridge
        bridge -->|writes| tiled
    end

    agent["Blop QueueserverAgent<br/>(suggests points, listens for completions, evaluates data)"]

    agent -->|submit plans via REManagerAPI| rem
    zmqp -->|document stream| agent
    tiled -->|read data| agent
    

Data flow:

  1. The agent suggests parameter values and submits an acquisition plan to the RE Manager

  2. The RE Manager executes the plan (moves motors, reads detectors)

  3. Bluesky documents are published via ZMQ to the proxy

  4. The ZMQ-Tiled bridge persists documents to the Tiled server (using bluesky-tiled-plugins’s TiledWriter callback)

  5. The agent’s ZMQ listener detects plan completion

  6. The agent’s evaluation function reads results from Tiled and computes objectives

  7. The optimizer ingests outcomes and suggests the next point

Prerequisites#

  • Docker and Docker Compose installed

  • The blop Python package installed (with bluesky-queueserver-api and tiled)

  • The blop GitHub repository cloned (for the service definitions under docs/source/tutorials/queueserver/)

Starting the Infrastructure#

All services are defined in a docker-compose.yml in the docs/source/tutorials/queueserver/ directory. Before running this tutorial, start the stack in a separate terminal:

cd docs/source/tutorials/queueserver
docker compose up -d --build

Wait until all containers are healthy:

docker compose ps

You should see all services in a “healthy” or “running” state. The services expose the following ports on localhost:

Service

Port

Purpose

RE Manager

60615

ZMQ control channel (REManagerAPI connects here)

ZMQ Proxy (out)

5578

Document stream (agent listens for plan completions)

Tiled

8000

Data access (evaluation function reads results)

Redis

6379

Internal message broker for queueserver

Once the containers are up, proceed with the tutorial below.

import time

from bluesky.callbacks.zmq import RemoteDispatcher
from bluesky_queueserver_api.zmq import REManagerAPI

RM = REManagerAPI(zmq_control_addr="tcp://localhost:60615")
RM.environment_open()
RM.wait_for_idle(timeout=30)
status = RM.status()
print(f"RE Manager state: {status['manager_state']}")
print(f"Worker environment exists: {status['worker_environment_exists']}")
assert status["worker_environment_exists"], "Open the RE environment before continuing (see instructions above)"
RE Manager state: idle
Worker environment exists: True

The Queueserver Environment#

The queueserver startup script (shown below for reference) defines the devices and plans available in the remote environment. This script runs inside the RE Manager process — not in your notebook:

# startup.py (runs inside the RE Manager)
from bluesky import RunEngine
from bluesky_queueserver import is_re_worker_active
from ophyd.sim import SynAxis, SynSignal

RE = RunEngine({})

# Publish documents to ZMQ so external subscribers can react
if is_re_worker_active():
    import os
    from bluesky.callbacks.zmq import Publisher as ZmqPublisher

    addr = os.environ.get("BLUESKY_ZMQ_PROXY_IN_ADDR", "tcp://zmq-proxy:5577")
    host, port = addr.replace("tcp://", "").split(":")
    publisher = ZmqPublisher((host, int(port)))
    RE.subscribe(publisher)

# Simulated motors
motor1 = SynAxis(name="motor1", labels={"motors"})
motor2 = SynAxis(name="motor2", labels={"motors"})


# Simulated detector that computes the Himmelblau function
def _compute_himmelblau():
    x = motor1.read()["motor1"]["value"]
    y = motor2.read()["motor2"]["value"]
    return float((x**2 + y - 11) ** 2 + (x + y**2 - 7) ** 2)


himmel_det = SynSignal(name="himmel_det", func=_compute_himmelblau, labels={"detectors"})

# Acquisition plan — moves actuators to suggested positions and reads sensors
import bluesky.plans as bp


def default_acquire(suggestions, actuators, sensors, *, md=None):
    plan_args = []
    for actuator in actuators:
        values = [s[actuator.name] for s in suggestions]
        plan_args.append(actuator)
        plan_args.append(values)

    _md = {"blop_suggestions": suggestions}
    if md:
        _md.update(md)

    yield from bp.list_scan(list(sensors), *plan_args, md=_md)

Key points:

  • is_re_worker_active() gates code that should only run inside the queueserver worker

  • The ZMQ publisher sends all run documents to the proxy for external consumption

  • default_acquire is a simple plan that wraps Bluesky’s list_scan — it moves actuators to each suggested position and reads sensors

Connecting to Tiled#

The evaluation function will read experimental data from the Tiled server:

from tiled.client import from_uri

tiled_client = from_uri("http://localhost:8000", api_key="tutorialkey")

Defining the Optimization Problem#

Just as in the simple experiment tutorial, we define DOFs and objectives. The key difference: since devices exist only in the remote queueserver environment, DOFs reference device names as strings (no actuator objects).

from blop.ax import RangeDOF, Objective
from blop.ax.queueserver_agent import QueueserverAgent

dofs = [
    RangeDOF(actuator="motor1", bounds=(-5.0, 5.0), parameter_type="float"),
    RangeDOF(actuator="motor2", bounds=(-5.0, 5.0), parameter_type="float"),
]

objectives = [
    Objective(name="himmelblau", minimize=True),
]

# Sensors are referenced by name — these are the detectors in the queueserver environment
sensors = ["himmel_det"]
[INFO 06-08 19:44:33] ax.storage.sqa_store.with_db_settings_base: Ax SQL storage initialized with SQLAlchemy 2.0.50

Writing the Evaluation Function#

The evaluation function is called each time a plan completes. It reads data from Tiled and computes the objective values. The function receives:

  • uid: the Bluesky run UID (use this to look up data in Tiled)

  • suggestions: the list of parameter dicts that were evaluated

It must return a list of outcome dicts, each containing the objective value(s) and an _id matching the suggestion.

Because the agent and the ZMQ-Tiled bridge are separate subscribers to the same ZMQ stream, there is a race condition: the agent may receive the stop document before the bridge has finished writing data to Tiled. The evaluation function should poll Tiled until both the run and the detector data are available.

import numpy as np
from tiled.client.container import Container


class HimmelblauEvaluation:
    """Reads detector data from Tiled and computes the Himmelblau objective."""

    def __init__(self, tiled_client: Container, timeout: float = 30.0, poll_interval: float = 0.5):
        self.tiled_client = tiled_client
        self.timeout = timeout
        self.poll_interval = poll_interval

    def _wait_for_run(self, uid: str):
        """Poll Tiled until the run with the given UID is available."""
        deadline = time.time() + self.timeout
        while time.time() < deadline:
            try:
                return self.tiled_client[uid]
            except KeyError:
                time.sleep(self.poll_interval)
        raise TimeoutError(
            f"Run '{uid}' not found in Tiled after {self.timeout}s. "
            "The ZMQ-Tiled bridge may not be running."
        )

    def _wait_for_detector_data(self, run, path: str):
        """Poll Tiled until the detector data path is readable."""
        deadline = time.time() + self.timeout
        while time.time() < deadline:
            try:
                return run[path].read()
            except KeyError:
                time.sleep(self.poll_interval)
        raise TimeoutError(
            f"Data path '{path}' for run '{run.metadata['start']['uid']}' was not readable after {self.timeout}s. "
            "The ZMQ-Tiled bridge may still be writing the run."
        )

    def __call__(self, uid: str, suggestions: list[dict]) -> list[dict]:
        run = self._wait_for_run(uid)

        # Read the detector values from the primary data stream
        himmel_values = self._wait_for_detector_data(run, "primary/himmel_det")

        outcomes = []
        for idx, suggestion in enumerate(suggestions):
            outcomes.append({
                "_id": suggestion["_id"],
                "himmelblau": float(himmel_values[idx]),
            })

        return outcomes

Creating the Queueserver Agent#

Now we bring everything together. The QueueserverAgent needs:

  • re_manager_api: how to communicate with the queueserver (submit plans, check status)

  • document_dispatcher: a RemoteDispatcher that subscribes to the Bluesky document stream

  • The DOFs, objectives, sensors, and evaluation function

document_dispatcher = RemoteDispatcher(("localhost", 5578))

agent = QueueserverAgent(
    re_manager_api=RM,
    document_dispatcher=document_dispatcher,
    sensors=sensors,
    dofs=dofs,
    objectives=objectives,
    evaluation_function=HimmelblauEvaluation(tiled_client),
    acquisition_plan="default_acquire",
)

Note

The re_manager_api argument also accepts an HTTP-based client (bluesky_queueserver_api.http.REManagerAPI) for deployments that expose the queueserver over HTTP rather than ZMQ. Construct the RemoteDispatcher with whatever arguments your document stream requires, such as a ZMQ address or message prefix.

Running the Optimization#

The run() method is non-blocking — it submits the first plan and returns immediately. The agent reacts asynchronously to plan completions via ZMQ callbacks.

future = agent.run(iterations=10, n_points=1)
[INFO 06-08 19:44:33] ax.api.client: GenerationStrategy(name='Center+Sobol+MBM:fast', nodes=[CenterGenerationNode(next_node_name='Sobol', use_existing_trials_for_initialization=True), GenerationNode(name='Sobol', generator_specs=[GeneratorSpec(generator_enum=Sobol, generator_key_override=None)], transition_criteria=[MinTrials(transition_to='MBM'), MinTrials(transition_to='MBM')], suggested_experiment_status=ExperimentStatus.INITIALIZATION, pausing_criteria=[MaxTrialsAwaitingData(threshold=5)]), GenerationNode(name='MBM', generator_specs=[GeneratorSpec(generator_enum=BoTorch, generator_key_override=None)], transition_criteria=None, suggested_experiment_status=ExperimentStatus.OPTIMIZATION, pausing_criteria=None)]) chosen based on user input and problem structure.
[INFO 06-08 19:44:33] ax.api.client: Generated new trial 0 with parameters {'motor1': 0.0, 'motor2': 0.0} using GenerationNode CenterOfSearchSpace.

Wait for the optimization to complete and inspect the result:

result = future.result()

print(f"Iterations completed : {result.iterations_completed}")
print(f"Points per iteration : {result.num_points}")
print(f"Total acquisitions   : {len(result.uids)}")
print()
print("Run UIDs:")
for uid in result.uids:
    print(f"  {uid}")
Iterations completed : 10
Points per iteration : 1
Total acquisitions   : 10
Run UIDs:
  4a3f4079-7d82-45d9-a5ba-58c27b94ec2a
  2c7f1f70-d56a-4d98-9891-376c1f2981a4
  d912b396-28a0-44f9-9bb9-8c02252c2402
  01b20339-cd3b-4501-92e9-82884c6cb5c8
  58048e2a-6d99-49b6-8be5-d5a48bca5169
  aa15facf-4472-4a46-9981-021b3ab40986
  e5556056-38c9-43f7-acd1-a173527548d3
  9ccd5062-b39a-4106-933e-b8ae04cd392d
  ce5f8a96-fc03-4f48-94b0-240625484c2f
  518849f7-793f-4f34-adcb-d2214583b9d1
[INFO 06-08 19:44:35] ax.api.client: Trial 0 marked COMPLETED.
[INFO 06-08 19:44:35] ax.api.client: Generated new trial 1 with parameters {'motor1': 2.921755, 'motor2': 4.142677} using GenerationNode Sobol.
[INFO 06-08 19:44:36] ax.api.client: Trial 1 marked COMPLETED.
[INFO 06-08 19:44:36] ax.api.client: Generated new trial 2 with parameters {'motor1': -0.852056, 'motor2': -2.757492} using GenerationNode Sobol.
[INFO 06-08 19:44:37] ax.api.client: Trial 2 marked COMPLETED.
[INFO 06-08 19:44:37] ax.api.client: Generated new trial 3 with parameters {'motor1': -3.361364, 'motor2': 0.25036} using GenerationNode Sobol.
[INFO 06-08 19:44:38] ax.api.client: Trial 3 marked COMPLETED.
[INFO 06-08 19:44:38] ax.api.client: Generated new trial 4 with parameters {'motor1': 0.354158, 'motor2': -1.677581} using GenerationNode Sobol.
[INFO 06-08 19:44:39] ax.api.client: Trial 4 marked COMPLETED.
[INFO 06-08 19:44:40] ax.api.client: Generated new trial 5 with parameters {'motor1': -4.313711, 'motor2': 1.939101} using GenerationNode MBM.
[INFO 06-08 19:44:41] ax.api.client: Trial 5 marked COMPLETED.
[INFO 06-08 19:44:42] ax.api.client: Generated new trial 6 with parameters {'motor1': -2.964313, 'motor2': -1.448735} using GenerationNode MBM.
[INFO 06-08 19:44:42] ax.api.client: Trial 6 marked COMPLETED.
[INFO 06-08 19:44:43] ax.api.client: Generated new trial 7 with parameters {'motor1': -2.995139, 'motor2': -3.669876} using GenerationNode MBM.
[INFO 06-08 19:44:43] ax.api.client: Trial 7 marked COMPLETED.
[INFO 06-08 19:44:44] ax.api.client: Generated new trial 8 with parameters {'motor1': -3.307866, 'motor2': -5.0} using GenerationNode MBM.
[INFO 06-08 19:44:44] ax.api.client: Trial 8 marked COMPLETED.
[INFO 06-08 19:44:45] ax.api.client: Generated new trial 9 with parameters {'motor1': -3.648596, 'motor2': -3.494265} using GenerationNode MBM.
[INFO 06-08 19:44:46] ax.api.client: Trial 9 marked COMPLETED.

Viewing Results#

Since QueueserverAgent uses the same Ax optimizer backend as the local Agent, all the familiar analysis methods are available:

agent.ax_client.summarize()
trial_index arm_name trial_status generation_node himmelblau motor1 motor2
0 0 0_0 COMPLETED CenterOfSearchSpace 170.000000 0.000000 0.000000
1 1 1_0 COMPLETED Sobol 173.998873 2.921755 4.142677
2 2 2_0 COMPLETED Sobol 169.881434 -0.852056 -2.757492
3 3 3_0 COMPLETED Sobol 106.364431 -3.361364 0.250360
4 4 4_0 COMPLETED Sobol 172.237422 0.354158 -1.677581
5 5 5_0 COMPLETED MBM 148.205969 -4.313711 1.939101
6 6 6_0 COMPLETED MBM 75.272964 -2.964313 -1.448735
7 7 7_0 COMPLETED MBM 44.539539 -2.995139 -3.669876
8 8 8_0 COMPLETED MBM 241.442370 -3.307866 -5.000000
9 9 9_0 COMPLETED MBM 3.834796 -3.648596 -3.494265

The Himmelblau function has four global minima (all with value 0). The optimizer should have made some progress toward these optima.

  • (3.0, 2.0)

  • (-2.805, 3.131)

  • (-3.779, -3.283)

  • (3.584, -1.848)

Cleanup#

When you’re done, close the RE environment and stop the Docker services:

RM.environment_close()
RM.wait_for_idle(timeout=30)
RM.close()
cd docs/source/tutorials/queueserver
docker compose down

What You Learned#

  • Distributed architecture: The queueserver separates experiment execution from optimization logic, connected via ZMQ and Tiled

  • String-based device references: Since devices live in the remote process, DOFs, sensors, and plans are referenced by name

  • Asynchronous operation: agent.run() is non-blocking; the agent reacts to events via ZMQ callbacks

  • Evaluation function: Reads from Tiled (not direct device access) to compute objectives after each plan completes

Next Steps#

  • Add multiple objectives for multi-objective optimization (see KB Mirrors tutorial)

  • Use agent.submit_suggestions() to manually evaluate specific parameter combinations (see Manual Point Injection)

  • Implement outcome_constraints to constrain the optimization (see Set outcome constraints)

  • Add a checkpoint_path to persist optimizer state across restarts (see Agent)