import collections
import collections.abc
import functools
import importlib
import itertools
import time
import warnings
from dataclasses import asdict
from typing import TYPE_CHECKING, Any, Optional, Union
from urllib.parse import parse_qs, urlparse
import entrypoints
import httpx
from ..adapters.utils import IndexersMixin
from ..iterviews import ItemsView, KeysView, ValuesView
from ..queries import KeyLookup
from ..query_registration import default_query_registry
from ..structures.core import Spec, StructureFamily
from ..structures.data_source import DataSource
from ..utils import UNCHANGED, OneShotCachedMap, Sentinel, node_repr, safe_json_dump
from .base import STRUCTURE_TYPES, BaseClient
from .utils import (
MSGPACK_MIME_TYPE,
ClientError,
client_for_item,
export_util,
handle_error,
)
# import pandas
if TYPE_CHECKING:
import pandas
import pyarrow
class Container(BaseClient, collections.abc.Mapping, IndexersMixin):
# This maps the structure_family sent by the server to a client-side object that
# can interpret the structure_family's structure and content. OneShotCachedMap is used to
# defer imports.
# This is populated when the first instance is created.
STRUCTURE_CLIENTS_FROM_ENTRYPOINTS = None
@classmethod
def _discover_entrypoints(cls, entrypoint_name):
return OneShotCachedMap(
{
name: entrypoint.load
for name, entrypoint in entrypoints.get_group_named(
entrypoint_name
).items()
}
)
@classmethod
def discover_clients_from_entrypoints(cls):
"""
Search the software environment for libraries that register structure clients.
This is called once automatically the first time Node.from_uri
is called. It is idempotent.
"""
if cls.STRUCTURE_CLIENTS_FROM_ENTRYPOINTS is not None:
# short-circuit
return
# The modules associated with these entrypoints will be imported
# lazily, only when the item is first accessed.
cls.STRUCTURE_CLIENTS_FROM_ENTRYPOINTS = OneShotCachedMap()
# Check old name (special_client) and new name (structure_client).
for entrypoint_name in ["tiled.special_client", "tiled.structure_client"]:
for name, entrypoint in entrypoints.get_group_named(
entrypoint_name
).items():
cls.STRUCTURE_CLIENTS_FROM_ENTRYPOINTS.set(name, entrypoint.load)
DEFAULT_STRUCTURE_CLIENT_DISPATCH["numpy"].set(name, entrypoint.load)
DEFAULT_STRUCTURE_CLIENT_DISPATCH["dask"].set(name, entrypoint.load)
def __init__(
self,
context,
*,
item,
structure_clients,
queries=None,
sorting=None,
structure=None,
include_data_sources=False,
):
"This is not user-facing. Use Node.from_uri."
self.structure_clients = structure_clients
self._queries = list(queries or [])
self._queries_as_params = _queries_to_params(*self._queries)
# If the user has not specified a sorting, give the server the opportunity
# to tell us the default sorting.
if sorting:
self._sorting = sorting
else:
# In the Python API we encode sorting as (key, direction).
# This order-based "record" notion does not play well with OpenAPI.
# In the HTTP API, therefore, we use {"key": key, "direction": direction}.
self._sorting = [
(s["key"], int(s["direction"]))
for s in (item["attributes"].get("sorting") or [])
]
sorting = sorting or item["attributes"].get("sorting")
self._sorting_params = {
"sort": ",".join(
f"{'-' if item[1] < 0 else ''}{item[0]}" for item in self._sorting
)
}
self._reversed_sorting_params = {
"sort": ",".join(
f"{'-' if item[1] > 0 else ''}{item[0]}" for item in self._sorting
)
}
super().__init__(
context=context,
item=item,
structure_clients=structure_clients,
include_data_sources=include_data_sources,
)
def __repr__(self):
# Display up to the first N keys to avoid making a giant service
# request. Use _keys_slicer because it is unauthenticated.
N = 10
return node_repr(self, self._keys_slice(0, N, direction=1))
@property
def sorting(self):
"""
The current sorting of this Node
Given as a list of tuples where the first entry is the sorting key
and the second entry indicates ASCENDING (or 1) or DESCENDING (or -1).
"""
return list(self._sorting)
def new_variation(
self,
*,
structure_clients=UNCHANGED,
queries=UNCHANGED,
sorting=UNCHANGED,
**kwargs,
):
"""
Create a copy of this Node, optionally varying some parameters.
This is intended primarily for internal use and use by subclasses.
"""
if isinstance(structure_clients, str):
structure_clients = DEFAULT_STRUCTURE_CLIENT_DISPATCH[structure_clients]
if structure_clients is UNCHANGED:
structure_clients = self.structure_clients
if queries is UNCHANGED:
queries = self._queries
if sorting is UNCHANGED:
sorting = self._sorting
return super().new_variation(
structure_clients=structure_clients,
queries=queries,
sorting=sorting,
**kwargs,
)
def __len__(self):
# If the contents of this node was provided in-line, there is an
# implication that the contents are not expected to be dynamic. Used the
# count provided in the structure.
structure = self.item["attributes"]["structure"]
if structure["contents"]:
return structure["count"]
now = time.monotonic()
if self._cached_len is not None:
length, deadline = self._cached_len
if now < deadline:
# Used the cached value and do not make any request.
return length
link = self.item["links"]["search"]
content = handle_error(
self.context.http_client.get(
link,
headers={"Accept": MSGPACK_MIME_TYPE},
params={
**parse_qs(urlparse(link).query),
"fields": "",
**self._queries_as_params,
**self._sorting_params,
},
)
).json()
length = content["meta"]["count"]
self._cached_len = (length, now + LENGTH_CACHE_TTL)
return length
def __length_hint__(self):
# TODO The server should provide an estimated count.
# https://www.python.org/dev/peps/pep-0424/
return len(self)
def __iter__(self, _ignore_inlined_contents=False):
# If the contents of this node was provided in-line, and we don't need
# to apply any filtering or sorting, we can slice the in-lined data
# without fetching anything from the server.
contents = self.item["attributes"]["structure"]["contents"]
if (
(contents is not None)
and (not self._queries)
and ((not self.sorting) or (self.sorting == [("_", 1)]))
and (not _ignore_inlined_contents)
):
return (yield from contents)
next_page_url = self.item["links"]["search"]
while next_page_url is not None:
content = handle_error(
self.context.http_client.get(
next_page_url,
headers={"Accept": MSGPACK_MIME_TYPE},
params={
**parse_qs(urlparse(next_page_url).query),
"fields": "",
**self._queries_as_params,
**self._sorting_params,
},
)
).json()
self._cached_len = (
content["meta"]["count"],
time.monotonic() + LENGTH_CACHE_TTL,
)
for item in content["data"]:
yield item["id"]
next_page_url = content["links"]["next"]
def __getitem__(self, keys, _ignore_inlined_contents=False):
# These are equivalent:
#
# >>> node['a']['b']['c']
# >>> node[('a', 'b', 'c')]
# >>> node['a', 'b', 'c']
#
# The last two are equivalent at a Python level;
# both call node.__getitem__(('a', 'b', 'c')).
#
# We elide this into a single request to the server rather than
# a chain of requests. This is not totally straightforward because
# of this use case:
#
# >>> node.search(...)['a', 'b']
#
# which must only return a result if 'a' is contained in the search results.
if not isinstance(keys, tuple):
keys = (keys,)
for key in keys:
if not isinstance(key, str):
raise TypeError("Containers can only be indexed by strings")
keys = tuple("/".join(keys).strip("/").split("/")) # Remove any slashes
if self._queries:
# Lookup this key *within the search results* of this Node.
key, *tail = keys
tail = tuple(tail) # list -> tuple
params = {
**_queries_to_params(KeyLookup(key)),
**self._queries_as_params,
**self._sorting_params,
}
if self._include_data_sources:
params["include_data_sources"] = True
link = self.item["links"]["search"]
content = handle_error(
self.context.http_client.get(
link,
headers={"Accept": MSGPACK_MIME_TYPE},
params={**parse_qs(urlparse(link).query), **params},
)
).json()
self._cached_len = (
content["meta"]["count"],
time.monotonic() + LENGTH_CACHE_TTL,
)
data = content["data"]
if not data:
raise KeyError(key)
assert (
len(data) == 1
), "The key lookup query must never return more than one result."
(item,) = data
result = client_for_item(
self.context,
self.structure_clients,
item,
include_data_sources=self._include_data_sources,
)
if tail:
result = result[tail]
else:
# Straightforwardly look up the keys under this node.
# There is no search filter in place, so if it is there
# then we want it.
# The server may greedily send nested information about children
# ("inlined contents") to reduce latency. This is how we handle
# xarray Datasets efficiently, for example.
# In a loop, walk the key(s). Use inlined contents if we have it.
# When we reach a key that we don't have inlined contents for, send
# out a single request with all the rest of the keys, and break
# the keys-walking loop. We are effectively "jumping" down the tree
# to the node of interest without downloading information about
# intermediate parents.
for i, key in enumerate(keys):
item = (self.item["attributes"]["structure"]["contents"] or {}).get(key)
if (item is None) or _ignore_inlined_contents:
# The item was not inlined, either because nothing was inlined
# or because it was added after we fetched the inlined contents.
# Make a request for it.
try:
self_link = self.item["links"]["self"].rstrip("/")
params = {}
if self._include_data_sources:
params["include_data_sources"] = True
link = self_link + "".join(f"/{key}" for key in keys[i:])
content = handle_error(
self.context.http_client.get(
link,
headers={"Accept": MSGPACK_MIME_TYPE},
params={**parse_qs(urlparse(link).query), **params},
)
).json()
except ClientError as err:
if err.response.status_code == httpx.codes.NOT_FOUND:
# If this is a scalar lookup, raise KeyError("X") not KeyError(("X",)).
err_arg = keys[i:]
if len(err_arg) == 1:
(err_arg,) = err_arg
raise KeyError(err_arg)
raise
item = content["data"]
# Tables that belong to composite nodes cannot be addressed directly
if (
item["attributes"]["structure_family"] == StructureFamily.table
) and (
"flattened" in (s["name"] for s in item["attributes"]["specs"])
):
raise KeyError(
f"Attempting to access a table in a composite container; use .parts['{keys[-1]}'] instead." # noqa
)
break
result = client_for_item(
self.context,
self.structure_clients,
item,
include_data_sources=self._include_data_sources,
)
return result
def delete(self, key):
self._cached_len = None
handle_error(self.context.http_client.delete(f"{self.uri}/{key}"))
# The following two methods are used by keys(), values(), items().
def _keys_slice(self, start, stop, direction, _ignore_inlined_contents=False):
# If the contents of this node was provided in-line, and we don't need
# to apply any filtering or sorting, we can slice the in-lined data
# without fetching anything from the server.
contents = self.item["attributes"]["structure"]["contents"]
if (
(contents is not None)
and (not self._queries)
and ((not self.sorting) or (self.sorting == [("_", 1)]))
and (not _ignore_inlined_contents)
):
keys = list(contents)
if direction < 0:
keys = list(reversed(keys))
return (yield from keys[start:stop])
if direction > 0:
sorting_params = self._sorting_params
else:
sorting_params = self._reversed_sorting_params
assert start >= 0
assert (stop is None) or (stop >= 0)
next_page_url = f"{self.item['links']['search']}?page[offset]={start}"
item_counter = itertools.count(start)
while next_page_url is not None:
content = handle_error(
self.context.http_client.get(
next_page_url,
headers={"Accept": MSGPACK_MIME_TYPE},
params={
**parse_qs(urlparse(next_page_url).query),
"fields": "",
**self._queries_as_params,
**sorting_params,
},
)
).json()
self._cached_len = (
content["meta"]["count"],
time.monotonic() + LENGTH_CACHE_TTL,
)
for item in content["data"]:
if stop is not None and next(item_counter) == stop:
return
yield item["id"]
next_page_url = content["links"]["next"]
def _items_slice(self, start, stop, direction, _ignore_inlined_contents=False):
# If the contents of this node was provided in-line, and we don't need
# to apply any filtering or sorting, we can slice the in-lined data
# without fetching anything from the server.
contents = self.item["attributes"]["structure"]["contents"]
if (
(contents is not None)
and (not self._queries)
and ((not self.sorting) or (self.sorting == [("_", 1)]))
and (not _ignore_inlined_contents)
):
items = list(contents.items())
if direction < 0:
items = list(reversed(items))
for key, item in items[start:stop]:
yield key, client_for_item(
self.context,
self.structure_clients,
item,
include_data_sources=self._include_data_sources,
)
return
if direction > 0:
sorting_params = self._sorting_params
else:
sorting_params = self._reversed_sorting_params
assert start >= 0
assert (stop is None) or (stop >= 0)
next_page_url = f"{self.item['links']['search']}?page[offset]={start}"
item_counter = itertools.count(start)
while next_page_url is not None:
params = {
**parse_qs(urlparse(next_page_url).query),
**self._queries_as_params,
**sorting_params,
}
if self._include_data_sources:
params["include_data_sources"] = True
content = handle_error(
self.context.http_client.get(
next_page_url,
headers={"Accept": MSGPACK_MIME_TYPE},
params=params,
)
).json()
self._cached_len = (
content["meta"]["count"],
time.monotonic() + LENGTH_CACHE_TTL,
)
for item in content["data"]:
if stop is not None and next(item_counter) == stop:
return
key = item["id"]
yield key, client_for_item(
self.context,
self.structure_clients,
item,
include_data_sources=self._include_data_sources,
)
next_page_url = content["links"]["next"]
[docs]
def keys(self):
return KeysView(lambda: len(self), self._keys_slice)
[docs]
def values(self):
return ValuesView(lambda: len(self), self._items_slice)
[docs]
def items(self):
return ItemsView(lambda: len(self), self._items_slice)
[docs]
def search(self, query):
"""
Make a Node with a subset of this Node's entries, filtered by query.
Examples
--------
>>> from tiled.queries import FullText
>>> tree.search(FullText("hello"))
"""
return self.new_variation(queries=self._queries + [query])
[docs]
def distinct(
self, *metadata_keys, structure_families=False, specs=False, counts=False
):
"""
Get the unique values and optionally counts of metadata_keys,
structure_families, and specs in this Node's entries
Examples
--------
Query all the distinct values of a key.
>>> tree.distinct("foo", counts=True)
Query for multiple keys at once.
>>> tree.distinct("foo", "bar", counts=True)
"""
link = self.item["links"]["self"].replace("/metadata", "/distinct", 1)
distinct = handle_error(
self.context.http_client.get(
link,
headers={"Accept": MSGPACK_MIME_TYPE},
params={
**parse_qs(urlparse(link).query),
"metadata": metadata_keys,
"structure_families": structure_families,
"specs": specs,
"counts": counts,
**self._queries_as_params,
},
)
).json()
return distinct
[docs]
def sort(self, *sorting):
"""
Make a Node with the same entries but sorted according to `sorting`.
Examples
--------
Sort by "color" in ascending order, and then by "height" in descending order.
>>> from tiled.client import ASCENDING, DESCENDING
>>> tree.sort(("color", ASCENDING), ("height", DESCENDING))
Note that ``1`` may be used as a synonym for ``ASCENDING``, and ``-1``
may be used as a synonym for ``DESCENDING``.
"""
return self.new_variation(sorting=sorting)
def export(self, filepath, fields=None, *, format=None):
"""
Download metadata and data below this node in some format and write to a file.
Parameters
----------
file: str or buffer
Filepath or writeable buffer.
fields: List[str], optional
Filter which items in this node to export.
format : str, optional
If format is None and `file` is a filepath, the format is inferred
from the name, like 'table.h5' implies format="application/x-hdf5". The format
may be given as a file extension ("h5") or a media type ("application/x-hdf5").
Examples
--------
Export all.
>>> a.export("everything.h5")
"""
params = {}
if fields is not None:
params["field"] = fields
return export_util(
filepath,
format,
self.context.http_client.get,
self.item["links"]["full"],
params=params,
)
def _ipython_key_completions_(self):
"""
Provide method for the key-autocompletions in IPython.
See http://ipython.readthedocs.io/en/stable/config/integrating.html#tab-completion
"""
MAX_ENTRIES_SUPPORTED = 40
try:
if len(self) > MAX_ENTRIES_SUPPORTED:
MSG = (
"Tab-completition is not supported on this particular Node "
"because it has a large number of entries."
)
warnings.warn(MSG)
return []
else:
return list(self)
except Exception:
# Do not print messy traceback from thread. Just fail silently.
return []
[docs]
def new(
self,
structure_family,
data_sources,
*,
key=None,
metadata=None,
specs=None,
):
"""
Create a new item within this Node.
This is a low-level method. See high-level convenience methods listed below.
See Also
--------
write_array
write_dataframe
write_coo_array
"""
self._cached_len = None
metadata = metadata or {}
specs = specs or []
normalized_specs = []
for spec in specs:
if isinstance(spec, str):
spec = Spec(spec)
normalized_specs.append(asdict(spec))
item = {
"attributes": {
"metadata": metadata,
"structure_family": StructureFamily(structure_family),
"specs": normalized_specs,
"data_sources": [asdict(data_source) for data_source in data_sources],
}
}
body = dict(item["attributes"])
if key is not None:
body["id"] = key
# if check:
if any(data_source.assets for data_source in data_sources):
endpoint = self.uri.replace("/metadata/", "/register/", 1)
else:
endpoint = self.uri
document = handle_error(
self.context.http_client.post(
endpoint,
headers={"Accept": MSGPACK_MIME_TYPE},
content=safe_json_dump(body),
)
).json()
if structure_family in {StructureFamily.container, StructureFamily.composite}:
structure = {"contents": None, "count": None}
else:
# Only containers can have multiple data_sources right now.
(data_source,) = data_sources
structure = data_source.structure
item["attributes"]["structure"] = structure
# if server returned modified metadata update the local copy
if "metadata" in document:
item["attributes"]["metadata"] = document.pop("metadata")
# Ditto for structure
if "structure" in document:
item["attributes"]["structure"] = STRUCTURE_TYPES[structure_family](
document.pop("structure")
)
# Merge in "id" and "links" returned by the server.
item.update(document)
# Ensure this is a dataclass, not a dict.
# When we apply type hints and mypy to the client it should be possible
# to dispense with this.
if (
structure_family
not in {StructureFamily.container, StructureFamily.composite}
) and isinstance(structure, dict):
structure_type = STRUCTURE_TYPES[structure_family]
structure = structure_type.from_json(structure)
return client_for_item(
self.context,
self.structure_clients,
item,
structure=structure,
include_data_sources=self._include_data_sources,
)
# When (re)chunking arrays for upload, we use this limit
# to attempt to avoid bumping into size limits.
_SUGGESTED_MAX_UPLOAD_SIZE = 100_000_000 # 100 MB
def create_composite(self, key=None, *, metadata=None, specs=None):
"""Create a new, empty composite container.
Parameters
----------
key : str, optional
Key (name) for this new node. If None, the server will provide a unique key.
metadata : dict, optional
User metadata. May be nested. Must contain only basic types
(e.g. numbers, strings, lists, dicts) that are JSON-serializable.
specs : List[Spec], optional
List of names that are used to label that the data and/or metadata
conform to some named standard specification.
"""
return self.new(
StructureFamily.composite,
[],
key=key,
metadata=metadata,
specs=specs,
)
[docs]
def create_container(self, key=None, *, metadata=None, specs=None):
"""Create a new, empty container.
Parameters
----------
key : str, optional
Key (name) for this new node. If None, the server will provide a unique key.
metadata : dict, optional
User metadata. May be nested. Must contain only basic types
(e.g. numbers, strings, lists, dicts) that are JSON-serializable.
specs : List[Spec], optional
List of names that are used to label that the data and/or metadata
conform to some named standard specification.
"""
return self.new(
StructureFamily.container,
[],
key=key,
metadata=metadata,
specs=specs,
)
[docs]
def write_array(self, array, *, key=None, metadata=None, dims=None, specs=None):
"""
EXPERIMENTAL: Write an array.
Parameters
----------
array : array-like
key : str, optional
Key (name) for this new node. If None, the server will provide a unique key.
metadata : dict, optional
User metadata. May be nested. Must contain only basic types
(e.g. numbers, strings, lists, dicts) that are JSON-serializable.
dims : List[str], optional
A label for each dimension of the array.
specs : List[Spec], optional
List of names that are used to label that the data and/or metadata
conform to some named standard specification.
"""
import dask.array
import numpy
from dask.array.core import normalize_chunks
from ..structures.array import ArrayStructure, BuiltinDtype
if not (hasattr(array, "shape") and hasattr(array, "dtype")):
# This does not implement enough of the array-like interface.
# Coerce to numpy.
array = numpy.asarray(array)
# Determine chunks such that each chunk is not too large to upload.
# Any existing chunking will be taken into account.
# If the array is small, there will be only one chunk.
if hasattr(array, "chunks"):
chunks = normalize_chunks(
array.chunks,
limit=self._SUGGESTED_MAX_UPLOAD_SIZE,
dtype=array.dtype,
shape=array.shape,
)
else:
chunks = normalize_chunks(
tuple("auto" for _ in array.shape),
limit=self._SUGGESTED_MAX_UPLOAD_SIZE,
dtype=array.dtype,
shape=array.shape,
)
structure = ArrayStructure(
shape=array.shape,
chunks=chunks,
dims=dims,
data_type=BuiltinDtype.from_numpy_dtype(array.dtype),
)
client = self.new(
StructureFamily.array,
[
DataSource(structure=structure, structure_family=StructureFamily.array),
],
key=key,
metadata=metadata,
specs=specs,
)
chunked = any(len(dim) > 1 for dim in chunks)
if not chunked:
client.write(array)
else:
# Fan out client.write_block over each chunk using dask.
if isinstance(array, dask.array.Array):
da = array.rechunk(chunks)
else:
da = dask.array.from_array(array, chunks=chunks)
# Dask inspects the signature and passes block_id in if present.
# It also apparently calls it with an empty array and block_id
# once, so we catch that call and become a no-op.
def write_block(x, block_id, client):
if len(block_id):
client.write_block(x, block=block_id)
return x
# TODO Is there a fire-and-forget analogue such that we don't need
# to bother with the return type?
da.map_blocks(write_block, dtype=da.dtype, client=client).compute()
return client
[docs]
def write_awkward(
self,
array,
*,
key=None,
metadata=None,
dims=None,
specs=None,
):
"""
Write an AwkwardArray.
Parameters
----------
array: awkward.Array
key : str, optional
Key (name) for this new node. If None, the server will provide a unique key.
metadata : dict, optional
User metadata. May be nested. Must contain only basic types
(e.g. numbers, strings, lists, dicts) that are JSON-serializable.
dims : List[str], optional
A label for each dimension of the array.
specs : List[Spec], optional
List of names that are used to label that the data and/or metadata
conform to some named standard specification.
"""
import awkward
from ..structures.awkward import AwkwardStructure
packed = awkward.to_packed(array)
form, length, container = awkward.to_buffers(packed)
structure = AwkwardStructure(
length=length,
form=form.to_dict(),
)
client = self.new(
StructureFamily.awkward,
[
DataSource(
structure=structure, structure_family=StructureFamily.awkward
),
],
key=key,
metadata=metadata,
specs=specs,
)
client.write(container)
return client
[docs]
def write_sparse(
self,
coords,
data,
shape,
*,
key=None,
metadata=None,
dims=None,
specs=None,
):
"""
EXPERIMENTAL: Write a sparse array.
Parameters
----------
coords : array-like
data : array-like
shape : tuple
key : str, optional
Key (name) for this new node. If None, the server will provide a unique key.
metadata : dict, optional
User metadata. May be nested. Must contain only basic types
(e.g. numbers, strings, lists, dicts) that are JSON-serializable.
dims : List[str], optional
A label for each dimension of the array.
specs : List[Spec], optional
List of names that are used to label that the data and/or metadata
conform to some named standard specification.
Examples
--------
Write a sparse.COO array.
>>> import sparse
>>> from numpy import array
>>> coo = sparse.COO(coords=array([[2, 5]]), data=array([1.3, 7.5]), shape=(10,))
>>> c.write_sparse(coords=coo.coords, data=coo.data, shape=coo.shape)
This only supports a single chunk. For chunked upload, use lower-level methods.
# Define the overall shape and the dimensions of each chunk.
>>> from tiled.structures.sparse import COOStructure
>>> structure = COOStructure(shape=(10,), chunks=((5, 5),))
>>> data_source = DataSource(structure=structure, structure_family=StructureFamily.sparse)
>>> x = c.new("sparse", [data_source])
# Upload the data in each chunk.
# Coords are given with in the reference frame of each chunk.
>>> x.write_block(coords=array([[2, 4]]), data=array([3.1, 2.8]), block=(0,))
>>> x.write_block(coords=array([[0, 1]]), data=array([6.7, 1.2]), block=(1,))
"""
from ..structures.sparse import COOStructure
structure = COOStructure(
shape=shape,
# This method only supports single-chunk COO arrays.
chunks=tuple((dim,) for dim in shape),
dims=dims,
)
client = self.new(
StructureFamily.sparse,
[
DataSource(
structure=structure, structure_family=StructureFamily.sparse
),
],
key=key,
metadata=metadata,
specs=specs,
)
client.write(coords, data)
return client
def create_appendable_table(
self,
schema: "pyarrow.Schema",
npartitions: int = 1,
*,
key=None,
metadata=None,
specs=None,
table_name: Optional[str] = None,
):
"""
Write a DataFrame and store it such that rows can be appended to a partition.
Parameters
----------
schema : column names and dtypes info in the form of pyarrow.Schema
key : str, optional
Key (name) for this new node. If None, the server will provide a unique key.
metadata : dict, optional
User metadata. May be nested. Must contain only basic types
(e.g. numbers, strings, lists, dicts) that are JSON-serializable.
specs : List[Spec], optional
List of names that are used to label that the data and/or metadata
conform to some named standard specification.
table_name : str, optional
Optionally provide a name for the table this should be stored in.
By default a name unique to the schema will be chosen.
See Also
--------
write_dataframe
"""
parameters = {}
if table_name is not None:
parameters["table_name"] = table_name
from ..structures.table import TableStructure
structure = TableStructure.from_schema(schema, npartitions)
client = self.new(
StructureFamily.table,
[
DataSource(
structure=structure,
structure_family=StructureFamily.table,
mimetype="application/x-tiled-sql-table",
parameters=parameters,
)
],
key=key,
metadata=metadata or {},
specs=specs or [],
)
return client
[docs]
def write_dataframe(
self,
dataframe: Union["pandas.DataFrame", dict[str, Any]],
*,
key=None,
metadata=None,
specs=None,
):
"""
Write a DataFrame.
Parameters
----------
dataframe : pandas.DataFrame or dict
key : str, optional
Key (name) for this new node. If None, the server will provide a unique key.
metadata : dict, optional
User metadata. May be nested. Must contain only basic types
(e.g. numbers, strings, lists, dicts) that are JSON-serializable.
specs : List[Spec], optional
List of names that are used to label that the data and/or metadata
conform to some named standard specification.
See Also
--------
create_appendable_dataframe
"""
import dask.dataframe
from ..structures.table import TableStructure
if isinstance(dataframe, dask.dataframe.DataFrame):
structure = TableStructure.from_dask_dataframe(dataframe)
elif isinstance(dataframe, dict):
# table as dict, e.g. {"a": [1,2,3], "b": [4,5,6]}
structure = TableStructure.from_dict(dataframe)
else:
structure = TableStructure.from_pandas(dataframe)
client = self.new(
StructureFamily.table,
[
DataSource(
structure=structure,
structure_family=StructureFamily.table,
)
],
key=key,
metadata=metadata or {},
specs=specs or [],
)
if hasattr(dataframe, "partitions"):
if isinstance(dataframe, dask.dataframe.DataFrame):
ddf = dataframe
else:
raise NotImplementedError(
f"Unsure how to handle type {type(dataframe)}"
)
ddf.map_partitions(
functools.partial(_write_partition, client=client), meta=dataframe._meta
).compute()
else:
client.write(dataframe)
return client
class Composite(Container):
def get_contents(self, maxlen=None, include_metadata=False):
result = {}
next_page_url = f"{self.item['links']['search']}"
while (next_page_url is not None) or (
maxlen is not None and len(result) < maxlen
):
content = handle_error(
self.context.http_client.get(
next_page_url,
headers={"Accept": MSGPACK_MIME_TYPE},
params={
**parse_qs(urlparse(next_page_url).query),
**self._queries_as_params,
}
| ({} if include_metadata else {"select_metadata": False}),
)
).json()
result.update({item["id"]: item for item in content["data"]})
next_page_url = content["links"]["next"]
return result
@property
def _flat_keys_mapping(self):
result = {}
for key, item in self.get_contents().items():
if item["attributes"]["structure_family"] == StructureFamily.table:
for col in item["attributes"]["structure"]["columns"]:
result[col] = item["id"] + "/" + col
else:
result[item["id"]] = item["id"]
self._cached_len = (len(result), time.monotonic() + LENGTH_CACHE_TTL)
return result
@property
def parts(self):
return CompositeParts(self)
def _keys_slice(self, start, stop, direction, _ignore_inlined_contents=False):
yield from self._flat_keys_mapping.keys()
def _items_slice(self, start, stop, direction, _ignore_inlined_contents=False):
for key in self._flat_keys_mapping.keys():
yield key, self[key]
def __len__(self):
if self._cached_len is not None:
length, deadline = self._cached_len
if time.monotonic() < deadline:
# Used the cached value and do not make any request.
return length
return len(self._flat_keys_mapping)
def __getitem__(self, key: str, _ignore_inlined_contents=False):
if isinstance(key, tuple):
key = "/".join(key)
if key in self._flat_keys_mapping:
key = self._flat_keys_mapping[key]
else:
raise KeyError(
f"Key '{key}' not found. If it refers to a table, use .parts['{key}'] instead."
)
return super().__getitem__(key, _ignore_inlined_contents)
def create_container(self, key=None, *, metadata=None, specs=None):
"""Composite nodes can not include nested containers by design."""
raise NotImplementedError("Cannot create a container within a composite node.")
def create_composite(self, key=None, *, metadata=None, specs=None):
"""Ccomposite nodes can not include nested composites by design."""
raise NotImplementedError("Cannot create a composite within a composite node.")
class CompositeParts:
def __init__(self, node):
self.contents = node.get_contents(include_metadata=True)
self.context = node.context
self.structure_clients = node.structure_clients
self._include_data_sources = node._include_data_sources
def __repr__(self):
return (
f"<{type(self).__name__} {{"
+ ", ".join(f"'{item}'" for item in self.contents)
+ "}>"
)
def __getitem__(self, key):
key, *tail = key.split("/")
if key not in self.contents:
raise KeyError(key)
client = client_for_item(
self.context,
self.structure_clients,
self.contents[key],
include_data_sources=self._include_data_sources,
)
if tail:
return client["/".join(tail)]
else:
return client
def __iter__(self):
for key in self.contents:
yield key
def __len__(self) -> int:
return len(self.contents)
def _queries_to_params(*queries):
"Compute GET params from the queries."
params = collections.defaultdict(list)
for query in queries:
name = default_query_registry.query_type_to_name[type(query)]
for field, value in query.encode().items():
if value is not None:
params[f"filter[{name}][condition][{field}]"].append(value)
return dict(params)
LENGTH_CACHE_TTL = 1 # second
class Ascending(Sentinel):
"Intended for more readable sorting operations. An alias for 1."
def __index__(self):
return 1
class Descending(Sentinel):
"Intended for more readable sorting operations. An alias for -1."
def __index__(self):
return -1
ASCENDING = Ascending("ASCENDING")
"Ascending sort order. An alias for 1."
DESCENDING = Descending("DESCENDING")
"Decending sort order. An alias for -1."
class _LazyLoad:
# This exists because lambdas and closures cannot be pickled.
def __init__(self, import_module_args, attr_name):
self.import_module_args = import_module_args
self.attr_name = attr_name
def __call__(self):
return getattr(
importlib.import_module(*self.import_module_args), self.attr_name
)
class _Wrap:
# This exists because lambdas and closures cannot be pickled.
def __init__(self, obj):
self.obj = obj
def __call__(self):
return self.obj
def _write_partition(x, partition_info, client):
client.write_partition(x, partition_info["number"])
return x
DEFAULT_STRUCTURE_CLIENT_DISPATCH = {
"numpy": OneShotCachedMap(
{
"container": _Wrap(Container),
"composite": _Wrap(Composite),
"array": _LazyLoad(("..array", Container.__module__), "ArrayClient"),
"awkward": _LazyLoad(("..awkward", Container.__module__), "AwkwardClient"),
"dataframe": _LazyLoad(
("..dataframe", Container.__module__), "DataFrameClient"
),
"sparse": _LazyLoad(("..sparse", Container.__module__), "SparseClient"),
"table": _LazyLoad(
("..dataframe", Container.__module__), "DataFrameClient"
),
"xarray_dataset": _LazyLoad(
("..xarray", Container.__module__), "DatasetClient"
),
}
),
"dask": OneShotCachedMap(
{
"container": _Wrap(Container),
"composite": _Wrap(Composite),
"array": _LazyLoad(("..array", Container.__module__), "DaskArrayClient"),
# TODO Create DaskAwkwardClient
# "awkward": _LazyLoad(("..awkward", Container.__module__), "DaskAwkwardClient"),
"dataframe": _LazyLoad(
("..dataframe", Container.__module__), "DaskDataFrameClient"
),
"sparse": _LazyLoad(("..sparse", Container.__module__), "SparseClient"),
"table": _LazyLoad(
("..dataframe", Container.__module__), "DaskDataFrameClient"
),
"xarray_dataset": _LazyLoad(
("..xarray", Container.__module__), "DaskDatasetClient"
),
}
),
}