Source code for databroker_pack._unpack

import copy
import os
import pathlib

import databroker
import intake
from tqdm import tqdm
import yaml

from ._utils import CatalogNameExists

__all__ = ("unpack_inplace", "unpack_mongo_normalized")


[docs]def unpack_inplace(path, catalog_name, merge=False): """ Place a catalog configuration file in the user configuration area. Parameters ---------- path: Path Path to output from pack catalog_name: Str A unique name for the catalog merge: Boolean, optional Unpack into an existing catalog Returns ------- config_path: Path Location of new catalog configuration file """ # Validate user input. if not os.path.isdir(path): raise ValueError(f"{path} is not a directory") source_catalog_file_path = pathlib.Path(path, "catalog.yml") if not os.path.isfile(source_catalog_file_path): raise ValueError(f"Could not find 'catalog.yml' in {path}") dest_catalog_file_name = f"databroker_unpack_{catalog_name}.yml" config_dir = databroker.catalog_search_path()[0] dest_catalog_file_path = pathlib.Path(config_dir, dest_catalog_file_name) exists = catalog_name in databroker.catalog if exists: if not merge: raise CatalogNameExists(catalog_name) if not os.path.isfile(dest_catalog_file_path): raise ValueError( "The catalog exists but not in the user-writable location. " "Pick a different catalog name." ) with open(dest_catalog_file_path) as file: existing_catalog = yaml.safe_load(file) existing_source = existing_catalog["sources"][catalog_name] else: existing_source = None with open(source_catalog_file_path) as file: catalog = yaml.safe_load(file) source = catalog["sources"].pop("packed_catalog") # Check that the drivers match. For example, we cannot merge a # msgpack-backed catalog into an existing JSONL-backed one. if existing_source is not None: source_driver = source.get("driver") existing_source_driver = existing_source.get("driver") if existing_source_driver != source_driver: raise ValueError( f"Cannot merge source with driver {source_driver} into source " f"with driver {existing_source_driver}" ) # Handle temporary condition where 'pack' puts absolute paths in "args" # and puts relative paths off to the side. relative_paths = source.get("metadata", {}).get("relative_paths") if relative_paths: new_paths = [ str(pathlib.Path(path, rel_path).absolute()) for rel_path in relative_paths ] source["args"]["paths"] = sorted(new_paths) # Merge paths and relative_paths from existing source, if applicable. if existing_source is not None: source["args"]["paths"] = sorted( set(source["args"]["paths"]) | set(existing_source["args"]["paths"]) ) if "metadata" not in source: source["metadata"] = {} source["metadata"]["relative_paths"] = sorted( set(source["metadata"].get("relative_paths", [])) | set(existing_source.get("metadata", {}).get("relative_paths", [])) ) # The root_map values may be relative inside a pack, given relative to the # catalog file. Now that we are going to use a catalog file in a config # directory, we need to make these paths absolute. for k, v in source["args"].get("root_map", {}).items(): if not pathlib.Path(v).is_absolute(): source["args"]["root_map"][k] = str(pathlib.Path(path, v).absolute()) # Merge root_map from existing source, if applicable. if existing_source is not None: collisions = set(source["args"].get("root_map", {})).intersection( set(existing_source["args"].get("root_map", {})) ) if collisions: raise ValueError( "root_map between existing source and new source have " f"colliding keys {collisions}" ) source["args"]["root_map"].update(existing_source["args"].get("root_map", {})) catalog["sources"][catalog_name] = source os.makedirs(config_dir, exist_ok=True) with open(dest_catalog_file_path, "w") as file: yaml.dump(catalog, file) return dest_catalog_file_path
[docs]def unpack_mongo_normalized(path, uri, catalog_name, merge=False): """ Place a catalog configuration file in the user configuration area. Parameters ---------- path: Path Path to output from pack uri: Str MongoDB URI. Must include a database name. Example: ``mongodb://localhost:27017/databroker_unpack_my_catalog`` catalog_name: Str A unique name for the catalog merge: Boolean, optional Unpack into an existing catalog Returns ------- config_path: Path Location of new catalog configuration file """ import pymongo import suitcase.mongo_normalized # Validate user input. if not os.path.isdir(path): raise ValueError(f"{path} is not a directory") source_catalog_file_path = pathlib.Path(path, "catalog.yml") if not os.path.isfile(source_catalog_file_path): raise ValueError(f"Could not find 'catalog.yml' in {path}") dest_catalog_file_name = f"databroker_unpack_{catalog_name}.yml" config_dir = databroker.catalog_search_path()[0] dest_catalog_file_path = pathlib.Path(config_dir, dest_catalog_file_name) exists = catalog_name in databroker.catalog if exists: if not merge: raise CatalogNameExists(catalog_name) if not os.path.isfile(dest_catalog_file_path): raise ValueError( "The catalog exists but not in the user-writable location. " "Pick a different catalog name." ) with open(dest_catalog_file_path) as file: existing_catalog = yaml.safe_load(file) existing_source = existing_catalog["sources"][catalog_name] else: existing_source = None with open(source_catalog_file_path) as file: catalog = yaml.safe_load(file) source = catalog["sources"].pop("packed_catalog") # Ensure that the URI has a database name in it. if not pymongo.uri_parser.parse_uri(uri)["database"]: raise ValueError("Mongo URI must include a database name.") database = pymongo.MongoClient(uri).get_database() # Check that the target catalog is the right driver and URIs. if existing_source is not None: existing_source_driver = existing_source.get("driver") if existing_source_driver != "bluesky-mongo-normalized-catalog": raise ValueError( f"Existing catalog has driver {existing_source_driver} " "so we cannot make a MongoDB-backed catalog with that name." ) if existing_source["args"]["metadatastore_db"] != uri: raise ValueError( "Existing catalog has metadatastore_db" f"{existing_source['args']['metadatastore_db']} " r"which does not match requested uri {uri}." ) if existing_source["args"]["asset_registry_db"] != uri: raise ValueError( "Existing catalog has asset_registry_db" f"{existing_source['args']['asset_registry_db']} " r"which does not match requested uri {uri}." ) # Handle temporary condition where 'pack' puts absolute paths in "args" # and puts relative paths off to the side. relative_paths = source.get("metadata", {}).get("relative_paths") if relative_paths: new_paths = [ str(pathlib.Path(path, rel_path).absolute()) for rel_path in relative_paths ] source["args"]["paths"] = sorted(new_paths) # The root_map values may be relative inside a pack, given relative to the # catalog file. Now that we are going to use a catalog file in a config # directory, we need to make these paths absolute. for k, v in source["args"].get("root_map", {}).items(): if not pathlib.Path(v).is_absolute(): source["args"]["root_map"][k] = str(pathlib.Path(path, v).absolute()) # Copy data into MongoDB. catalog_class = intake.registry[source["driver"]] source_catalog = catalog_class(**source["args"]) serializer = suitcase.mongo_normalized.Serializer( metadatastore_db=database, asset_registry_db=database, ) with tqdm( total=len(source_catalog), desc="Copying Documents into MongoDB" ) as progress: for uid, run in source_catalog.items(): for name, doc in run.canonical(fill="no"): serializer(name, doc) progress.update() # Modify a copy of the original, file-based source, to make it a # mongo_normalized one. mongo_source = copy.deepcopy(source) mongo_source["driver"] = "bluesky-mongo-normalized-catalog" mongo_source["args"] = { "metadatastore_db": uri, "asset_registry_db": uri, "root_map": source["args"]["root_map"], } # Merge root_map from existing source, if applicable. if existing_source is not None: collisions = set(mongo_source["args"].get("root_map", {})).intersection( set(existing_source["args"].get("root_map", {})) ) if collisions: raise ValueError( "root_map between existing source and new source have " f"colliding keys {collisions}" ) mongo_source["args"]["root_map"].update( existing_source["args"].get("root_map", {}) ) catalog["sources"][catalog_name] = mongo_source os.makedirs(config_dir, exist_ok=True) with open(dest_catalog_file_path, "w") as file: yaml.dump(catalog, file) return dest_catalog_file_path