Source code for databroker._drivers.jsonl

import errno
import glob
import json
import os
import pathlib
import event_model

from ..in_memory import BlueskyInMemoryCatalog
from ..core import retry


def gen(filename):
    """
    A JSONL file generator.

    Parameters
    ----------
    filename: str
        JSONL file to load.
    """
    with open(filename, 'r') as file:
        for line in file:
            name, doc = json.loads(line)
            yield (name, doc)


def get_stop(filename):
    """
    Returns the stop_doc of a Bluesky JSONL file.

    The stop_doc is always the last line of the file.

    Parameters
    ----------
    filename: str
        JSONL file to load.
    Returns
    -------
    stop_doc: dict or None
        A Bluesky run_stop document or None if one is not present.
    """
    stop_doc = None
    lastline, = tail(filename)
    if lastline:
        try:
            name, doc = json.loads(lastline)
        except json.JSONDecodeError:
            ...
            # stop_doc will stay None if it can't be decoded correctly.
        else:
            if (name == 'stop'):
                stop_doc = doc
    return stop_doc


[docs]class BlueskyJSONLCatalog(BlueskyInMemoryCatalog): name = 'bluesky-jsonl-catalog' # noqa def __init__(self, paths, *, handler_registry=None, root_map=None, filler_class=event_model.Filler, query=None, transforms=None, **kwargs): """ This Catalog is backed by a newline-delimited JSON (jsonl) file. Each line of the file is expected to be a JSON list with two elements, the document name (type) and the document itself. The documents are expected to be in chronological order. Parameters ---------- paths : list list of filepaths handler_registry : dict, optional This is passed to the Filler or whatever class is given in the ``filler_class`` parameter below. Maps each 'spec' (a string identifying a given type or external resource) to a handler class. A 'handler class' may be any callable with the signature:: handler_class(resource_path, root, **resource_kwargs) It is expected to return an object, a 'handler instance', which is also callable and has the following signature:: handler_instance(**datum_kwargs) As the names 'handler class' and 'handler instance' suggest, this is typically implemented using a class that implements ``__init__`` and ``__call__``, with the respective signatures. But in general it may be any callable-that-returns-a-callable. root_map: dict, optional This is passed to Filler or whatever class is given in the ``filler_class`` parameter below. str -> str mapping to account for temporarily moved/copied/remounted files. Any resources which have a ``root`` in ``root_map`` will be loaded using the mapped ``root``. filler_class: type, optional This is Filler by default. It can be a Filler subclass, ``functools.partial(Filler, ...)``, or any class that provides the same methods as ``DocumentRouter``. query : dict, optional Mongo query that filters entries' RunStart documents transforms : Dict[str, Callable] A dict that maps any subset of the keys {start, stop, resource, descriptor} to a function that accepts a document of the corresponding type and returns it, potentially modified. This feature is for patching up erroneous metadata. It is intended for quick, temporary fixes that may later be applied permanently to the data at rest (e.g., via a database migration). **kwargs : Additional keyword arguments are passed through to the base class, Catalog. """ # Tolerate a single path (as opposed to a list). if isinstance(paths, (str, pathlib.Path)): paths = [paths] self.paths = paths self._filename_to_mtime = {} super().__init__(handler_registry=handler_registry, root_map=root_map, filler_class=filler_class, query=query, transforms=transforms, **kwargs) def _load(self): for path in self.paths: for filename in glob.glob(path): mtime = os.path.getmtime(filename) if mtime == self._filename_to_mtime.get(filename): # This file has not changed since last time we loaded it. continue self._filename_to_mtime[filename] = mtime with open(filename, 'r') as file: try: name, start_doc = json.loads(file.readline()) except json.JSONDecodeError as e: if not file.readline(): # Empty file, maybe being written to currently continue raise e stop_doc = get_stop(filename) self.upsert(start_doc, stop_doc, gen, (filename,), {})
[docs] @retry def search(self, query): """ Return a new Catalog with a subset of the entries in this Catalog. Parameters ---------- query : dict """ query = dict(query) if self._query: query = {'$and': [self._query, query]} cat = type(self)( paths=self.paths, query=query, handler_registry=self._handler_registry, transforms=self._transforms, root_map=self._root_map, name='search results', getenv=self.getenv, getshell=self.getshell, auth=self.auth, metadata=(self.metadata or {}).copy(), storage_options=self.storage_options) return cat
def _get_serializer(self): "This is used internally by v1.Broker. It may be removed in future." from suitcase.jsonl import Serializer from event_model import RunRouter path, *_ = self.paths directory = os.path.dirname(path) def factory(name, doc): serializer = Serializer(directory) return [serializer], [] return RunRouter([factory])
def tail(filename, n=1, bsize=2048): """ Returns a generator with the last n lines of a file. Thanks to Martijn Pieters for this solution: https://stackoverflow.com/a/12295054/6513183 Parameters ---------- filename : string n: int number of lines bsize: int seek step size Returns ------- line : generator """ # get newlines type, open in universal mode to find it with open(filename, 'r', newline=None) as hfile: if not hfile.readline(): return # empty, no point sep = hfile.newlines # After reading a line, python gives us this assert isinstance(sep, str), 'multiple newline types found, aborting' # find a suitable seek position in binary mode with open(filename, 'rb') as hfile: hfile.seek(0, os.SEEK_END) linecount = 0 pos = 0 while linecount <= n + 1: # read at least n lines + 1 more; we need to skip a partial line later on try: hfile.seek(-bsize, os.SEEK_CUR) # go backwards linecount += hfile.read(bsize).count(sep.encode()) # count newlines hfile.seek(-bsize, os.SEEK_CUR) # go back again except IOError as e: if e.errno == errno.EINVAL: # Attempted to seek past the start, can't go further bsize = hfile.tell() hfile.seek(0, os.SEEK_SET) pos = 0 linecount += hfile.read(bsize).count(sep.encode()) break raise # Some other I/O exception, re-raise pos = hfile.tell() # Re-open in text mode with open(filename, 'r') as hfile: hfile.seek(pos, os.SEEK_SET) # our file position from above for line in hfile: # We've located n lines *or more*, so skip if needed if linecount > n: linecount -= 1 continue # The rest we yield yield line.rstrip()