New (v2) API

Core

class databroker.core.Document(*args, **kwargs)[source]

Document is an immutable dict subclass.

It is immutable to help consumer code avoid accidentally corrupting data that another part of the cosumer code was expected to use unchanged.

Subclasses of Document must define __dask_tokenize__. The tokenization schemes typically uniquely identify the document based on only a subset of its contents, and mutating the contents can thereby create situations where two unequal objects have colliding tokens. Immutability helps guard against this too.

Note that Documents are not recursively immutable. Just as it is possible create a tuple (immutable) of lists (mutable) and mutate the lists, it is possible to mutate the internal contents of a Document, but this should not be done. It is safer to use the to_dict() method to create a mutable deep copy.

This is implemented as a dict subclass in order to satisfy certain consumers that expect an object that satisfies isinstance(obj, dict). This implementation detail may change in the future.

clear() None.  Remove all items from D.
pop(k[, d]) v, remove specified key and return the corresponding value.

If key is not found, d is returned if given, otherwise KeyError is raised

popitem(*args, **kwargs)

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault(*args, **kwargs)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

to_dict()[source]

Create a mutable deep copy.

update([E, ]**F) None.  Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

class databroker.core.BlueskyRun(*args, **kwargs)[source]

Catalog representing one Run.

Parameters
get_run_start: callable

Expected signature get_run_start() -> RunStart

get_run_stopcallable

Expected signature get_run_stop() -> RunStop

get_event_descriptorscallable

Expected signature get_event_descriptors() -> List[EventDescriptors]

get_event_pagescallable

Expected signature get_event_pages(descriptor_uid) -> generator where generator yields Event documents

get_event_countcallable

Expected signature get_event_count(descriptor_uid) -> int

get_resourcecallable

Expected signature get_resource(resource_uid) -> Resource

get_resources: callable

Expected signature get_resources() -> Resources

lookup_resource_for_datumcallable

Expected signature lookup_resource_for_datum(datum_id) -> resource_uid

get_datum_pagescallable

Expected signature get_datum_pages(resource_uid) -> generator where generator yields Datum documents

get_fillercallable

Expected signature get_filler() -> event_model.Filler

transformsDict[str, Callable]

A dict that maps any subset of the keys {start, stop, resource, descriptor} to a function that accepts a document of the corresponding type and returns it, potentially modified. This feature is for patching up erroneous metadata. It is intended for quick, temporary fixes that may later be applied permanently to the data at rest (e.g., via a database migration).

**kwargs

Additional keyword arguments are passed through to the base class, Catalog.

configure_new(**kwargs)[source]

Return self or, if args are provided, some new instance of type(self).

This is here so that the user does not have to remember whether a given variable is a BlueskyRun or an Entry with a Bluesky Run. In either case, obj() will return a BlueskyRun.

describe()[source]

Description from the entry spec

get(**kwargs)

Return self or, if args are provided, some new instance of type(self).

This is here so that the user does not have to remember whether a given variable is a BlueskyRun or an Entry with a Bluesky Run. In either case, obj() will return a BlueskyRun.

get_file_list(resource)[source]

Fetch filepaths of external files associated with this Run.

This method is not defined on RemoteBlueskyRun because the filepaths may not be meaningful on a remote machine.

This method should be considered experimental. It may be changed or removed in a future release.

property has_been_persisted

The base class does not interact with persistence

property is_persisted

The base class does not interact with persistence

persist(*args, **kwargs)[source]

Save data from this source to local persistent storage

Parameters
ttl: numeric, optional

Time to live in seconds. If provided, the original source will be accessed and a new persisted version written transparently when more than ttl seconds have passed since the old persisted version was written.

kargs: passed to the _persist method on the base container.
read()[source]

Load entire dataset into a container and return it

to_dask()[source]

Return a dask container for this data source

class databroker.core.RemoteBlueskyRun(*args, **kwargs)[source]

Catalog representing one Run.

This is a client-side proxy to a BlueskyRun stored on a remote server.

Parameters
url: str

Address of the server

headers: dict

