Source code for databroker._drivers.mongo_embedded

import collections.abc
import event_model
from sys import maxsize
from functools import partial
import logging
import cachetools
import pymongo
import pymongo.errors

from ..core import Entry
from ..v2 import Broker


logger = logging.getLogger(__name__)


class _Entries(collections.abc.Mapping):
    "Mock the dict interface around a MongoDB query result."
    def __init__(self, catalog):
        self.catalog = catalog
        self.__cache = cachetools.LRUCache(1024)

    def cache_clear(self):
        self.__cache.clear()

    def _doc_to_entry(self, run_start_doc):

        header_doc = None
        uid = run_start_doc['uid']

        def get_header_field(field):
            nonlocal header_doc
            if header_doc is None:
                header_doc = self.catalog._db.header.find_one(
                                {'run_id': uid}, {'_id': False})
            if field in header_doc:
                if field in ['start', 'stop']:
                    return header_doc[field][0]
                else:
                    return header_doc[field]
            else:
                if field == 'resources':
                    return []
                elif field[0:6] == 'count_':
                    return 0
                else:
                    return None

        def get_resource(uid):
            resources = get_header_field('resources')
            for resource in resources:
                if resource['uid'] == uid:
                    return resource
            raise ValueError(f"Could not find Resource with uid={uid}")

        def lookup_resource_for_datum(datum_id):
            """ This method is likely very slow. """
            resources = [resource['uid']
                         for resource in get_header_field('resources')]
            datum_page = self.catalog._db.datum.find_one(
                    {'$and': [{'resource': {'$in': resources}},
                              {'datum_id': datum_id}]},
                    {'_id': False})
            for datum in event_model.unpack_datum_page(datum_page):
                if datum['datum_id'] == datum_id:
                    return datum['resource']
            raise ValueError(f"Could not find Datum with datum_id={datum_id}")

        def get_run_start():
            return run_start_doc

        def get_event_count(descriptor_uid):
            return get_header_field(f'count_{descriptor_uid}')

        entry_metadata = {'start': get_header_field('start'),
                          'stop': get_header_field('stop')}

        args = dict(
            get_run_start=get_run_start,
            get_run_stop=partial(get_header_field, 'stop'),
            get_event_descriptors=partial(get_header_field, 'descriptors'),
            get_event_pages=self.catalog._get_event_pages,
            get_event_count=get_event_count,
            get_resource=get_resource,
            get_resources=partial(get_header_field, 'resources'),
            lookup_resource_for_datum=lookup_resource_for_datum,
            get_datum_pages=self.catalog._get_datum_pages,
            get_filler=self.catalog._get_filler,
            transforms=self.catalog._transforms)
        return Entry(
            name=run_start_doc['uid'],
            description={},  # TODO
            driver='databroker.core.BlueskyRun',
            direct_access='forbid',  # ???
            args=args,
            cache=None,  # ???
            parameters=[],
            metadata=entry_metadata,
            catalog_dir=None,
            getenv=True,
            getshell=True,
            catalog=self.catalog)

    def __iter__(self):
        cursor = self.catalog._db.header.find(
                    self.catalog._query,
                    {'start.uid': True, '_id': False},
                    sort=[('start.time', pymongo.DESCENDING)])

        for doc in cursor:
            yield doc['start'][0]['uid']

    def _find_header_doc(self, name):
        # If this came from a client, we might be getting '-1'.
        try:
            N = int(name)
        except ValueError:
            query = {'$and': [self.catalog._query, {'uid': name}]}
            header_doc = self.catalog._db.header.find_one(query)
            if header_doc is None:
                regex_query = {
                    '$and': [self.catalog._query,
                             {'start.uid': {'$regex': f'{name}.*'}}]}
                matches = list(
                    self.catalog._db.header.find(regex_query).limit(10))
                if not matches:
                    raise KeyError(name)
                elif len(matches) == 1:
                    header_doc, = matches
                else:
                    match_list = '\n'.join(doc['uid'] for doc in matches)
                    raise ValueError(
                        f"Multiple matches to partial uid {name!r}. "
                        f"Up to 10 listed here:\n"
                        f"{match_list}")
        else:
            if N < 0:
                # Interpret negative N as "the Nth from last entry".
                query = self.catalog._query
                cursor = (self.catalog._db.header.find(query)
                          .sort('start.time', pymongo.DESCENDING)
                          .skip(-N - 1)
                          .limit(1))
                try:
                    header_doc, = cursor
                except ValueError:
                    raise IndexError(
                        f"Catalog only contains {len(self.catalog)} "
                        f"runs.")
            else:
                # Interpret positive N as
                # "most recent entry with scan_id == N".
                query = {'$and': [self.catalog._query, {'start.scan_id': N}]}
                cursor = (self.catalog._db.header.find(query)
                          .sort('start.time', pymongo.DESCENDING)
                          .limit(1))
                try:
                    header_doc, = cursor
                except ValueError:
                    raise KeyError(f"No run with scan_id={N}")
        if header_doc is None:
            raise KeyError(name)
        return header_doc

    def __getitem__(self, name):
        header_doc = self._find_header_doc(name)
        uid = header_doc['start'][0]['uid']
        try:
            entry = self.__cache[uid]
            logger.debug('Mongo Entries cache found %r', uid)
        except KeyError:
            entry = self._doc_to_entry(header_doc['start'][0])
            self.__cache[uid] = entry
        # The user has requested one specific Entry. In order to give them a
        # more useful object, 'get' the Entry for them. Note that if they are
        # expecting an Entry and try to call ``()`` or ``.get()``, that will
        # still work because BlueskyRun supports those methods and will just
        # return itself.
        return entry.get()  # an instance of BlueskyRun

    def __contains__(self, key):
        # Try the fast path first.
        if key in self.__cache:
            return True
        # Avoid paying for creating the Entry yet. Do just enough work decide
        # if we *can* create such an Entry.
        try:
            self._find_header_doc(key)
        except KeyError:
            return False
        else:
            return True

    def __len__(self):
        return len(self.catalog)


