from collections.abc import Iterable
from collections import defaultdict
from datetime import datetime
import pandas
import re
import warnings
import time
import humanize
import jinja2
import os
import shutil
from types import SimpleNamespace
import tzlocal
import xarray
import event_model
import intake
import pymongo
# Toolz and CyToolz have identical APIs -- same test suite, docstrings.
try:
from cytoolz.dicttoolz import merge
except ImportError:
from toolz.dicttoolz import merge
from .utils import (ALL, format_time, get_fields, wrap_in_deprecated_doct,
ensure_path_exists, lookup_config,
transpose)
# The v2 API is expected to grow more options for filled than just True/False
# (e.g. 'delayed') so it expects a string instead of a boolean.
_FILL = {True: 'yes', False: 'no'}
[docs]def temp_config():
raise NotImplementedError("Use temp() instead, which returns a v1.Broker.")
[docs]def temp():
from .v2 import temp
catalog = temp()
return Broker(catalog)
class Registry:
"""
An accessor that serves as a backward-compatible shim for Broker.reg
"""
def __init__(self, catalog):
self._catalog = catalog
@property
def handler_reg(self):
return self._catalog.handler_registry
@property
def root_map(self):
return self._catalog.root_map
def register_handler(self, key, handler, overwrite=False):
return self._catalog.register_handler(
key, handler, overwrite=overwrite)
def deregister_handler(self, key):
return self._catalog.deregister_handler(key)
def copy_files(self, resource, new_root,
verify=False, file_rename_hook=None,
run_start_uid=None):
"""
Copy files associated with a resource to a new directory.
The registered handler must have a `get_file_list` method and the
process running this method must have read/write access to both the
source and destination file systems.
This method does *not* update the assets dataregistry_template.
Internally the resource level directory information is stored
as two parts: the root and the resource_path. The 'root' is
the non-semantic component (typically a mount point) and the
'resource_path' is the 'semantic' part of the file path. For
example, it is common to collect data into paths that look like
``/mnt/DATA/2016/04/28``. In this case we could split this as
``/mnt/DATA`` as the 'root' and ``2016/04/28`` as the resource_path.
Parameters
----------
resource : Document
The resource to move the files of
new_root : str
The new 'root' to copy the files into
verify : bool, optional (False)
Verify that the move happened correctly. This currently
is not implemented and will raise if ``verify == True``.
file_rename_hook : callable, optional
If provided, must be a callable with signature ::
def hook(file_counter, total_number, old_name, new_name):
pass
This will be run in the inner loop of the file copy step and is
run inside of an unconditional try/except block.
See Also
--------
`RegistryMoving.shift_root`
`RegistryMoving.change_root`
"""
if verify:
raise NotImplementedError('Verification is not implemented yet')
def rename_hook_wrapper(hook):
if hook is None:
def noop(n, total, old_name, new_name):
return
return noop
def safe_hook(n, total, old_name, new_name):
try:
hook(n, total, old_name, new_name)
except Exception:
pass
return safe_hook
file_rename_hook = rename_hook_wrapper(file_rename_hook)
run_start_uid = resource.get('run_start', run_start_uid)
if run_start_uid is None:
raise ValueError(
"If the Resource document has no `run_start` key, the "
"caller must provide run_start_uid.")
file_list = self._catalog[run_start_uid].get_file_list(resource)
# check that all files share the same root
old_root = resource.get('root')
if not old_root:
warnings.warn("There is no 'root' in this resource which "
"is required to be able to change the root. "
"Please use `fs.shift_root` to move some of "
"the path from the 'resource_path' to the "
"'root'. For now assuming '/' as root")
old_root = os.path.sep
for f in file_list:
if not f.startswith(old_root):
raise RuntimeError('something is very wrong, the files '
'do not all share the same root, ABORT')
# sort out where new files should go
new_file_list = [os.path.join(new_root,
os.path.relpath(f, old_root))
for f in file_list]
N = len(new_file_list)
# copy the files to the new location
for n, (fin, fout) in enumerate(zip(file_list, new_file_list)):
# copy files
file_rename_hook(n, N, fin, fout)
ensure_path_exists(os.path.dirname(fout))
shutil.copy2(fin, fout)
return zip(file_list, new_file_list)
[docs]class Broker:
"""
This supports the original Broker API but implemented on intake.Catalog.
"""
def __init__(self, catalog, *, serializer=None,
external_fetchers=None):
self._catalog = catalog
self.__serializer = serializer
self.external_fetchers = external_fetchers or {}
self.prepare_hook = wrap_in_deprecated_doct
self.aliases = {}
self.filters = {}
self.v2._Broker__v1 = self
self._reg = Registry(catalog)
@property
def _serializer(self):
if self.__serializer is None:
# The method _get_serializer is an optional method implememented on
# some Broker subclasses to support the Broker.insert() method,
# which is pending deprecation.
if hasattr(self._catalog, '_get_serializer'):
self.__serializer = self._catalog._get_serializer()
return self.__serializer
@property
def reg(self):
"Registry of externally-stored data"
return self._reg
@property
def name(self):
return self._catalog.name
@property
def v1(self):
"A self-reference. This makes v1.Broker and v2.Broker symmetric."
return self
@property
def v2(self):
"Accessor to the version 2 API."
return self._catalog
[docs] @classmethod
def from_config(cls, config, auto_register=True, name=None):
return from_config(
config=config, auto_register=auto_register, name=name)
[docs] def get_config(self):
"""
Return the v0 config dict this was created from, or None if N/A.
"""
if hasattr(self, '_config'):
return self._config
[docs] @classmethod
def named(cls, name, auto_register=True):
"""
Create a new Broker instance using a configuration file with this name.
Configuration file search path:
* ``~/.config/databroker/{name}.yml``
* ``{python}/../etc/databroker/{name}.yml``
* ``/etc/databroker/{name}.yml``
where ``{python}`` is the location of the current Python binary, as
reported by ``sys.executable``. It will use the first match it finds.
Special Case: The name ``'temp'`` creates a new, temporary
configuration. Subsequent calls to ``Broker.named('temp')`` will
create separate configurations. Any data saved using this temporary
configuration will not be accessible once the ``Broker`` instance has
been deleted.
Parameters
----------
name : string
auto_register : boolean, optional
By default, automatically register built-in asset handlers (classes
that handle I/O for externally stored data). Set this to ``False``
to do all registration manually.
Returns
-------
db : Broker
"""
if name == 'temp':
return temp()
else:
try:
config = lookup_config(name)
except FileNotFoundError:
# Continue on to the v2 way.
pass
else:
db = cls.from_config(config, auto_register=auto_register, name=name)
return db
catalog = getattr(intake.cat, name)
return Broker(catalog)
@property
def fs(self):
warnings.warn("fs is deprecated, use `db.reg` instead", stacklevel=2)
return self.reg
[docs] def fetch_external(self, start, stop):
return {k: func(start, stop) for
k, func in self.external_fetchers.items()}
def _patch_state(self, catalog):
"Copy references to v1 state."
catalog.v1.aliases = self.aliases
catalog.v1.filters = self.filters
catalog.v1.prepare_hook = self.prepare_hook
[docs] def __call__(self, text_search=None, **kwargs):
data_key = kwargs.pop('data_key', None)
tz = tzlocal.get_localzone()
try:
tz = tz.key
except AttributeError:
tz = tz.zone
if self.filters:
filters = self.filters.copy()
format_time(filters, tz) # mutates in place
catalog = self._catalog.search(filters)
self._patch_state(catalog)
else:
catalog = self._catalog
if text_search:
kwargs.update({'$text': {'$search': text_search}})
format_time(kwargs, tz) # mutates in place
result_catalog = catalog.search(kwargs)
self._patch_state(result_catalog)
return Results(self, result_catalog,
data_key)
[docs] def __getitem__(self, key):
# If this came from a client, we might be getting '-1'.
if not isinstance(key, str) and isinstance(key, Iterable):
return [self[item] for item in key]
if isinstance(key, slice):
if key.start is not None and key.start > -1:
raise ValueError("slice.start must be negative. You gave me "
"key=%s The offending part is key.start=%s"
% (key, key.start))
if key.stop is not None and key.stop > 0:
raise ValueError("slice.stop must be <= 0. You gave me key=%s. "
"The offending part is key.stop = %s"
% (key, key.stop))
if key.start is None:
raise ValueError("slice.start cannot be None because we do not "
"support slicing infinitely into the past; "
"the size of the result is non-deterministic "
"and could become too large.")
return [self[index]
for index in reversed(range(key.start, key.stop or 0, key.step or 1))]
datasource = self._catalog[key]
return Header(datasource)
get_fields = staticmethod(get_fields)
[docs] def get_documents(self,
headers, stream_name=ALL, fields=None, fill=False,
handler_registry=None):
"""
Get all documents from one or more runs.
Parameters
----------
headers : Header or iterable of Headers
The headers to fetch the events for
stream_name : str, optional
Get events from only "event stream" with this name.
Default is `ALL` which yields documents for all streams.
fields : List[str], optional
whitelist of field names of interest; if None, all are returned
Default is None
fill : bool or Iterable[str], optional
Which fields to fill. If `True`, fill all
possible fields.
Each event will have the data filled for the intersection
of it's external keys and the fields requested filled.
Default is False
handler_registry : dict, optional
mapping asset pecs (strings) to handlers (callable classes)
Yields
------
name : str
The name of the kind of document
doc : dict
The payload, may be RunStart, RunStop, EventDescriptor, or Event.
Raises
------
ValueError if any key in `fields` is not in at least one descriptor
pre header.
"""
if handler_registry is not None:
raise NotImplementedError("The handler_registry must be set when "
"the Broker is initialized, usually specified "
"in a configuration file.")
headers = _ensure_list(headers)
no_fields_filter = False
if fields is None:
no_fields_filter = True
fields = []
fields = set(fields)
comp_re = _compile_re(fields)
for header in headers:
uid = header.start['uid']
descs = header.descriptors
per_desc_discards = {}
per_desc_extra_data = {}
per_desc_extra_ts = {}
for d in descs:
(all_extra_dk, all_extra_data,
all_extra_ts, discard_fields) = _extract_extra_data(
header.start, header.stop, d, fields, comp_re,
no_fields_filter)
per_desc_discards[d['uid']] = discard_fields
per_desc_extra_data[d['uid']] = all_extra_data
per_desc_extra_ts[d['uid']] = all_extra_ts
d = d.copy()
dict.__setitem__(d, 'data_keys', d['data_keys'].copy())
for k in discard_fields:
del d['data_keys'][k]
d['data_keys'].update(all_extra_dk)
if not len(d['data_keys']) and not len(all_extra_data):
continue
def merge_config_into_event(event):
# Mutate event in place, adding in data and timestamps from the
# descriptor's 'configuration' key.
event_data = event['data'] # cache for perf
desc = event['descriptor']
event_timestamps = event['timestamps']
event_data.update(per_desc_extra_data[desc])
event_timestamps.update(per_desc_extra_ts[desc])
discard_fields = per_desc_discards[desc]
for field in discard_fields:
del event_data[field]
del event_timestamps[field]
get_documents_router = _GetDocumentsRouter(self.prepare_hook,
merge_config_into_event,
stream_name=stream_name)
for name, doc in self._catalog[uid].documents(fill=_FILL[bool(fill)],
strict_order=True):
yield from get_documents_router(name, doc)
[docs] def get_events(self,
headers, stream_name='primary', fields=None, fill=False,
handler_registry=None):
"""
Get Event documents from one or more runs.
Parameters
----------
headers : Header or iterable of Headers
The headers to fetch the events for
stream_name : str, optional
Get events from only "event stream" with this name.
Default is 'primary'
fields : List[str], optional
whitelist of field names of interest; if None, all are returned
Default is None
fill : bool or Iterable[str], optional
Which fields to fill. If `True`, fill all
possible fields.
Each event will have the data filled for the intersection
of it's external keys and the fields requested filled.
Default is False
handler_registry : dict, optional
mapping asset specs (strings) to handlers (callable classes)
Yields
------
event : Event
The event, optionally with non-scalar data filled in
Raises
------
ValueError if any key in `fields` is not in at least one descriptor
pre header.
"""
if handler_registry is not None:
raise NotImplementedError("The handler_registry must be set when "
"the Broker is initialized, usually specified "
"in a configuration file.")
for name, doc in self.get_documents(headers,
fields=fields,
stream_name=stream_name,
fill=fill,
handler_registry=handler_registry):
if name == 'event':
yield doc
[docs] def get_table(self,
headers, stream_name='primary', fields=None, fill=False,
handler_registry=None,
convert_times=True, timezone=None, localize_times=True):
"""
Load the data from one or more runs as a table (``pandas.DataFrame``).
Parameters
----------
headers : Header or iterable of Headers
The headers to fetch the events for
stream_name : str, optional
Get events from only "event stream" with this name.
Default is 'primary'
fields : List[str], optional
whitelist of field names of interest; if None, all are returned
Default is None
fill : bool or Iterable[str], optional
Which fields to fill. If `True`, fill all
possible fields.
Each event will have the data filled for the intersection
of it's external keys and the fields requested filled.
Default is False
handler_registry : dict, optional
mapping filestore specs (strings) to handlers (callable classes)
convert_times : bool, optional
Whether to convert times from float (seconds since 1970) to
numpy datetime64, using pandas. True by default.
timezone : str, optional
e.g., 'US/Eastern'; if None, use metadatastore configuration in
`self.mds.config['timezone']`
handler_registry : dict, optional
mapping asset specs (strings) to handlers (callable classes)
localize_times : bool, optional
If the times should be localized to the 'local' time zone. If
True (the default) the time stamps are converted to the localtime
zone (as configure in mds).
This is problematic for several reasons:
- apparent gaps or duplicate times around DST transitions
- incompatibility with every other time stamp (which is in UTC)
however, this makes the dataframe repr look nicer
This implies convert_times.
Defaults to True to preserve back-compatibility.
Returns
-------
table : pandas.DataFrame
"""
if handler_registry is not None:
raise NotImplementedError(
"The handler_registry must be set when "
"the Broker is initialized, usually specified "
"in a configuration file.")
headers = _ensure_list(headers)
# TODO --- Use local time I guess.
# if timezone is None:
# timezone = self.mds.config['timezone']
no_fields_filter = False
if fields is None:
no_fields_filter = True
fields = []
fields = set(fields)
comp_re = _compile_re(fields)
dfs = []
for header in headers:
descs = header.descriptors
start = header.start
stop = header.stop
descs = [desc for desc in descs if desc.get('name') == stream_name]
for descriptor in descs:
(all_extra_dk, all_extra_data,
all_extra_ts, discard_fields) = _extract_extra_data(
start, stop, descriptor, fields, comp_re, no_fields_filter)
all_events = [
doc for name, doc in
self.get_documents(header, stream_name=stream_name, fill=fill)
if name == 'event' and
doc['descriptor'] == descriptor['uid']]
seq_nums = [ev['seq_num'] for ev in all_events]
times = [ev['time'] for ev in all_events]
keys = list(descriptor['data_keys'])
data = transpose(all_events, keys, 'data')
# timestamps = transpose(all_events, keys, 'timestamps')
df = pandas.DataFrame(index=seq_nums)
# if converting to datetime64 (in utc or 'local' tz)
if convert_times or localize_times:
times = pandas.to_datetime(times, unit='s')
# make sure this is a series
times = pandas.Series(times, index=seq_nums)
# if localizing to 'local' time
if localize_times:
times = (times
.dt.tz_localize('UTC') # first make tz aware
# .dt.tz_convert(timezone) # convert to 'local'
.dt.tz_localize(None) # make naive again
)
df['time'] = times
for field, values in data.items():
if field in discard_fields:
continue
df[field] = values
if list(df.columns) == ['time']:
# no content
continue
for field, v in all_extra_data.items():
df[field] = v
dfs.append(df)
if dfs:
result = pandas.concat(dfs)
else:
# edge case: no data
result = pandas.DataFrame()
result.index.name = 'seq_num'
return result
[docs] def get_images(self, headers, name,
stream_name='primary',
handler_registry=None,):
"""
This method is deprecated. Use Broker.get_documents instead.
Load image data from one or more runs into a lazy array-like object.
Parameters
----------
headers : Header or list of Headers
name : string
field name (data key) of a detector
handler_registry : dict, optional
mapping spec names (strings) to handlers (callable classes)
Examples
--------
>>> header = db[-1]
>>> images = Images(header, 'my_detector_lightfield')
>>> for image in images:
# do something
"""
# Defer this import so that pims is an optional dependency.
from ._legacy_images import Images
headers = _ensure_list(headers)
datasets = [header.xarray_dask(stream_name=stream_name)
for header in headers]
if handler_registry is not None:
raise NotImplementedError(
"The handler_registry parameter is no longer supported "
"and must be None.")
dataset = xarray.merge(datasets)
data_array = dataset[name]
return Images(data_array=data_array)
[docs] def alias(self, key, **query):
"""
Create an alias for a query.
Parameters
----------
key : string
must be a valid Python identifier
query :
keyword argument comprising a query
Examples
--------
Define an alias that searches for headers with purpose='calibration'.
>>> db.alias('cal', purpose='calibration')
Use it.
>>> headers = db.cal # -> db(purpose='calibration')
Review defined aliases.
>>> db.aliases
{'cal': {'purpose': 'calibration'}}
"""
if hasattr(self, key) and key not in self.aliases:
raise ValueError("'%s' is not a legal alias." % key)
self.aliases[key] = query
[docs] def dynamic_alias(self, key, func):
"""
Create an alias for a "dynamic" query, a function that returns a query.
Parameters
----------
key : string
must be a valid Python identifier
func : callable
When called with no arguments, must return a dict that is a valid
query.
Examples
--------
Define an alias to get headers from the last 24 hours.
>>> import time
>>> db.dynamic_alias('today',
... lambda: {'since': time.time() - 24*60*60})
Use it.
>>> headers = db.today
Define an alias to get headers with the 'user' field in metadata
matches the current logged-in user.
>>> import getpass
>>> db.dynamic_alias('mine', lambda: {'user': getpass.getuser()})
Use it
>>> headers = db.mine
"""
if hasattr(self, key) and key not in self.aliases:
raise ValueError("'%s' is not a legal alias." % key)
self.aliases[key] = func
[docs] def add_filter(self, **kwargs):
"""
Add query to the list of 'filter' queries.
Any query passed to ``db.add_filter()`` is stashed and "AND-ed" with
all future queries.
``db.add_filter(**kwargs)`` is just a convenient way to spell
``db.filters.update(**kwargs)``.
Examples
--------
Filter all searches to restrict results to a specific user after a
March 2017.
>>> db.add_filter(user='Dan')
>>> db.add_filter(since='2017-3')
The following query is equivalent to
``db(user='Dan', plan_name='scan')``.
>>> db(plan_name='scan')
Review current filters.
>>> db.filters
{'user': 'Dan', 'since': '2017-3'}
Clear filters.
>>> db.clear_filters()
See Also
--------
:meth:`Broker.clear_filters`
"""
self.filters.update(**kwargs)
[docs] def clear_filters(self, **kwargs):
"""
Clear all 'filter' queries.
Filter queries are combined with every given query using '$and',
acting as a filter to restrict the results.
``Broker.clear_filters()`` is just a convenient way to spell
``Broker.filters.clear()``.
See Also
--------
:meth:`Broker.add_filter`
"""
self.filters.clear()
def __getattr__(self, key):
try:
query = self.aliases[key]
except KeyError:
raise AttributeError(key)
if callable(query):
query = query()
return self(**query)
[docs] def restream(self, headers, fields=None, fill=False):
"""
Get all Documents from given run(s).
This output can be used as a drop-in replacement for the output of the
bluesky Run Engine.
Parameters
----------
headers : Header or iterable of Headers
header or headers to fetch the documents for
fields : list, optional
whitelist of field names of interest; if None, all are returned
fill : bool, optional
Whether externally-stored data should be filled in. Defaults to
False.
Yields
------
name, doc : tuple
string name of the Document type and the Document itself.
Example: ('start', {'time': ..., ...})
Examples
--------
>>> def f(name, doc):
... # do something
...
>>> h = db[-1] # most recent header
>>> for name, doc in restream(h):
... f(name, doc)
See Also
--------
:meth:`Broker.process`
"""
for payload in self.get_documents(headers, fields=fields, fill=fill):
yield payload
stream = restream # compat
[docs] def process(self, headers, func, fields=None, fill=False):
"""
Pass all the documents from one or more runs into a callback.
This output can be used as a drop-in replacement for the output of the
bluesky Run Engine.
Parameters
----------
headers : Header or iterable of Headers
header or headers to process documents from
func : callable
function with the signature `f(name, doc)`
where `name` is a string and `doc` is a dict
fields : list, optional
whitelist of field names of interest; if None, all are returned
fill : bool, optional
Whether externally-stored data should be filled in. Defaults to
False.
Examples
--------
>>> def f(name, doc):
... # do something
...
>>> h = db[-1] # most recent header
>>> process(h, f)
See Also
--------
:meth:`Broker.restream`
"""
for name, doc in self.get_documents(headers, fields=fields, fill=fill):
func(name, doc)
[docs] def export(self, headers, db, new_root=None, copy_kwargs=None):
"""
Serialize a list of runs.
If a new_root is passed files associated with the run will be moved to
this new location, and the corresponding resource document will be
updated with the new_root.
Parameters
----------
headers : databroker.header
one or more run headers that are going to be exported
db : databroker.Broker
an instance of databroker.Broker class that will be the target to
export info
new_root : str
optional. root directory of files that are going to
be exported
copy_kwargs : dict or None
passed through to the ``copy_files`` method on Registry;
None by default
Returns
------
file_pairs : list
list of (old_file_path, new_file_path) pairs generated by
``copy_files`` method on Registry.
"""
if copy_kwargs is None:
copy_kwargs = {}
if isinstance(headers, Header):
headers = [headers]
file_pairs = []
for header in headers:
for name, doc in self._catalog[header.start['uid']].documents(fill='no'):
if name == 'event_page':
for event in event_model.unpack_event_page(doc):
db.insert('event', event)
elif name == 'resource' and new_root:
copy_kwargs.setdefault('run_start_uid', header.start['uid'])
file_pairs.extend(self.reg.copy_files(doc, new_root, **copy_kwargs))
new_resource = doc.to_dict()
new_resource['root'] = new_root
db.insert(name, new_resource)
else:
db.insert(name, doc)
return file_pairs
[docs] def export_size(self, headers):
"""
Get the size of files associated with a list of headers.
Parameters
----------
headers : :class:databroker.Header:
one or more headers that are going to be exported
Returns
-------
total_size : float
total size of all the files associated with the ``headers`` in Gb
"""
headers = _ensure_list(headers)
total_size = 0
for header in headers:
run = self._catalog[header.start['uid']]
for name, doc in self._catalog[header.start['uid']].documents(fill='no'):
if name == 'resource':
for filepath in run.get_file_list(doc):
total_size += os.path.getsize(filepath)
return total_size * 1e-9
[docs] def insert(self, name, doc):
if self._serializer is None:
raise RuntimeError("No Serializer was configured for this.")
warnings.warn(
"The method Broker.insert may be removed in a future release of "
"databroker.", PendingDeprecationWarning)
self._serializer(name, doc)
# Make a reasonable effort to keep the Catalog in sync with new data.
if name == 'stop':
self._catalog.force_reload()
[docs] def fill_event(*args, **kwargs):
raise NotImplementedError("This method is no longer supported. If you "
"need this please contact the developers by "
"opening an issue here: "
"https://github.com/bluesky/databroker/issues/new ")
[docs] def fill_events(*args, **kwargs):
raise NotImplementedError("This method is no longer supported. If you "
"need this please contact the developers by "
"opening an issue here: "
"https://github.com/bluesky/databroker/issues/new ")
[docs] def stats(self):
"Access MongoDB storage statistics for this database."
return self.v2.stats()
class Results:
"""
Iterable object encapsulating a results set of Headers
Parameters
----------
catalog : Catalog
search results
data_key : string or None
Special query parameter that filters results
"""
def __init__(self, broker, catalog, data_key):
self._broker = broker
self._catalog = catalog
self._data_key = data_key
def __iter__(self):
# TODO Catalog.walk() fails. We should probably support Catalog.items().
for uid, entry in self._catalog._entries.items():
header = Header(entry())
if self._data_key is None:
yield header
else:
# Only include this header in the result if `data_key` is found
# in one of its descriptors' data_keys.
for descriptor in header.descriptors:
if self._data_key in descriptor['data_keys']:
yield header
break
def _ensure_list(headers):
try:
headers.items()
except AttributeError:
return headers
else:
return [headers]
def _compile_re(fields=[]):
"""
Return a regular expression object based on a list of regular expressions.
Parameters
----------
fields : list, optional
List of regular expressions. If fields is empty returns a general RE.
Returns
-------
comp_re : regular expression object
"""
if len(fields) == 0:
fields = ['.*']
f = ["(?:" + regex + r")\Z" for regex in fields]
comp_re = re.compile('|'.join(f))
return comp_re
def _extract_extra_data(start, stop, d, fields, comp_re,
no_fields_filter):
def _project_header_data(source_data, source_ts,
selected_fields, comp_re):
"""Extract values from a header for merging into events
Parameters
----------
source : dict
selected_fields : set
comp_re : SRE_Pattern
Returns
-------
data_keys : dict
data : dict
timestamps : dict
"""
fields = (set(filter(comp_re.match, source_data)) - selected_fields)
data = {k: source_data[k] for k in fields}
timestamps = {k: source_ts[k] for k in fields}
return {}, data, timestamps
if fields:
event_fields = set(d['data_keys'])
selected_fields = set(filter(comp_re.match, event_fields))
discard_fields = event_fields - selected_fields
else:
discard_fields = set()
selected_fields = set(d['data_keys'])
objs_config = d.get('configuration', {}).values()
config_data = merge(obj_conf['data'] for obj_conf in objs_config)
config_ts = merge(obj_conf['timestamps']
for obj_conf in objs_config)
all_extra_data = {}
all_extra_ts = {}
all_extra_dk = {}
if not no_fields_filter:
for dt, ts in [(config_data, config_ts),
(start, defaultdict(lambda: start['time'])),
(stop, defaultdict(lambda: stop['time']))]:
# Look in the descriptor, then start, then stop.
l_dk, l_data, l_ts = _project_header_data(
dt, ts, selected_fields, comp_re)
all_extra_data.update(l_data)
all_extra_ts.update(l_ts)
selected_fields.update(l_data)
all_extra_dk.update(l_dk)
return (all_extra_dk, all_extra_data, all_extra_ts,
discard_fields)
_HTML_TEMPLATE = """
{% macro rtable(doc, cap) -%}
<table>
<caption> {{ cap }} </caption>
{%- for key, value in doc | dictsort recursive -%}
<tr>
<th> {{ key }} </th>
<td>
{%- if value.items -%}
<table>
{{ loop(value | dictsort) }}
</table>
{%- elif value is iterable and value is not string -%}
<table>
{%- set outer_loop = loop -%}
{%- for stuff in value -%}
{%- if stuff.items -%}
{{ outer_loop(stuff | dictsort) }}
{%- else -%}
<tr><td>{{ stuff }}</td></tr>
{%- endif -%}
{%- endfor -%}
</table>
{%- else -%}
{%- if key == 'time' -%}
{{ value | human_time }}
{%- else -%}
{{ value }}
{%- endif -%}
{%- endif -%}
</td>
</tr>
{%- endfor -%}
</table>
{%- endmacro %}
<table>
<tr>
<td>{{ rtable(document.start, 'Start') }}</td>
</tr
<tr>
<td>{{ rtable(document.stop, 'Stop') }}</td>
</tr>
<tr>
<td>
<table>
<caption>Descriptors</caption>
{%- for d in document.descriptors -%}
<tr>
<td> {{ rtable(d, d.get('name')) }} </td>
</tr>
{%- endfor -%}
</table>
</td>
</tr>
</table>
"""
def _pretty_print_time(timestamp):
# timestamp needs to be a float or fromtimestamp() will barf
timestamp = float(timestamp)
dt = datetime.fromtimestamp(timestamp).isoformat()
ago = humanize.naturaltime(time.time() - timestamp)
return '{ago} ({date})'.format(ago=ago, date=dt)
class InvalidConfig(Exception):
"""Raised when the configuration file is invalid."""
...
def from_config(config, auto_register=True, name=None):
"""
Build (some version of) a Broker instance from a v0 configuration dict.
This method accepts v1 config files.
This can return a ``v0.Broker``, ``v1.Broker``, or ``v2.Broker`` depending
on the contents of ``config``.
If config contains the key 'api_version', it should be set to a value 0, 1,
0, or 2. That setting will be respected until there is an error, in which
case a warning will be issued and we will fall back to v0. If no
'api_version' is explicitly set by the configuration file, version 1 will
be used.
"""
forced_version = config.get('api_version')
if forced_version == 0:
from . import v0
return v0.Broker.from_config(config, auto_register, name)
try:
catalog = _from_v0_config(config, auto_register, name)
except InvalidConfig:
raise
except Exception as exc:
warnings.warn(
f"Failed to load config. Falling back to v0."
f"Exception was: {exc}")
from . import v0
return v0.Broker.from_config(config, auto_register, name)
if forced_version == 2:
return catalog
elif forced_version is None or forced_version == 1:
broker = Broker(catalog)
broker._config = config # HACK to support Broker.get_config()
return broker
else:
raise ValueError(f"Cannot handle api_version {forced_version}")
def _from_v0_config(config, auto_register, name):
mds_module = config['metadatastore']['module']
if mds_module != 'databroker.headersource.mongo':
raise NotImplementedError(
f"Unable to handle metadatastore.module {mds_module!r}")
mds_class = config['metadatastore']['class']
if mds_class not in ('MDS', 'MDSRO'):
raise NotImplementedError(
f"Unable to handle metadatastore.class {mds_class!r}")
assets_module = config['assets']['module']
if assets_module != 'databroker.assets.mongo':
raise NotImplementedError(
f"Unable to handle assets.module {assets_module!r}")
assets_class = config['assets']['class']
if assets_class not in ('Registry', 'RegistryRO'):
raise NotImplementedError(
f"Unable to handle assets.class {assets_class!r}")
# Get the mongo databases.
metadatastore_db = _get_mongo_database(config['metadatastore']['config'])
asset_registry_db = _get_mongo_database(config['assets']['config'])
from ._drivers.mongo_normalized import BlueskyMongoCatalog
from .core import discover_handlers
# Update the handler registry.
handler_registry = {}
if auto_register:
handler_registry.update(discover_handlers())
# In v0, config-specified handlers are *added* to any default ones.
for spec, contents in config.get('handlers', {}).items():
dotted_object = '.'.join((contents['module'], contents['class']))
handler_registry[spec] = dotted_object
root_map = config.get('root_map')
transforms = config.get('transforms')
return BlueskyMongoCatalog(metadatastore_db, asset_registry_db,
handler_registry=handler_registry,
root_map=root_map,
name=name,
transforms=transforms)
_mongo_clients = {} # cache of pymongo.MongoClient instances
def _get_mongo_database(config):
"""
Return a MongoClient.database. Use a cache in order to reuse the
MongoClient.
"""
# Check that config contains either uri, or host/port, but not both.
if {'uri', 'host'} <= set(config) or {'uri', 'port'} <= set(config):
raise InvalidConfig(
"The config file must define either uri, or host/port, but not both.")
uri = config.get('uri')
database = config['database']
# If this statement is True then uri does not exist in the config.
# If the config has username and password, turn it into a uri.
# This is only here for backward compatibility.
if {'mongo_user', 'mongo_pwd', 'host', 'port'} <= set(config):
uri = (f"mongodb://{config['mongo_user']}:{config['mongo_pwd']}@"
f"{config['host']}:{config['port']}/")
if uri:
if 'authsource' in config:
uri += f'?authsource={config["authsource"]}'
try:
client = _mongo_clients[uri]
except KeyError:
client = pymongo.MongoClient(uri)
_mongo_clients[uri] = client
else:
host = config.get('host')
port = config.get('port')
try:
client = _mongo_clients[(host, port)]
except KeyError:
client = pymongo.MongoClient(host, port)
_mongo_clients[(host, port)] = client
return client[database]
class _GetDocumentsRouter:
"""
This is used by Broker.get_documents.
It employs a pattern similar to event_model.DocumentRouter, but the methods
are generators instead of functions.
"""
def __init__(self, prepare_hook, merge_config_into_event, stream_name):
self.prepare_hook = prepare_hook
self.merge_config_into_event = merge_config_into_event
self.stream_name = stream_name
self._descriptors = set()
def __call__(self, name, doc):
# Special case when there is no Run Stop doc.
# In v0, we returned an empty dict here. We now think better of it.
if name == 'stop' and doc is None:
doc = {}
for new_name, new_doc in getattr(self, name)(doc):
yield new_name, self.prepare_hook(new_name, new_doc)
def descriptor(self, doc):
"Cache descriptor uid and pass it through if it is stream of interest."
if self.stream_name is ALL or doc.get('name', 'primary') == self.stream_name:
self._descriptors.add(doc['uid'])
yield 'descriptor', doc
def event_page(self, doc):
"Unpack into events and pass them to event method for more processing."
if doc['descriptor'] in self._descriptors:
for event in event_model.unpack_event_page(doc):
yield from self.event(event)
def event(self, doc):
"Apply merge_config_into_event."
if doc['descriptor'] in self._descriptors:
# Mutate event in place, merging in content from other documents
# and discarding fields excluded by the user.
self.merge_config_into_event(doc)
# If the mutation above leaves event['data'] empty, omit it.
if doc['data']:
yield 'event', doc
def datum_page(self, doc):
"Unpack into datum."
for datum in event_model.unpack_datum_page(doc):
yield 'datum', datum
def datum(self, doc):
yield 'datum', doc
def start(self, doc):
yield 'start', doc
def stop(self, doc):
yield 'stop', doc
def resource(self, doc):
yield 'resource', doc