import event_model
import importlib
import tempfile
from .core import parse_handler_registry, discover_handlers, parse_transforms
from intake.catalog import Catalog
from event_model import DuplicateHandler
from functools import partial
from pathlib import Path
[docs]class Broker(Catalog):
"""
This is a thin wrapper around intake.Catalog.
It includes an accessor the databroker API version 1.
Parameters
----------
handler_registry: dict, optional
This is passed to the Filler or whatever class is given in the
filler_class parameter below.
Maps each 'spec' (a string identifying a given type or external
resource) to a handler class.
A 'handler class' may be any callable with the signature::
handler_class(resource_path, root, **resource_kwargs)
It is expected to return an object, a 'handler instance', which is also
callable and has the following signature::
handler_instance(**datum_kwargs)
As the names 'handler class' and 'handler instance' suggest, this is
typically implemented using a class that implements ``__init__`` and
``__call__``, with the respective signatures. But in general it may be
any callable-that-returns-a-callable.
root_map: dict, optional
This is passed to Filler or whatever class is given in the filler_class
parameter below.
str -> str mapping to account for temporarily moved/copied/remounted
files. Any resources which have a ``root`` in ``root_map`` will be
loaded using the mapped ``root``.
filler_class: type
This is Filler by default. It can be a Filler subclass,
``functools.partial(Filler, ...)``, or any class that provides the same
methods as ``DocumentRouter``.
transforms: dict
A dict that maps any subset of the keys {start, stop, resource, descriptor}
to a function that accepts a document of the corresponding type and
returns it, potentially modified. This feature is for patching up
erroneous metadata. It is intended for quick, temporary fixes that
may later be applied permanently to the data at rest
(e.g., via a database migration).
**kwargs :
Additional keyword arguments are passed through to the base class,
Catalog.
"""
# Work around
# https://github.com/intake/intake/issues/545
_container = None
def __init__(self, *, handler_registry=None, root_map=None,
filler_class=event_model.Filler, transforms=None, **kwargs):
# Work around https://github.com/intake/intake/issues/543
self.auth = kwargs.pop("auth", None)
if isinstance(filler_class, str):
module_name, _, class_name = filler_class.rpartition('.')
self._filler_class = getattr(importlib.import_module(module_name), class_name)
else:
self._filler_class = filler_class
self._root_map = root_map or {}
self._transforms = parse_transforms(transforms)
if handler_registry is None:
handler_registry = discover_handlers()
self._handler_registry = parse_handler_registry(handler_registry)
self.handler_registry = event_model.HandlerRegistryView(
self._handler_registry)
self._get_filler = partial(self._filler_class,
handler_registry=self.handler_registry,
root_map=self._root_map,
inplace=False)
super().__init__(**kwargs)
# The values in the root_map are allowed to be relative to the catalog
# file in order to facilitate portable archives. Not all catalogs comes
# from catalog files, so relative paths are only allowed in root_map if
# the the root_map originates from an actual file. If not, we raise.
for k, v in list(self._root_map.items()):
if not Path(v).is_absolute():
catalog_dir = self.metadata.get('catalog_dir')
if not catalog_dir:
raise ValueError(
"Found relative path {v} in root_map. "
"Relative paths are only allowed when the catalog "
"is backed by a YAML file, so that paths can be "
"interpreted relative to the location of that file.")
self._root_map[k] = str(Path(catalog_dir, v))
@property
def root_map(self):
# This is mutable, so be advised that users *can* mutate it under us.
# The property just prohibits them from setting it to an entirely
# different dict instance.
return self._root_map
@property
def v1(self):
"Accessor to the version 1 API."
if not hasattr(self, '_Broker__v1'):
from .v1 import Broker
self.__v1 = Broker(self)
return self.__v1
@property
def v2(self):
"A self-reference. This makes v1.Broker and v2.Broker symmetric."
return self
def register_handler(self, spec, handler, overwrite=False):
if (not overwrite) and (spec in self._handler_registry):
original = self._handler_registry[spec]
if original is handler:
return
raise DuplicateHandler(
f"There is already a handler registered for the spec {spec!r}. "
f"Use overwrite=True to deregister the original.\n"
f"Original: {original}\n"
f"New: {handler}")
self._handler_registry[spec] = handler
def deregister_handler(self, spec):
self._handler_registry.pop(spec, None)
[docs] def items(self):
# TEMP: Patch regression in intake 0.6.0.
for key, value in super().items():
yield key, value.get()
[docs]def temp():
"""
Generate a Catalog backed by a temporary directory of msgpack-encoded files.
"""
from databroker._drivers.msgpack import BlueskyMsgpackCatalog
handler_registry = {}
from databroker.core import discover_handlers
handler_registry = discover_handlers()
# The temp databroker is often (but not exclusively) used in the context of
# a demo or tutorial with the simulated devices in ophyd.sim, some of which
# require a handler registered for the spec 'NPY_SEQ'.
# With ophyd >= 1.4.0 this handler will be discovered in the normal way
# via discover_handlers() above. The following special case is here to
# support older versions of ophyd, which do not declare a
# 'databroker.handlers' entrypoint.
if 'NPY_SEQ' not in handler_registry:
try:
import ophyd.sim
except ImportError:
pass
else:
handler_registry['NPY_SEQ'] = ophyd.sim.NumpySeqHandler
tmp_dir = tempfile.mkdtemp()
tmp_data_dir = Path(tmp_dir) / 'data'
catalog = BlueskyMsgpackCatalog(
f"{tmp_data_dir}/*.msgpack",
name='temp',
handler_registry=handler_registry)
return catalog