Source code for databroker._drivers.msgpack

import event_model
import glob
import msgpack
import msgpack_numpy
import os
import pathlib

from ..in_memory import BlueskyInMemoryCatalog


UNPACK_OPTIONS = dict(object_hook=msgpack_numpy.decode,
                      raw=False,
                      max_buffer_size=1_000_000_000)


def gen(filename):
    """
    A msgpack generator

    Parameters
    ----------
    filename: str
        msgpack file to laod.
    """
    with open(filename, 'rb') as file:
        yield from msgpack.Unpacker(file, **UNPACK_OPTIONS)


def get_stop(filename):
    """
    Returns the stop_doc of a Bluesky msgpack file.

    The stop_doc is always the last line of the file.

    Parameters
    ----------
    filename: str
        msgpack file to load.
    Returns
    -------
    stop_doc: dict or None
        A Bluesky run_stop document or None if one is not present.
    """
    with open(filename, 'rb') as file:
        for name, doc in msgpack.Unpacker(file, **UNPACK_OPTIONS):
            if name == 'stop':
                return doc


[docs]class BlueskyMsgpackCatalog(BlueskyInMemoryCatalog): name = 'bluesky-msgpack-catalog' # noqa def __init__(self, paths, *, handler_registry=None, root_map=None, filler_class=event_model.Filler, query=None, transforms=None, **kwargs): """ This Catalog is backed by msgpack files. Each chunk the file is expected to be a list with two elements, the document name (type) and the document itself. The documents are expected to be in chronological order. Parameters ---------- paths : list list of filepaths handler_registry : dict, optional This is passed to the Filler or whatever class is given in the ``filler_class`` parameter below. Maps each 'spec' (a string identifying a given type or external resource) to a handler class. A 'handler class' may be any callable with the signature:: handler_class(resource_path, root, **resource_kwargs) It is expected to return an object, a 'handler instance', which is also callable and has the following signature:: handler_instance(**datum_kwargs) As the names 'handler class' and 'handler instance' suggest, this is typically implemented using a class that implements ``__init__`` and ``__call__``, with the respective signatures. But in general it may be any callable-that-returns-a-callable. root_map: dict, optional This is passed to Filler or whatever class is given in the ``filler_class`` parameter below. str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a ``root`` in ``root_map`` will be loaded using the mapped ``root``. filler_class: type, optional This is Filler by default. It can be a Filler subclass, ``functools.partial(Filler, ...)``, or any class that provides the same methods as ``DocumentRouter``. query : dict, optional Mongo query that filters entries' RunStart documents transforms : Dict[str, Callable] A dict that maps any subset of the keys {start, stop, resource, descriptor} to a function that accepts a document of the corresponding type and returns it, potentially modified. This feature is for patching up erroneous metadata. It is intended for quick, temporary fixes that may later be applied permanently to the data at rest (e.g., via a database migration). **kwargs : Additional keyword arguments are passed through to the base class, Catalog. """ # Tolerate a single path (as opposed to a list). if isinstance(paths, (str, pathlib.Path)): paths = [paths] self.paths = paths self._filename_to_mtime = {} super().__init__(handler_registry=handler_registry, root_map=root_map, filler_class=filler_class, query=query, transforms=transforms, **kwargs) def _load(self): for path in self.paths: for filename in glob.glob(path): mtime = os.path.getmtime(filename) if mtime == self._filename_to_mtime.get(filename): # This file has not changed since last time we loaded it. continue self._filename_to_mtime[filename] = mtime with open(filename, 'rb') as file: unpacker = msgpack.Unpacker(file, **UNPACK_OPTIONS) try: name, start_doc = next(unpacker) except StopIteration: # Empty file, maybe being written to currently continue stop_doc = get_stop(filename) self.upsert(start_doc, stop_doc, gen, (filename,), {})
[docs] def search(self, query): """ Return a new Catalog with a subset of the entries in this Catalog. Parameters ---------- query : dict """ query = dict(query) if self._query: query = {'$and': [self._query, query]} cat = type(self)( paths=self.paths, query=query, handler_registry=self._handler_registry, transforms=self._transforms, root_map=self._root_map, name='search results', getenv=self.getenv, getshell=self.getshell, auth=self.auth, metadata=(self.metadata or {}).copy(), storage_options=self.storage_options) return cat
def _get_serializer(self): "This is used internally by v1.Broker. It may be removed in future." from suitcase.msgpack import Serializer from event_model import RunRouter path, *_ = self.paths directory = os.path.dirname(path) def factory(name, doc): serializer = Serializer(directory) return [serializer], [] return RunRouter([factory])