HTTP headers to sue in calls

name: str

handle to reference this data

parameters: dict

To pass to the server when it instantiates the data source

metadata: dict

Additional info

kwargs: ignored
property has_been_persisted

The base class does not interact with persistence

property is_persisted

The base class does not interact with persistence

persist(*args, **kwargs)[source]

Save data from this source to local persistent storage

Parameters
ttl: numeric, optional

Time to live in seconds. If provided, the original source will be accessed and a new persisted version written transparently when more than ttl seconds have passed since the old persisted version was written.

kargs: passed to the _persist method on the base container.
read()[source]

Load entire dataset into a container and return it

to_dask()[source]

Return a dask container for this data source

class databroker.core.BlueskyEventStream(*args, **kwargs)[source]

Catalog representing one Event Stream from one Run.

Parameters
stream_namestring

Stream name, such as ‘primary’.

get_run_stopcallable

Expected signature get_run_stop() -> RunStop

get_event_descriptorscallable

Expected signature get_event_descriptors() -> List[EventDescriptors]

get_event_pagescallable

Expected signature get_event_pages(descriptor_uid) -> generator where generator yields event_page documents

get_event_countcallable

Expected signature get_event_count(descriptor_uid) -> int

get_resourcecallable

Expected signature get_resource(resource_uid) -> Resource

get_resources: callable

Expected signature get_resources() -> Resources

lookup_resource_for_datumcallable

Expected signature lookup_resource_for_datum(datum_id) -> resource_uid

get_datum_pagescallable

Expected signature get_datum_pages(resource_uid) -> generator where generator yields datum_page documents

fillersdict of Fillers
transformsDict[str, Callable]

A dict that maps any subset of the keys {start, stop, resource, descriptor} to a function that accepts a document of the corresponding type and returns it, potentially modified. This feature is for patching up erroneous metadata. It is intended for quick, temporary fixes that may later be applied permanently to the data at rest (e.g., via a database migration).

metadatadict

passed through to base class

includelist, optional

Fields (‘data keys’) to include. By default all are included. This parameter is mutually exclusive with exclude.

excludelist, optional

Fields (‘data keys’) to exclude. By default none are excluded. This parameter is mutually exclusive with include.

sub_dict{“data”, “timestamps”}, optional

Which sub-dict in the EventPage to use

configuration_forstr

The name of an object (e.g. device) whose configuration we want to read.

**kwargs

Additional keyword arguments are passed through to the base class.

configure_new(**kwargs)[source]

Return self or, if args are provided, some new instance of type(self).

This is here so that the user does not have to remember whether a given variable is a BlueskyRun or an Entry with a Bluesky Run. In either case, obj() will return a BlueskyRun.

property has_been_persisted

The base class does not interact with persistence

property is_persisted

The base class does not interact with persistence

persist(*args, **kwargs)[source]

Save data from this source to local persistent storage

Parameters
ttl: numeric, optional

Time to live in seconds. If provided, the original source will be accessed and a new persisted version written transparently when more than ttl seconds have passed since the old persisted version was written.

kargs: passed to the _persist method on the base container.
read()[source]

Return data from this Event Stream as an xarray.Dataset.

This loads all of the data into memory. For delayed (“lazy”), chunked access to the data, see to_dask().

read_partition(partition)[source]

Fetch one chunk of documents.

to_dask()[source]

Return data from this Event Stream as an xarray.Dataset backed by dask.

databroker.core.discover_handlers(entrypoint_group_name='databroker.handlers', skip_failures=True)[source]

Discover handlers via entrypoints.

Parameters
entrypoint_group_name: str

Default is ‘databroker.handlers’, the “official” databroker entrypoint for handlers.

skip_failures: boolean

True by default. Errors loading a handler class are converted to warnings if this is True.

Returns
handler_registry: dict

A suitable default handler registry

databroker.core.parse_handler_registry(handler_registry)[source]

Parse mapping of spec name to ‘import path’ into mapping to class itself.

Parameters
handler_registrydict

Values may be string ‘import paths’ to classes or actual classes.

