Source code for tiled.client.base

import importlib
import time
import warnings
from dataclasses import asdict
from pathlib import Path

from httpx import URL

from ..structures.core import Spec, StructureFamily
from ..utils import UNCHANGED, DictView, ListView, OneShotCachedMap, safe_json_dump
from .utils import MSGPACK_MIME_TYPE, handle_error


class MetadataRevisions:
    def __init__(self, context, link):
        self._cached_len = None
        self.context = context
        self._link = link

    def __len__(self):
        LENGTH_CACHE_TTL = 1  # second

        now = time.monotonic()
        if self._cached_len is not None:
            length, deadline = self._cached_len
            if now < deadline:
                # Used the cached value and do not make any request.
                return length

        content = handle_error(
            self.context.http_client.get(
                self._link,
                headers={"Accept": MSGPACK_MIME_TYPE},
                params={"page[offset]": 0, "page[limit]": 0},
            )
        ).json()
        length = content["meta"]["count"]
        self._cached_len = (length, now + LENGTH_CACHE_TTL)
        return length

    def __getitem__(self, item_):
        self._cached_len = None

        if isinstance(item_, int):
            offset = item_
            limit = 1

            content = handle_error(
                self.context.http_client.get(
                    self._link,
                    headers={"Accept": MSGPACK_MIME_TYPE},
                    params={"page[offset]": offset, "page[limit]": limit},
                )
            ).json()
            (result,) = content["data"]
            return result

        elif isinstance(item_, slice):
            offset = item_.start
            if offset is None:
                offset = 0
            if item_.stop is None:
                params = f"?page[offset]={offset}"
            else:
                limit = item_.stop - offset
                params = f"?page[offset]={offset}&page[limit]={limit}"

            next_page = self._link + params
            result = []
            while next_page is not None:
                content = handle_error(
                    self.context.http_client.get(
                        next_page,
                        headers={"Accept": MSGPACK_MIME_TYPE},
                    )
                ).json()
                if len(result) == 0:
                    result = content.copy()
                else:
                    result["data"].append(content["data"])
                next_page = content["links"]["next"]

            return result["data"]

    def delete_revision(self, n):
        handle_error(self.context.http_client.delete(self._link, params={"number": n}))


