Adaptive Sampling#

In this tutorial you will learn:

  1. Basic acquisition using the BlueSky RunEngine

  2. Generating basic acquisition plans for multiple sample environments on a simulated diffraction beamline

  3. How to use the Bluesky Adaptive harness to readily integrate AI-agents with the beamline

  4. Demonstration of Reinforcement Learning (RL) being used to optimize data collection strategies

For more bluesky tutorials, go to

Bluesky flow diagram (image source)

Some of the software we use:#

  • Bluesky RunEngine for experiment orchestration (sequencing)

  • Bluesky Ophyd for device integration (for this demo, a simulated detector)

  • Bluesky Widgets components for live-updating (“streaming”) visualization

  • Matplotlib for visualization

  • Bluesky Adaptive, an adaptive “harness” for integrating an Agent in a feedback loop with the Bluesky RunEngine

  • Tensorflow for the model

%matplotlib widget

import matplotlib.pyplot as plt
from bluesky import RunEngine
from bluesky.plans import count
from utils.simulated_hardware import detector, sample_selector, select_sample
from utils.visualization import stream_to_figures
from utils.adaptive_recommendations import with_agent

detector.delay = 1

Make a “RunEngine”#

The RunEngine is responsible for orchestrating the operation of the beamline. It

  • Processes a set of instructions (a “plan”) from the user

  • Direct hardware, tracks what is moving when, and tries to clean up correct in the event of success or failures

  • Emits metadata and data in a streaming fashion for consumers (plots, models, storage, etc.)

RE = RunEngine()

We can now use RunEngine to move things and it collect data.

Switch between samples#

For this tutorial, we will be moving a sample_selector ophyd device, which can switch between samples on our simulated beamline. We can read the current status of the motor to see which sample we are currently on.
{'sample_selector': {'value': 0, 'timestamp': 1682197314.1015368}}

To move this motor, we could use the built-in bluesky mv plan…

from bluesky.plan_stubs import mv

RE(mv(sample_selector, 2))
{'sample_selector': {'value': 2, 'timestamp': 1682197314.7594862}}

Or, we can write our own custom plan with whatever language and extensions we wish.

def select_sample(sample_number, *, verbose=False):
    if verbose:
        print(f"Moving to sample {sample_number}")
    yield from mv(sample_selector, sample_number)
RE(select_sample(0, verbose=True))  # This moves a sample positioner to place Sample 0 in the beam.
Moving to sample 0


Acquire images#

In this tutorial, we are focused on live streaming data as published by the RunEngine and formatted in the event-model. As such, we’re going to setup our visualization first, and then stream data in as we are measuring.

Functionally, this involves creating a figure and axes with Matplotlib, and then wrapping them in a callback function. The callback function will be streamed the documents as they are emitted by the RunEngine, unpack the documents, and update the visualization.

When the cell below is first run, you should see a checkerboard pattern and some basic metadata in the title (sample number, and number of shots on the sample).

fig, axes = plt.subplots(squeeze=False, constrained_layout=True, figsize=(5, 5))
callback = stream_to_figures(fig, axes)

Now we can select the sample we want and pass a count plan into the run engine.

count is the most basic acquisition plan in Bluesky. It takes as an argument a list of detectors. In this case, we will pass it both the detector and the sample_selector ophyd objects. In this way, the emitted documents will automatically contain both the image data from the detector and the sample number associated with this data (important metadata).

RE(count([sample_selector, detector]), callback)

If everything worked, you should have seen a diffraction pattern appear on the visualization, with the associated sample number (“Sample 0”) in the image title. This data is pulled from the streaming documents emitted by the RunEngine - we didn’t have to go get them and fully process them later!

Note that if you look at the immidiate output of the cell, you should see a long-string of letters and numbers. This serves as a universally unique identifier (uuid) for that particular measurment that will never be repeated. These uuids can be used as book-keeping identifiers for later data lookup, but that is beyond the scope of this tutorial.

So, what happens if we measure the same sample again?

RE(count([sample_selector, detector]), callback)

The stream_to_figures callback we have setup here contains logic to average additional measurements of the same sample together. As such, you may have noticed that the image quality improved slightly when this second measurement was performed. The “N_shots” quantity in the plot title also reflects this number of shots on sample increasing.

