import collections
import functools
import logging
import os
import pathlib
import secrets
import shutil
import event_model
import databroker.core
from tqdm import tqdm
import yaml
from ._utils import root_hash
from ._version import get_versions
__all__ = (
"copy_external_files",
"export_uids",
"export_catalog",
"export_run",
"write_documents_manifest",
"write_external_files_manifest",
"write_jsonl_catalog_file",
"write_msgpack_catalog_file",
)
logger = logging.getLogger(__name__)
# Write through tqdm to avoid overlapping with bars.
def print(*args):
tqdm.write(" ".join(str(arg) for arg in args))
[docs]def export_uids(
source_catalog,
uids,
directory,
*,
strict=False,
external=None,
no_documents=False,
handler_registry=None,
serializer_class=None,
salt=None,
):
"""
Export Runs from a Catalog, given a list of RunStart unique IDs.
Parameters
----------
source_catalog: Catalog
uids: List[Str]
List of RunStart unique IDs
directory: Union[Str, Manager]
Where files containing documents will be written, or a Manager for
writing to non-file buffers.
strict: Bool, optional
By default, swallow erros and return a list of them at the end.
Set to True to debug errors.
external: {None, 'fill', 'ignore')
If None, return the paths to external files.
If 'fill', fill the external data into the Documents.
If 'ignore', do not locate external files.
no_documents: Bool, optional
If True, do not write any files. False by default.
handler_registry: Union[Dict, None]
If None, automatic handler discovery is used.
serializer_class: Serializer
Expected to be a lossless serializer that encodes a format for which
there is a corresponding databroker intake driver. Default (None) is
currently ``suitcase.msgpack.Serializer``, but this may change in the
future. If you want ``suitcase.msgpack.Serializer`` specifically, pass
it in explicitly.
salt: Union[bytes, None]
We want to make hashes is unique to:
- a root
- a given batch of exported runs (i.e., a given call to this function)
so that we can use it as a key in root_map which is guaranteed not to
collide with keys from other batches. Thus, we create a "salt" unless
one is specified here. This does not need to be cryptographically
secure, just unique.
Returns
-------
artifacts, files, failures, file_uids
Notes
-----
* ``artifacts`` maps a human-readable string (typically just ``'all'`` in
this case) to a list of buffers or filepaths where the documents were
serialized.
* ``files`` is the set of filepaths of all external files referenced by
Resource documents, keyed on ``(root_in_document, root, unique_id)``.
* ``failures`` is a list of uids of runs that raised Exceptions. (The
relevant tracebacks are logged.)
* ``file_uids`` is a dictionary of RunStart unique IDs mapped to a set of
``(root, filename)`` pairs.
"""
accumulated_files = collections.defaultdict(set)
file_uids = {}
accumulated_artifacts = collections.defaultdict(set)
failures = []
if salt is None:
salt = secrets.token_hex(32).encode()
root_hash_func = functools.partial(root_hash, salt)
with tqdm(total=len(uids), position=1, desc="Writing Documents", unit="runs") as progress:
for uid in uids:
try:
run = source_catalog[uid]
artifacts, files = export_run(
run,
directory,
root_hash_func,
external=external,
no_documents=no_documents,
handler_registry=handler_registry,
root_map=source_catalog.root_map,
serializer_class=serializer_class,
)
file_uids[uid] = []
for root, set_ in files.items():
accumulated_files[root].update(set_)
file_uids[uid].extend((root, filename) for filename in set_)
for name, list_ in artifacts.items():
accumulated_artifacts[name].update(list_)
except Exception:
logger.exception("Error while exporting Run %r", uid)
if strict:
raise
failures.append(uid)
progress.set_description(
f"Writing Documents ({len(failures)} failures)", refresh=False
)
progress.update()
return dict(accumulated_artifacts), dict(accumulated_files), failures, file_uids
[docs]def export_catalog(
source_catalog,
directory,
*,
strict=False,
external=None,
no_documents=False,
handler_registry=None,
serializer_class=None,
salt=None,
limit=None,
):
"""
Export all the Runs from a Catalog.
Parameters
----------
source_catalog: Catalog
directory: Union[Str, Manager]
Where files containing documents will be written, or a Manager for
writing to non-file buffers.
strict: Bool, optional
By default, swallow erros and return a list of them at the end.
Set to True to debug errors.
external: {None, 'fill', 'ignore')
If None, return the paths to external files.
If 'fill', fill the external data into the Documents.
If 'ignore', do not locate external files.
no_documents: Bool, optional
If True, do not serialize documents. False by default.
handler_registry: Union[Dict, None]
If None, automatic handler discovery is used.
serializer_class: Serializer
Expected to be a lossless serializer that encodes a format for which
there is a corresponding databroker intake driver. Default (None) is
currently ``suitcase.msgpack.Serializer``, but this may change in the
future. If you want ``suitcase.msgpack.Serializer`` specifically, pass
it in explicitly.
salt: Union[bytes, None]
We want to make hashes is unique to:
- a root
- a given batch of exported runs (i.e., a given call to this function)
so that we can use it as a key in root_map which is guaranteed not to
collide with keys from other batches. Thus, we create a "salt" unless
one is specified here. This does not need to be cryptographically
secure, just unique.
limit: Union[Integer, None]
Stop after exporting some number of Runs. Useful for testing a subset
before doing a lengthy export.
Returns
-------
artifacts, files, failures, file_uids
Notes
-----
* ``artifacts`` maps a human-readable string (typically just ``'all'`` in
this case) to a list of buffers or filepaths where the documents were
serialized.
* ``files`` is the set of filepaths of all external files referenced by
Resource documents, keyed on ``(root_in_document, root, unique_id)``.
* ``failures`` is a list of uids of runs that raised Exceptions. (The
relevant tracebacks are logged.)
* ``file_uids`` is a dictionary of RunStart unique IDs mapped to a set of
``(root, filename)`` pairs.
"""
if limit is not None:
if limit < 1:
raise ValueError("limit must be None or a number 1 or greater")
limit = int(limit)
accumulated_files = collections.defaultdict(set)
file_uids = {}
accumulated_artifacts = collections.defaultdict(set)
failures = []
if salt is None:
salt = secrets.token_hex(32).encode()
root_hash_func = functools.partial(root_hash, salt)
with tqdm(
total=limit or len(source_catalog), position=1, unit="runs", desc="Writing Documents"
) as progress:
for i, (uid, run) in enumerate(source_catalog.items()):
if i == limit:
break
try:
artifacts, files = export_run(
run,
directory,
root_hash_func,
external=external,
no_documents=no_documents,
handler_registry=handler_registry,
root_map=source_catalog.root_map,
serializer_class=serializer_class,
)
file_uids[uid] = []
for root, set_ in files.items():
accumulated_files[root].update(set_)
file_uids[uid].extend((root, filename) for filename in set_)
for name, list_ in artifacts.items():
accumulated_artifacts[name].update(list_)
except Exception:
logger.exception("Error while exporting Run %r", uid)
if strict:
raise
failures.append(uid)
progress.set_description(
f"Writing Documents ({len(failures)} failures)", refresh=False
)
progress.update()
return dict(accumulated_artifacts), dict(accumulated_files), failures, file_uids
[docs]def export_run(
run,
directory,
root_hash_func,
*,
external=None,
no_documents=False,
handler_registry=None,
root_map=None,
serializer_class=None,
):
"""
Export one Run.
Parameters
----------
run: BlueskyRun
directory: Union[Str, Manager]
Where files containing documents will be written, or a Manager for
writing to non-file buffers.
external: {None, 'fill', 'ignore')
If None, return the paths to external files.
If 'fill', fill the external data into the Documents.
If 'ignore', do not locate external files.
no_documents: Bool, optional
If True, do not serialize documents. False by default.
handler_registry: Union[Dict, None]
If None, automatic handler discovery is used.
serializer_class: Serializer, optional
Expected to be a lossless serializer that encodes a format for which
there is a corresponding databroker intake driver. Default (None) is
currently ``suitcase.msgpack.Serializer``, but this may change in the
future. If you want ``suitcase.msgpack.Serializer`` specifically, pass
it in explicitly.
Returns
-------
artifacts, files
Notes
-----
* ``artifacts`` maps a human-readable string (typically just ``'all'`` in
this case) to a list of buffers or filepaths where the documents were
serialized.
* ``files`` is the set of filepaths of all external files referenced by
Resource documents, keyed on ``(root_in_document, root, unique_id)``.
"""
EXTERNAL_RELATED_DOCS = ("resource", "datum", "datum_page")
if serializer_class is None:
import suitcase.msgpack
serializer_class = suitcase.msgpack.Serializer
root_map = root_map or {}
files = collections.defaultdict(set)
if handler_registry is None:
handler_registry = databroker.core.discover_handlers()
with event_model.Filler(
handler_registry, inplace=False, root_map=root_map
) as filler:
with serializer_class(
directory, file_prefix="documents/{start[uid]}"
) as serializer:
with tqdm(position=0, unit="documents") as progress:
for name, doc in run.canonical(fill="no"):
if external == "fill":
name, doc = filler(name, doc)
# Omit Resource and Datum[Page] because the data was
# filled in place.
if name in EXTERNAL_RELATED_DOCS:
progress.update()
continue
elif name == "resource":
root = root_map.get(doc["root"], doc["root"])
unique_id = root_hash_func(doc["root"])
if external is None:
if no_documents:
root_in_document = doc["root"]
else:
root_in_document = root
# - root_in_document is the 'root' actually in the
# resource_document
# - root may be different depending on the
# source_catalog configuration, which can map the
# recorded 'root' in the document to some other
# location. This is where we should go looking for
# the data if we plan to copy it.
# - unique_id is unique to this (root, salt)
# combination and used to place the data in a
# unique location.
key = (root_in_document, root, unique_id)
files[key].update(run.get_file_list(doc))
if not no_documents:
# Replace root with a unique ID before serialization.
# We are overriding the local variable name doc here
# (yuck!) so that serializer(name, doc) below works on
# all document types.
doc = doc.copy()
doc["root"] = unique_id
if not no_documents:
serializer(name, doc)
progress.update()
return serializer.artifacts, dict(files)
[docs]def write_external_files_manifest(manager, unique_id, files):
"""
Write a manifest of external files.
Parameters
----------
manager: suitcase Manager object
unique_id: Str
files: Iterable[Union[Str, Path]]
"""
name = f"external_files_manifest_{unique_id}.txt"
with manager.open("manifest", name, "xt") as file:
file.write("\n".join(sorted((str(f) for f in set(files)))))
[docs]def copy_external_files(target_directory, root, unique_id, files, strict=False):
"""
Make a filesystem copy of the external files.
A filesystem copy is not always applicable/desirable. Use the
external_file_manifest_*.txt files to feed other file transfer mechanisms,
such as rsync or globus.
This is a wrapper around shutil.copyfile.
Parameters
----------
target_directory: Union[Str, Path]
root: Str
files: Iterable[Str]
strict: Bool, optional
By default, swallow erros and return a list of them at the end.
Set to True to debug errors.
Returns
-------
new_root, new_files, failures
Notes
-----
* ``new_root`` is a Path to the new root directory
* ``new_files`` is the list of filepaths to the files that were created.
* ``failures`` is a list of uids of runs that raised Exceptions. (The
relevant tracebacks are logged.)
"""
new_files = []
failures = []
for filename in tqdm(files, total=len(files), desc="Copying external files"):
relative_path = pathlib.Path(filename).relative_to(root)
new_root = pathlib.Path(target_directory, unique_id)
dest = new_root / relative_path
try:
os.makedirs(dest.parent, exist_ok=True)
new_files.append(shutil.copyfile(filename, dest))
except Exception:
logger.exception("Error while copying %r to %r", filename, dest)
if strict:
raise
failures.append(filename)
return new_root, new_files, failures
[docs]def write_msgpack_catalog_file(manager, directory, paths, root_map):
"""
Write a YAML file with configuration for an intake catalog.
Parameters
----------
manager: suitcase Manager object
directory: Str
Directory to which paths below are relative
paths: Union[Str, List[Str]]
Relative (s) of JSONL files encoding Documents.
root_map: Dict
"""
# Ideally, the drivers should be able to cope with relative paths,
# interpreting them as relative to the Catalog file. This requires changes
# to intake (I think) so as a short-term hack, we make the paths aboslute
# here but note the relative paths in a separate place.
abs_paths = [str(pathlib.Path(directory, path).absolute()) for path in paths]
metadata = {
"generated_by": {
"library": "databroker_pack",
"version": get_versions()["version"],
},
"relative_paths": [str(path) for path in paths],
}
source = {
"driver": "bluesky-msgpack-catalog",
"args": {"paths": abs_paths},
"metadata": metadata,
}
if root_map is not None:
source["args"]["root_map"] = {str(k): str(v) for k, v in root_map.items()}
sources = {"packed_catalog": source}
catalog = {"sources": sources}
FILENAME = "catalog.yml" # expected by unpack
with manager.open("catalog_file", FILENAME, "xt") as file:
yaml.dump(catalog, file)
[docs]def write_jsonl_catalog_file(manager, directory, paths, root_map):
"""
Write a YAML file with configuration for an intake catalog.
Parameters
----------
manager: suitcase Manager object
directory: Str
Directory to which paths below are relative
paths: Union[Str, List[Str]]
Relative (s) of JSONL files encoding Documents.
root_map: Dict
"""
# There is clearly some code repetition here with respect to
# write_msgpack_catalog_file, but I expect they may diverge over time as
# the suitcase implementation pick up format-specific options.
# Ideally, the drivers should be able to cope with relative paths,
# interpreting them as relative to the Catalog file. This requires changes
# to intake (I think) so as a short-term hack, we make the paths aboslute
# here but note the relative paths in a separate place.
abs_paths = [str(pathlib.Path(directory, path).absolute()) for path in paths]
metadata = {
"generated_by": {
"library": "databroker_pack",
"version": get_versions()["version"],
},
"relative_paths": [str(path) for path in paths],
}
source = {
"driver": "bluesky-jsonl-catalog",
"args": {"paths": abs_paths},
"metadata": metadata,
}
if root_map is not None:
source["args"]["root_map"] = {str(k): str(v) for k, v in root_map.items()}
sources = {"packed_catalog": source}
catalog = {"sources": sources}
FILENAME = "catalog.yml"
with manager.open("catalog_file", FILENAME, "xt") as file:
yaml.dump(catalog, file)
[docs]def write_documents_manifest(manager, directory, artifacts):
"""
Wirte the paths to all the files of Documents relative to the pack directory.
Parameters
----------
manager: suitcase Manager object
directory: Str
Pack directory
artifacts: List[Str]
"""
FILENAME = "documents_manifest.txt"
abs_directory = pathlib.Path(directory).absolute()
with manager.open("documents_manifest", FILENAME, "xt") as file:
for artifact in artifacts:
file.write(f"{pathlib.Path(artifact).relative_to(abs_directory)!s}\n")