Queueserver#

Warning

The queueserver integration is experimental. The API is not yet stable and may change in future releases without a deprecation period. It is not recommended for production use.

These classes implement the distributed optimization backend that connects Blop to a remote Bluesky Queueserver. See the Asynchronous Optimization with Bluesky Queueserver tutorial for a full worked example.

OptimizationResult#

class blop.queueserver.OptimizationResult(iterations_completed, num_points, uids)[source]#

The result of a completed or stopped optimization run.

Warning

This class is part of the experimental queueserver integration. The API may change in future releases without a deprecation period.

Parameters:
iterations_completedint

The number of suggest -> acquire -> ingest cycles that finished successfully. For a run stopped early via QueueserverOptimizationRunner.stop(), this reflects however many iterations completed before the stop.

num_pointsint

The number of points suggested per iteration.

uidstuple[str, …]

The Bluesky run UIDs for each completed acquisition, in order. These can be used to retrieve raw data from a Tiled or databroker catalog for post-hoc analysis.

iterations_completed: int#
num_points: int#
uids: tuple[str, ...]#

QueueserverClient#

class blop.queueserver.QueueserverClient(re_manager_api, document_dispatcher)[source]#

Bases: object

Handles communication with a Bluesky queueserver.

This class encapsulates queueserver communication, including plan submission and event listening.

Warning

This class is part of the experimental queueserver integration. The API may change in future releases without a deprecation period.

Parameters:
re_manager_apibluesky_queueserver_api.zmq.REManagerAPI

Manager instance for communication with Bluesky Queueserver

document_dispatcherbluesky.callbacks.zmq.RemoteDispatcher

Dispatcher for the Bluesky document stream.

check_environment()[source]#

Verify that the queueserver environment is ready.

Raises:
RuntimeError

If the queueserver environment is not open.

check_devices_available(device_names)[source]#

Verify that all specified devices are available in the queueserver.

Parameters:
device_namesSequence[str]

Names of devices to check.

Raises:
ValueError

If any device is not available.

check_plan_available(plan_name)[source]#

Verify that a plan is available in the queueserver.

Parameters:
plan_namestr

Name of the plan to check.

Raises:
ValueError

If the plan is not available.

submit_plan(plan, autostart=True, timeout=600)[source]#

Submit a plan to the queueserver queue.

Parameters:
planBPlan

The plan to submit.

autostartbool, optional

If True, start the queue after adding the plan.

timeoutfloat, optional

Timeout in seconds when waiting for queue to be idle.

start_listener(on_stop)[source]#

Start listening for document events from the queueserver.

Parameters:
on_stopcallable

Callback invoked when a stop document is received. Signature: on_stop(start_doc, stop_doc)

stop_listener()[source]#

Stop the document listener thread.

QueueserverOptimizationRunner#

class blop.queueserver.QueueserverOptimizationRunner(optimization_problem, queueserver_client)[source]#

Bases: object

Runs optimization loops through a Bluesky queueserver.

This class coordinates the optimization workflow by getting suggestions from the optimizer, submitting acquisition plans to the queueserver, and ingesting results when plans complete.

Warning

This class is part of the experimental queueserver integration. The API may change in future releases without a deprecation period.

Parameters:
optimization_problemQueueserverOptimizationProblem

The optimization problem to solve, containing the optimizer, actuators, sensors, and evaluation function.

queueserver_clientQueueserverClient

Client for communicating with the queueserver. The document listener is started once during runner construction so it is ready before any submitted plan can emit documents.

property optimization_problem: QueueserverOptimizationProblem#

The optimization problem being solved.

property current_iteration: int#

The current iteration number (0 if not running).

run(iterations=1, num_points=1)[source]#

Run the optimization loop.

Validates the queueserver state, then begins the suggest -> acquire -> ingest cycle. This method returns immediately; the optimization runs asynchronously via callbacks on the Bluesky document stream.

Parameters:
iterationsint

Number of optimization iterations to run.

num_pointsint

Number of points to suggest per iteration.

Returns:
concurrent.futures.Future[OptimizationResult]

A future that resolves to an OptimizationResult when all iterations complete, or when stop() is called. If the optimization loop raises an unhandled exception the future will hold that exception and re-raise it on .result().

Raises:
RuntimeError

If the queueserver environment is not ready.

ValueError

If required devices or plans are not available.

submit_suggestions(suggestions)[source]#

Manually submit suggestions to the queue. This method returns immediately; the optimization runs asynchronously via callbacks on the Bluesky document stream.

Parameters:
suggestionslist[dict]

Parameter combinations to evaluate. Can be:

  • Optimizer suggestions (with “_id” keys from suggest())

  • Manual points (without “_id”, requires CanRegisterSuggestions protocol)

Returns:
concurrent.futures.Future[OptimizationResult]

A future that resolves to an OptimizationResult when the acquisition completes. If an unhandled exception occurs the future will hold it and re-raise on .result().

stop()[source]#

Stop the optimization loop gracefully.

The future returned by run() or submit_suggestions() will be resolved with a partial OptimizationResult containing however many iterations completed before the stop.