[docs]class BlueskyMongoCatalog(Broker): def __init__(self, datastore_db, *, handler_registry=None, root_map=None, filler_class=event_model.Filler, query=None, transforms=None, **kwargs): """ This Catalog is backed by a MongoDB with an embedded data model. This embedded data model has three collections: header, event, datum. The header collection includes start, stop, descriptor, and resource documents. The event_pages are stored in the event colleciton, and datum_pages are stored in the datum collection. Parameters ---------- datastore_db : pymongo.database.Database or string Must be a Database or a URI string that includes a database name. handler_registry : dict, optional This is passed to the Filler or whatever class is given in the filler_class parametr below. Maps each 'spec' (a string identifying a given type or external resource) to a handler class. A 'handler class' may be any callable with the signature:: handler_class(resource_path, root, **resource_kwargs) It is expected to return an object, a 'handler instance', which is also callable and has the following signature:: handler_instance(**datum_kwargs) As the names 'handler class' and 'handler instance' suggest, this is typically implemented using a class that implements ``__init__`` and ``__call__``, with the respective signatures. But in general it may be any callable-that-returns-a-callable. root_map: dict, optional This is passed to Filler or whatever class is given in the filler_class parameter below. str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a ``root`` in ``root_map`` will be loaded using the mapped ``root``. filler_class: type, optional This is Filler by default. It can be a Filler subclass, ``functools.partial(Filler, ...)``, or any class that provides the same methods as ``DocumentRouter``. query : dict, optional MongoDB query. Used internally by the ``search()`` method. transforms : Dict[str, Callable] A dict that maps any subset of the keys {start, stop, resource, descriptor} to a function that accepts a document of the corresponding type and returns it, potentially modified. This feature is for patching up erroneous metadata. It is intended for quick, temporary fixes that may later be applied permanently to the data at rest (e.g., via a database migration). **kwargs : Additional keyword arguments are passed through to the base class, Catalog. """ name = 'bluesky-mongo-embedded-catalog' # noqa if isinstance(datastore_db, str): self._db = _get_database(datastore_db) else: self._db = datastore_db self._query = query or {} self._root_map = root_map self._filler_class = filler_class super().__init__(handler_registry=handler_registry, root_map=root_map, filler_class=filler_class, transforms=transforms, **kwargs) def _get_event_pages(self, descriptor_uid, skip=0, limit=None): if limit is None: limit = maxsize page_cursor = self._db.event.find( {'$and': [ {'descriptor': descriptor_uid}, {'last_index': {'$gte': skip}}, {'first_index': {'$lte': skip + limit}}]}, {'_id': False}, sort=[('last_index', pymongo.ASCENDING)]) return page_cursor def _get_datum_pages(self, resource_uid, skip=0, limit=None): if limit is None: limit = maxsize page_cursor = self._db.datum.find( {'$and': [ {'resource': resource_uid}, {'last_index': {'$gte': skip}}, {'first_index': {'$lte': skip + limit}}]}, {'_id': False}, sort=[('last_index', pymongo.ASCENDING)]) return page_cursor def _make_entries_container(self): return _Entries(self) def _close(self): self._client.close() def __len__(self): return self._db.header.count_documents(self._query)
[docs] def search(self, query): """ Return a new Catalog with a subset of the entries in this Catalog. Parameters ---------- query : dict MongoDB query. """ query = dict(query) if query: query = {f"start.{key}": val for key, val in query.items()} if self._query: query = {'$and': [self._query, query]} cat = type(self)( datastore_db=self._db, query=query, handler_registry=self._handler_registry, transforms=self._transforms, root_map=self._root_map, filler_class=self._filler_class, name='search results', getenv=self.getenv, getshell=self.getshell, auth=self.auth, metadata=(self.metadata or {}).copy(), storage_options=self.storage_options) return cat
def _get_database(uri): client = pymongo.MongoClient(uri) try: # Called with no args, get_database() returns the database # specified in the client's uri --- or raises if there was none. # There is no public method for checking this in advance, so we # just catch the error. return client.get_database() except pymongo.errors.ConfigurationError as err: raise ValueError( f"Invalid client: {client} " f"Did you forget to include a database?") from err