Source code for tiled.client.catalog

import collections
import collections.abc
from dataclasses import fields
import importlib
import itertools
import time
import warnings

import entrypoints

from ..query_registration import query_type_to_name
from ..queries import KeyLookup
from ..utils import (
    import_object,
    OneShotCachedMap,
    Sentinel,
)
from .authentication import DEFAULT_TOKEN_CACHE, reauthenticate_client
from .base import BaseClient
from .utils import (
    client_and_path_from_uri,
    client_from_catalog,
    NEEDS_INITIALIZATION,
)
from ..catalogs.utils import (
    catalog_repr,
    IndexersMixin,
    UNCHANGED,
)


class Catalog(BaseClient, collections.abc.Mapping, IndexersMixin):

    # This maps the structure_family sent by the server to a client-side object that
    # can interpret the structure_family's structure and content. OneShotCachedMap is used to
    # defer imports.
    DEFAULT_STRUCTURE_CLIENT_DISPATCH = {
        "numpy": OneShotCachedMap(
            {
                "array": lambda: importlib.import_module(
                    "..array", Catalog.__module__
                ).ArrayClient,
                "dataframe": lambda: importlib.import_module(
                    "..dataframe", Catalog.__module__
                ).DataFrameClient,
                "variable": lambda: importlib.import_module(
                    "..xarray", Catalog.__module__
                ).VariableClient,
                "data_array": lambda: importlib.import_module(
                    "..xarray", Catalog.__module__
                ).DataArrayClient,
                "dataset": lambda: importlib.import_module(
                    "..xarray", Catalog.__module__
                ).DatasetClient,
            }
        ),
        "dask": OneShotCachedMap(
            {
                "array": lambda: importlib.import_module(
                    "..array", Catalog.__module__
                ).DaskArrayClient,
                "dataframe": lambda: importlib.import_module(
                    "..dataframe", Catalog.__module__
                ).DaskDataFrameClient,
                "variable": lambda: importlib.import_module(
                    "..xarray", Catalog.__module__
                ).DaskVariableClient,
                "data_array": lambda: importlib.import_module(
                    "..xarray", Catalog.__module__
                ).DaskDataArrayClient,
                "dataset": lambda: importlib.import_module(
                    "..xarray", Catalog.__module__
                ).DaskDatasetClient,
            }
        ),
    }

    # This is populated when the first instance is created. To populate or
    # refresh it manually, call classmethod discover_special_clients().
    DEFAULT_SPECIAL_CLIENT_DISPATCH = None

    @classmethod
    def _discover_entrypoints(cls, entrypoint_name):
        return OneShotCachedMap(
            {
                name: entrypoint.load
                for name, entrypoint in entrypoints.get_group_named(
                    entrypoint_name
                ).items()
            }
        )

    @classmethod
    def discover_special_clients(cls):
        """
        Search the software environment for libraries that register special clients.

        This is called once automatically the first time Catalog.from_uri
        is called. You may call it again manually to refresh, and it will
        reflect any changes to the environment since it was first populated.
        """
        # The modules associated with these entrypoints will be imported
        # lazily, only when the item is first accessed.
        cls.DEFAULT_SPECIAL_CLIENT_DISPATCH = cls._discover_entrypoints(
            "tiled.special_client"
        )
        # Note: We could use entrypoints to discover custom structure_family types as
        # well, and in fact we did do this in an early draft. It was removed
        # for simplicity, at least for now.

    @classmethod
    def from_client(cls, *args, **kwargs):
        warnings.warn(
            "The classmethod Catalog.from_client is deperecated and will be removed. "
            "Use the function tiled.client.from_client instead."
        )
        return from_client(*args, **kwargs)

    @classmethod
    def from_catalog(cls, *args, **kwargs):
        warnings.warn(
            "The classmethod Catalog.from_catalog is being considered "
            "for deperecation and may be removed. "
            "The function tiled.client.from_catalog may be used instead.",
            PendingDeprecationWarning,
        )
        return from_catalog(*args, **kwargs)

    @classmethod
    def from_uri(cls, *args, **kwargs):
        warnings.warn(
            "The classmethod Catalog.from_uri is being considered "
            "for deperecation and may be removed. "
            "The function tiled.client.from_uri may be used instead.",
            PendingDeprecationWarning,
        )
        return from_uri(*args, **kwargs)

    @classmethod
    def from_profile(cls, *args, **kwargs):
        warnings.warn(
            "The classmethod Catalog.from_profile is being considered "
            "for deperecation and may be removed. "
            "The function tiled.client.from_profile may be used instead.",
            PendingDeprecationWarning,
        )
        return from_profile(*args, **kwargs)

    @classmethod
    def from_config(cls, *args, **kwargs):
        warnings.warn(
            "The classmethod Catalog.from_config is being considered "
            "for deperecation and may be removed. "
            "The function tiled.client.from_config may be used instead.",
            PendingDeprecationWarning,
        )
        return from_profile(*args, **kwargs)

    def __init__(
        self,
        client,
        *,
        username,
        offline,
        path,
        item,
        metadata,
        root_client_type,
        structure_clients,
        cache,
        special_clients,
        token_cache,
        params=None,
        queries=None,
        sorting=None,
    ):
        "This is not user-facing. Use Catalog.from_uri."

        self.structure_clients = structure_clients
        self.special_clients = special_clients
        self._root_client_type = root_client_type
        self._queries = list(queries or [])
        self._queries_as_params = _queries_to_params(*self._queries)
        self._sorting = [(name, int(direction)) for name, direction in (sorting or [])]
        self._sorting_params = {
            "sort": ",".join(
                f"{'-' if item[1] < 0 else ''}{item[0]}" for item in self._sorting
            )
        }
        self._reversed_sorting_params = {
            "sort": ",".join(
                f"{'-' if item[1] > 0 else ''}{item[0]}" for item in self._sorting
            )
        }
        super().__init__(
            client=client,
            item=item,
            username=username,
            token_cache=token_cache,
            cache=cache,
            offline=offline,
            path=path,
            metadata=metadata,
            params=params,
        )
        if metadata is NEEDS_INITIALIZATION:
            # TO DO: This is a big wart, a side-effect refactor to support
            # refresh tokens. Needs rethinking.
            content = self._get_json_with_cache(f"/metadata/{'/'.join(path)}")
            item = content["data"]
            self._metadata.update(item["attributes"]["metadata"])
            self._item = item

    def __repr__(self):
        # Display up to the first N keys to avoid making a giant service
        # request. Use _keys_slicer because it is unauthenticated.
        N = 10
        return catalog_repr(self, self._keys_slice(0, N, direction=1))

    @property
    def sorting(self):
        """
        The current sorting of this Catalog

        Given as a list of tuples where the first entry is the sorting key
        and the second entry indicates ASCENDING (or 1) or DESCENDING (or -1).
        """
        return list(self._sorting)