You can try re-running that cell several times if you’d like, but we can expect little visual change. This is because sample 0 happens to be a ‘strong’ scatterer, and it’s resultant signal-to-noise quality is good over the background. In fact, even a single exposure produced an image with sufficient contrast to interpret scientifically.

Next, let’s set up a new figure, move to the next sample, and take a measurement.

fig, axes = plt.subplots(squeeze=False, constrained_layout=True, figsize=(5, 5))
callback = stream_to_figures(fig, axes, start_at=1)
RE(select_sample(1, verbose=True))
Moving to sample 1

RE(count([sample_selector, detector]), callback)

This sample doesn’t look nearly as good as the first one! It gives a weak signal (low signal-to-noise ratio). A single exposure does not produce sufficient contrast, but we can take additional exposures. The visualization above will display the average of all the exposures of this sample.

We could do that by hand, re-running the above cell several times. Or, we could write a custom plan to do this. Let’s go with the latter as it’s less tedious to manage. Plus, we can take advantage of the ability to use plans in other plans!

def multi_count(num_counts):
    for i in range(num_counts):
        print(f"Exposure number {i}")
        yield from count([sample_selector, detector])

Note that we didn’t have to know anything about how the guts of count worked, and we even made the decision to hardcode detector and sample_selector into the plan (though we wouldn’t need to, it’s up to how you want to write the plan).

unique_ids = RE(multi_count(5), callback)
Exposure number 0

Exposure number 1

Exposure number 2

Exposure number 3

Exposure number 4

Now the statistics on that weakly-scattering sample are starting to look better!

Acquire images for more than one sample#

We’ve already seen that samples 0 and 1 represent a strong and weakly scatterer respectively. Let’s say that based off these initial observations, we decide that the poorly-scattering sample need to be measured by some ratio (e.g. 4x) as often as good scattering samples. We probably also want to be able to define how many total loops to complete over both samples at the desired measurement ratio.

Let’s write a plan!

def multi_sample_plan(measure_ratio, total_loops=1):
    for i in range(total_loops):
        print(f"* Now on big loop {i}")
        yield from select_sample(0, verbose=True)
        yield from count([sample_selector, detector])
        yield from select_sample(1, verbose=True)
        for _ in range(measure_ratio):
            yield from count([sample_selector, detector])

To make the visualization easier, let’s setup a callback that can display both samples at once as the data is streamed.

fig, axes = plt.subplots(1, 2, squeeze=False, constrained_layout=True, figsize=(5, 3))
callback = stream_to_figures(fig, axes)
unique_ids = RE(multi_sample_plan(4, total_loops=2), callback)
* Now on big loop 0

Moving to sample 0

Moving to sample 1

* Now on big loop 1

Moving to sample 0

Moving to sample 1

Dealing with many, many samples.#

We have thus far been playing with just a couple of samples. However, on the real PDF beamline at NSLS-II, we often load hundreds of different samples at one time. And each of these samples could have very different scattering qualities!

PDF High Capacity Sample Changer

let’s write a custom plan to ‘sweep’ all samples on the simulated bracket. For this example, we’ll pretend to have 9-samples. We would like our plan to define the total number of individual shots to be taken.

def sequential_sweep(total_shots):
    "Sweep over the samples in order. Take up to `total_shots` shots."
    for shot in range(total_shots):
        yield from select_sample(shot % 9)
        yield from count([sample_selector, detector])
fig, axes = plt.subplots(3, 3, constrained_layout=True, figsize=(5, 5))
callback = stream_to_figures(fig, axes)

Our simulated detector has a simulated “delay”, standing in for the exposure and readout time in a real detector. Here we’ll speed it up a bit just so the following examples runs faster.

detector.delay = 0.2
unique_ids = RE(sequential_sweep(total_shots=25), callback)

You no doubt have a combination of strong and weakly scattering samples. Note that this arrangment is random, and if you restart this Notebook, will likely be different. You could sit down and write a custom plan, based on the scattering strength of your unique simulated samples. However, this would get to be tedious work if performed more than once or twice. This is where AI can help us out!