[docs]class BaseClient: # The HTTP spec does not define a size limit for URIs, # but a common setting is 4K or 8K (for all the headers together). # As another reference point, Internet Explorer imposes a # 2048-character limit on URLs. URL_CHARACTER_LIMIT = 2_000 # number of characters
[docs] def __init__( self, context, *, item, structure_clients, structure=None, include_data_sources=False, ): self._context = context self._item = item self._cached_len = None # a cache just for __len__ self.structure_clients = structure_clients self._metadata_revisions = None self._include_data_sources = include_data_sources attributes = self.item["attributes"] structure_family = attributes["structure_family"] if structure is not None: # Allow the caller to optionally hand us a structure that is already # parsed from a dict into a structure dataclass. self._structure = structure elif structure_family == StructureFamily.container: self._structure = None else: structure_type = STRUCTURE_TYPES[attributes["structure_family"]] self._structure = structure_type.from_json(attributes["structure"]) super().__init__()
[docs] def structure(self): """ Return a dataclass describing the structure of the data. """ if getattr(self._structure, "resizable", None): # In the future, conditionally fetch updated information. raise NotImplementedError( "The server has indicated that this has a dynamic, resizable " "structure and this version of the Tiled Python client cannot " "cope with that." ) return self._structure
[docs] def login(self, username=None, provider=None): """ Depending on the server's authentication method, this will prompt for username/password: >>> c.login() Username: USERNAME Password: <input is hidden> or prompt you to open a link in a web browser to login with a third party: >>> c.login() You have ... minutes visit this URL https://... and enter the code: XXXX-XXXX """ self.context.login(username=username, provider=provider)
[docs] def logout(self): """ Log out. This method is idempotent: if you are already logged out, it will do nothing. """ self.context.logout()
def __repr__(self): return f"<{type(self).__name__}>" @property def context(self): return self._context def refresh(self): content = handle_error( self.context.http_client.get( self.uri, headers={"Accept": MSGPACK_MIME_TYPE}, params={"include_data_sources": self._include_data_sources}, ) ).json() self._item = content["data"] return self @property def item(self): "JSON payload describing this item. Mostly for internal use." return self._item @property def metadata(self): "Metadata about this data source." # Ensure this is immutable (at the top level) to help the user avoid # getting the wrong impression that editing this would update anything # persistent. return DictView(self._item["attributes"]["metadata"]) @property def specs(self): "List of specifications describing the structure of the metadata and/or data." return ListView([Spec(**spec) for spec in self._item["attributes"]["specs"]]) @property def uri(self): "Direct link to this entry" return self.item["links"]["self"] @property def structure_family(self): "Quick access to this entry" return StructureFamily[self.item["attributes"]["structure_family"]] def data_sources(self): if not self._include_data_sources: warnings.warn( """Calling include_data_sources().refresh(). To fetch the data sources up front, call include_data_sources() on the client or pass the optional parameter `include_data_sources=True` to `from_uri(...)` or similar.""" ) return self.include_data_sources().item["attributes"].get("data_sources") def include_data_sources(self): """ Ensure that data source and asset information is fetched. If it has already been fetched, this is a no-op. """ if self._include_data_sources: return self # no op return self.new_variation(include_data_sources=True).refresh()
[docs] def new_variation( self, structure_clients=UNCHANGED, include_data_sources=UNCHANGED, **kwargs, ): """ This is intended primarily for internal use and use by subclasses. """ if structure_clients is UNCHANGED: structure_clients = self.structure_clients if include_data_sources is UNCHANGED: include_data_sources = self._include_data_sources return type(self)( self.context, item=self._item, structure_clients=structure_clients, include_data_sources=include_data_sources, **kwargs, )
def asset_manifest(self, data_sources): """ Return a manifest of the relative paths of the contents in each asset. This return a dictionary keyed on asset ID. Assets backed by a single file are mapped to None (no manifest). Asset backed by a directory of files are mapped to a list of relative paths. Parameters ---------- data_sources : dict The value returned by ``.data_sources()``. This is passed in explicitly to avoid fetching it twice in common usages. It also enables passing in a subset of the data_sources of interest. """ manifests = {} for data_source in data_sources: manifest_link = self.item["links"]["self"].replace( "/metadata", "/asset/manifest", 1 ) for asset in data_source["assets"]: if asset["is_directory"]: manifest = handle_error( self.context.http_client.get( manifest_link, params={"id": asset["id"]} ) ).json()["manifest"] else: manifest = None manifests[asset["id"]] = manifest return manifests def raw_export(self, destination_directory=None, max_workers=4): """ Download the raw assets backing this node. This may produce a single file or a directory. Parameters ---------- destination_directory : Path, optional Destination for downloaded assets. Default is current working directory max_workers : int, optional Number of parallel workers downloading data. Default is 4. Returns ------- paths : List[Path] Filepaths of exported files """ if destination_directory is None: destination_directory = Path.cwd() else: destination_directory = Path(destination_directory) # Import here to defer the import of rich (for progress bar). from .download import ATTACHMENT_FILENAME_PLACEHOLDER, download urls = [] paths = [] data_sources = self.include_data_sources().data_sources() asset_manifest = self.asset_manifest(data_sources) if len(data_sources) != 1: raise NotImplementedError( "Export of multiple data sources not yet supported" ) for data_source in data_sources: bytes_link = self.item["links"]["self"].replace( "/metadata", "/asset/bytes", 1 ) for asset in data_source["assets"]: if len(data_source["assets"]) == 1: # Only one asset: keep the name simple. base_path = destination_directory else: # Multiple assets: Add a subdirectory named for the asset # id to namespace each asset. base_path = Path(destination_directory, str(asset["id"])) if asset["is_directory"]: relative_paths = asset_manifest[asset["id"]] urls.extend( [ URL( bytes_link, params={ "id": asset["id"], "relative_path": relative_path, }, ) for relative_path in relative_paths ] ) paths.extend( [ Path(base_path, relative_path) for relative_path in relative_paths ] ) else: urls.append(URL(bytes_link, params={"id": asset["id"]})) paths.append(Path(base_path, ATTACHMENT_FILENAME_PLACEHOLDER)) return download(self.context.http_client, urls, paths, max_workers=max_workers) @property def formats(self): "List formats that the server can export this data as." formats = set() for spec in self.item["attributes"]["specs"]: formats.update(self.context.server_info["formats"].get(spec, [])) formats.update( self.context.server_info["formats"][ self.item["attributes"]["structure_family"] ] ) return sorted(formats) def update_metadata(self, metadata=None, specs=None): """ EXPERIMENTAL: Update metadata. This is subject to change or removal without notice Parameters ---------- metadata : dict, optional User metadata. May be nested. Must contain only basic types (e.g. numbers, strings, lists, dicts) that are JSON-serializable. specs : List[str], optional List of names that are used to label that the data and/or metadata conform to some named standard specification. """ self._cached_len = None if specs is None: normalized_specs = None else: normalized_specs = [] for spec in specs: if isinstance(spec, str): spec = Spec(spec) normalized_specs.append(asdict(spec)) data = { "metadata": metadata, "specs": normalized_specs, } content = handle_error( self.context.http_client.put( self.item["links"]["self"], content=safe_json_dump(data) ) ).json() if metadata is not None: if "metadata" in content: # Metadata was accepted and modified by the specs validator on the server side. # It is updated locally using the new version. self._item["attributes"]["metadata"] = content["metadata"] else: # Metadata was accepted as it si by the server. # It is updated locally with the version submitted buy the client. self._item["attributes"]["metadata"] = metadata if specs is not None: self._item["attributes"]["specs"] = normalized_specs @property def metadata_revisions(self): if self._metadata_revisions is None: link = self.item["links"]["self"].replace("/metadata", "/revisions", 1) self._metadata_revisions = MetadataRevisions(self.context, link) return self._metadata_revisions def delete_tree(self): endpoint = self.uri.replace("/metadata/", "/nodes/", 1) handle_error(self.context.http_client.delete(endpoint))
STRUCTURE_TYPES = OneShotCachedMap( { StructureFamily.array: lambda: importlib.import_module( "...structures.array", BaseClient.__module__ ).ArrayStructure, StructureFamily.awkward: lambda: importlib.import_module( "...structures.awkward", BaseClient.__module__ ).AwkwardStructure, StructureFamily.table: lambda: importlib.import_module( "...structures.table", BaseClient.__module__ ).TableStructure, StructureFamily.sparse: lambda: importlib.import_module( "...structures.sparse", BaseClient.__module__ ).SparseStructure, } )