"""
This module includes objects inspired by https://github.com/dask/cachey/
We opted for an independent implementation because reusing cachey would have required:
* An invasive subclass that could be a bit fragile
* And also composition in order to get the public API we want
* Carrying around some complexity/features that we do not use here
The original cachey license (which, like Tiled's, is 3-clause BSD) is included in
the same source directory as this module.
"""
import collections.abc
import enum
import functools
import hashlib
import threading
import time
import types
import warnings
from collections import defaultdict
from datetime import datetime, timedelta
from math import log
from pathlib import Path
from urllib.parse import quote_plus
from heapdict import heapdict
from httpx import Headers
if __debug__:
from .utils import logger
class Revalidate(enum.Enum):
FORCE = "force"
IF_EXPIRED = "if_expired"
IF_WE_MUST = "if_we_must"
def __call__(member):
if isinstance(member, str):
member = member.lower()
return super().__call__(member)
class WhenFull(enum.Enum):
EVICT = "evict"
ERROR = "error"
WARN = "warn"
def __call__(member):
if isinstance(member, str):
member = member.lower()
return super().__call__(member)
class UrlItem(
collections.namedtuple(
"UrlItem",
[
"size",
"media_type",
"encoding",
"etag",
"must_revalidate",
"expires",
],
)
):
"""
An item in the cache mapping URLs to ETags (with other Headers info)
"""
@classmethod
def from_headers(cls, headers):
expires_str = headers.get("expires")
if expires_str is not None:
expires = datetime.strptime(expires_str, HTTP_EXPIRES_HEADER_FORMAT)
else:
expires = None
return cls(
size=int(headers["content-length"]),
media_type=headers["content-type"],
encoding=None,
# Record encoding when we start writing compressed data.
# We currently always write it uncompressed, so the encoding is None.
# encoding=headers.get("content-encoding"),
etag=headers["etag"],
must_revalidate="must-revalidate" in headers.get("cache-control", ""),
expires=expires,
)
@classmethod
def from_text(cls, text):
headers = Headers()
for line in text.splitlines():
k, v = line.split(": ", 1)
headers[k] = v
return cls.from_headers(headers)
def to_text(self):
headers = {
"content-length": str(self.size),
"content-type": self.media_type,
"etag": self.etag,
}
if self.must_revalidate:
headers["cache-control"] = "must-revalidate"
if self.expires is not None:
headers["expires"] = self.expires.strftime(HTTP_EXPIRES_HEADER_FORMAT)
if self.encoding is not None:
headers["content-encoding"] = self.encoding
return "\n".join(f"{k}: {v}" for k, v in headers.items())
[docs]def download(*entries):
"""
Download a local cache for Tiled so access is fast and can work offline.
Parameters
----------
*entries : Node(s) or structure client(s)
Examples
--------
Connect a tree and download it in its entirety.
>>> from tiled.client import from_uri
>>> from tiled.client.cache import download, Cache
>>> client = from_uri("http://...", cache=Cache.on_disk("path/to/directory"))
>>> download(client)
Alternatively ,this can be done from the commandline via the tiled CLI:
$ tiled download "http://..." my_cache_direcotry
Use the local copy for faster data access. Tiled will connect to server just
to verify that the local copy is current, and only download data if there have
been changes to the copy of the server.
>>> from tiled.client import from_uri
>>> from tiled.client.cache import Cache
>>> client = from_uri("http://...")
>>> client = from_uri("http://...", cache=Cache.on_disk("my_cache_directory"))
If network is unavailable or very slow, rely on the local copy entirely. Tiled
will not connect to the server. (Note that you still need to provide a URL,
but it is only used to contruct the names of files in the local directory.)
>>> from tiled.client import from_uri
>>> from tiled.client.cache import Cache
>>> client = from_uri("http://...", cache=Cache.on_disk("my_cache_directory"), offline=True)
"""
for entry in entries:
entry.download()
# Protect explicitly downloaded data from being silently evicted.
entry.context.cache.when_full = WhenFull.ERROR
# This is a silly time format, but it is the HTTP standard.
HTTP_EXPIRES_HEADER_FORMAT = "%a, %d %b %Y %H:%M:%S GMT"
UNIT_SECOND = timedelta(seconds=1)
ZERO_SECONDS = timedelta(seconds=0)
def _round_seconds(dt):
return round(dt / UNIT_SECOND)
class Reservation:
"""
This represents a reservation on a cached piece of content.
The content will not be evicted from the cache or updated
until the content is loaded using `load_content()` or released
using `ensure_released()`.
"""
def __init__(self, url, item, renew, lock, load_content):
self.url = url
self.item = item
self._renew = renew
self._lock = lock
self._lock_held = True
self._load_content = load_content
lock.acquire()
def load_content(self):
"Return the content and release the reservation."
start = time.perf_counter()
content = self._load_content()
duration = 1000 * (time.perf_counter() - start) # units: ms
self._lock.release()
if __debug__:
# Use _ for thousands separator in bytes.
logger.debug(
"Cache read (%s B in %.1f ms) %s",
f"{len(content):_}",
duration,
self.url,
)
return content
def is_stale(self):
if self.item.expires is None:
stale = True
else:
time_remaining = datetime.utcnow() - self.item.expires
stale = time_remaining > ZERO_SECONDS
return stale
def renew(self, expires):
self._renew(expires=expires)
def ensure_released(self):
"Release the reservation. This is idempotent."
import locket
if self._lock_held:
try:
self._lock.release()
# TODO Investigate why this is sometimes released twice.
except (
locket.LockError, # for locket 1.0.0
AttributeError, # for locket <1.0.0
RuntimeError, # for locket <1.0.0
):
pass
self._lock_held = False
[docs]class Cache:
"""
A client-side cache of data from the server.
The __init__ is to be used internally and by authors of custom caches.
See ``Cache.in_memory()`` and ``Cache.on_disk()`` for user-facing methods.
This is used by the function ``tiled.client.utils.get_content_with_cache``.
"""
[docs] @classmethod
def in_memory(cls, capacity, *, scorer=None):
"""
An in-memory cache of data from the server
This is useful to ensure that data is not downloaded repeatedly
unless it has been updated since the last download.
Because it is in memory, it only applies to a given Python process,
i.e. a given working session. See ``Cache.on_disk()`` for a
cache that can be shared across process and persistent for future
sessions.
Parameters
----------
capacity : integer
e.g. 2e9 to use up to 2 GB of RAM
scorer : Scorer
Determines which items to evict from the cache when it grows full.
See tiled.client.cache.Scorer for example.
"""
return cls(
capacity,
url_to_headers_cache={},
etag_to_content_cache={},
global_lock=threading.Lock(),
lock_factory=lambda etag: threading.Lock(),
state=types.SimpleNamespace(),
scorer=scorer,
)
[docs] @classmethod
def on_disk(
cls,
path,
capacity=None,
*,
scorer=None,
):
"""
An on-disk cache of data from the server
This is useful to ensure that data is not downloaded repeatedly
unless it has been updated since the last download.
This uses file-based locking to ensure consistency when the cache
is shared by multiple processes.
Parameters
----------
path : Path or str
A directory will be created at this path if it does not yet exist.
It is safe to reuse an existing cache directory and to share a cache
directory between multiple processes.
capacity : integer, optional
e.g. 2e9 to use up to 2 GB of disk space. If None, this will consume
up to (X - 1 GB) where X is the free space remaining on the volume
containing `path`.
scorer : Scorer
Determines which items to evict from the cache when it grows full.
See tiled.client.cache.Scorer for example.
"""
import locket
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
if capacity is None:
# By default, use (X - 1 GB) where X is the current free space
# on the volume containing `path`.
import shutil
capacity = shutil.disk_usage(path).free - 1e9
etag_to_content_cache = FileBasedCache(path / "etag_to_content_cache")
instance = cls(
capacity,
url_to_headers_cache=FileBasedUrlCache(path / "url_to_headers_cache"),
etag_to_content_cache=etag_to_content_cache,
global_lock=locket.lock_file(path / "global.lock"),
lock_factory=lambda etag: locket.lock_file(
path / "etag_to_content_cache" / f"{etag}.lock"
),
state=OnDiskState(path),
scorer=scorer,
)
return instance
[docs] def __init__(
self,
capacity,
*,
url_to_headers_cache,
etag_to_content_cache,
global_lock,
lock_factory,
state,
scorer=None,
):
"""
Parameters
----------
capacity : int
The number of bytes of data to keep in the cache
url_to_headers_cache : MutableMapping
Dict-like object to use for cache
etag_to_content_cache : MutableMapping
Dict-like object to use for cache
global_lock : Lock
A lock used for the url_to_headers_cache
lock_factory : callable
Expected signature: ``f(etag) -> Lock``
state : object
Namespace used for storing general-purpose cache state
scorer: Scorer, optional
A Scorer object that controls how we decide what to retire when space
is low.
"""
if scorer is None:
scorer = Scorer(halflife=1000)
self.scorer = scorer
self._state = state
if not hasattr(state, "when_full"):
state.when_full = b"evict"
self.capacity = capacity
self.heap = heapdict()
self.nbytes = dict()
self.total_bytes = 0
self.url_to_headers_cache = url_to_headers_cache
self.etag_to_content_cache = etag_to_content_cache
self.etag_refcount = defaultdict(lambda: 0)
self.etag_lock = LockDict.from_lock_factory(lock_factory)
self.url_to_headers_lock = global_lock
# If the cache has data in it, initialize the internal caches.
for etag in etag_to_content_cache:
# This tells us the content size without actually reading in the data.
nbytes = etag_to_content_cache.sizeof(etag)
score = self.scorer.touch(etag, nbytes)
self.heap[etag] = score
self.nbytes[etag] = nbytes
self.total_bytes += nbytes
@property
def when_full(self):
"""
Controls what happens the cache is at capacity
* EVICT - Make a cost/benefit judgement about what (if anything) to evict to make room.
* ERROR - Raise an error.
* WARN - Leave the cache content alone and warn that nothing new will be cached.
This is not recommended.
"""
return WhenFull(self._state.when_full.decode())
@when_full.setter
def when_full(self, value):
self._state.when_full = WhenFull(value).value.encode()
def renew(self, url, etag, expires):
cache_key = tokenize_url(url)
if expires is None:
# Do not renew.
return
with self.url_to_headers_lock:
item = self.url_to_headers_cache[cache_key]
assert item.etag == etag
# TO DO We end up going str -> datetime -> str here.
# It may be worth adding a fast path.
expires_dt = datetime.strptime(expires, HTTP_EXPIRES_HEADER_FORMAT)
updated_item = item._replace(expires=expires_dt)
self.url_to_headers_cache[cache_key] = updated_item
if __debug__:
logger.debug(
"Cache renewed %s for %d secs.",
url,
_round_seconds(expires_dt - datetime.utcnow()),
)
def put(self, url, headers, content):
cache_key = tokenize_url(url)
cached = self.url_to_headers_cache.get(cache_key)
if cached:
previous_etag = cached.etag
self.etag_refcount[previous_etag] -= 1
if self.etag_refcount[previous_etag] == 0:
# All URLs that referred to this content have since
# changed their ETags, so we can forget about this content.
self.retire(previous_etag)
item = UrlItem.from_headers(headers)
start = time.perf_counter()
with self.url_to_headers_lock:
self.url_to_headers_cache[cache_key] = item
nbytes = self._put_content(item.etag, content)
duration = 1000 * (time.perf_counter() - start) # units: ms
if __debug__:
if nbytes:
if item.expires is not None:
logger.debug(
"Cache stored (%s B in %.1f ms) %s. Stale in %d secs.",
f"{nbytes:_}", # Use _ for thousands separator.
duration,
url,
_round_seconds(item.expires - datetime.utcnow()),
)
else:
logger.debug(
"Cache stored (%s B in %.1f ms) %s. Immediately stale.",
f"{nbytes:_}", # Use _ for thousands separator.
duration,
url,
)
else:
logger.debug(
"Cache delined to store %s given its cost/benefit score.", url
)
def _put_content(self, etag, content):
nbytes = len(content)
if nbytes > self.capacity:
msg = (
f"A single item of size {nbytes} is too large for the "
f"cache of capacity {self.capacity}."
)
if self.when_full == WhenFull.ERROR:
raise TooLargeForCache(msg)
elif self.when_full == WhenFull.WARN:
warnings.warn(msg)
# If we are set to EVICT, just do not bother storing items too large for the cache.
return
if nbytes + self.total_bytes > self.capacity:
if self.when_full == WhenFull.ERROR:
raise CacheIsFull(
f"""All {self.capacity} bytes of the cache's capacity are used. Options:
1. Set larger capacity (and if necessary a different storage volume with more room).
2. Choose a smaller set of entries to cache.
3. Allow the cache to evict items that do not fit by setting cache.when_full = "evict".
4. Let the cache sit as it is, keeping the items it has but not adding any more,
This is not recommended but it is doable by setting cache.when_full = "warn"."""
)
elif self.when_full == WhenFull.WARN:
warnings.warn(
f"""All {self.capacity} bytes of the cache's capacity are used. Options:
1. Set larger capacity (and if necessary a different storage volume with more room).
2. Choose a smaller set of entries to cache.
3. Allow the cache to evict items that do not fit by setting cache.when_full = "evict".
4. Let the cache sit as it is, keeping the items it has but not adding any more,
This is not recommended but it is doable by setting cache.when_full = "warn"."""
)
return
# If we reach here, either the data fits or we are going to
# make a cost/benefit assessment of whether to evict things to make room for it.
score = self.scorer.touch(etag, nbytes)
if (
nbytes + self.total_bytes < self.capacity
or not self.heap
or score > self.heap.peekitem()[1]
):
self.shrink(target=self.capacity - nbytes)
self.etag_to_content_cache[etag] = content
self.heap[etag] = score
self.nbytes[etag] = nbytes
self.total_bytes += nbytes
return nbytes
def get_reservation(self, url):
# Hold the global lock.
with self.url_to_headers_lock:
cached = self.url_to_headers_cache.get(tokenize_url(url))
if cached is None:
# We have nothing for this URL.
return None
# Acquire a targeted lock, and then release and the global lock.
lock = self.etag_lock[cached.etag]
return Reservation(
url,
cached,
functools.partial(self.renew, url, cached.etag),
lock,
functools.partial(self._get_content_for_etag, cached.etag),
)
def _get_content_for_etag(self, etag):
try:
content = self.etag_to_content_cache[etag]
# Access this item increases its score.
score = self.scorer.touch(etag, len(content))
self.heap[etag] = score
return content
except KeyError:
return None
def retire(self, etag):
"""Retire/remove a etag from the cache
See Also:
shrink
"""
lock = self.etag_lock[etag]
with lock:
self.etag_to_content_cache.pop(etag)
self.total_bytes -= self.nbytes.pop(etag)
self.etag_lock.pop(etag)
def _shrink_one(self):
if self.heap.heap:
# Retire the lowest-priority item that isn't locked.
for score, etag, _ in self.heap.heap:
lock = self.etag_lock[etag]
if lock.acquire(blocking=False):
try:
self.heap.pop(etag)
self.etag_to_content_cache.pop(etag)
nbytes = self.nbytes.pop(etag)
self.total_bytes -= nbytes
self.etag_lock.pop(etag)
finally:
lock.release()
break
if __debug__:
logger.debug("Cache evicted %s B with ETag %s", f"{nbytes:_}", etag)
def resize(self, capacity):
"""Resize the cache.
Will fit the cache into capacity by calling `shrink()`.
"""
self.capacity = capacity
self.shrink()
def shrink(self, target=None):
"""Retire keys from the cache until we're under bytes budget
See Also:
retire
"""
if target is None:
target = self.capacity
if self.total_bytes <= target:
return
while self.total_bytes > target:
self._shrink_one()
def clear(self):
while self.etag_to_content_cache:
self._shrink_one()
[docs]class Scorer:
"""
Object to track scores of cache
Prefers items that have the following properties:
1. Expensive to download (bytes)
3. Frequently used
4. Recently used
This object tracks both stated costs of keys and a separate score related
to how frequently/recently they have been accessed. It uses these to to
provide a score for the key used by the ``Cache`` object, which is the main
usable object.
Examples
--------
>>> s = Scorer(halflife=10)
>>> s.touch('x', cost=2) # score is similar to cost
2
>>> s.touch('x') # scores increase on every touch
4.138629436111989
"""
[docs] def __init__(self, halflife):
self.cost = dict()
self.time = defaultdict(lambda: 0)
self._base_multiplier = 1 + log(2) / float(halflife)
self.tick = 1
self._base = 1
def touch(self, key, cost=None):
"""Update score for key
Provide a cost the first time and optionally thereafter.
"""
time = self._base
if cost is not None:
self.cost[key] = cost
self.time[key] += self._base
time = self.time[key]
else:
try:
cost = self.cost[key]
self.time[key] += self._base
time = self.time[key]
except KeyError:
return
self._base *= self._base_multiplier
return cost * time
def tokenize_url(url):
"""
>>> tokenize_url(httpx_URL)
("urlsafe_host_and_port", "md5_hash_of_the_rest")
"""
url_as_tuple = url.raw
address = url_as_tuple[1].decode()
if url_as_tuple[2]:
address += f":{url_as_tuple[2]}"
return (quote_plus(address), hashlib.md5(b"".join(url_as_tuple[3:])).hexdigest())
class OnDiskState:
def __init__(self, directory):
self._directory = Path(directory)
self._ttl_cache = {}
if self._directory.exists():
import packaging.version
version_filepath = self._directory / "client_library_version"
if version_filepath.is_file():
with open(version_filepath, "rt") as file:
# Check to see if this cache has an internal layout that we
# understand.
version = packaging.version.parse(file.read())
if version <= packaging.version.parse("0.1.0a43"):
raise ValueError(
f"Cache directory {directory} was used by an older version of Tiled. "
"It cannot be used by this version. Choose a different directory, "
"or delete all the cached data in that directory. (In the future "
"Tiled caching will be backward compatible, but not during alpha.)"
)
self._directory.mkdir(parents=True, exist_ok=True)
# Record the version which may be useful to future code that needs
# to deal with a change in internal format or layout.
with open(self._directory / "client_library_version", "wt") as file:
from tiled import __version__
file.write(__version__)
def __getattr__(self, key):
try:
deadline, value = self._ttl_cache[key]
if deadline < time.monotonic():
return value
except KeyError:
pass
try:
with open(self._directory / key, "rb") as file:
value = file.read()
except FileNotFoundError:
raise AttributeError(key)
TIMEOUT = 1 # second
deadline = TIMEOUT + time.monotonic()
self._ttl_cache[key] = deadline, value
return value
def __setattr__(self, key, value):
if key.startswith("_"):
return super().__setattr__(key, value)
TIMEOUT = 1 # second
deadline = TIMEOUT + time.monotonic()
self._ttl_cache[key] = deadline, value
with open(self._directory / key, "wb") as file:
return file.write(value)
class FileBasedCache(collections.abc.MutableMapping):
"Locking is handled in the other layer, by Cache."
def __init__(self, directory, mode="w"):
self._directory = Path(directory)
self._directory.mkdir(parents=True, exist_ok=True)
def __repr__(self):
return repr(dict(self))
@property
def directory(self):
return self._directory
def sizeof(self, key):
# This is vestigial and can likely be removed.
path = Path(self._directory, *_normalize(key))
return path.stat().st_size
def __getitem__(self, key):
path = Path(self._directory, *_normalize(key))
if not path.is_file():
raise KeyError(key)
with open(path, "rb") as file:
return file.read()
def __setitem__(self, key, value):
path = Path(self._directory, *_normalize(key))
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "wb") as file:
file.write(value)
def __delitem__(self, key):
path = Path(self._directory, *_normalize(key))
path.unlink()
def __len__(self):
return len(list(self._directory.iterdir()))
def __iter__(self):
for path in self._directory.iterdir():
parts = path.relative_to(self._directory).parts
if len(parts) == 1:
# top-level metadata like "client_library_version"
continue
yield _unnormalize(parts)
def __contains__(self, key):
path = Path(self._directory, *_normalize(key))
return path.is_file()
def _normalize(key):
if isinstance(key, str):
key = (key,)
# To avoid an overly large directory (which leads to slow performance)
# divide into subdirectories beginning with the first two characters of
# the contents' name.
return (*key[:-1], key[-1][:2], key[-1])
def _unnormalize(key):
return (*key[:-2], key[-1])
class LockDict(dict):
@classmethod
def from_lock_factory(cls, lock_factory):
instance = cls()
instance._lock_factory = lock_factory
return instance
def __missing__(self, key):
value = self._lock_factory(key)
self[key] = value
return value
class FileBasedUrlCache(FileBasedCache):
def __getitem__(self, key):
data = super().__getitem__(key)
return UrlItem.from_text(data.decode())
def __setitem__(self, key, value):
data = value.to_text().encode()
super().__setitem__(key, data)
class AlreadyTooLarge(Exception):
pass
class CacheIsFull(Exception):
pass
class TooLargeForCache(Exception):
pass
class NoCache(Exception):
pass
def verify_cache(cache):
if cache is None:
raise NoCache(
"""To download, the Tiled client needs to be configured
with a cache, such as:
from tiled.client import from_uri
from tiled.client.cache import Cache
c = from_uri("http://tiled-demo.blueskyproject.io/api", cache = Cache.on_disk("data"))
See https://blueskyproject.io/tiled/tutorials/caching.html for details."""
)