Important

You can run this notebook in a live session Binder or view it on nbviewer or GitHub.

Flyer Basics

We want to understand how to build a Flyer in “Bluesky” to support various types of fly scans and remote data loggers. The data about Flyers is spread about the standard documentation. We need some clarity and a few examples that build complexity incrementally.

The basic notion of a Flyer is that it directs an external controller (we’ll call it the controller here) to perform some data collection process. Usually, a controller is used to collect data at rates beyond the capabilities of Bluesky plans and the RunEngine. Examples could be waveforms from a storage oscilloscope or a continuous motion scan of a diffractometer.

This notebook will show the basic requirements for a Flyer and build a simple working example you can use a template for your own work. (Our inspiration to create this basic flyer tutorial was the goal of operating various fly scans at the APS, such as the USAXS fly scan, from Bluesky. The examples we found before we started this project quickly became too instrument-specific to serve as tutorials.) ## Python imports and definitions

Here are the full set of packages to imported. The first block are Python standard packages, then come the ophyd, Bluesky, and databroker packages. Just the parts we plan on using here. Since this is also a tutorial, we will not rename imports or use other such shortcuts in the documentation (the online code has some shortcuts).

  • Create a logger instance in case we want to investigate internal details as our code runs.

  • Create an instance of the Bluesky RunEngine.

  • Create an instance of the databroker using databroker.temp(), establishing a temporary, disposable yet fully-functional databroker for just this tutorial session.

  • Subscribe that databroker instance to the RunEngine so it receives the document stream each time we run a plan.

Links

[1]:
import logging
import threading
import time

import ophyd
import bluesky
import bluesky.plans
import databroker

logger = logging.getLogger()
RE = bluesky.RunEngine({})
db = databroker.temp()
RE.subscribe(db.insert)
/home/travis/virtualenv/python3.7.1/lib/python3.7/site-packages/pims/image_reader.py:26: RuntimeWarning: PIMS image_reader.py could not find scikit-image. Falling back to matplotlib's imread(), which uses floats instead of integers. This may break your scripts.
(To ignore this warning, include the line "warnings.simplefilter("ignore", RuntimeWarning)" in your script.)
  warnings.warn(RuntimeWarning(ski_preferred))
[1]:
0

Bare Minimum Requirements for a Flyer

In Bluesky, a Flyer is an ophyd.Device that meets the Fly-able interface, which has three methods:

  1. Kickoff - begin accumulating data

  2. Complete - Bluesky tells the Flyer that Bluesky is ready to receive data

  3. Collect - the device provides the data to Bluesky

The first two methods must return an instance of ophyd.status.Status (a.k.a. a status object). For our code, we choose a specialized version of Status(), the DeviceStatus() object, based on the explanation provided in discussion.

The collect() method requires a companion describe_collect() that informs the RunEngine what kind of data to expect from collect(). If no timestamp information is provided from the controller, then do as we show here and use the workstation’s clock to provide a timestamp for the event.

This example (which does absolutely nothing) meets the bare minimum requirement.

Links

[2]:
class BareMinimumFlyer(ophyd.Device):

    def kickoff(self):
        kickoff_status = ophyd.DeviceStatus(self)
        kickoff_status._finished(success=True)
        return kickoff_status

    def complete(self):
        complete_status = ophyd.DeviceStatus(self)
        complete_status._finished(success=True)
        return complete_status

    def collect(self):
        yield {'data':{}, 'timestamps':{}, 'time':time.time()}

    def describe_collect(self):
        return {self.name: {}}


flyer = BareMinimumFlyer(name="flyer")
print(flyer.complete())
print(list(flyer.collect()))

# if this next step succeeds, it's proof that we did this right!
RE(bluesky.plans.fly([flyer]))
DeviceStatus(device=flyer, done=False, success=False)
[{'data': {}, 'timestamps': {}, 'time': 1594844781.3913615}]
/home/travis/virtualenv/python3.7.1/lib/python3.7/site-packages/event_model/__init__.py:172: UserWarning: The document type 'bulk_events' has been deprecated in favor of 'event_page', whose structure is a transpose of 'bulk_events'.
  "The document type 'bulk_events' has been deprecated in favor of "
[2]:
('9f536542-6128-4519-8f8c-d283aedfe1b7',)