Use bluesky-adaptive to let an Agent drive the experiment#

Here, we introduce the bluesky-adaptive plan. This is a plan designed to readily interface with any sort of decision-making agent via an ask/tell interface. More details of this plan design and implementation can be found at

We will take advantage of a plan here called with_agent which simplifies the interface here, but you can easily look at the details by digging into with_agent??

Agent 1: Naive Agent#

To begin with, let’s use a naive sequential agent. The goal of this agent is to, within a defined “budget” of time (i.e. scans), sequentially measure across each sample. This is effectively exactly what our sequential_sweep plan above performed, but in the language of an agent in the bluesky-adaptive machinery.

fig, axes = plt.subplots(3, 3, constrained_layout=True, figsize=(5, 5))
callback = stream_to_figures(fig, axes)
from utils.adaptive_recommendations import NaiveAgent

unique_ids = RE(
    with_agent(NaiveAgent(9), max_shots=30),

This works well functionally, but it’s not a very efficient use of our time. After all, we would really prefer to measure the weaker-scatterers many more times than the strong scatterers, in order to built up better statistics. However, hand-writing a plan for every set of samples is not tractable. Let’s here use an agent trained using Reinforcement Learning to accomplish this task.

Agent 2: Reinforcement Learning Agent#

The details of how Reinforcement Learning (RL) works, or how we implemented it are beyond the scope of this short tutorial. However, interested readers can find details of the method here Essentially, the language of RL allows us to easily “gamify” our scientific goals.

Gamifying the Beamline

Prior to this tutorial, we trained an RL agent to develop an optimal policy that would enable for the following goals:

1.) Measure each sample initially once

2.) Repeat measurements on the samples, but collecting on weak scatterers 9x, and strong 1x.

3.) Having now built up sufficient statistics on the weak scatterers, continue to measuring each sample sequentially.

fig, axes = plt.subplots(3, 3, constrained_layout=True, figsize=(5, 5))
callback = stream_to_figures(fig, axes)

Note: Here we replace the RL agent with a lightweight Monte Carlo approximation. This enables this tutorial to run quickly, overcomes challenges with outdated versioning of the agent, and retains the Tensorforce agent code in this repository as an example. Nonetheless, from the standpoint of interfacing with Bluesky, we are again using the Bluesky-adaptive machinery and the with_agent helper plan.

from utils.adaptive_recommendations import RLAgent

detector.delay = 1
unique_ids = RE(
    with_agent(RLAgent(9, "tf_models/bluesky-tutorial/saved_models"), max_shots=90),

Through the use of an RL agent, and Bluesky-adaptive, we have measured the data more efficiently than the naive sequential plan, spending more time on the samples that need extra attention!

Bonus Agents#

On your own try out these agents that are at the extremes of knowing exactly what to do and knowing nothing about what it should do.

Any callable that matches the signature

def recommend(sample_number, badness)->

Agent 3: “Cheating” (Omniscient) Agent#

Just in case you want to try another agent, here is one who we have whispered the answer to ahead of time.

fig, axes = plt.subplots(3, 3, constrained_layout=True, figsize=(5, 5))
callback = stream_to_figures(fig, axes)
from utils.adaptive_recommendations import CheatingAgent

unique_ids = RE(
    with_agent(CheatingAgent(9), max_shots=90),

Note the RL agent probably performed just as well as the agent which had the right answer, because it was train ahead of time how to “play the game” we defined through our scientific goals.

Agent 4: “Chaos” (random) Agent#

Given this API we can plug in any logic we want to the plan, but that does not mean that the agent has to be good at its job! For example we can write an agent that will randomly pick a sample to measure next:

import random

def chaos_agent(x, y):
    # ignores input!
    return random.choice(range(9))
fig, axes = plt.subplots(3, 3, constrained_layout=True, figsize=(5, 5))
callback = stream_to_figures(fig, axes)
unique_ids = RE(
    with_agent(chaos_agent, max_shots=30),

You should see that measurements jump around between the samples. Given that the samples are also randomly strong or weak this is not actually better or worse than a sequential scan in terms of over / under collecting on the samples, but if we take into account the time to move the motor maybe significantly worse!