Queueserver#
These classes implement the distributed optimization backend that connects Blop to a remote Bluesky Queueserver. See the Asynchronous Optimization with Bluesky Queueserver tutorial for a full worked example.
OptimizationResult#
- class blop.queueserver.OptimizationResult(iterations_completed, num_points, uids)[source]#
The result of a completed or stopped optimization run.
- Parameters:
- iterations_completedint
The number of suggest -> acquire -> ingest cycles that finished successfully. For a run stopped early via
QueueserverOptimizationRunner.stop(), this reflects however many iterations completed before the stop.- num_pointsint
The number of points suggested per iteration.
- uidstuple[str, …]
The Bluesky run UIDs for each completed acquisition, in order. These can be used to retrieve raw data from a Tiled or databroker catalog for post-hoc analysis.
QueueserverClient#
- class blop.queueserver.QueueserverClient(re_manager_api, zmq_consumer_addr)[source]#
Bases:
objectHandles communication with a Bluesky queueserver.
This class encapsulates all ZMQ and HTTP communication with the queueserver, including plan submission and event listening.
- Parameters:
- re_manager_apibluesky_queueserver_api.zmq.REManagerAPI
Manager instance for communication with Bluesky Queueserver
- zmq_consumer_addrstr
Address for ZMQ document consumer (e.g., “localhost:5578”).
- check_environment()[source]#
Verify that the queueserver environment is ready.
- Raises:
- RuntimeError
If the queueserver environment is not open.
- check_devices_available(device_names)[source]#
Verify that all specified devices are available in the queueserver.
- Parameters:
- device_namesSequence[str]
Names of devices to check.
- Raises:
- ValueError
If any device is not available.
- check_plan_available(plan_name)[source]#
Verify that a plan is available in the queueserver.
- Parameters:
- plan_namestr
Name of the plan to check.
- Raises:
- ValueError
If the plan is not available.
- submit_plan(plan, autostart=True, timeout=600)[source]#
Submit a plan to the queueserver queue.
- Parameters:
- planBPlan
The plan to submit.
- autostartbool, optional
If True, start the queue after adding the plan.
- timeoutfloat, optional
Timeout in seconds when waiting for queue to be idle.
QueueserverOptimizationRunner#
- class blop.queueserver.QueueserverOptimizationRunner(optimization_problem, queueserver_client)[source]#
Bases:
objectRuns optimization loops through a Bluesky queueserver.
This class coordinates the optimization workflow by getting suggestions from the optimizer, submitting acquisition plans to the queueserver, and ingesting results when plans complete.
- Parameters:
- optimization_problemQueueserverOptimizationProblem
The optimization problem to solve, containing the optimizer, actuators, sensors, and evaluation function.
- queueserver_clientQueueserverClient
Client for communicating with the queueserver.
- property optimization_problem: QueueserverOptimizationProblem#
The optimization problem being solved.
- run(iterations=1, num_points=1)[source]#
Run the optimization loop.
Validates the queueserver state, then begins the suggest -> acquire -> ingest cycle. This method returns immediately; the optimization runs asynchronously via callbacks on the Bluesky document stream.
- Parameters:
- iterationsint
Number of optimization iterations to run.
- num_pointsint
Number of points to suggest per iteration.
- Returns:
- concurrent.futures.Future[OptimizationResult]
A future that resolves to an
OptimizationResultwhen all iterations complete, or whenstop()is called. If the optimization loop raises an unhandled exception the future will hold that exception and re-raise it on.result().
- Raises:
- RuntimeError
If the queueserver environment is not ready.
- ValueError
If required devices or plans are not available.
- submit_suggestions(suggestions)[source]#
Manually submit suggestions to the queue. This method returns immediately; the optimization runs asynchronously via callbacks on the Bluesky document stream.
- Parameters:
- suggestionslist[dict]
Parameter combinations to evaluate. Can be:
Optimizer suggestions (with “_id” keys from suggest())
Manual points (without “_id”, requires CanRegisterSuggestions protocol)
- Returns:
- concurrent.futures.Future[OptimizationResult]
A future that resolves to an
OptimizationResultwhen the acquisition completes. If an unhandled exception occurs the future will hold it and re-raise on.result().
- stop()[source]#
Stop the optimization loop gracefully.
The future returned by
run()orsubmit_suggestions()will be resolved with a partialOptimizationResultcontaining however many iterations completed before the stop.