Only three documents were emitted from the RunEngine: start, descriptor, and stop since we generated no data content.

[3]:
list(db[-1].documents())
[3]:
[('start',
  {'uid': '9f536542-6128-4519-8f8c-d283aedfe1b7',
   'time': 1594844781.3921616,
   'versions': {'ophyd': '1.5.2', 'bluesky': '1.6.4'},
   'scan_id': 1,
   'plan_type': 'generator',
   'plan_name': 'fly'}),
 ('descriptor',
  {'run_start': '9f536542-6128-4519-8f8c-d283aedfe1b7',
   'time': 1594844781.3951018,
   'data_keys': {},
   'uid': 'ae3ceacd-5ca7-44dc-a9ba-6b6497c5b176',
   'name': 'flyer',
   'configuration': {'flyer': {'data': {}, 'timestamps': {}, 'data_keys': {}}},
   'hints': {'flyer': {'fields': []}},
   'object_keys': {'flyer': []}}),
 ('stop',
  {'run_start': '9f536542-6128-4519-8f8c-d283aedfe1b7',
   'time': 1594844781.396851,
   'uid': 'ffe6e859-b03d-4e55-9da4-cfa6467f3a13',
   'exit_status': 'success',
   'reason': '',
   'num_events': {'flyer': 1}})]

Flyer : a starting template

The BareMinimumFlyer is a good start to use a Flyer but we’ll need to add a few more things to make a good template. The first thing to do is to make the status objects known to any method of the class. We’ll prepend both of them with self. and describe in the code comments what is happening. In the constructor (__init__()), we set both to None, the value we expect when not kicked off or flying, respectively. Since we need a constructor, we must remember to call the constructor of the superclass as well or our ophyd.Device will not work correctly.

def __init__(self, *args, **kwargs):
    super().__init__('', parent=None, **kwargs)
    self.kickoff_status = None
    self.complete_status = None

