import importlib
import time
import warnings
from dataclasses import asdict
from pathlib import Path
from httpx import URL
from ..structures.core import Spec, StructureFamily
from ..utils import UNCHANGED, DictView, ListView, OneShotCachedMap, safe_json_dump
from .utils import MSGPACK_MIME_TYPE, handle_error
class MetadataRevisions:
def __init__(self, context, link):
self._cached_len = None
self.context = context
self._link = link
def __len__(self):
LENGTH_CACHE_TTL = 1 # second
now = time.monotonic()
if self._cached_len is not None:
length, deadline = self._cached_len
if now < deadline:
# Used the cached value and do not make any request.
return length
content = handle_error(
self.context.http_client.get(
self._link,
headers={"Accept": MSGPACK_MIME_TYPE},
params={"page[offset]": 0, "page[limit]": 0},
)
).json()
length = content["meta"]["count"]
self._cached_len = (length, now + LENGTH_CACHE_TTL)
return length
def __getitem__(self, item_):
self._cached_len = None
if isinstance(item_, int):
offset = item_
limit = 1
content = handle_error(
self.context.http_client.get(
self._link,
headers={"Accept": MSGPACK_MIME_TYPE},
params={"page[offset]": offset, "page[limit]": limit},
)
).json()
(result,) = content["data"]
return result
elif isinstance(item_, slice):
offset = item_.start
if offset is None:
offset = 0
if item_.stop is None:
params = f"?page[offset]={offset}"
else:
limit = item_.stop - offset
params = f"?page[offset]={offset}&page[limit]={limit}"
next_page = self._link + params
result = []
while next_page is not None:
content = handle_error(
self.context.http_client.get(
next_page,
headers={"Accept": MSGPACK_MIME_TYPE},
)
).json()
if len(result) == 0:
result = content.copy()
else:
result["data"].append(content["data"])
next_page = content["links"]["next"]
return result["data"]
def delete_revision(self, n):
handle_error(self.context.http_client.delete(self._link, params={"number": n}))
[docs]class BaseClient:
# The HTTP spec does not define a size limit for URIs,
# but a common setting is 4K or 8K (for all the headers together).
# As another reference point, Internet Explorer imposes a
# 2048-character limit on URLs.
URL_CHARACTER_LIMIT = 2_000 # number of characters
[docs] def __init__(
self,
context,
*,
item,
structure_clients,
structure=None,
include_data_sources=False,
):
self._context = context
self._item = item
self._cached_len = None # a cache just for __len__
self.structure_clients = structure_clients
self._metadata_revisions = None
self._include_data_sources = include_data_sources
attributes = self.item["attributes"]
structure_family = attributes["structure_family"]
if structure is not None:
# Allow the caller to optionally hand us a structure that is already
# parsed from a dict into a structure dataclass.
self._structure = structure
elif structure_family == StructureFamily.container:
self._structure = None
else:
structure_type = STRUCTURE_TYPES[attributes["structure_family"]]
self._structure = structure_type.from_json(attributes["structure"])
super().__init__()
[docs] def structure(self):
"""
Return a dataclass describing the structure of the data.
"""
if getattr(self._structure, "resizable", None):
# In the future, conditionally fetch updated information.
raise NotImplementedError(
"The server has indicated that this has a dynamic, resizable "
"structure and this version of the Tiled Python client cannot "
"cope with that."
)
return self._structure
[docs] def login(self, username=None, provider=None):
"""
Depending on the server's authentication method, this will prompt for username/password:
>>> c.login()
Username: USERNAME
Password: <input is hidden>
or prompt you to open a link in a web browser to login with a third party:
>>> c.login()
You have ... minutes visit this URL
https://...
and enter the code: XXXX-XXXX
"""
self.context.login(username=username, provider=provider)
[docs] def logout(self):
"""
Log out.
This method is idempotent: if you are already logged out, it will do nothing.
"""
self.context.logout()
def __repr__(self):
return f"<{type(self).__name__}>"
@property
def context(self):
return self._context
def refresh(self):
content = handle_error(
self.context.http_client.get(
self.uri,
headers={"Accept": MSGPACK_MIME_TYPE},
params={"include_data_sources": self._include_data_sources},
)
).json()
self._item = content["data"]
return self
@property
def item(self):
"JSON payload describing this item. Mostly for internal use."
return self._item
@property
def metadata(self):
"Metadata about this data source."
# Ensure this is immutable (at the top level) to help the user avoid
# getting the wrong impression that editing this would update anything
# persistent.
return DictView(self._item["attributes"]["metadata"])
@property
def specs(self):
"List of specifications describing the structure of the metadata and/or data."
return ListView([Spec(**spec) for spec in self._item["attributes"]["specs"]])
@property
def uri(self):
"Direct link to this entry"
return self.item["links"]["self"]
@property
def structure_family(self):
"Quick access to this entry"
return StructureFamily[self.item["attributes"]["structure_family"]]
def data_sources(self):
if not self._include_data_sources:
warnings.warn(
"""Calling include_data_sources().refresh().
To fetch the data sources up front, call include_data_sources() on the
client or pass the optional parameter `include_data_sources=True` to
`from_uri(...)` or similar."""
)
return self.include_data_sources().item["attributes"].get("data_sources")
def include_data_sources(self):
"""
Ensure that data source and asset information is fetched.
If it has already been fetched, this is a no-op.
"""
if self._include_data_sources:
return self # no op
return self.new_variation(include_data_sources=True).refresh()
[docs] def new_variation(
self,
structure_clients=UNCHANGED,
include_data_sources=UNCHANGED,
**kwargs,
):
"""
This is intended primarily for internal use and use by subclasses.
"""
if structure_clients is UNCHANGED:
structure_clients = self.structure_clients
if include_data_sources is UNCHANGED:
include_data_sources = self._include_data_sources
return type(self)(
self.context,
item=self._item,
structure_clients=structure_clients,
include_data_sources=include_data_sources,
**kwargs,
)
def asset_manifest(self, data_sources):
"""
Return a manifest of the relative paths of the contents in each asset.
This return a dictionary keyed on asset ID.
Assets backed by a single file are mapped to None (no manifest).
Asset backed by a directory of files are mapped to a list of relative paths.
Parameters
----------
data_sources : dict
The value returned by ``.data_sources()``. This is passed in explicitly
to avoid fetching it twice in common usages. It also enables passing in
a subset of the data_sources of interest.
"""
manifests = {}
for data_source in data_sources:
manifest_link = self.item["links"]["self"].replace(
"/metadata", "/asset/manifest", 1
)
for asset in data_source["assets"]:
if asset["is_directory"]:
manifest = handle_error(
self.context.http_client.get(
manifest_link, params={"id": asset["id"]}
)
).json()["manifest"]
else:
manifest = None
manifests[asset["id"]] = manifest
return manifests
def raw_export(self, destination_directory=None, max_workers=4):
"""
Download the raw assets backing this node.
This may produce a single file or a directory.
Parameters
----------
destination_directory : Path, optional
Destination for downloaded assets. Default is current working directory
max_workers : int, optional
Number of parallel workers downloading data. Default is 4.
Returns
-------
paths : List[Path]
Filepaths of exported files
"""
if destination_directory is None:
destination_directory = Path.cwd()
else:
destination_directory = Path(destination_directory)
# Import here to defer the import of rich (for progress bar).
from .download import ATTACHMENT_FILENAME_PLACEHOLDER, download
urls = []
paths = []
data_sources = self.include_data_sources().data_sources()
asset_manifest = self.asset_manifest(data_sources)
if len(data_sources) != 1:
raise NotImplementedError(
"Export of multiple data sources not yet supported"
)
for data_source in data_sources:
bytes_link = self.item["links"]["self"].replace(
"/metadata", "/asset/bytes", 1
)
for asset in data_source["assets"]:
if len(data_source["assets"]) == 1:
# Only one asset: keep the name simple.
base_path = destination_directory
else:
# Multiple assets: Add a subdirectory named for the asset
# id to namespace each asset.
base_path = Path(destination_directory, str(asset["id"]))
if asset["is_directory"]:
relative_paths = asset_manifest[asset["id"]]
urls.extend(
[
URL(
bytes_link,
params={
"id": asset["id"],
"relative_path": relative_path,
},
)
for relative_path in relative_paths
]
)
paths.extend(
[
Path(base_path, relative_path)
for relative_path in relative_paths
]
)
else:
urls.append(URL(bytes_link, params={"id": asset["id"]}))
paths.append(Path(base_path, ATTACHMENT_FILENAME_PLACEHOLDER))
return download(self.context.http_client, urls, paths, max_workers=max_workers)
@property
def formats(self):
"List formats that the server can export this data as."
formats = set()
for spec in self.item["attributes"]["specs"]:
formats.update(self.context.server_info["formats"].get(spec, []))
formats.update(
self.context.server_info["formats"][
self.item["attributes"]["structure_family"]
]
)
return sorted(formats)
def update_metadata(self, metadata=None, specs=None):
"""
EXPERIMENTAL: Update metadata.
This is subject to change or removal without notice
Parameters
----------
metadata : dict, optional
User metadata. May be nested. Must contain only basic types
(e.g. numbers, strings, lists, dicts) that are JSON-serializable.
specs : List[str], optional
List of names that are used to label that the data and/or metadata
conform to some named standard specification.
"""
self._cached_len = None
if specs is None:
normalized_specs = None
else:
normalized_specs = []
for spec in specs:
if isinstance(spec, str):
spec = Spec(spec)
normalized_specs.append(asdict(spec))
data = {
"metadata": metadata,
"specs": normalized_specs,
}
content = handle_error(
self.context.http_client.put(
self.item["links"]["self"], content=safe_json_dump(data)
)
).json()
if metadata is not None:
if "metadata" in content:
# Metadata was accepted and modified by the specs validator on the server side.
# It is updated locally using the new version.
self._item["attributes"]["metadata"] = content["metadata"]
else:
# Metadata was accepted as it si by the server.
# It is updated locally with the version submitted buy the client.
self._item["attributes"]["metadata"] = metadata
if specs is not None:
self._item["attributes"]["specs"] = normalized_specs
@property
def metadata_revisions(self):
if self._metadata_revisions is None:
link = self.item["links"]["self"].replace("/metadata", "/revisions", 1)
self._metadata_revisions = MetadataRevisions(self.context, link)
return self._metadata_revisions
def delete_tree(self):
endpoint = self.uri.replace("/metadata/", "/nodes/", 1)
handle_error(self.context.http_client.delete(endpoint))
STRUCTURE_TYPES = OneShotCachedMap(
{
StructureFamily.array: lambda: importlib.import_module(
"...structures.array", BaseClient.__module__
).ArrayStructure,
StructureFamily.awkward: lambda: importlib.import_module(
"...structures.awkward", BaseClient.__module__
).AwkwardStructure,
StructureFamily.table: lambda: importlib.import_module(
"...structures.table", BaseClient.__module__
).TableStructure,
StructureFamily.sparse: lambda: importlib.import_module(
"...structures.sparse", BaseClient.__module__
).SparseStructure,
}
)