[docs] def touch(self): """ Access all the data in this Catalog. This causes it to be cached if the client is configured with a cache. """ self._get_json_with_cache(self.uri) repr(self) for key in self: entry = self[key] entry.touch()
def _get_class(self, item): # The basic structure of the response is either one of the structure_clients # we know about or a sub-Catalog. if item["type"] == "reader": structure_family = item["attributes"]["structure_family"] try: return self.structure_clients[structure_family] except KeyError: raise UnknownStructureFamily(structure_family) from None # If a catalog, server can hint that we should use a special variant # that might have a special __repr__, or extra methods for usability, # etc. client_type_hint = item["attributes"].get("client_type_hint") if client_type_hint is not None: class_ = self.special_clients.get(client_type_hint) if class_ is None: warnings.warn( "The server suggested to use a special client with the " f"hint {client_type_hint!r} but nothing matching the " "description could be discovered in the current software " "environment. We will fall back back to a default that " "should be functional but may lack some usability " "features." ) else: return class_ # This is generally just Catalog, but if the original # user-created catalog was a subclass of Catalog, this will # repsect that. return self._root_client_type
[docs] def client_for_item(self, item, path, metadata, sorting): """ Create an instance of the appropriate client class for an item. This is intended primarily for internal use and use by subclasses. """ class_ = self._get_class(item) if item["type"] == "catalog": return class_( client=self._client, username=self._username, item=item, offline=self._offline, cache=self._cache, path=path, metadata=metadata, structure_clients=self.structure_clients, special_clients=self.special_clients, token_cache=self._token_cache, params=self._params, queries=None, sorting=sorting, root_client_type=self._root_client_type, ) else: # item["type"] == "reader" return class_( client=self._client, item=item, offline=self._offline, cache=self._cache, path=path, metadata=metadata, params=self._params, username=self._username, token_cache=self._token_cache, )
[docs] def new_variation( self, *, structure_clients=UNCHANGED, special_clients=UNCHANGED, queries=UNCHANGED, sorting=UNCHANGED, **kwargs, ): """ Create a copy of this Catalog, optionally varying some parameters. This is intended primarily for intenal use and use by subclasses. """ if isinstance(structure_clients, str): structure_clients = Catalog.DEFAULT_STRUCTURE_CLIENT_DISPATCH[ structure_clients ] if structure_clients is UNCHANGED: structure_clients = self.structure_clients if special_clients is UNCHANGED: special_clients = self.special_clients if queries is UNCHANGED: queries = self._queries if sorting is UNCHANGED: sorting = self._sorting return super().new_variation( structure_clients=structure_clients, special_clients=special_clients, queries=queries, sorting=sorting, root_client_type=self._root_client_type, **kwargs, )
def __len__(self): now = time.monotonic() if self._cached_len is not None: length, deadline = self._cached_len if now < deadline: # Used the cached value and do not make any request. return length content = self._get_json_with_cache( self.item["links"]["search"], params={ "fields": "", **self._queries_as_params, **self._sorting_params, **self._params, }, ) length = content["meta"]["count"] self._cached_len = (length, now + LENGTH_CACHE_TTL) return length def __length_hint__(self): # TODO The server should provide an estimated count. # https://www.python.org/dev/peps/pep-0424/ return len(self) def __iter__(self): next_page_url = self.item["links"]["search"] while next_page_url is not None: content = self._get_json_with_cache( next_page_url, params={ "fields": "", **self._queries_as_params, **self._sorting_params, **self._params, }, ) self._cached_len = ( content["meta"]["count"], time.monotonic() + LENGTH_CACHE_TTL, ) for item in content["data"]: yield item["id"] next_page_url = content["links"]["next"] def __getitem__(self, key): # Lookup this key *within the search results* of this Catalog. content = self._get_json_with_cache( self.item["links"]["search"], params={ **_queries_to_params(KeyLookup(key)), **self._queries_as_params, **self._sorting_params, **self._params, }, ) self._cached_len = ( content["meta"]["count"], time.monotonic() + LENGTH_CACHE_TTL, ) data = content["data"] if not data: raise KeyError(key) assert ( len(data) == 1 ), "The key lookup query must never result more than one result." (item,) = data return self.client_for_item( item, path=self._path + (item["id"],), metadata=item["attributes"]["metadata"], sorting=item["attributes"].get("sorting"), )
[docs] def items(self): # The base implementation would use __iter__ and __getitem__, making # one HTTP request per item. Pull pages instead. next_page_url = self.item["links"]["search"] while next_page_url is not None: content = self._get_json_with_cache( next_page_url, params={ **self._queries_as_params, **self._sorting_params, **self._params, }, ) self._cached_len = ( content["meta"]["count"], time.monotonic() + LENGTH_CACHE_TTL, ) for item in content["data"]: key = item["id"] value = self.client_for_item( item, path=self._path + (item["id"],), metadata=item["attributes"]["metadata"], sorting=item["attributes"].get("sorting"), ) yield key, value next_page_url = content["links"]["next"]
[docs] def values(self): # The base implementation would use __iter__ and __getitem__, making # one HTTP request per item. Pull pages instead. for _, value in self.items(): yield value
# The following three methods are used by IndexersMixin # to define keys_indexer, items_indexer, and values_indexer. def _keys_slice(self, start, stop, direction): if direction > 0: sorting_params = self._sorting_params else: sorting_params = self._reversed_sorting_params assert start >= 0 assert stop >= 0 next_page_url = f"{self.item['links']['search']}?page[offset]={start}" item_counter = itertools.count(start) while next_page_url is not None: content = self._get_json_with_cache( next_page_url, params={ "fields": "", **self._queries_as_params, **sorting_params, **self._params, }, ) self._cached_len = ( content["meta"]["count"], time.monotonic() + LENGTH_CACHE_TTL, ) for item in content["data"]: if stop is not None and next(item_counter) == stop: return yield item["id"] next_page_url = content["links"]["next"] def _items_slice(self, start, stop, direction): if direction > 0: sorting_params = self._sorting_params else: sorting_params = self._reversed_sorting_params assert start >= 0 assert stop >= 0 next_page_url = f"{self.item['links']['search']}?page[offset]={start}" item_counter = itertools.count(start) while next_page_url is not None: content = self._get_json_with_cache( next_page_url, params={ **self._queries_as_params, **sorting_params, **self._params, }, ) self._cached_len = ( content["meta"]["count"], time.monotonic() + LENGTH_CACHE_TTL, ) for item in content["data"]: if stop is not None and next(item_counter) == stop: return key = item["id"] yield key, self.client_for_item( item, path=self._path + (item["id"],), metadata=item["attributes"]["metadata"], sorting=item["attributes"].get("sorting"), ) next_page_url = content["links"]["next"] def _item_by_index(self, index, direction): if direction > 0: sorting_params = self._sorting_params else: sorting_params = self._reversed_sorting_params assert index >= 0 next_page_url = ( f"{self.item['links']['search']}?page[offset]={index}&page[limit]=1" ) content = self._get_json_with_cache( next_page_url, params={ **self._queries_as_params, **sorting_params, **self._params, }, ) self._cached_len = ( content["meta"]["count"], time.monotonic() + LENGTH_CACHE_TTL, ) (item,) = content["data"] key = item["id"] value = self.client_for_item( item, path=self._path + (item["id"],), metadata=item["attributes"]["metadata"], sorting=item["attributes"].get("sorting"), ) return (key, value)
[docs] def search(self, query): """ Make a Catalog with a subset of this Catalog's entries, filtered by query. Examples -------- >>> from tiled.queries import FullText >>> catalog.search(FullText("hello")) """ return self.new_variation( queries=self._queries + [query], )
[docs] def sort(self, sorting): """ Make a Catalog with the same entries but sorted according to `sorting`. Examples -------- Sort by "color" in ascending order, and then by "height" in descending order. >>> from tiled.client import ASCENDING, DESCENDING >>> catalog.sort([("color", ASCENDING), ("height", DESCENDING)]) Note that ``1`` may be used as a synonym for ``ASCENDING``, and ``-1`` may be used as a synonym for ``DESCENDING``. """ return self.new_variation( sorting=sorting, )
def _ipython_key_completions_(self): """ Provide method for the key-autocompletions in IPython. See http://ipython.readthedocs.io/en/stable/config/integrating.html#tab-completion """ MAX_ENTRIES_SUPPORTED = 40 try: if len(self) > MAX_ENTRIES_SUPPORTED: MSG = ( "Tab-completition is not supported on this particular Catalog " "because it has a large number of entries." ) warnings.warn(MSG) return [] else: return list(self) except Exception: # Do not print messy traceback from thread. Just fail silently. return [] def _queries_to_params(*queries): "Compute GET params from the queries." params = collections.defaultdict(list) for query in queries: name = query_type_to_name[type(query)] for field in fields(query): value = getattr(query, field.name) if isinstance(value, (list, tuple)): for item_as_str in map(str, value): if "," in item_as_str: raise ValueError( "Items in list- or tuple-type parameters may not contain commas." ) value = ",".join(map(str, value)) if value is not None: params[f"filter[{name}][condition][{field.name}]"].append(value) return dict(params) class UnknownStructureFamily(KeyError): pass LENGTH_CACHE_TTL = 1 # second
[docs]def from_uri( uri, structure_clients="numpy", *, cache=None, offline=False, username=None, token_cache=DEFAULT_TOKEN_CACHE, special_clients=None, verify=True, ): """ Connect to a Catalog on a local or remote server. Parameters ---------- uri : str e.g. "http://localhost:8000" structure_clients : str or dict, optional Use "dask" for delayed data loading and "numpy" for immediate in-memory structures (e.g. normal numpy arrays, pandas DataFrames). For advanced use, provide dict mapping structure_family names ("array", "dataframe", "variable", "data_array", "dataset") to client objects. See ``Catalog.DEFAULT_STRUCTURE_CLIENT_DISPATCH``. cache : Cache, optional offline : bool, optional False by default. If True, rely on cache only. username : str, optional Username for authenticated access. token_cache : str, optional Path to directory for storing refresh tokens. special_clients : dict, optional Advanced: Map client_type_hint from the server to special client catalog objects. See also ``Catalog.discover_special_clients()`` and ``Catalog.DEFAULT_SPECIAL_CLIENT_DISPATCH``. verify: bool, optional Verify SSL certifications. True by default. False is insecure, intended for development and testing only. """ client, path = client_and_path_from_uri(uri, verify=verify) return from_client( client, structure_clients=structure_clients, username=username, path=path, cache=cache, offline=offline, special_clients=special_clients, token_cache=token_cache, )
[docs]def from_catalog( catalog, authentication=None, server_settings=None, structure_clients="numpy", *, cache=None, offline=False, username=None, special_clients=None, token_cache=DEFAULT_TOKEN_CACHE, ): """ Connect to a Catalog directly, running the app in this same process. NOTE: This is experimental. It may need to be re-designed or even removed. In this configuration, we are using the server, but we are communicating with it directly within this process, not over a local network. It is generally faster. Specifically, we are using HTTP over ASGI rather than HTTP over TCP. There are no sockets or network-related syscalls. Parameters ---------- catalog : Catalog authentication : dict, optional Dict of authentication configuration. username : str, optional Username for authenticated access. structure_clients : str or dict, optional Use "dask" for delayed data loading and "numpy" for immediate in-memory structures (e.g. normal numpy arrays, pandas DataFrames). For advanced use, provide dict mapping structure_family names ("array", "dataframe", "variable", "data_array", "dataset") to client objects. See ``Catalog.DEFAULT_STRUCTURE_CLIENT_DISPATCH``. cache : Cache, optional offline : bool, optional False by default. If True, rely on cache only. special_clients : dict, optional Advanced: Map client_type_hint from the server to special client catalog objects. See also ``Catalog.discover_special_clients()`` and ``Catalog.DEFAULT_SPECIAL_CLIENT_DISPATCH``. token_cache : str, optional Path to directory for storing refresh tokens. """ client = client_from_catalog( catalog=catalog, authentication=authentication, server_settings=server_settings, ) return from_client( client, structure_clients=structure_clients, username=username, # The cache and "offline" mode do not make much sense when we have an # in-process connection, but we support it for the sake of testing and # making direct access a drop in replacement for the normal service. cache=cache, offline=offline, special_clients=special_clients, token_cache=token_cache, )
[docs]def from_client( client, structure_clients="numpy", *, username=None, cache=None, offline=False, path=None, special_clients=None, token_cache=DEFAULT_TOKEN_CACHE, ): """ Advanced: Connect to a Catalog using a custom instance of httpx.Client or httpx.AsyncClient. Parameters ---------- client : httpx.Client Should be pre-configured with a base_url and any auth-related headers. structure_clients : str or dict, optional Use "dask" for delayed data loading and "numpy" for immediate in-memory structures (e.g. normal numpy arrays, pandas DataFrames). For advanced use, provide dict mapping structure_family names ("array", "dataframe", "variable", "data_array", "dataset") to client objects. See ``Catalog.DEFAULT_STRUCTURE_CLIENT_DISPATCH``. username : str, optional Username for authenticated access. cache : Cache, optional offline : bool, optional False by default. If True, rely on cache only. special_clients : dict, optional Advanced: Map client_type_hint from the server to special client catalog objects. See also ``Catalog.discover_special_clients()`` and ``Catalog.DEFAULT_SPECIAL_CLIENT_DISPATCH``. token_cache : str, optional Path to directory for storing refresh tokens. """ # Interpret structure_clients="numpy" and structure_clients="dask" shortcuts. if isinstance(structure_clients, str): structure_clients = Catalog.DEFAULT_STRUCTURE_CLIENT_DISPATCH[structure_clients] path = path or [] # Do entrypoint discovery if it hasn't yet been done. if Catalog.DEFAULT_SPECIAL_CLIENT_DISPATCH is None: Catalog.discover_special_clients() special_clients = collections.ChainMap( special_clients or {}, Catalog.DEFAULT_SPECIAL_CLIENT_DISPATCH, ) if username is not None: reauthenticate_client(client, username, token_cache=token_cache) instance = Catalog( client, item=NEEDS_INITIALIZATION, username=username, offline=offline, path=path, metadata=NEEDS_INITIALIZATION, structure_clients=structure_clients, cache=cache, special_clients=special_clients, root_client_type=Catalog, token_cache=token_cache, ) item = instance.item metadata = item["attributes"]["metadata"] return instance.client_for_item( item, path=path, metadata=metadata, sorting=item["attributes"].get("sorting") )
[docs]def from_profile(name, structure_clients=None, **kwargs): """ Build a Catalog based a 'profile' (a named configuration). List available profiles and the source filepaths from Python like: >>> from tiled.client.profiles import list_profiles >>> list_profiles() or from a CLI like: $ tiled profile list Or show the file contents like: >>> from tiled.client.profiles import load_profiles >>> load_profiles() or from a CLI like: $ tiled profile show PROFILE_NAME Any additional parameters override profile content. See from_uri for details. """ from ..profiles import load_profiles, paths, ProfileNotFound profiles = load_profiles() try: filepath, profile_content = profiles[name] except KeyError as err: raise ProfileNotFound( f"Profile {name!r} not found. Found profiles {list(profiles)} " f"from directories {paths}." ) from err merged = {**profile_content, **kwargs} if structure_clients is not None: merged["structure_clients"] = structure_clients cache_config = merged.pop("cache", None) if cache_config is not None: from tiled.client.cache import Cache if isinstance(cache_config, collections.abc.Mapping): # All necessary validation has already been performed # in load_profiles(). ((key, value),) = cache_config.items() if key == "memory": cache = Cache.in_memory(**value) elif key == "disk": cache = Cache.on_disk(**value) else: # Interpret this as a Cache object passed in directly. cache = cache_config merged["cache"] = cache structure_clients_ = merged.pop("structure_clients", None) if structure_clients_ is not None: if isinstance(structure_clients_, str): # Nothing to do. merged["structure_clients"] = structure_clients_ else: # This is a dict mapping structure families like "array" and "dataframe" # to values. The values may be client objects or importable strings. result = {} for key, value in structure_clients_.items(): if isinstance(value, str): class_ = import_object(value) else: class_ = value result[key] = class_ merged["structure_clients"] = result if "direct" in merged: # The profiles specifies that there is no server. We should create # an app ourselves and use it directly via ASGI. from ..config import construct_serve_catalog_kwargs serve_catalog_kwargs = construct_serve_catalog_kwargs( merged.pop("direct", None), source_filepath=filepath ) return from_catalog(**serve_catalog_kwargs, **merged) else: return from_uri(**merged)
[docs]def from_config(config): """ Build Catalogs directly, running the app in this same process. NOTE: This is experimental. It may need to be re-designed or even removed. Parameters ---------- config : str or dict May be: * Path to config file * Path to directory of config files * Dict of config Examples -------- From config file: >>> from_config("path/to/file.yml") From directory of config files: >>> from_config("path/to/directory") From configuration given directly, as dict: >>> from_config( { "catalogs": [ "path": "/", "catalog": "tiled.files.Catalog.from_files", "args": {"diretory": "path/to/files"} ] } ) """ from ..config import direct_access catalog = direct_access(config) return from_catalog(catalog)
class Ascending(Sentinel): "Intended for more readable sorting operations. An alias for 1." def __index__(self): return 1 class Descending(Sentinel): "Intended for more readable sorting operations. An alias for -1." def __index__(self): return -1 ASCENDING = Ascending("ASCENDING") "Ascending sort order. An alias for 1." DESCENDING = Descending("DESCENDING") "Decending sort order. An alias for -1."