Our controller could take some time to signal that it is finished (seconds to minutes, at least). We need a way to detect this completion. We can do that either by polling the PV or by subscribing to a callback on the completion event. We’ll make that choice when we implement the actual activity. Here, intend to wait in a polling loop. Since the polling loop is an activity that does not return until the controller is done, we must do that waiting in a thread (consider an [alternative suggestion to use epics.ca.CAThread separate from that of the RunEngine. (We do not want to block the RunEngine thread so it is free to respond to other activities, such as data from other streams or the user inerface.) So, we run my_activity() in a separate thread that is called from kickoff():

thread = threading.Thread(target=self.my_activity, daemon=True)
thread.start()

Link

The basic outline of my_activity() is:

def my_activity(self):
    # tell the *controller* to do its work
    # once started, we notify by updating the status object
    self.kickoff_status._finished(success=True)

    # wait for for it to tell us it is done
    # after the wait, we declare victory
    self.complete_status._finished(success=True)

The waiting step will block the thread in which my_activity() is running but that’s OK since this is separate from the RunEngine’s thread.

Within my_activity(), we’ve left two comments starting with # TODO: marking where we need to add code that will complete the specifics of our final project. Since this tutorial develops a starting template for that project (and others), we leave these comments as-is.

We’ve also added some diagnostic reporting (calls to logger.info(...)) to build out the next example:

[4]:
class MyFlyer(ophyd.Device):
    """
    starting template for a Flyer that we understand
    """

    def __init__(self, *args, **kwargs):
        super().__init__('', parent=None, **kwargs)
        self.complete_status = None

    def my_activity(self):
        """
        start the "fly scan" here, could wait for completion

        It's OK to use blocking calls here
        since this is called in a separate thread
        from the Bluesky RunEngine.
        """
        logger.info("activity()")
        if self.complete_status is None:
            logger.info("leaving activity() - not complete")
            return

        # TODO: do the activity here

        # once started, we notify by updating the status object
        self.kickoff_status._finished(success=True)

        # TODO: wait for completion

        # after the wait, we declare victory
        self.complete_status._finished(success=True)
        logger.info("activity() complete. status = " + str(self.complete_status))

    def kickoff(self):
        """
        Start this Flyer
        """
        logger.info("kickoff()")
        self.kickoff_status = ophyd.DeviceStatus(self)
        self.complete_status = ophyd.DeviceStatus(self)

        thread = threading.Thread(target=self.my_activity, daemon=True)
        thread.start()

        return self.kickoff_status

    def complete(self):
        """
        Wait for flying to be complete
        """
        logger.info("complete()")
        if self.complete_status is None:
            raise RuntimeError("No collection in progress")

        return self.complete_status

    def collect(self):
        """
        Retrieve the data
        """
        logger.info("collect()")
        self.complete_status = None
        yield {'data':{}, 'timestamps':{}, 'time':time.time()}

    def describe_collect(self):
        """
        Describe details for ``collect()`` method
        """
        logger.info("describe_collect()")
        return {self.name: {}}

[5]:
ifly = MyFlyer(name="ifly")

Diagnostics

When building a Flyer, it is useful to have some diagnostics in place. Already, we have been using some of these, including printing interim messages via calls to logger.info(...). Another useful diagnostic step is to call each of the methods individually to make sure they are acting as expected.

  1. create an instance of the Flyer

    flyer = MyFlyer(name=“flyer”)

  2. verify that kickoff() returns a status that is “Done”

    status = flyer.kickoff() status.done

  3. verify that complete() returns a status that is “Done”

    status = flyer.complete() status.done

  4. verify that describe_collect() returns a dictionary

    d = flyer.describe_collect() d

  5. verify that collect() returns a generator

    g = flyer.collect() g

  6. verify that generator is a list of data dictionaries

    list(g)

Apply some of those steps here (we’ll skip testing the ifly.complete() method when not flying since it raises a RuntimeError exception if data collection is not in progress):

[6]:
ifly.describe_collect()
[6]:
{'ifly': {}}
[7]:
list(ifly.collect())
[7]:
[{'data': {}, 'timestamps': {}, 'time': 1594844781.460254}]

Now, run this fly scan:

[8]:
RE(bluesky.plans.fly([ifly]))
[8]:
('a25bf41e-ec96-4cd5-b32c-2690e27b11f4',)
[9]:
db[-1].stream_names
[9]:
['ifly']
[10]:
db[-1].table("ifly")
[10]:
seq_num

Still only three documents were emitted from the RunEngine: start, descriptor, and stop.

[11]:
list(db[-1].documents())
[11]:
[('start',
  {'uid': 'a25bf41e-ec96-4cd5-b32c-2690e27b11f4',
   'time': 1594844781.4690773,
   'versions': {'ophyd': '1.5.2', 'bluesky': '1.6.4'},
   'scan_id': 2,
   'plan_type': 'generator',
   'plan_name': 'fly'}),
 ('descriptor',
  {'run_start': 'a25bf41e-ec96-4cd5-b32c-2690e27b11f4',
   'time': 1594844781.471447,
   'data_keys': {},
   'uid': '05cb92de-ba80-456a-9205-8cebc145d1b1',
   'name': 'ifly',
   'configuration': {'ifly': {'data': {}, 'timestamps': {}, 'data_keys': {}}},
   'hints': {'ifly': {'fields': []}},
   'object_keys': {'ifly': []}}),
 ('stop',
  {'run_start': 'a25bf41e-ec96-4cd5-b32c-2690e27b11f4',
   'time': 1594844781.4721828,
   'uid': '69085030-7539-407a-baca-9992a87146ef',
   'exit_status': 'success',
   'reason': '',
   'num_events': {'ifly': 1}})]

First working Flyer - trivial data

To collect data, we need to modify both the collect() and the describe_collect() methods. Bluesky needs to know what kind of data to expect from this Flyer, so that it can generate the correct descriptor document.

For the most trivial case, we’ll return a single number (1.2345) as the result of the first working Flyer.

In the describe_collect() method, we create a dictionary that describes the data to be collected:

d = dict(
    source = "fictional",
    dtype = "number",
    shape = []
)
return {
    'ifly': {
        "x": d
    }
}

Then, in the collect() method, add the actual data collection code:

t = time.time()
d = dict(
    time=t,
    data=dict(x=1.2345),
    timestamps=dict(x=t)
)
yield d
[12]:
class MyFlyer(ophyd.Device):
    """
    build a Flyer that we understand
    """

    def __init__(self, *args, **kwargs):
        super().__init__('', parent=None, **kwargs)
        self.complete_status = None

    def my_activity(self):
        """
        start the "fly scan" here, could wait for completion

        It's OK to use blocking calls here
        since this is called in a separate thread
        from the Bluesky RunEngine.
        """
        logger.info("activity()")
        if self.complete_status is None:
            logger.info("leaving activity() - not complete")
            return

        # TODO: do the activity here

        # once started, we notify by updating the status object
        self.kickoff_status._finished(success=True)

        # TODO: wait for completion

        # after the wait, we declare victory
        self.complete_status._finished(success=True)
        logger.info("activity() complete. status = " + str(self.complete_status))

    def kickoff(self):
        """
        Start this Flyer
        """
        logger.info("kickoff()")
        self.kickoff_status = ophyd.DeviceStatus(self)
        self.complete_status = ophyd.DeviceStatus(self)

        thread = threading.Thread(target=self.my_activity, daemon=True)
        thread.start()

        return self.kickoff_status

    def complete(self):
        """
        Wait for flying to be complete
        """
        logger.info("complete()")
        if self.complete_status is None:
            raise RuntimeError("No collection in progress")

        return self.complete_status

    def describe_collect(self):
        """
        Describe details for ``collect()`` method
        """
        logger.info("describe_collect()")
        d = dict(
            source = "fictional",
            dtype = "number",
            shape = []
        )
        return {
            'ifly': {
                "x": d
            }
        }

    def collect(self):
        """
        Start this Flyer
        """
        logger.info("collect()")
        self.complete_status = None
        t = time.time()
        d = dict(
            time=t,
            data=dict(x=1.2345),
            timestamps=dict(x=t)
        )
        yield d

As before, create a new instance of the revised MyFlyer class.

[13]:
ifly = MyFlyer(name="ifly")
[14]:
print('output from describe_collect() : ', ifly.describe_collect())
print("list output from collect() : ", list(ifly.collect()))
output from describe_collect() :  {'ifly': {'x': {'source': 'fictional', 'dtype': 'number', 'shape': []}}}
list output from collect() :  [{'time': 1594844781.5480998, 'data': {'x': 1.2345}, 'timestamps': {'x': 1594844781.5480998}}]

Running this flyer with the RunEngine seems anticlimactic but the lack of exceptions tells us that it ran and we get a UUID at the end.

[15]:
RE(bluesky.plans.fly([ifly]))
/home/travis/virtualenv/python3.7.1/lib/python3.7/site-packages/event_model/__init__.py:172: UserWarning: The document type 'bulk_events' has been deprecated in favor of 'event_page', whose structure is a transpose of 'bulk_events'.
  "The document type 'bulk_events' has been deprecated in favor of "
[15]:
('4f1316c6-1a7f-499b-af9d-7fb70edd6154',)

We next query the last scan in the databroker and show a table of the stream from collect():

[16]:
h = db[-1]
h.table(h.stream_names[0])
[16]:
time x
seq_num
1 2020-07-15 20:26:21.559512377 1.2345

Now, there is one more document type emitted: event. Only one event since there is only one data point.

[17]:
list(db[-1].documents())
[17]:
[('start',
  {'uid': '4f1316c6-1a7f-499b-af9d-7fb70edd6154',
   'time': 1594844781.5564368,
   'versions': {'ophyd': '1.5.2', 'bluesky': '1.6.4'},
   'scan_id': 3,
   'plan_type': 'generator',
   'plan_name': 'fly'}),
 ('descriptor',
  {'run_start': '4f1316c6-1a7f-499b-af9d-7fb70edd6154',
   'time': 1594844781.5588233,
   'data_keys': {'x': {'source': 'fictional', 'dtype': 'number', 'shape': []}},
   'uid': '79a1d032-a631-449f-b416-a08c577c31b2',
   'name': 'ifly',
   'configuration': {'ifly': {'data': {}, 'timestamps': {}, 'data_keys': {}}},
   'hints': {'ifly': {'fields': []}},
   'object_keys': {'ifly': ['x']}}),
 ('event',
  {'descriptor': '79a1d032-a631-449f-b416-a08c577c31b2',
   'uid': 'e08ac1b6-313d-48f5-b034-c4f968d8095c',
   'time': 1594844781.5595124,
   'seq_num': 1,
   'data': {'x': 1.2345},
   'timestamps': {'x': 1594844781.5595124},
   'filled': {}}),
 ('stop',
  {'run_start': '4f1316c6-1a7f-499b-af9d-7fb70edd6154',
   'time': 1594844781.5598881,
   'uid': '15453d36-f67c-453b-8775-f04768ac553a',
   'exit_status': 'success',
   'reason': '',
   'num_events': {'ifly': 1}})]

Flyer that “collects” 1-D array data

We continue to expand the capabilities of our working example of a Flyer. Now, we wish to “collect” a 1-D array of data. We’ll manufacture that data ourselves to keep the code simple.

Here, we generate an array of 5 numbers, each representing the elapsed time (in seconds) since the scan started (in kickoff()). We record the start time with self.t0 = time.time(). Also, time.time() is used to generate the timestamps in the data events. In a real case, we might get timestamps from the controller or we’d have to make it up ourselves, just like this example.

[18]:
class MyFlyer(ophyd.Device):
    """
    a Flyer that we understand that reports 1-D array of data
    """

    def __init__(self, *args, **kwargs):
        super().__init__('', parent=None, **kwargs)
        self.complete_status = None
        self.t0 = 0

    def my_activity(self):
        """
        start the "fly scan" here, could wait for completion

        It's OK to use blocking calls here
        since this is called in a separate thread
        from the Bluesky RunEngine.
        """
        logger.info("activity()")
        if self.complete_status is None:
            logger.info("leaving activity() - not complete")
            return

        # TODO: do the activity here

        # once started, we notify by updating the status object
        self.kickoff_status._finished(success=True)

        # TODO: wait for completion

        # after the wait, we declare victory
        self.complete_status._finished(success=True)
        logger.info("activity() complete. status = " + str(self.complete_status))

    def kickoff(self):
        """
        Start this Flyer
        """
        logger.info("kickoff()")
        self.kickoff_status = ophyd.DeviceStatus(self)
        self.complete_status = ophyd.DeviceStatus(self)
        self.t0 = time.time()

        thread = threading.Thread(target=self.my_activity, daemon=True)
        thread.start()

        return self.kickoff_status

    def complete(self):
        """
        Wait for flying to be complete
        """
        logger.info("complete()")
        if self.complete_status is None:
            raise RuntimeError("No collection in progress")

        return self.complete_status

    def describe_collect(self):
        """
        Describe details for ``collect()`` method
        """
        logger.info("describe_collect()")
        d = dict(
            source = "elapsed time, s",
            dtype = "number",
            shape = (1,)
        )
        return {
            self.name: {
                "x": d
            }
        }

    def collect(self):
        """
        Start this Flyer
        """
        logger.info("collect()")
        self.complete_status = None
        for _ in range(5):
            t = time.time()
            x = t - self.t0 # data is elapsed time since kickoff()
            d = dict(
                time=t,
                data=dict(x=x),
                timestamps=dict(x=t)
            )
            yield d

[19]:
ifly = MyFlyer(name="ifly")
[20]:
print(ifly.describe_collect())
{'ifly': {'x': {'source': 'elapsed time, s', 'dtype': 'number', 'shape': (1,)}}}
[21]:
list(ifly.collect())
[21]:
[{'time': 1594844781.635777,
  'data': {'x': 1594844781.635777},
  'timestamps': {'x': 1594844781.635777}},
 {'time': 1594844781.63578,
  'data': {'x': 1594844781.63578},
  'timestamps': {'x': 1594844781.63578}},
 {'time': 1594844781.6357815,
  'data': {'x': 1594844781.6357815},
  'timestamps': {'x': 1594844781.6357815}},
 {'time': 1594844781.6357837,
  'data': {'x': 1594844781.6357837},
  'timestamps': {'x': 1594844781.6357837}},
 {'time': 1594844781.635785,
  'data': {'x': 1594844781.635785},
  'timestamps': {'x': 1594844781.635785}}]

Again, not much information from running this flyer, except that it succeeds and a uuid is returned.

[22]:
RE(bluesky.plans.fly([ifly]))
/home/travis/virtualenv/python3.7.1/lib/python3.7/site-packages/event_model/__init__.py:172: UserWarning: The document type 'bulk_events' has been deprecated in favor of 'event_page', whose structure is a transpose of 'bulk_events'.
  "The document type 'bulk_events' has been deprecated in favor of "
[22]:
('6d1d5a8e-04af-4b67-b07d-a990ef9da80e',)

More information is obtained by asking the databroker about the most recent scan. Here’s the data table from our stream of data events. (The stream is named “ifly” as set from self.name in the describe_collect() method.) We’ll reference that last scan as h to reduce the amount of typing.

[23]:
h = db[-1]
h.stream_names[0]
[23]:
'ifly'
[24]:
h.table(h.stream_names[0])
[24]:
time x
seq_num
1 2020-07-15 20:26:21.648188114 0.001629
2 2020-07-15 20:26:21.648211479 0.001653
3 2020-07-15 20:26:21.648221254 0.001662
4 2020-07-15 20:26:21.648228884 0.001670
5 2020-07-15 20:26:21.648236275 0.001678

We generated 5 data points, there are 5 event documents to match.

[25]:
list(db[-1].documents())
[25]:
[('start',
  {'uid': '6d1d5a8e-04af-4b67-b07d-a990ef9da80e',
   'time': 1594844781.6451385,
   'versions': {'ophyd': '1.5.2', 'bluesky': '1.6.4'},
   'scan_id': 4,
   'plan_type': 'generator',
   'plan_name': 'fly'}),
 ('descriptor',
  {'run_start': '6d1d5a8e-04af-4b67-b07d-a990ef9da80e',
   'time': 1594844781.6475658,
   'data_keys': {'x': {'source': 'elapsed time, s',
     'dtype': 'number',
     'shape': [1]}},
   'uid': 'aace9903-902d-4ddd-9474-1fa6246ff2e4',
   'name': 'ifly',
   'configuration': {'ifly': {'data': {}, 'timestamps': {}, 'data_keys': {}}},
   'hints': {'ifly': {'fields': []}},
   'object_keys': {'ifly': ['x']}}),
 ('event',
  {'descriptor': 'aace9903-902d-4ddd-9474-1fa6246ff2e4',
   'uid': '6e1831d6-0a40-437d-a2fc-0ae4979786f4',
   'time': 1594844781.648188,
   'seq_num': 1,
   'data': {'x': 0.0016293525695800781},
   'timestamps': {'x': 1594844781.648188},
   'filled': {}}),
 ('event',
  {'descriptor': 'aace9903-902d-4ddd-9474-1fa6246ff2e4',
   'uid': '3011f8ca-bc88-4a7d-8fb6-43b39d964eb6',
   'time': 1594844781.6482115,
   'seq_num': 2,
   'data': {'x': 0.0016527175903320312},
   'timestamps': {'x': 1594844781.6482115},
   'filled': {}}),
 ('event',
  {'descriptor': 'aace9903-902d-4ddd-9474-1fa6246ff2e4',
   'uid': '8cfe5c46-41bb-46c1-a25e-cd868d4d3aff',
   'time': 1594844781.6482213,
   'seq_num': 3,
   'data': {'x': 0.0016624927520751953},
   'timestamps': {'x': 1594844781.6482213},
   'filled': {}}),
 ('event',
  {'descriptor': 'aace9903-902d-4ddd-9474-1fa6246ff2e4',
   'uid': 'eed3e167-d4ec-44af-a072-d367e5587241',
   'time': 1594844781.648229,
   'seq_num': 4,
   'data': {'x': 0.0016701221466064453},
   'timestamps': {'x': 1594844781.648229},
   'filled': {}}),
 ('event',
  {'descriptor': 'aace9903-902d-4ddd-9474-1fa6246ff2e4',
   'uid': '5c465d99-de3e-4261-8782-603f8587dde8',
   'time': 1594844781.6482363,
   'seq_num': 5,
   'data': {'x': 0.0016775131225585938},
   'timestamps': {'x': 1594844781.6482363},
   'filled': {}}),
 ('stop',
  {'run_start': '6d1d5a8e-04af-4b67-b07d-a990ef9da80e',
   'time': 1594844781.6487951,
   'uid': 'c0196afe-5c8f-4745-8c42-1917be4a877b',
   'exit_status': 'success',
   'reason': '',
   'num_events': {'ifly': 5}})]

Conclusion

Now you have seen the basic steps in creating a Flyer. You have also been shown a few diagnostic tools to help you investigate your code development.