import collections.abc
import copy
import itertools
import operator
import sys
from collections import Counter
from datetime import datetime, timedelta
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union, cast
from fastapi import APIRouter
from ..iterviews import ItemsView, KeysView, ValuesView
from ..queries import (
Comparison,
Contains,
Eq,
FullText,
In,
KeysFilter,
NotEq,
NotIn,
Regex,
SpecsQuery,
StructureFamilyQuery,
)
from ..query_registration import QueryTranslationRegistry
from ..server.schemas import SortingItem
from ..structures.core import Spec, StructureFamily
from ..structures.table import TableStructure
from ..utils import UNCHANGED, Sentinel
from .protocols import AccessPolicy, AnyAdapter
from .type_alliases import JSON
from .utils import IndexersMixin
if sys.version_info < (3, 9):
from typing_extensions import Mapping
MappingType = Mapping
else:
import collections
MappingType = collections.abc.Mapping
[docs]
class MapAdapter(MappingType[str, AnyAdapter], IndexersMixin):
"""
Adapt any mapping (dictionary-like object) to Tiled.
"""
__slots__ = (
"_access_policy",
"_mapping",
"_metadata",
"_sorting",
"_must_revalidate",
"background_tasks",
"entries_stale_after",
"include_routers",
"metadata_stale_after",
"specs",
)
structure_family = StructureFamily.container
# Define classmethods for managing what queries this Adapter knows.
query_registry = QueryTranslationRegistry()
register_query = query_registry.register
register_query_lazy = query_registry.register_lazy
[docs]
def __init__(
self,
mapping: Dict[str, Any],
*,
structure: Optional[TableStructure] = None,
metadata: Optional[JSON] = None,
sorting: Optional[List[SortingItem]] = None,
specs: Optional[List[Spec]] = None,
access_policy: Optional[AccessPolicy] = None,
entries_stale_after: Optional[timedelta] = None,
metadata_stale_after: Optional[timedelta] = None,
must_revalidate: bool = True,
) -> None:
"""
Create a simple Adapter from any mapping (e.g. dict, OneShotCachedMap).
Parameters
----------
mapping : dict-like
metadata : dict, optional
specs : List[str], optional
sorting : List[Tuple[str, int]], optional
specs : List[str], optional
access_policy : AccessPolicy, optional
entries_stale_after: timedelta
This server uses this to communicate to the client how long
it should rely on a local cache before checking back for changes.
metadata_stale_after: timedelta
This server uses this to communicate to the client how long
it should rely on a local cache before checking back for changes.
must_revalidate : bool
Whether the client should strictly refresh stale cache items.
"""
if structure is not None:
raise ValueError(
f"structure is expected to be None for containers, not {structure}"
)
self._mapping = mapping
if sorting is None:
# This is a special case that means, "the given ordering".
# By giving that a name ("_") we enable requests to asking for the
# last N by requesting the sorting ("_", -1).
sorting = [SortingItem(key="_", direction=1)]
self._sorting = sorting
self._metadata = metadata or {}
self.specs = specs or []
self._access_policy = access_policy
self._must_revalidate = must_revalidate
self.include_routers: List[APIRouter] = []
self.background_tasks: List[Any] = []
self.entries_stale_after = entries_stale_after
self.metadata_stale_after = metadata_stale_after
self.specs = specs or []
super().__init__()
@property
def must_revalidate(self) -> bool:
"""
Returns
-------
"""
return self._must_revalidate
@must_revalidate.setter
def must_revalidate(self, value: bool) -> None:
"""
Parameters
----------
value :
Returns
-------
"""
self._must_revalidate = value
@property
def access_policy(self) -> Optional[AccessPolicy]:
"""
Returns
-------
"""
return self._access_policy
@access_policy.setter
def access_policy(self, value: AccessPolicy) -> None:
"""
Parameters
----------
value :
Returns
-------
"""
self._access_policy = value
def metadata(self) -> JSON:
"Metadata about this Adapter."
# Ensure this is immutable (at the top level) to help the user avoid
# getting the wrong impression that editing this would update anything
# persistent.
return self._metadata
@property
def sorting(self) -> List[SortingItem]:
"""
Returns
-------
"""
return list(self._sorting)
def __repr__(self) -> str:
"""
Returns
-------
"""
return (
f"<{type(self).__name__}({{{', '.join(repr(k) for k in self._mapping)}}})>"
)
def __getitem__(self, key: str) -> Any:
"""
Parameters
----------
key :
Returns
-------
"""
return self._mapping[key]
def __iter__(self) -> Iterator[str]:
"""
Returns
-------
"""
yield from self._mapping
def __len__(self) -> int:
"""
Returns
-------
"""
return len(self._mapping)
def keys(self) -> KeysView: # type: ignore
"""
Returns
-------
"""
return KeysView(lambda: len(self), self._keys_slice)
def values(self) -> ValuesView: # type: ignore
"""
Returns
-------
"""
return ValuesView(lambda: len(self), self._items_slice)
def items(self) -> ItemsView: # type: ignore
"""
Returns
-------
"""
return ItemsView(lambda: len(self), self._items_slice)
def structure(self) -> None:
"""
Returns
-------
"""
return None
@property
def metadata_stale_at(self) -> Optional[datetime]:
"""
Returns
-------
"""
if self.metadata_stale_after is None:
return None
return self.metadata_stale_after + datetime.utcnow()
@property
def entries_stale_at(self) -> Optional[datetime]:
"""
Returns
-------
"""
if self.entries_stale_after is None:
return None
return self.entries_stale_after + datetime.utcnow()
def new_variation(
self,
*args: Any,
mapping: Union[Sentinel, Dict[str, Any]] = UNCHANGED,
metadata: Union[Sentinel, JSON] = UNCHANGED,
sorting: Union[Sentinel, List[SortingItem]] = UNCHANGED,
must_revalidate: Union[Sentinel, bool] = UNCHANGED,
**kwargs: Any,
) -> "MapAdapter":
"""
Parameters
----------
args :
mapping :
metadata :
sorting :
must_revalidate :
kwargs :
Returns
-------
"""
if mapping is UNCHANGED:
mapping = self._mapping
if metadata is UNCHANGED:
metadata = self._metadata
if sorting is UNCHANGED:
sorting = self._sorting
if must_revalidate is UNCHANGED:
must_revalidate = self.must_revalidate
return type(self)(
# *args,
mapping=cast(Dict[str, Any], mapping),
sorting=cast(List[SortingItem], sorting),
metadata=cast(JSON, self._metadata),
specs=self.specs,
access_policy=self.access_policy,
entries_stale_after=self.entries_stale_after,
metadata_stale_after=self.entries_stale_after,
must_revalidate=cast(bool, must_revalidate),
**kwargs,
)
def read(self, fields: Optional[str] = None) -> "MapAdapter":
"""
Parameters
----------
fields :
Returns
-------
"""
if fields is not None:
new_mapping = {}
for field in fields:
new_mapping[field] = self._mapping[field]
return self.new_variation(mapping=new_mapping)
return self
def search(self, query: Any) -> Any:
"""
Parameters
----------
query :
Returns
-------
Return a Adapter with a subset of the mapping.
"""
return self.query_registry(query, self)
def get_distinct(
self,
metadata: JSON,
structure_families: StructureFamily,
specs: List[Spec],
counts: int,
) -> Dict[str, Any]:
"""
Parameters
----------
metadata :
structure_families :
specs :
counts :
Returns
-------
"""
data: Dict[str, Any] = {}
# data: dict[str, list[dict[str, Any]]] = {}
if metadata:
data["metadata"] = {}
for metadata_key in metadata:
counter = Counter(
term for key, value, term in iter_child_metadata(metadata_key, self)
)
data["metadata"][metadata_key] = counter_to_dict(counter, counts)
if structure_families:
counter = Counter(value.structure_family for key, value in self.items())
data["structure_families"] = counter_to_dict(counter, counts)
if specs:
counter = Counter(tuple(value.specs) for key, value in self.items())
data["specs"] = counter_to_dict(counter, counts)
return data
def sort(self, sorting: SortingItem) -> "MapAdapter":
"""
Parameters
----------
sorting :
Returns
-------
"""
mapping = copy.copy(self._mapping)
for key, direction in reversed(sorting):
if key == "_":
# Special case to enable reversing the given/default ordering.
# Leave mapping as is, and possibly reserve it below.
pass
else:
mapping = dict(
sorted(
mapping.items(),
key=lambda item: item[1].metadata().get(key, _HIGH_SORTER), # type: ignore
)
)
if direction < 0:
# TODO In Python 3.8 dict items should be reservible
# but I have seen errors in the wild that I could not
# quickly resolve so for now we convert to list in the middle.
to_reverse = list(mapping.items())
mapping = dict(reversed(to_reverse))
return self.new_variation(mapping=mapping, sorting=sorting)
# The following two methods are used by keys(), values(), items().
def _keys_slice(
self, start: int, stop: int, direction: int
) -> Union[Iterator[str], List[str]]:
"""
Parameters
----------
start :
stop :
direction :
Returns
-------
"""
if direction > 0:
yield from itertools.islice(self._mapping.keys(), start, stop)
else:
keys_to_slice = list(
reversed(
list(
itertools.islice(
self._mapping.keys(), 0, len(self._mapping) - start
)
)
)
)
keys = keys_to_slice[start:stop]
return keys
def _items_slice(
self, start: int, stop: int, direction: int
) -> Iterator[Tuple[str, Any]]:
"""
Parameters
----------
start :
stop :
direction :
Returns
-------
"""
# A goal of this implementation is to avoid iterating over
# self._mapping.values() because self._mapping may be a OneShotCachedMap which
# only constructs its values at access time. With this in mind, we
# identify the key(s) of interest and then only access those values.
yield from (
(key, self._mapping[key])
for key in self._keys_slice(start, stop, direction)
)
def walk_string_values(tree: MapAdapter, node: Optional[Any] = None) -> Iterator[str]:
"""
>>> list(
... walk_string_values(
... {'a': {'b': {'c': 'apple', 'd': 'banana'},
... 'e': ['cat', 'dog']}, 'f': 'elephant'}
... )
... )
['apple', 'banana', 'cat', 'dog', 'elephant']
Parameters
----------
tree :
node :
Returns
-------
"""
if node is None:
for node in tree:
yield from walk_string_values(tree, node)
else:
value = tree[node]
if isinstance(value, str):
yield value
elif hasattr(value, "items"):
for k, v in value.items():
yield from walk_string_values(value, k)
elif isinstance(value, collections.abc.Iterable):
for item in value:
if isinstance(item, str):
yield item
def counter_to_dict(counter: Dict[str, Any], counts: Any) -> List[Dict[str, Any]]:
"""
Parameters
----------
counter :
counts :
Returns
-------
"""
if counts:
data = [{"value": k, "count": v} for k, v in counter.items() if k is not None]
else:
data = [{"value": k} for k in counter if k is not None]
return data
def iter_child_metadata(
query_key: Any, tree: MapAdapter
) -> Iterator[Tuple[str, Any, Any]]:
"""
Parameters
----------
query_key :
tree :
Returns
-------
"""
for key, value in tree.items():
term = value.metadata()
for subkey in query_key.split("."):
if subkey not in term:
term = None
break
term = term[subkey]
else:
yield key, value, term
def full_text_search(query: Any, tree: MapAdapter) -> MapAdapter:
"""
Parameters
----------
query :
tree :
Returns
-------
"""
matches = {}
text = query.text
query_words = set(text.split())
for key, value in tree.items():
words = set(
word
for s in walk_string_values(value.metadata())
for word in s.lower().split()
)
# Note that `not set.isdisjoint` is faster than `set.intersection`. At
# the C level, `isdisjoint` loops over the set until it finds one match,
# and then bails, whereas `intersection` proceeds to find all matches.
if not words.isdisjoint(query_words):
matches[key] = value
return tree.new_variation(mapping=matches)
MapAdapter.register_query(FullText, full_text_search)
def regex(query: Any, tree: MapAdapter) -> MapAdapter:
"""
Parameters
----------
query :
tree :
Returns
-------
"""
import re
matches = {}
flags = 0 if query.case_sensitive else re.IGNORECASE
pattern = re.compile(query.pattern, flags=flags)
for key, value, term in iter_child_metadata(query.key, tree):
if isinstance(term, str) and pattern.search(term):
matches[key] = value
return tree.new_variation(mapping=matches)
MapAdapter.register_query(Regex, regex)
def eq(query: Any, tree: MapAdapter) -> MapAdapter:
"""
Parameters
----------
query :
tree :
Returns
-------
"""
matches = {}
for key, value, term in iter_child_metadata(query.key, tree):
if term == query.value:
matches[key] = value
return tree.new_variation(mapping=matches)
MapAdapter.register_query(Eq, eq)
def noteq(query: Any, tree: MapAdapter) -> MapAdapter:
"""
Parameters
----------
query :
tree :
Returns
-------
"""
matches = {}
for key, value, term in iter_child_metadata(query.key, tree):
if term != query.value:
matches[key] = value
return tree.new_variation(mapping=matches)
MapAdapter.register_query(NotEq, noteq)
def contains(query: Any, tree: MapAdapter) -> MapAdapter:
"""
Parameters
----------
query :
tree :
Returns
-------
"""
matches = {}
for key, value, term in iter_child_metadata(query.key, tree):
if (
isinstance(term, collections.abc.Iterable)
and (not isinstance(term, str))
and (query.value in term)
):
matches[key] = value
return tree.new_variation(mapping=matches)
MapAdapter.register_query(Contains, contains)
def comparison(query: Any, tree: MapAdapter) -> MapAdapter:
"""
Parameters
----------
query :
tree :
Returns
-------
"""
matches = {}
for key, value, term in iter_child_metadata(query.key, tree):
if query.operator not in {"le", "lt", "ge", "gt"}:
raise ValueError(f"Unexpected operator {query.operator}.")
comparison_func = getattr(operator, query.operator)
if comparison_func(term, query.value):
matches[key] = value
return tree.new_variation(mapping=matches)
MapAdapter.register_query(Comparison, comparison)
def _in(query: Any, tree: MapAdapter) -> MapAdapter:
"""
Parameters
----------
query :
tree :
Returns
-------
"""
matches = {}
for key, value, term in iter_child_metadata(query.key, tree):
if term in query.value:
matches[key] = value
return tree.new_variation(mapping=matches)
MapAdapter.register_query(In, _in)
def notin(query: Any, tree: MapAdapter) -> MapAdapter:
"""
Parameters
----------
query :
tree :
Returns
-------
"""
matches = {}
for key, value, term in iter_child_metadata(query.key, tree):
if term not in query.value:
matches[key] = value
return tree.new_variation(mapping=matches)
MapAdapter.register_query(NotIn, notin)
def specs(query: Any, tree: MapAdapter) -> MapAdapter:
"""
Parameters
----------
query :
tree :
Returns
-------
"""
matches = {}
include = set(query.include)
exclude = set(query.exclude)
for key, value in tree.items():
specs = set(value.specs)
if include.issubset(specs) and exclude.isdisjoint(specs):
matches[key] = value
return tree.new_variation(mapping=matches)
MapAdapter.register_query(SpecsQuery, specs)
def structure_family(query: Any, tree: MapAdapter) -> MapAdapter:
"""
Parameters
----------
query :
tree :
Returns
-------
"""
matches = {}
for key, value in tree.items():
if value.structure_family == query.value:
matches[key] = value
return tree.new_variation(mapping=matches)
MapAdapter.register_query(StructureFamilyQuery, structure_family)
def keys_filter(query: Any, tree: MapAdapter) -> MapAdapter:
"""
Parameters
----------
query :
tree :
Returns
-------
"""
matches = {}
for key, value in tree.items():
if key in query.keys:
matches[key] = value
return tree.new_variation(mapping=matches)
MapAdapter.register_query(KeysFilter, keys_filter)
class _HIGH_SORTER_CLASS:
"""
Enables sort to work when metadata is sparse
"""
def __lt__(self, other: "_HIGH_SORTER_CLASS") -> bool:
"""
Parameters
----------
other :
Returns
-------
"""
return False
def __gt__(self, other: "_HIGH_SORTER_CLASS") -> bool:
"""
Parameters
----------
other :
Returns
-------
"""
return True
_HIGH_SORTER = _HIGH_SORTER_CLASS()