Asynchronous Optimization with Bluesky Queueserver#
Warning
The queueserver integration is experimental. The API is not yet stable and may change in future releases without a deprecation period. It is not recommended for production use.
In this tutorial, you will learn how to run Blop optimization against a remote Bluesky Queueserver. This architecture is used when:
The experiment hardware is controlled by a shared instrument server
You want the optimizer to run in a separate process from the RunEngine
You need asynchronous, non-blocking optimization (the agent submits plans and reacts to completions)
We will optimize the same Himmelblau function from the simple experiment tutorial, but now the devices live inside a remote queueserver process rather than in the same Python session as the agent.
Architecture#
The distributed system has five components:
flowchart
subgraph docker["Docker Compose Stack"]
redis["Redis"]
rem["RE Manager<br/>(devices + plans)"]
zmqp["ZMQ Proxy<br/>(pub/sub)"]
bridge["ZMQ-Tiled Bridge<br/>(persists docs to Tiled)"]
tiled["Tiled Server<br/>(data storage)"]
redis <-->|state| rem
rem -->|publishes documents| zmqp
zmqp -->|document stream| bridge
bridge -->|writes| tiled
end
agent["Blop QueueserverAgent<br/>(suggests points, listens for completions, evaluates data)"]
agent -->|submit plans via REManagerAPI| rem
zmqp -->|document stream| agent
tiled -->|read data| agent
Data flow:
The agent suggests parameter values and submits an acquisition plan to the RE Manager
The RE Manager executes the plan (moves motors, reads detectors)
Bluesky documents are published via ZMQ to the proxy
The ZMQ-Tiled bridge persists documents to the Tiled server (using bluesky-tiled-plugins’s
TiledWritercallback)The agent’s ZMQ listener detects plan completion
The agent’s evaluation function reads results from Tiled and computes objectives
The optimizer ingests outcomes and suggests the next point
Prerequisites#
Docker and Docker Compose installed
The
blopPython package installed (withbluesky-queueserver-apiandtiled)The
blopGitHub repository cloned (for the service definitions underdocs/source/tutorials/queueserver/)
Starting the Infrastructure#
All services are defined in a docker-compose.yml in the docs/source/tutorials/queueserver/ directory. Before running this tutorial, start the stack in a separate terminal:
cd docs/source/tutorials/queueserver
docker compose up -d --build
Wait until all containers are healthy:
docker compose ps
You should see all services in a “healthy” or “running” state. The services expose the following ports on localhost:
Service |
Port |
Purpose |
|---|---|---|
RE Manager |
60615 |
ZMQ control channel (REManagerAPI connects here) |
ZMQ Proxy (out) |
5578 |
Document stream (agent listens for plan completions) |
Tiled |
8000 |
Data access (evaluation function reads results) |
Redis |
6379 |
Internal message broker for queueserver |
Once the containers are up, proceed with the tutorial below.
import time
from bluesky.callbacks.zmq import RemoteDispatcher
from bluesky_queueserver_api.zmq import REManagerAPI
RM = REManagerAPI(zmq_control_addr="tcp://localhost:60615")
RM.environment_open()
RM.wait_for_idle(timeout=30)
status = RM.status()
print(f"RE Manager state: {status['manager_state']}")
print(f"Worker environment exists: {status['worker_environment_exists']}")
assert status["worker_environment_exists"], "Open the RE environment before continuing (see instructions above)"
RE Manager state: idle
Worker environment exists: True
The Queueserver Environment#
The queueserver startup script (shown below for reference) defines the devices and plans available in the remote environment. This script runs inside the RE Manager process — not in your notebook:
# startup.py (runs inside the RE Manager)
from bluesky import RunEngine
from bluesky_queueserver import is_re_worker_active
from ophyd.sim import SynAxis, SynSignal
RE = RunEngine({})
# Publish documents to ZMQ so external subscribers can react
if is_re_worker_active():
import os
from bluesky.callbacks.zmq import Publisher as ZmqPublisher
addr = os.environ.get("BLUESKY_ZMQ_PROXY_IN_ADDR", "tcp://zmq-proxy:5577")
host, port = addr.replace("tcp://", "").split(":")
publisher = ZmqPublisher((host, int(port)))
RE.subscribe(publisher)
# Simulated motors
motor1 = SynAxis(name="motor1", labels={"motors"})
motor2 = SynAxis(name="motor2", labels={"motors"})
# Simulated detector that computes the Himmelblau function
def _compute_himmelblau():
x = motor1.read()["motor1"]["value"]
y = motor2.read()["motor2"]["value"]
return float((x**2 + y - 11) ** 2 + (x + y**2 - 7) ** 2)
himmel_det = SynSignal(name="himmel_det", func=_compute_himmelblau, labels={"detectors"})
# Acquisition plan — moves actuators to suggested positions and reads sensors
import bluesky.plans as bp
def default_acquire(suggestions, actuators, sensors, *, md=None):
plan_args = []
for actuator in actuators:
values = [s[actuator.name] for s in suggestions]
plan_args.append(actuator)
plan_args.append(values)
_md = {"blop_suggestions": suggestions}
if md:
_md.update(md)
yield from bp.list_scan(list(sensors), *plan_args, md=_md)
Key points:
is_re_worker_active()gates code that should only run inside the queueserver workerThe ZMQ publisher sends all run documents to the proxy for external consumption
default_acquireis a simple plan that wraps Bluesky’slist_scan— it moves actuators to each suggested position and reads sensors
Connecting to Tiled#
The evaluation function will read experimental data from the Tiled server:
from tiled.client import from_uri
tiled_client = from_uri("http://localhost:8000", api_key="tutorialkey")
Defining the Optimization Problem#
Just as in the simple experiment tutorial, we define DOFs and objectives. The key difference: since devices exist only in the remote queueserver environment, DOFs reference device names as strings (no actuator objects).
from blop.ax import RangeDOF, Objective
from blop.ax.queueserver_agent import QueueserverAgent
dofs = [
RangeDOF(actuator="motor1", bounds=(-5.0, 5.0), parameter_type="float"),
RangeDOF(actuator="motor2", bounds=(-5.0, 5.0), parameter_type="float"),
]
objectives = [
Objective(name="himmelblau", minimize=True),
]
# Sensors are referenced by name — these are the detectors in the queueserver environment
sensors = ["himmel_det"]
[INFO 06-08 19:44:33] ax.storage.sqa_store.with_db_settings_base: Ax SQL storage initialized with SQLAlchemy 2.0.50
Writing the Evaluation Function#
The evaluation function is called each time a plan completes. It reads data from Tiled and computes the objective values. The function receives:
uid: the Bluesky run UID (use this to look up data in Tiled)suggestions: the list of parameter dicts that were evaluated
It must return a list of outcome dicts, each containing the objective value(s) and an _id matching the suggestion.
Because the agent and the ZMQ-Tiled bridge are separate subscribers to the same ZMQ stream, there is a race condition: the agent may receive the stop document before the bridge has finished writing data to Tiled. The evaluation function should poll Tiled until both the run and the detector data are available.
import numpy as np
from tiled.client.container import Container
class HimmelblauEvaluation:
"""Reads detector data from Tiled and computes the Himmelblau objective."""
def __init__(self, tiled_client: Container, timeout: float = 30.0, poll_interval: float = 0.5):
self.tiled_client = tiled_client
self.timeout = timeout
self.poll_interval = poll_interval
def _wait_for_run(self, uid: str):
"""Poll Tiled until the run with the given UID is available."""
deadline = time.time() + self.timeout
while time.time() < deadline:
try:
return self.tiled_client[uid]
except KeyError:
time.sleep(self.poll_interval)
raise TimeoutError(
f"Run '{uid}' not found in Tiled after {self.timeout}s. "
"The ZMQ-Tiled bridge may not be running."
)
def _wait_for_detector_data(self, run, path: str):
"""Poll Tiled until the detector data path is readable."""
deadline = time.time() + self.timeout
while time.time() < deadline:
try:
return run[path].read()
except KeyError:
time.sleep(self.poll_interval)
raise TimeoutError(
f"Data path '{path}' for run '{run.metadata['start']['uid']}' was not readable after {self.timeout}s. "
"The ZMQ-Tiled bridge may still be writing the run."
)
def __call__(self, uid: str, suggestions: list[dict]) -> list[dict]:
run = self._wait_for_run(uid)
# Read the detector values from the primary data stream
himmel_values = self._wait_for_detector_data(run, "primary/himmel_det")
outcomes = []
for idx, suggestion in enumerate(suggestions):
outcomes.append({
"_id": suggestion["_id"],
"himmelblau": float(himmel_values[idx]),
})
return outcomes
Creating the Queueserver Agent#
Now we bring everything together. The QueueserverAgent needs:
re_manager_api: how to communicate with the queueserver (submit plans, check status)document_dispatcher: aRemoteDispatcherthat subscribes to the Bluesky document streamThe DOFs, objectives, sensors, and evaluation function
document_dispatcher = RemoteDispatcher(("localhost", 5578))
agent = QueueserverAgent(
re_manager_api=RM,
document_dispatcher=document_dispatcher,
sensors=sensors,
dofs=dofs,
objectives=objectives,
evaluation_function=HimmelblauEvaluation(tiled_client),
acquisition_plan="default_acquire",
)
Note
The re_manager_api argument also accepts an HTTP-based client
(bluesky_queueserver_api.http.REManagerAPI) for deployments that expose the
queueserver over HTTP rather than ZMQ. Construct the RemoteDispatcher with
whatever arguments your document stream requires, such as a ZMQ address or
message prefix.
Running the Optimization#
The run() method is non-blocking — it submits the first plan and returns immediately. The agent reacts asynchronously to plan completions via ZMQ callbacks.
future = agent.run(iterations=10, n_points=1)
[INFO 06-08 19:44:33] ax.api.client: GenerationStrategy(name='Center+Sobol+MBM:fast', nodes=[CenterGenerationNode(next_node_name='Sobol', use_existing_trials_for_initialization=True), GenerationNode(name='Sobol', generator_specs=[GeneratorSpec(generator_enum=Sobol, generator_key_override=None)], transition_criteria=[MinTrials(transition_to='MBM'), MinTrials(transition_to='MBM')], suggested_experiment_status=ExperimentStatus.INITIALIZATION, pausing_criteria=[MaxTrialsAwaitingData(threshold=5)]), GenerationNode(name='MBM', generator_specs=[GeneratorSpec(generator_enum=BoTorch, generator_key_override=None)], transition_criteria=None, suggested_experiment_status=ExperimentStatus.OPTIMIZATION, pausing_criteria=None)]) chosen based on user input and problem structure.
[INFO 06-08 19:44:33] ax.api.client: Generated new trial 0 with parameters {'motor1': 0.0, 'motor2': 0.0} using GenerationNode CenterOfSearchSpace.
Wait for the optimization to complete and inspect the result:
result = future.result()
print(f"Iterations completed : {result.iterations_completed}")
print(f"Points per iteration : {result.num_points}")
print(f"Total acquisitions : {len(result.uids)}")
print()
print("Run UIDs:")
for uid in result.uids:
print(f" {uid}")
Iterations completed : 10
Points per iteration : 1
Total acquisitions : 10
Run UIDs:
4a3f4079-7d82-45d9-a5ba-58c27b94ec2a
2c7f1f70-d56a-4d98-9891-376c1f2981a4
d912b396-28a0-44f9-9bb9-8c02252c2402
01b20339-cd3b-4501-92e9-82884c6cb5c8
58048e2a-6d99-49b6-8be5-d5a48bca5169
aa15facf-4472-4a46-9981-021b3ab40986
e5556056-38c9-43f7-acd1-a173527548d3
9ccd5062-b39a-4106-933e-b8ae04cd392d
ce5f8a96-fc03-4f48-94b0-240625484c2f
518849f7-793f-4f34-adcb-d2214583b9d1
[INFO 06-08 19:44:35] ax.api.client: Trial 0 marked COMPLETED.
[INFO 06-08 19:44:35] ax.api.client: Generated new trial 1 with parameters {'motor1': 2.921755, 'motor2': 4.142677} using GenerationNode Sobol.
[INFO 06-08 19:44:36] ax.api.client: Trial 1 marked COMPLETED.
[INFO 06-08 19:44:36] ax.api.client: Generated new trial 2 with parameters {'motor1': -0.852056, 'motor2': -2.757492} using GenerationNode Sobol.
[INFO 06-08 19:44:37] ax.api.client: Trial 2 marked COMPLETED.
[INFO 06-08 19:44:37] ax.api.client: Generated new trial 3 with parameters {'motor1': -3.361364, 'motor2': 0.25036} using GenerationNode Sobol.
[INFO 06-08 19:44:38] ax.api.client: Trial 3 marked COMPLETED.
[INFO 06-08 19:44:38] ax.api.client: Generated new trial 4 with parameters {'motor1': 0.354158, 'motor2': -1.677581} using GenerationNode Sobol.
[INFO 06-08 19:44:39] ax.api.client: Trial 4 marked COMPLETED.
[INFO 06-08 19:44:40] ax.api.client: Generated new trial 5 with parameters {'motor1': -4.313711, 'motor2': 1.939101} using GenerationNode MBM.
[INFO 06-08 19:44:41] ax.api.client: Trial 5 marked COMPLETED.
[INFO 06-08 19:44:42] ax.api.client: Generated new trial 6 with parameters {'motor1': -2.964313, 'motor2': -1.448735} using GenerationNode MBM.
[INFO 06-08 19:44:42] ax.api.client: Trial 6 marked COMPLETED.
[INFO 06-08 19:44:43] ax.api.client: Generated new trial 7 with parameters {'motor1': -2.995139, 'motor2': -3.669876} using GenerationNode MBM.
[INFO 06-08 19:44:43] ax.api.client: Trial 7 marked COMPLETED.
[INFO 06-08 19:44:44] ax.api.client: Generated new trial 8 with parameters {'motor1': -3.307866, 'motor2': -5.0} using GenerationNode MBM.
[INFO 06-08 19:44:44] ax.api.client: Trial 8 marked COMPLETED.
[INFO 06-08 19:44:45] ax.api.client: Generated new trial 9 with parameters {'motor1': -3.648596, 'motor2': -3.494265} using GenerationNode MBM.
[INFO 06-08 19:44:46] ax.api.client: Trial 9 marked COMPLETED.
Viewing Results#
Since QueueserverAgent uses the same Ax optimizer backend as the local Agent, all the familiar analysis methods are available:
agent.ax_client.summarize()
| trial_index | arm_name | trial_status | generation_node | himmelblau | motor1 | motor2 | |
|---|---|---|---|---|---|---|---|
| 0 | 0 | 0_0 | COMPLETED | CenterOfSearchSpace | 170.000000 | 0.000000 | 0.000000 |
| 1 | 1 | 1_0 | COMPLETED | Sobol | 173.998873 | 2.921755 | 4.142677 |
| 2 | 2 | 2_0 | COMPLETED | Sobol | 169.881434 | -0.852056 | -2.757492 |
| 3 | 3 | 3_0 | COMPLETED | Sobol | 106.364431 | -3.361364 | 0.250360 |
| 4 | 4 | 4_0 | COMPLETED | Sobol | 172.237422 | 0.354158 | -1.677581 |
| 5 | 5 | 5_0 | COMPLETED | MBM | 148.205969 | -4.313711 | 1.939101 |
| 6 | 6 | 6_0 | COMPLETED | MBM | 75.272964 | -2.964313 | -1.448735 |
| 7 | 7 | 7_0 | COMPLETED | MBM | 44.539539 | -2.995139 | -3.669876 |
| 8 | 8 | 8_0 | COMPLETED | MBM | 241.442370 | -3.307866 | -5.000000 |
| 9 | 9 | 9_0 | COMPLETED | MBM | 3.834796 | -3.648596 | -3.494265 |
The Himmelblau function has four global minima (all with value 0). The optimizer should have made some progress toward these optima.
(3.0, 2.0)
(-2.805, 3.131)
(-3.779, -3.283)
(3.584, -1.848)
Cleanup#
When you’re done, close the RE environment and stop the Docker services:
RM.environment_close()
RM.wait_for_idle(timeout=30)
RM.close()
cd docs/source/tutorials/queueserver
docker compose down
What You Learned#
Distributed architecture: The queueserver separates experiment execution from optimization logic, connected via ZMQ and Tiled
String-based device references: Since devices live in the remote process, DOFs, sensors, and plans are referenced by name
Asynchronous operation:
agent.run()is non-blocking; the agent reacts to events via ZMQ callbacksEvaluation function: Reads from Tiled (not direct device access) to compute objectives after each plan completes
Next Steps#
Add multiple objectives for multi-objective optimization (see KB Mirrors tutorial)
Use
agent.submit_suggestions()to manually evaluate specific parameter combinations (see Manual Point Injection)Implement
outcome_constraintsto constrain the optimization (see Set outcome constraints)Add a
checkpoint_pathto persist optimizer state across restarts (see Agent)