Queueserver#
Warning
The queueserver integration is experimental. The API is not yet stable and may change in future releases without a deprecation period. It is not recommended for production use.
These classes implement the distributed optimization backend that connects Blop to a remote Bluesky Queueserver. See the Asynchronous Optimization with Bluesky Queueserver tutorial for a full worked example.
OptimizationResult#
- class blop.queueserver.OptimizationResult(iterations_completed, num_points, uids)[source]#
The result of a completed or stopped optimization run.
Warning
This class is part of the experimental queueserver integration. The API may change in future releases without a deprecation period.
- Parameters:
- iterations_completedint
The number of suggest -> acquire -> ingest cycles that finished successfully. For a run stopped early via
QueueserverOptimizationRunner.stop(), this reflects however many iterations completed before the stop.- num_pointsint
The number of points suggested per iteration.
- uidstuple[str, …]
The Bluesky run UIDs for each completed acquisition, in order. These can be used to retrieve raw data from a Tiled or databroker catalog for post-hoc analysis.
QueueserverClient#
- class blop.queueserver.QueueserverClient(re_manager_api, document_dispatcher)[source]#
Bases:
objectHandles communication with a Bluesky queueserver.
This class encapsulates queueserver communication, including plan submission and event listening.
Warning
This class is part of the experimental queueserver integration. The API may change in future releases without a deprecation period.
- Parameters:
- re_manager_apibluesky_queueserver_api.zmq.REManagerAPI
Manager instance for communication with Bluesky Queueserver
- document_dispatcherbluesky.callbacks.zmq.RemoteDispatcher
Dispatcher for the Bluesky document stream.
- check_environment()[source]#
Verify that the queueserver environment is ready.
- Raises:
- RuntimeError
If the queueserver environment is not open.
- check_devices_available(device_names)[source]#
Verify that all specified devices are available in the queueserver.
- Parameters:
- device_namesSequence[str]
Names of devices to check.
- Raises:
- ValueError
If any device is not available.
- check_plan_available(plan_name)[source]#
Verify that a plan is available in the queueserver.
- Parameters:
- plan_namestr
Name of the plan to check.
- Raises:
- ValueError
If the plan is not available.
- submit_plan(plan, autostart=True, timeout=600)[source]#
Submit a plan to the queueserver queue.
- Parameters:
- planBPlan
The plan to submit.
- autostartbool, optional
If True, start the queue after adding the plan.
- timeoutfloat, optional
Timeout in seconds when waiting for queue to be idle.
QueueserverOptimizationRunner#
- class blop.queueserver.QueueserverOptimizationRunner(optimization_problem, queueserver_client)[source]#
Bases:
objectRuns optimization loops through a Bluesky queueserver.
This class coordinates the optimization workflow by getting suggestions from the optimizer, submitting acquisition plans to the queueserver, and ingesting results when plans complete.
Warning
This class is part of the experimental queueserver integration. The API may change in future releases without a deprecation period.
- Parameters:
- optimization_problemQueueserverOptimizationProblem
The optimization problem to solve, containing the optimizer, actuators, sensors, and evaluation function.
- queueserver_clientQueueserverClient
Client for communicating with the queueserver. The document listener is started once during runner construction so it is ready before any submitted plan can emit documents.
- property optimization_problem: QueueserverOptimizationProblem#
The optimization problem being solved.
- run(iterations=1, num_points=1)[source]#
Run the optimization loop.
Validates the queueserver state, then begins the suggest -> acquire -> ingest cycle. This method returns immediately; the optimization runs asynchronously via callbacks on the Bluesky document stream.
- Parameters:
- iterationsint
Number of optimization iterations to run.
- num_pointsint
Number of points to suggest per iteration.
- Returns:
- concurrent.futures.Future[OptimizationResult]
A future that resolves to an
OptimizationResultwhen all iterations complete, or whenstop()is called. If the optimization loop raises an unhandled exception the future will hold that exception and re-raise it on.result().
- Raises:
- RuntimeError
If the queueserver environment is not ready.
- ValueError
If required devices or plans are not available.
- submit_suggestions(suggestions)[source]#
Manually submit suggestions to the queue. This method returns immediately; the optimization runs asynchronously via callbacks on the Bluesky document stream.
- Parameters:
- suggestionslist[dict]
Parameter combinations to evaluate. Can be:
Optimizer suggestions (with “_id” keys from suggest())
Manual points (without “_id”, requires CanRegisterSuggestions protocol)
- Returns:
- concurrent.futures.Future[OptimizationResult]
A future that resolves to an
OptimizationResultwhen the acquisition completes. If an unhandled exception occurs the future will hold it and re-raise on.result().
- stop()[source]#
Stop the optimization loop gracefully.
The future returned by
run()orsubmit_suggestions()will be resolved with a partialOptimizationResultcontaining however many iterations completed before the stop.