Queueserver#

These classes implement the distributed optimization backend that connects Blop to a remote Bluesky Queueserver. See the Asynchronous Optimization with Bluesky Queueserver tutorial for a full worked example.

OptimizationResult#

class blop.queueserver.OptimizationResult(iterations_completed, num_points, uids)[source]#

The result of a completed or stopped optimization run.

Parameters:
iterations_completedint

The number of suggest -> acquire -> ingest cycles that finished successfully. For a run stopped early via QueueserverOptimizationRunner.stop(), this reflects however many iterations completed before the stop.

num_pointsint

The number of points suggested per iteration.

uidstuple[str, …]

The Bluesky run UIDs for each completed acquisition, in order. These can be used to retrieve raw data from a Tiled or databroker catalog for post-hoc analysis.

iterations_completed: int#
num_points: int#
uids: tuple[str, ...]#

QueueserverClient#

class blop.queueserver.QueueserverClient(re_manager_api, zmq_consumer_addr)[source]#

Bases: object

Handles communication with a Bluesky queueserver.

This class encapsulates all ZMQ and HTTP communication with the queueserver, including plan submission and event listening.

Parameters:
re_manager_apibluesky_queueserver_api.zmq.REManagerAPI

Manager instance for communication with Bluesky Queueserver

zmq_consumer_addrstr

Address for ZMQ document consumer (e.g., “localhost:5578”).

check_environment()[source]#

Verify that the queueserver environment is ready.

Raises:
RuntimeError

If the queueserver environment is not open.

check_devices_available(device_names)[source]#

Verify that all specified devices are available in the queueserver.

Parameters:
device_namesSequence[str]

Names of devices to check.

Raises:
ValueError

If any device is not available.

check_plan_available(plan_name)[source]#

Verify that a plan is available in the queueserver.

Parameters:
plan_namestr

Name of the plan to check.

Raises:
ValueError

If the plan is not available.

submit_plan(plan, autostart=True, timeout=600)[source]#

Submit a plan to the queueserver queue.

Parameters:
planBPlan

The plan to submit.

autostartbool, optional

If True, start the queue after adding the plan.

timeoutfloat, optional

Timeout in seconds when waiting for queue to be idle.

start_listener(on_stop)[source]#

Start listening for document events from the queueserver.

Parameters:
on_stopcallable

Callback invoked when a stop document is received. Signature: on_stop(start_doc, stop_doc)

stop_listener()[source]#

Stop the ZMQ listener thread.

QueueserverOptimizationRunner#

class blop.queueserver.QueueserverOptimizationRunner(optimization_problem, queueserver_client)[source]#

Bases: object

Runs optimization loops through a Bluesky queueserver.

This class coordinates the optimization workflow by getting suggestions from the optimizer, submitting acquisition plans to the queueserver, and ingesting results when plans complete.

Parameters:
optimization_problemQueueserverOptimizationProblem

The optimization problem to solve, containing the optimizer, actuators, sensors, and evaluation function.

queueserver_clientQueueserverClient

Client for communicating with the queueserver.

property optimization_problem: QueueserverOptimizationProblem#

The optimization problem being solved.

property current_iteration: int#

The current iteration number (0 if not running).

run(iterations=1, num_points=1)[source]#

Run the optimization loop.

Validates the queueserver state, then begins the suggest -> acquire -> ingest cycle. This method returns immediately; the optimization runs asynchronously via callbacks on the Bluesky document stream.

Parameters:
iterationsint

Number of optimization iterations to run.

num_pointsint

Number of points to suggest per iteration.

Returns:
concurrent.futures.Future[OptimizationResult]

A future that resolves to an OptimizationResult when all iterations complete, or when stop() is called. If the optimization loop raises an unhandled exception the future will hold that exception and re-raise it on .result().

Raises:
RuntimeError

If the queueserver environment is not ready.

ValueError

If required devices or plans are not available.

submit_suggestions(suggestions)[source]#

Manually submit suggestions to the queue. This method returns immediately; the optimization runs asynchronously via callbacks on the Bluesky document stream.

Parameters:
suggestionslist[dict]

Parameter combinations to evaluate. Can be:

  • Optimizer suggestions (with “_id” keys from suggest())

  • Manual points (without “_id”, requires CanRegisterSuggestions protocol)

Returns:
concurrent.futures.Future[OptimizationResult]

A future that resolves to an OptimizationResult when the acquisition completes. If an unhandled exception occurs the future will hold it and re-raise on .result().

stop()[source]#

Stop the optimization loop gracefully.

The future returned by run() or submit_suggestions() will be resolved with a partial OptimizationResult containing however many iterations completed before the stop.