Source code for tiled.adapters.mapping

import collections.abc
import copy
import itertools
import operator
import sys
from collections import Counter
from datetime import datetime, timedelta
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union, cast

from fastapi import APIRouter

from ..iterviews import ItemsView, KeysView, ValuesView
from ..queries import (
    Comparison,
    Contains,
    Eq,
    FullText,
    In,
    KeysFilter,
    NotEq,
    NotIn,
    Regex,
    SpecsQuery,
    StructureFamilyQuery,
)
from ..query_registration import QueryTranslationRegistry
from ..server.schemas import SortingItem
from ..structures.core import Spec, StructureFamily
from ..structures.table import TableStructure
from ..utils import UNCHANGED, Sentinel
from .protocols import AccessPolicy, AnyAdapter
from .type_alliases import JSON
from .utils import IndexersMixin

if sys.version_info < (3, 9):
    from typing_extensions import Mapping

    MappingType = Mapping
else:
    import collections

    MappingType = collections.abc.Mapping


[docs] class MapAdapter(MappingType[str, AnyAdapter], IndexersMixin): """ Adapt any mapping (dictionary-like object) to Tiled. """ __slots__ = ( "_access_policy", "_mapping", "_metadata", "_sorting", "_must_revalidate", "background_tasks", "entries_stale_after", "include_routers", "metadata_stale_after", "specs", ) structure_family = StructureFamily.container # Define classmethods for managing what queries this Adapter knows. query_registry = QueryTranslationRegistry() register_query = query_registry.register register_query_lazy = query_registry.register_lazy
[docs] def __init__( self, mapping: Dict[str, Any], *, structure: Optional[TableStructure] = None, metadata: Optional[JSON] = None, sorting: Optional[List[SortingItem]] = None, specs: Optional[List[Spec]] = None, access_policy: Optional[AccessPolicy] = None, entries_stale_after: Optional[timedelta] = None, metadata_stale_after: Optional[timedelta] = None, must_revalidate: bool = True, ) -> None: """ Create a simple Adapter from any mapping (e.g. dict, OneShotCachedMap). Parameters ---------- mapping : dict-like metadata : dict, optional specs : List[str], optional sorting : List[Tuple[str, int]], optional specs : List[str], optional access_policy : AccessPolicy, optional entries_stale_after: timedelta This server uses this to communicate to the client how long it should rely on a local cache before checking back for changes. metadata_stale_after: timedelta This server uses this to communicate to the client how long it should rely on a local cache before checking back for changes. must_revalidate : bool Whether the client should strictly refresh stale cache items. """ if structure is not None: raise ValueError( f"structure is expected to be None for containers, not {structure}" ) self._mapping = mapping if sorting is None: # This is a special case that means, "the given ordering". # By giving that a name ("_") we enable requests to asking for the # last N by requesting the sorting ("_", -1). sorting = [SortingItem(key="_", direction=1)] self._sorting = sorting self._metadata = metadata or {} self.specs = specs or [] self._access_policy = access_policy self._must_revalidate = must_revalidate self.include_routers: List[APIRouter] = [] self.background_tasks: List[Any] = [] self.entries_stale_after = entries_stale_after self.metadata_stale_after = metadata_stale_after self.specs = specs or [] super().__init__()
@property def must_revalidate(self) -> bool: """ Returns ------- """ return self._must_revalidate @must_revalidate.setter def must_revalidate(self, value: bool) -> None: """ Parameters ---------- value : Returns ------- """ self._must_revalidate = value @property def access_policy(self) -> Optional[AccessPolicy]: """ Returns ------- """ return self._access_policy @access_policy.setter def access_policy(self, value: AccessPolicy) -> None: """ Parameters ---------- value : Returns ------- """ self._access_policy = value def metadata(self) -> JSON: "Metadata about this Adapter." # Ensure this is immutable (at the top level) to help the user avoid # getting the wrong impression that editing this would update anything # persistent. return self._metadata @property def sorting(self) -> List[SortingItem]: """ Returns ------- """ return list(self._sorting) def __repr__(self) -> str: """ Returns ------- """ return ( f"<{type(self).__name__}({{{', '.join(repr(k) for k in self._mapping)}}})>" ) def __getitem__(self, key: str) -> Any: """ Parameters ---------- key : Returns ------- """ return self._mapping[key] def __iter__(self) -> Iterator[str]: """ Returns ------- """ yield from self._mapping def __len__(self) -> int: """ Returns ------- """ return len(self._mapping) def keys(self) -> KeysView: # type: ignore """ Returns ------- """ return KeysView(lambda: len(self), self._keys_slice) def values(self) -> ValuesView: # type: ignore """ Returns ------- """ return ValuesView(lambda: len(self), self._items_slice) def items(self) -> ItemsView: # type: ignore """ Returns ------- """ return ItemsView(lambda: len(self), self._items_slice) def structure(self) -> None: """ Returns ------- """ return None @property def metadata_stale_at(self) -> Optional[datetime]: """ Returns ------- """ if self.metadata_stale_after is None: return None return self.metadata_stale_after + datetime.utcnow() @property def entries_stale_at(self) -> Optional[datetime]: """ Returns ------- """ if self.entries_stale_after is None: return None return self.entries_stale_after + datetime.utcnow() def new_variation( self, *args: Any, mapping: Union[Sentinel, Dict[str, Any]] = UNCHANGED, metadata: Union[Sentinel, JSON] = UNCHANGED, sorting: Union[Sentinel, List[SortingItem]] = UNCHANGED, must_revalidate: Union[Sentinel, bool] = UNCHANGED, **kwargs: Any, ) -> "MapAdapter": """ Parameters ---------- args : mapping : metadata : sorting : must_revalidate : kwargs : Returns ------- """ if mapping is UNCHANGED: mapping = self._mapping if metadata is UNCHANGED: metadata = self._metadata if sorting is UNCHANGED: sorting = self._sorting if must_revalidate is UNCHANGED: must_revalidate = self.must_revalidate return type(self)( # *args, mapping=cast(Dict[str, Any], mapping), sorting=cast(List[SortingItem], sorting), metadata=cast(JSON, self._metadata), specs=self.specs, access_policy=self.access_policy, entries_stale_after=self.entries_stale_after, metadata_stale_after=self.entries_stale_after, must_revalidate=cast(bool, must_revalidate), **kwargs, ) def read(self, fields: Optional[str] = None) -> "MapAdapter": """ Parameters ---------- fields : Returns ------- """ if fields is not None: new_mapping = {} for field in fields: new_mapping[field] = self._mapping[field] return self.new_variation(mapping=new_mapping) return self def search(self, query: Any) -> Any: """ Parameters ---------- query : Returns ------- Return a Adapter with a subset of the mapping. """ return self.query_registry(query, self) def get_distinct( self, metadata: JSON, structure_families: StructureFamily, specs: List[Spec], counts: int, ) -> Dict[str, Any]: """ Parameters ---------- metadata : structure_families : specs : counts : Returns ------- """ data: Dict[str, Any] = {} # data: dict[str, list[dict[str, Any]]] = {} if metadata: data["metadata"] = {} for metadata_key in metadata: counter = Counter( term for key, value, term in iter_child_metadata(metadata_key, self) ) data["metadata"][metadata_key] = counter_to_dict(counter, counts) if structure_families: counter = Counter(value.structure_family for key, value in self.items()) data["structure_families"] = counter_to_dict(counter, counts) if specs: counter = Counter(tuple(value.specs) for key, value in self.items()) data["specs"] = counter_to_dict(counter, counts) return data def sort(self, sorting: SortingItem) -> "MapAdapter": """ Parameters ---------- sorting : Returns ------- """ mapping = copy.copy(self._mapping) for key, direction in reversed(sorting): if key == "_": # Special case to enable reversing the given/default ordering. # Leave mapping as is, and possibly reserve it below. pass else: mapping = dict( sorted( mapping.items(), key=lambda item: item[1].metadata().get(key, _HIGH_SORTER), # type: ignore ) ) if direction < 0: # TODO In Python 3.8 dict items should be reservible # but I have seen errors in the wild that I could not # quickly resolve so for now we convert to list in the middle. to_reverse = list(mapping.items()) mapping = dict(reversed(to_reverse)) return self.new_variation(mapping=mapping, sorting=sorting) # The following two methods are used by keys(), values(), items(). def _keys_slice( self, start: int, stop: int, direction: int ) -> Union[Iterator[str], List[str]]: """ Parameters ---------- start : stop : direction : Returns ------- """ if direction > 0: yield from itertools.islice(self._mapping.keys(), start, stop) else: keys_to_slice = list( reversed( list( itertools.islice( self._mapping.keys(), 0, len(self._mapping) - start ) ) ) ) keys = keys_to_slice[start:stop] return keys def _items_slice( self, start: int, stop: int, direction: int ) -> Iterator[Tuple[str, Any]]: """ Parameters ---------- start : stop : direction : Returns ------- """ # A goal of this implementation is to avoid iterating over # self._mapping.values() because self._mapping may be a OneShotCachedMap which # only constructs its values at access time. With this in mind, we # identify the key(s) of interest and then only access those values. yield from ( (key, self._mapping[key]) for key in self._keys_slice(start, stop, direction) )
def walk_string_values(tree: MapAdapter, node: Optional[Any] = None) -> Iterator[str]: """ >>> list( ... walk_string_values( ... {'a': {'b': {'c': 'apple', 'd': 'banana'}, ... 'e': ['cat', 'dog']}, 'f': 'elephant'} ... ) ... ) ['apple', 'banana', 'cat', 'dog', 'elephant'] Parameters ---------- tree : node : Returns ------- """ if node is None: for node in tree: yield from walk_string_values(tree, node) else: value = tree[node] if isinstance(value, str): yield value elif hasattr(value, "items"): for k, v in value.items(): yield from walk_string_values(value, k) elif isinstance(value, collections.abc.Iterable): for item in value: if isinstance(item, str): yield item def counter_to_dict(counter: Dict[str, Any], counts: Any) -> List[Dict[str, Any]]: """ Parameters ---------- counter : counts : Returns ------- """ if counts: data = [{"value": k, "count": v} for k, v in counter.items() if k is not None] else: data = [{"value": k} for k in counter if k is not None] return data def iter_child_metadata( query_key: Any, tree: MapAdapter ) -> Iterator[Tuple[str, Any, Any]]: """ Parameters ---------- query_key : tree : Returns ------- """ for key, value in tree.items(): term = value.metadata() for subkey in query_key.split("."): if subkey not in term: term = None break term = term[subkey] else: yield key, value, term def full_text_search(query: Any, tree: MapAdapter) -> MapAdapter: """ Parameters ---------- query : tree : Returns ------- """ matches = {} text = query.text query_words = set(text.split()) for key, value in tree.items(): words = set( word for s in walk_string_values(value.metadata()) for word in s.lower().split() ) # Note that `not set.isdisjoint` is faster than `set.intersection`. At # the C level, `isdisjoint` loops over the set until it finds one match, # and then bails, whereas `intersection` proceeds to find all matches. if not words.isdisjoint(query_words): matches[key] = value return tree.new_variation(mapping=matches) MapAdapter.register_query(FullText, full_text_search) def regex(query: Any, tree: MapAdapter) -> MapAdapter: """ Parameters ---------- query : tree : Returns ------- """ import re matches = {} flags = 0 if query.case_sensitive else re.IGNORECASE pattern = re.compile(query.pattern, flags=flags) for key, value, term in iter_child_metadata(query.key, tree): if isinstance(term, str) and pattern.search(term): matches[key] = value return tree.new_variation(mapping=matches) MapAdapter.register_query(Regex, regex) def eq(query: Any, tree: MapAdapter) -> MapAdapter: """ Parameters ---------- query : tree : Returns ------- """ matches = {} for key, value, term in iter_child_metadata(query.key, tree): if term == query.value: matches[key] = value return tree.new_variation(mapping=matches) MapAdapter.register_query(Eq, eq) def noteq(query: Any, tree: MapAdapter) -> MapAdapter: """ Parameters ---------- query : tree : Returns ------- """ matches = {} for key, value, term in iter_child_metadata(query.key, tree): if term != query.value: matches[key] = value return tree.new_variation(mapping=matches) MapAdapter.register_query(NotEq, noteq) def contains(query: Any, tree: MapAdapter) -> MapAdapter: """ Parameters ---------- query : tree : Returns ------- """ matches = {} for key, value, term in iter_child_metadata(query.key, tree): if ( isinstance(term, collections.abc.Iterable) and (not isinstance(term, str)) and (query.value in term) ): matches[key] = value return tree.new_variation(mapping=matches) MapAdapter.register_query(Contains, contains) def comparison(query: Any, tree: MapAdapter) -> MapAdapter: """ Parameters ---------- query : tree : Returns ------- """ matches = {} for key, value, term in iter_child_metadata(query.key, tree): if query.operator not in {"le", "lt", "ge", "gt"}: raise ValueError(f"Unexpected operator {query.operator}.") comparison_func = getattr(operator, query.operator) if comparison_func(term, query.value): matches[key] = value return tree.new_variation(mapping=matches) MapAdapter.register_query(Comparison, comparison) def _in(query: Any, tree: MapAdapter) -> MapAdapter: """ Parameters ---------- query : tree : Returns ------- """ matches = {} for key, value, term in iter_child_metadata(query.key, tree): if term in query.value: matches[key] = value return tree.new_variation(mapping=matches) MapAdapter.register_query(In, _in) def notin(query: Any, tree: MapAdapter) -> MapAdapter: """ Parameters ---------- query : tree : Returns ------- """ matches = {} for key, value, term in iter_child_metadata(query.key, tree): if term not in query.value: matches[key] = value return tree.new_variation(mapping=matches) MapAdapter.register_query(NotIn, notin) def specs(query: Any, tree: MapAdapter) -> MapAdapter: """ Parameters ---------- query : tree : Returns ------- """ matches = {} include = set(query.include) exclude = set(query.exclude) for key, value in tree.items(): specs = set(value.specs) if include.issubset(specs) and exclude.isdisjoint(specs): matches[key] = value return tree.new_variation(mapping=matches) MapAdapter.register_query(SpecsQuery, specs) def structure_family(query: Any, tree: MapAdapter) -> MapAdapter: """ Parameters ---------- query : tree : Returns ------- """ matches = {} for key, value in tree.items(): if value.structure_family == query.value: matches[key] = value return tree.new_variation(mapping=matches) MapAdapter.register_query(StructureFamilyQuery, structure_family) def keys_filter(query: Any, tree: MapAdapter) -> MapAdapter: """ Parameters ---------- query : tree : Returns ------- """ matches = {} for key, value in tree.items(): if key in query.keys: matches[key] = value return tree.new_variation(mapping=matches) MapAdapter.register_query(KeysFilter, keys_filter) class _HIGH_SORTER_CLASS: """ Enables sort to work when metadata is sparse """ def __lt__(self, other: "_HIGH_SORTER_CLASS") -> bool: """ Parameters ---------- other : Returns ------- """ return False def __gt__(self, other: "_HIGH_SORTER_CLASS") -> bool: """ Parameters ---------- other : Returns ------- """ return True _HIGH_SORTER = _HIGH_SORTER_CLASS()