Examples

Pass in name; get back actual class.

>>> parse_handler_registry({'my_spec': 'package.module.ClassName'})
{'my_spec': <package.module.ClassName>}
databroker.core.parse_transforms(transforms)[source]

Parse mapping of spec name to ‘import path’ into mapping to class itself.

Parameters
transformscollections.abc.Mapping or None

A collections.abc.Mapping or subclass, that maps any subset of the keys {start, stop, resource, descriptor} to a function (or a string import path) that accepts a document of the corresponding type and returns it, potentially modified. This feature is for patching up erroneous metadata. It is intended for quick, temporary fixes that may later be applied permanently to the data at rest (e.g via a database migration).

Examples

Pass in name; get back actual class.

>>> parse_transforms({'descriptor': 'package.module.function_name'})
{'descriptor': <package.module.function_name>}
class databroker.v2.Broker(*args, **kwargs)[source]

This is a thin wrapper around intake.Catalog.

It includes an accessor the databroker API version 1.

Parameters
handler_registry: dict, optional

This is passed to the Filler or whatever class is given in the filler_class parameter below.

Maps each ‘spec’ (a string identifying a given type or external resource) to a handler class.

A ‘handler class’ may be any callable with the signature:

handler_class(resource_path, root, **resource_kwargs)

It is expected to return an object, a ‘handler instance’, which is also callable and has the following signature:

handler_instance(**datum_kwargs)

As the names ‘handler class’ and ‘handler instance’ suggest, this is typically implemented using a class that implements __init__ and __call__, with the respective signatures. But in general it may be any callable-that-returns-a-callable.

root_map: dict, optional

This is passed to Filler or whatever class is given in the filler_class parameter below.

str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a root in root_map will be loaded using the mapped root.

filler_class: type

This is Filler by default. It can be a Filler subclass, functools.partial(Filler, ...), or any class that provides the same methods as DocumentRouter.

transforms: dict

A dict that maps any subset of the keys {start, stop, resource, descriptor} to a function that accepts a document of the corresponding type and returns it, potentially modified. This feature is for patching up erroneous metadata. It is intended for quick, temporary fixes that may later be applied permanently to the data at rest (e.g., via a database migration).

**kwargs

Additional keyword arguments are passed through to the base class, Catalog.

items()[source]

Get an iterator over (key, source) tuples for the catalog entries.

property v1

Accessor to the version 1 API.

property v2

A self-reference. This makes v1.Broker and v2.Broker symmetric.

Utils

databroker.utils.catalog_search_path()[source]

List directories that will be searched for catalog YAML files.

This is a convenience wrapper around functions used by intake to determine its search path.

Returns
directories: tuple
databroker.v2.temp()[source]

Generate a Catalog backed by a temporary directory of msgpack-encoded files.

databroker.v1.temp()[source]

Backend-Specific Catalogs

Note

These drivers are currently being developed in databroker itself, but will eventually be split out into separate repositories to isolate dependencies and release cycles. This will be done once the internal interfaces are stable.

class databroker._drivers.jsonl.BlueskyJSONLCatalog(*args, **kwargs)[source]
search(query)[source]

Return a new Catalog with a subset of the entries in this Catalog.

Parameters
querydict
class databroker._drivers.mongo_embedded.BlueskyMongoCatalog(*args, **kwargs)[source]
search(query)[source]

Return a new Catalog with a subset of the entries in this Catalog.

Parameters
querydict

MongoDB query.

class databroker._drivers.mongo_normalized.BlueskyMongoCatalog(*args, **kwargs)[source]
search(query, **kwargs)[source]

Return a new Catalog with a subset of the entries in this Catalog.

Parameters
querydict

MongoDB query.

**kwargs

Options passed through to the pymongo find() method

stats()[source]

Access MongoDB storage statistics for this database.

class databroker._drivers.msgpack.BlueskyMsgpackCatalog(*args, **kwargs)[source]
search(query)[source]

Return a new Catalog with a subset of the entries in this Catalog.

Parameters
querydict