import collections
import collections.abc
import concurrent.futures
import functools
import importlib
import itertools
import time
import warnings
from dataclasses import asdict
from typing import TYPE_CHECKING, Any, Iterable, Optional, Union
from urllib.parse import parse_qs, urlparse
import entrypoints
import httpx
import orjson
from ..iterviews import ItemsView, KeysView, ValuesView
from ..queries import KeyLookup
from ..query_registration import default_query_registry
from ..structures.core import StructureFamily
from ..structures.data_source import DataSource
from ..utils import (
UNCHANGED,
IndexersMixin,
OneShotCachedMap,
Sentinel,
node_repr,
safe_json_dump,
)
from .base import STRUCTURE_TYPES, BaseClient
from .utils import (
MSGPACK_MIME_TYPE,
ClientError,
client_for_item,
export_util,
handle_error,
normalize_specs,
retry_context,
)
if TYPE_CHECKING:
import pandas
import pyarrow
from .stream import ContainerSubscription
class Container(BaseClient, collections.abc.Mapping, IndexersMixin):
# This maps the structure_family sent by the server to a client-side object that
# can interpret the structure_family's structure and content. OneShotCachedMap is used to
# defer imports.
# This is populated when the first instance is created.
STRUCTURE_CLIENTS_FROM_ENTRYPOINTS = None
@classmethod
def _discover_entrypoints(cls, entrypoint_name) -> OneShotCachedMap[str, Any]:
return OneShotCachedMap(
{
name: entrypoint.load
for name, entrypoint in entrypoints.get_group_named(
entrypoint_name
).items()
}
)
@classmethod
def discover_clients_from_entrypoints(cls):
"""
Search the software environment for libraries that register structure clients.
This is called once automatically the first time Node.from_uri
is called. It is idempotent.
"""
if cls.STRUCTURE_CLIENTS_FROM_ENTRYPOINTS is not None:
# short-circuit
return
# The modules associated with these entrypoints will be imported
# lazily, only when the item is first accessed.
cls.STRUCTURE_CLIENTS_FROM_ENTRYPOINTS = OneShotCachedMap()
# Check old name (special_client) and new name (structure_client).
for entrypoint_name in ["tiled.special_client", "tiled.structure_client"]:
for name, entrypoint in entrypoints.get_group_named(
entrypoint_name
).items():
cls.STRUCTURE_CLIENTS_FROM_ENTRYPOINTS.set(name, entrypoint.load)
DEFAULT_STRUCTURE_CLIENT_DISPATCH["numpy"].set(name, entrypoint.load)
DEFAULT_STRUCTURE_CLIENT_DISPATCH["dask"].set(name, entrypoint.load)
def __init__(
self,
context,
*,
item,
structure_clients,
queries=None,
sorting=None,
structure=None,
include_data_sources=False,
):
"This is not user-facing. Use Node.from_uri."
self.structure_clients = structure_clients
self._queries = list(queries or [])
self._queries_as_params = _queries_to_params(*self._queries)
# If the user has not specified a sorting, give the server the opportunity
# to tell us the default sorting.
if sorting:
self._sorting = sorting
else:
# In the Python API we encode sorting as (key, direction).
# This order-based "record" notion does not play well with OpenAPI.
# In the HTTP API, therefore, we use {"key": key, "direction": direction}.
self._sorting = [
(s["key"], int(s["direction"]))
for s in (item["attributes"].get("sorting") or [])
]
sorting = sorting or item["attributes"].get("sorting")
self._sorting_params = {
"sort": ",".join(
f"{'-' if item[1] < 0 else ''}{item[0]}" for item in self._sorting
)
}
self._reversed_sorting_params = {
"sort": ",".join(
f"{'-' if item[1] > 0 else ''}{item[0]}" for item in self._sorting
)
}
super().__init__(
context=context,
item=item,
structure_clients=structure_clients,
include_data_sources=include_data_sources,
)
def __repr__(self):
# Display up to the first N keys to avoid making a giant service
# request. Use _keys_slicer because it is unauthenticated.
N = 10
return node_repr(self, self._keys_slice(0, N, direction=1))
@property
def sorting(self):
"""
The current sorting of this Node
Given as a list of tuples where the first entry is the sorting key
and the second entry indicates ASCENDING (or 1) or DESCENDING (or -1).
"""
return list(self._sorting)
def new_variation(
self,
*,
structure_clients=UNCHANGED,
queries=UNCHANGED,
sorting=UNCHANGED,
**kwargs,
):
"""
Create a copy of this Node, optionally varying some parameters.
This is intended primarily for internal use and use by subclasses.
"""
if isinstance(structure_clients, str):
structure_clients = DEFAULT_STRUCTURE_CLIENT_DISPATCH[structure_clients]
if structure_clients is UNCHANGED:
structure_clients = self.structure_clients
if queries is UNCHANGED:
queries = self._queries
if sorting is UNCHANGED:
sorting = self._sorting
return super().new_variation(
structure_clients=structure_clients,
queries=queries,
sorting=sorting,
**kwargs,
)
def __len__(self):
# If the contents of this node was provided in-line, there is an
# implication that the contents are not expected to be dynamic. Used the
# count provided in the structure.
if contents := self.item["attributes"]["structure"]["contents"]:
return len(contents)
now = time.monotonic()
if self._cached_len is not None:
length, deadline = self._cached_len
if now < deadline:
# Used the cached value and do not make any request.
return length
link = self.item["links"]["search"]
for attempt in retry_context():
with attempt:
content = handle_error(
self.context.http_client.get(
link,
headers={"Accept": MSGPACK_MIME_TYPE},
params={
**parse_qs(urlparse(link).query),
"fields": "count",
**self._queries_as_params,
**self._sorting_params,
},
)
).json()
length = content["meta"]["count"]
self._cached_len = (length, now + LENGTH_CACHE_TTL)
return length
def __length_hint__(self):
# TODO The server should provide an estimated count.
# https://www.python.org/dev/peps/pep-0424/
return len(self)
def __iter__(self, _ignore_inlined_contents=False):
# If the contents of this node was provided in-line, and we don't need
# to apply any filtering or sorting, we can slice the in-lined data
# without fetching anything from the server.
contents = self.item["attributes"]["structure"]["contents"]
if (
(contents is not None)
and (not self._queries)
and ((not self.sorting) or (self.sorting == [("_", 1)]))
and (not _ignore_inlined_contents)
):
return (yield from contents)
next_page_url = self.item["links"]["search"]
while next_page_url is not None:
for attempt in retry_context():
with attempt:
content = handle_error(
self.context.http_client.get(
next_page_url,
headers={"Accept": MSGPACK_MIME_TYPE},
params={
**parse_qs(urlparse(next_page_url).query),
"fields": "",
**self._queries_as_params,
**self._sorting_params,
},
)
).json()
self._cached_len = (
content["meta"]["count"],
time.monotonic() + LENGTH_CACHE_TTL,
)
for item in content["data"]:
yield item["id"]
next_page_url = content["links"]["next"]
def __getitem__(self, keys, _ignore_inlined_contents=False):
# These are equivalent:
#
# >>> node['a']['b']['c']
# >>> node[('a', 'b', 'c')]
# >>> node['a', 'b', 'c']
#
# The last two are equivalent at a Python level;
# both call node.__getitem__(('a', 'b', 'c')).
#
# We elide this into a single request to the server rather than
# a chain of requests. This is not totally straightforward because
# of this use case:
#
# >>> node.search(...)['a', 'b']
#
# which must only return a result if 'a' is contained in the search results.
if not isinstance(keys, tuple):
keys = (keys,)
for key in keys:
if not isinstance(key, str):
raise TypeError("Containers can only be indexed by strings")
keys = tuple("/".join(keys).strip("/").split("/")) # Remove any slashes
if self._queries:
# Lookup this key *within the search results* of this Node.
key, *tail = keys
tail = tuple(tail) # list -> tuple
params = {
**_queries_to_params(KeyLookup(key)),
**self._queries_as_params,
**self._sorting_params,
}
if self._include_data_sources:
params["include_data_sources"] = True
link = self.item["links"]["search"]
for attempt in retry_context():
with attempt:
content = handle_error(
self.context.http_client.get(
link,
headers={"Accept": MSGPACK_MIME_TYPE},
params={**parse_qs(urlparse(link).query), **params},
)
).json()
self._cached_len = (
content["meta"]["count"],
time.monotonic() + LENGTH_CACHE_TTL,
)
data = content["data"]
if not data:
raise KeyError(key)
assert (
len(data) == 1
), "The key lookup query must never return more than one result."
(item,) = data
result = client_for_item(
self.context,
self.structure_clients,
item,
include_data_sources=self._include_data_sources,
)
if tail:
result = result[tail]
else:
# Straightforwardly look up the keys under this node.
# There is no search filter in place, so if it is there
# then we want it.
# The server may greedily send nested information about children
# ("inlined contents") to reduce latency. This is how we handle
# xarray Datasets efficiently, for example.
# In a loop, walk the key(s). Use inlined contents if we have it.
# When we reach a key that we don't have inlined contents for, send
# out a single request with all the rest of the keys, and break
# the keys-walking loop. We are effectively "jumping" down the tree
# to the node of interest without downloading information about
# intermediate parents.
for i, key in enumerate(keys):
item = (self.item["attributes"]["structure"]["contents"] or {}).get(key)
if item is not None and not _ignore_inlined_contents:
# The item was inlined, and we should return it as is
result = client_for_item(
self.context,
self.structure_clients,
item,
include_data_sources=self._include_data_sources,
)
return (
result[keys[i + 1 :]] if keys[i + 1 :] else result # noqa: E203
)
else:
# The item was not inlined, either because nothing was inlined
# or because it was added after we fetched the inlined contents.
# Make a request for it.
try:
self_link = self.item["links"]["self"].rstrip("/")
params = {}
if self._include_data_sources:
params["include_data_sources"] = True
link = self_link + "".join(f"/{key}" for key in keys[i:])
for attempt in retry_context():
with attempt:
content = handle_error(
self.context.http_client.get(
link,
headers={"Accept": MSGPACK_MIME_TYPE},
params={
**parse_qs(urlparse(link).query),
**params,
},
)
).json()
except ClientError as err:
if err.response.status_code == httpx.codes.NOT_FOUND:
# If this is a scalar lookup, raise KeyError("X") not KeyError(("X",)).
err_arg = keys[i:]
if len(err_arg) == 1:
(err_arg,) = err_arg
raise KeyError(err_arg)
raise
item = content["data"]
break
result = client_for_item(
self.context,
self.structure_clients,
item,
include_data_sources=self._include_data_sources,
)
return result
def delete_contents(
self,
keys: Optional[Union[str, Iterable[str]]] = None,
recursive: bool = False,
external_only: bool = True,
) -> "Container":
"""Delete the contents of this Container.
Parameters
----------
keys : str or list of str
The key(s) to delete. If a list, all keys in the list will be deleted.
If None (default), delete all contents.
recursive : bool, optional
If True, descend into sub-containers and delete their contents too.
Defaults to False.
external_only : bool, optional
If True, only delete externally-managed data. Defaults to True.
"""
self._cached_len = None
if keys is None:
keys = self.keys()
keys = [keys] if isinstance(keys, str) else keys
for key in set(keys):
for attempt in retry_context():
with attempt:
handle_error(
self.context.http_client.delete(
f"{self.uri}/{key}",
params={
"recursive": recursive,
"external_only": external_only,
},
)
)
return self
# The following two methods are used by keys(), values(), items().
def _keys_slice(
self, start, stop, direction, page_size=None, *, _ignore_inlined_contents=False
):
# If the contents of this node was provided in-line, and we don't need
# to apply any filtering or sorting, we can slice the in-lined data
# without fetching anything from the server.
contents = self.item["attributes"]["structure"]["contents"]
if (
(contents is not None)
and (not self._queries)
and ((not self.sorting) or (self.sorting == [("_", 1)]))
and (not _ignore_inlined_contents)
):
keys = list(contents)
if direction < 0:
keys = list(reversed(keys))
return (yield from keys[start:stop])
if direction > 0:
sorting_params = self._sorting_params
else:
sorting_params = self._reversed_sorting_params
assert start >= 0
assert (stop is None) or (stop >= 0)
next_page_url = f"{self.item['links']['search']}?page[offset]={start}"
if page_size is not None:
next_page_url += f"&page[limit]={page_size}"
item_counter = itertools.count(start)
while next_page_url is not None:
for attempt in retry_context():
with attempt:
content = handle_error(
self.context.http_client.get(
next_page_url,
headers={"Accept": MSGPACK_MIME_TYPE},
params={
**parse_qs(urlparse(next_page_url).query),
"fields": "",
**self._queries_as_params,
**sorting_params,
},
)
).json()
self._cached_len = (
content["meta"]["count"],
time.monotonic() + LENGTH_CACHE_TTL,
)
for item in content["data"]:
yield item["id"]
if stop is not None and next(item_counter) == stop - 1:
return
next_page_url = content["links"]["next"]
def _items_slice(
self, start, stop, direction, page_size=None, *, _ignore_inlined_contents=False
):
# If the contents of this node was provided in-line, and we don't need
# to apply any filtering or sorting, we can slice the in-lined data
# without fetching anything from the server.
contents = self.item["attributes"]["structure"]["contents"]
if (
(contents is not None)
and (not self._queries)
and ((not self.sorting) or (self.sorting == [("_", 1)]))
and (not _ignore_inlined_contents)
):
items = list(contents.items())
if direction < 0:
items = list(reversed(items))
for key, item in items[start:stop]:
yield key, client_for_item(
self.context,
self.structure_clients,
item,
include_data_sources=self._include_data_sources,
)
return
if direction > 0:
sorting_params = self._sorting_params
else:
sorting_params = self._reversed_sorting_params
assert start >= 0
assert (stop is None) or (stop >= 0)
next_page_url = f"{self.item['links']['search']}?page[offset]={start}"
if page_size is not None:
next_page_url += f"&page[limit]={page_size}"
item_counter = itertools.count(start)
while next_page_url is not None:
params = {
**parse_qs(urlparse(next_page_url).query),
**self._queries_as_params,
**sorting_params,
}
if self._include_data_sources:
params["include_data_sources"] = True
for attempt in retry_context():
with attempt:
content = handle_error(
self.context.http_client.get(
next_page_url,
headers={"Accept": MSGPACK_MIME_TYPE},
params=params,
)
).json()
self._cached_len = (
content["meta"]["count"],
time.monotonic() + LENGTH_CACHE_TTL,
)
for item in content["data"]:
key = item["id"]
yield key, client_for_item(
self.context,
self.structure_clients,
item,
include_data_sources=self._include_data_sources,
)
if stop is not None and next(item_counter) == stop - 1:
return
next_page_url = content["links"]["next"]
[docs]
def keys(self):
return KeysView(lambda: len(self), self._keys_slice)
[docs]
def values(self):
return ValuesView(lambda: len(self), self._items_slice)
[docs]
def items(self):
return ItemsView(lambda: len(self), self._items_slice)
[docs]
def search(self, query):
"""
Make a Node with a subset of this Node's entries, filtered by query.
Examples
--------
>>> from tiled.queries import FullText
>>> tree.search(FullText("hello"))
"""
return self.new_variation(queries=self._queries + [query])
[docs]
def distinct(
self, *metadata_keys, structure_families=False, specs=False, counts=False
):
"""
Get the unique values and optionally counts of metadata_keys,
structure_families, and specs in this Node's entries
Examples
--------
Query all the distinct values of a key.
>>> tree.distinct("foo", counts=True)
Query for multiple keys at once.
>>> tree.distinct("foo", "bar", counts=True)
"""
link = self.item["links"]["self"].replace("/metadata", "/distinct", 1)
for attempt in retry_context():
with attempt:
distinct = handle_error(
self.context.http_client.get(
link,
headers={"Accept": MSGPACK_MIME_TYPE},
params={
**parse_qs(urlparse(link).query),
"metadata": metadata_keys,
"structure_families": structure_families,
"specs": specs,
"counts": counts,
**self._queries_as_params,
},
)
).json()
return distinct
[docs]
def sort(self, *sorting):
"""
Make a Node with the same entries but sorted according to `sorting`.
Examples
--------
Sort by "color" in ascending order, and then by "height" in descending order.
>>> from tiled.client import ASCENDING, DESCENDING
>>> tree.sort(("color", ASCENDING), ("height", DESCENDING))
Note that ``1`` may be used as a synonym for ``ASCENDING``, and ``-1``
may be used as a synonym for ``DESCENDING``.
"""
return self.new_variation(sorting=sorting)
def export(self, filepath, fields=None, *, format=None):
"""
Download metadata and data below this node in some format and write to a file.
Parameters
----------
file: str or buffer
Filepath or writeable buffer.
fields: List[str], optional
Filter which items in this node to export.
format : str, optional
If format is None and `file` is a filepath, the format is inferred
from the name, like 'table.h5' implies format="application/x-hdf5". The format
may be given as a file extension ("h5") or a media type ("application/x-hdf5").
Examples
--------
Export all.
>>> a.export("everything.h5")
"""
params = {}
if fields is not None:
params["field"] = fields
return export_util(
filepath,
format,
self.context.http_client.get,
self.item["links"]["full"],
params=params,
)
def _ipython_key_completions_(self):
"""
Provide method for the key-autocompletions in IPython.
See http://ipython.readthedocs.io/en/stable/config/integrating.html#tab-completion
"""
MAX_ENTRIES_SUPPORTED = 40
try:
if len(self) > MAX_ENTRIES_SUPPORTED:
MSG = (
"Tab-completion is not supported on this particular Node "
"because it has a large number of entries."
)
warnings.warn(MSG)
return []
else:
return list(self)
except Exception:
# Do not print messy traceback from thread. Just fail silently.
return []
[docs]
def new(
self,
structure_family,
data_sources,
*,
key=None,
metadata=None,
specs=None,
access_tags=None,
):
"""
Create a new item within this Node.
This is a low-level method. See high-level convenience methods listed below.
See Also
--------
write_array
write_table
write_coo_array
"""
self._cached_len = None
metadata = metadata or {}
access_blob = {"tags": access_tags} if access_tags is not None else {}
item = {
"attributes": {
"ancestors": self.path_parts,
"metadata": metadata,
"structure_family": StructureFamily(structure_family),
"specs": normalize_specs(specs or []),
"data_sources": [asdict(data_source) for data_source in data_sources],
"access_blob": access_blob,
}
}
body = dict(item["attributes"])
if key is not None:
body["id"] = key
# if check:
if any(data_source.assets for data_source in data_sources):
endpoint = self.uri.replace("/metadata/", "/register/", 1)
else:
endpoint = self.uri
for attempt in retry_context():
with attempt:
document = handle_error(
self.context.http_client.post(
endpoint,
headers={"Accept": MSGPACK_MIME_TYPE},
content=safe_json_dump(body),
)
).json()
if structure_family == StructureFamily.container:
structure = {"contents": None, "count": None}
else:
# Only containers can have multiple data_sources right now.
(data_source,) = data_sources
structure = data_source.structure
item["attributes"]["structure"] = structure
# If server returned modified metadata update the local copy.
# Otherwise, round-trip it through a JSON serializer locally
# to ensure that any special types (e.g. datetimes, numpy arrays)
# are normalized to natively JSON serializable types.
if "metadata" in document:
item["attributes"]["metadata"] = document.pop("metadata")
else:
item["attributes"]["metadata"] = orjson.loads(
safe_json_dump(item["attributes"]["metadata"])
)
# Ditto for structure
if "structure" in document:
item["attributes"]["structure"] = STRUCTURE_TYPES[structure_family](
document.pop("structure")
)
# And for data sources
if "data_sources" in document:
item["attributes"]["data_sources"] = [
ds for ds in document.pop("data_sources")
]
# And for access_blob
if "access_blob" in document:
item["attributes"]["access_blob"] = document.pop("access_blob")
# Merge in "id" and "links" returned by the server.
item.update(document)
# Ensure this is a dataclass, not a dict.
# When we apply type hints and mypy to the client it should be possible
# to dispense with this.
if (structure_family != StructureFamily.container) and isinstance(
structure, dict
):
structure_type = STRUCTURE_TYPES[structure_family]
structure = structure_type.from_json(structure)
return client_for_item(
self.context,
self.structure_clients,
item,
structure=structure,
include_data_sources=self._include_data_sources,
)
# When (re)chunking arrays for upload, we use this limit
# to attempt to avoid bumping into size limits.
_SUGGESTED_MAX_UPLOAD_SIZE = 100_000_000 # 100 MB
[docs]
def create_container(
self,
key=None,
*,
metadata=None,
specs=None,
access_tags=None,
):
"""Create a new, empty container.
Parameters
----------
key : str, optional
Key (name) for this new node. If None, the server will provide a unique key.
metadata : dict, optional
User metadata. May be nested. Must contain only basic types
(e.g. numbers, strings, lists, dicts) that are JSON-serializable.
specs : List[Spec], optional
List of names that are used to label that the data and/or metadata
conform to some named standard specification.
access_tags: List[str], optional
Server-specific authZ tags in list form, used to confer access to the node.
"""
return self.new(
StructureFamily.container,
[],
key=key,
metadata=metadata,
specs=specs,
access_tags=access_tags,
)
[docs]
def write_array(
self,
array,
*,
key=None,
metadata=None,
dims=None,
specs=None,
access_tags=None,
):
"""
EXPERIMENTAL: Write an array.
Parameters
----------
array : array-like
key : str, optional
Key (name) for this new node. If None, the server will provide a unique key.
metadata : dict, optional
User metadata. May be nested. Must contain only basic types
(e.g. numbers, strings, lists, dicts) that are JSON-serializable.
dims : List[str], optional
A label for each dimension of the array.
specs : List[Spec], optional
List of names that are used to label that the data and/or metadata
conform to some named standard specification.
access_tags: List[str], optional
Server-specific authZ tags in list form, used to confer access to the node.
"""
import dask.array
import numpy
from dask.array.core import normalize_chunks
from ..structures.array import ArrayStructure, BuiltinDtype
if not (hasattr(array, "shape") and hasattr(array, "dtype")):
# This does not implement enough of the array-like interface.
# Coerce to numpy.
array = numpy.asarray(array)
# Determine chunks such that each chunk is not too large to upload.
# Any existing chunking will be taken into account.
# If the array is small, there will be only one chunk.
if hasattr(array, "chunks"):
chunks = normalize_chunks(
array.chunks,
limit=self._SUGGESTED_MAX_UPLOAD_SIZE,
dtype=array.dtype,
shape=array.shape,
)
else:
chunks = normalize_chunks(
tuple("auto" for _ in array.shape),
limit=self._SUGGESTED_MAX_UPLOAD_SIZE,
dtype=array.dtype,
shape=array.shape,
)
structure = ArrayStructure(
shape=array.shape,
chunks=chunks,
dims=dims,
data_type=BuiltinDtype.from_numpy_dtype(array.dtype),
)
client = self.new(
StructureFamily.array,
[
DataSource(structure=structure, structure_family=StructureFamily.array),
],
key=key,
metadata=metadata,
specs=specs,
access_tags=access_tags,
)
chunked = any(len(dim) > 1 for dim in chunks)
if not chunked:
client.write(array)
else:
# Fan out client.write_block over each chunk using dask.
if isinstance(array, dask.array.Array):
da = array.rechunk(chunks)
else:
da = dask.array.from_array(array, chunks=chunks)
# Dask inspects the signature and passes block_id in if present.
# It also apparently calls it with an empty array and block_id
# once, so we catch that call and become a no-op.
def write_block(x, block_id, client):
if len(block_id):
client.write_block(x, block=block_id)
return x
# TODO Is there a fire-and-forget analogue such that we don't need
# to bother with the return type?
da.map_blocks(write_block, dtype=da.dtype, client=client).compute()
return client
[docs]
def write_awkward(
self,
array,
*,
key=None,
metadata=None,
dims=None,
specs=None,
access_tags=None,
):
"""
Write an AwkwardArray.
Parameters
----------
array: awkward.Array
key : str, optional
Key (name) for this new node. If None, the server will provide a unique key.
metadata : dict, optional
User metadata. May be nested. Must contain only basic types
(e.g. numbers, strings, lists, dicts) that are JSON-serializable.
dims : List[str], optional
A label for each dimension of the array.
specs : List[Spec], optional
List of names that are used to label that the data and/or metadata
conform to some named standard specification.
access_tags: List[str], optional
Server-specific authZ tags in list form, used to confer access to the node.
"""
import awkward
from ..structures.awkward import AwkwardStructure
packed = awkward.to_packed(array)
form, length, container = awkward.to_buffers(packed)
structure = AwkwardStructure(
length=length,
form=form.to_dict(),
)
client = self.new(
StructureFamily.awkward,
[
DataSource(
structure=structure, structure_family=StructureFamily.awkward
),
],
key=key,
metadata=metadata,
specs=specs,
access_tags=access_tags,
)
client.write(container)
return client
[docs]
def write_sparse(
self,
coords,
data,
shape,
*,
key=None,
metadata=None,
dims=None,
specs=None,
access_tags=None,
):
"""
EXPERIMENTAL: Write a sparse array.
Parameters
----------
coords : array-like
data : array-like
shape : tuple
key : str, optional
Key (name) for this new node. If None, the server will provide a unique key.
metadata : dict, optional
User metadata. May be nested. Must contain only basic types
(e.g. numbers, strings, lists, dicts) that are JSON-serializable.
dims : List[str], optional
A label for each dimension of the array.
specs : List[Spec], optional
List of names that are used to label that the data and/or metadata
conform to some named standard specification.
access_tags: List[str], optional
Server-specific authZ tags in list form, used to confer access to the node.
Examples
--------
Write a sparse.COO array.
>>> import sparse
>>> from numpy import array
>>> coo = sparse.COO(coords=array([[2, 5]]), data=array([1.3, 7.5]), shape=(10,))
>>> c.write_sparse(coords=coo.coords, data=coo.data, shape=coo.shape)
This only supports a single chunk. For chunked upload, use lower-level methods.
# Define the overall shape and the dimensions of each chunk.
>>> from tiled.structures.sparse import COOStructure
>>> structure = COOStructure(shape=(10,), chunks=((5, 5),))
>>> data_source = DataSource(structure=structure, structure_family=StructureFamily.sparse)
>>> x = c.new("sparse", [data_source])
# Upload the data in each chunk.
# Coords are given with in the reference frame of each chunk.
>>> x.write_block(coords=array([[2, 4]]), data=array([3.1, 2.8]), block=(0,))
>>> x.write_block(coords=array([[0, 1]]), data=array([6.7, 1.2]), block=(1,))
"""
from ..structures.array import BuiltinDtype
from ..structures.sparse import COOStructure
structure = COOStructure(
shape=shape,
# This method only supports single-chunk COO arrays.
chunks=tuple((dim,) for dim in shape),
dims=dims,
data_type=BuiltinDtype.from_numpy_dtype(data.dtype),
coord_data_type=BuiltinDtype.from_numpy_dtype(coords.dtype),
)
client = self.new(
StructureFamily.sparse,
[
DataSource(
structure=structure, structure_family=StructureFamily.sparse
),
],
key=key,
metadata=metadata,
specs=specs,
access_tags=access_tags,
)
client.write(coords, data)
return client
def create_appendable_table(
self,
schema: "pyarrow.Schema",
npartitions: int = 1,
*,
key=None,
metadata=None,
specs=None,
access_tags=None,
table_name: Optional[str] = None,
):
"""Initialize a table whose rows can be appended to a partition.
Parameters
----------
schema : column names and dtypes info in the form of pyarrow.Schema
npartitions : int, optional
Number of partitions to create. Default is 1.
key : str, optional
Key (name) for this new node. If None, the server will provide a unique key.
metadata : dict, optional
User metadata. May be nested. Must contain only basic types
(e.g. numbers, strings, lists, dicts) that are JSON-serializable.
specs : List[Spec], optional
List of names that are used to label that the data and/or metadata
conform to some named standard specification.
access_tags: List[str], optional
Server-specific authZ tags in list form, used to confer access to the node.
table_name : str, optional
Optionally provide a name for the SQL table this should be stored in.
By default a name unique to the schema will be chosen.
See Also
--------
write_table
"""
parameters = {}
if table_name is not None:
parameters["table_name"] = table_name
from ..structures.table import TableStructure
structure = TableStructure.from_schema(schema, npartitions)
client = self.new(
StructureFamily.table,
[
DataSource(
structure=structure,
structure_family=StructureFamily.table,
mimetype="application/x-tiled-sql-table",
parameters=parameters,
)
],
key=key,
metadata=metadata or {},
specs=specs or [],
access_tags=access_tags,
)
return client
[docs]
def write_table(
self,
data: Union["pandas.DataFrame", dict[str, Any]],
*,
key=None,
metadata=None,
specs=None,
access_tags=None,
):
"""Write tabular data.
The created asset will be stored as an external file (e.g. Parquet), which means
the table can not be appended to in the future. To create a table that can be grown,
use the `create_appendable_table` method instead.
Parameters
----------
data : pandas.DataFrame or dict
key : str, optional
Key (name) for this new node. If None, the server will provide a unique key.
metadata : dict, optional
User metadata. May be nested. Must contain only basic types
(e.g. numbers, strings, lists, dicts) that are JSON-serializable.
specs : List[Spec], optional
List of names that are used to label that the data and/or metadata
conform to some named standard specification.
access_tags: List[str], optional
Server-specific authZ tags in list form, used to confer access to the node.
See Also
--------
create_appendable_table
"""
import dask.dataframe
from ..structures.table import TableStructure
if isinstance(data, dask.dataframe.DataFrame):
structure = TableStructure.from_dask_dataframe(data)
elif isinstance(data, dict):
# table as dict, e.g. {"a": [1,2,3], "b": [4,5,6]}
structure = TableStructure.from_dict(data)
else:
structure = TableStructure.from_pandas(data)
client = self.new(
StructureFamily.table,
[
DataSource(
structure=structure,
structure_family=StructureFamily.table,
)
],
key=key,
metadata=metadata or {},
specs=specs or [],
access_tags=access_tags,
)
if hasattr(data, "partitions"):
if isinstance(data, dask.dataframe.DataFrame):
ddf = data
else:
raise NotImplementedError(f"Unsure how to handle type {type(data)}")
ddf.map_partitions(
functools.partial(_write_partition, client=client), meta=data._meta
).compute()
else:
client.write(data)
return client
def write_dataframe(
self, data, *, key=None, metadata=None, specs=None, access_tags=None
):
warnings.warn(
"The 'write_dataframe' method is deprecated and will be removed in a future release. "
"Please use 'write_table' instead.",
DeprecationWarning,
stacklevel=2,
)
return self.write_table(
data, key=key, metadata=metadata, specs=specs, access_tags=access_tags
)
def subscribe(
self,
executor: Optional[concurrent.futures.Executor] = None,
) -> "ContainerSubscription":
"""
Subscribe to streaming updates about this container.
Returns
-------
subscription : Subscription
executor : concurrent.futures.Executor, optional
Launches tasks asynchronously, in response to updates. By default,
a concurrent.futures.ThreadPoolExecutor is used.
"""
# Keep this import here to defer the websockets import until/unless needed.
from .stream import ContainerSubscription
return ContainerSubscription(
self.context, self.path_parts, executor, self.structure_clients
)
def _queries_to_params(*queries):
"Compute GET params from the queries."
params = collections.defaultdict(list)
for query in queries:
name = default_query_registry.query_type_to_name[type(query)]
for field, value in query.encode().items():
if value is not None:
params[f"filter[{name}][condition][{field}]"].append(value)
return dict(params)
LENGTH_CACHE_TTL = 1 # second
class Ascending(Sentinel):
"Intended for more readable sorting operations. An alias for 1."
def __index__(self):
return 1
class Descending(Sentinel):
"Intended for more readable sorting operations. An alias for -1."
def __index__(self):
return -1
ASCENDING = Ascending("ASCENDING")
"Ascending sort order. An alias for 1."
DESCENDING = Descending("DESCENDING")
"Descending sort order. An alias for -1."
class _LazyLoad:
# This exists because lambdas and closures cannot be pickled.
def __init__(self, import_module_args, attr_name):
self.import_module_args = import_module_args
self.attr_name = attr_name
def __call__(self):
return getattr(
importlib.import_module(*self.import_module_args), self.attr_name
)
class _Wrap:
# This exists because lambdas and closures cannot be pickled.
def __init__(self, obj):
self.obj = obj
def __call__(self):
return self.obj
def _write_partition(x, partition_info, client):
client.write_partition(partition_info["number"], x)
return x
DEFAULT_STRUCTURE_CLIENT_DISPATCH = {
"numpy": OneShotCachedMap(
{
"container": _Wrap(Container),
"composite": _LazyLoad(
("..composite", Container.__module__), "CompositeClient"
),
"array": _LazyLoad(("..array", Container.__module__), "ArrayClient"),
"awkward": _LazyLoad(("..awkward", Container.__module__), "AwkwardClient"),
"dataframe": _LazyLoad(
("..dataframe", Container.__module__), "DataFrameClient"
),
"sparse": _LazyLoad(("..sparse", Container.__module__), "SparseClient"),
"table": _LazyLoad(
("..dataframe", Container.__module__), "DataFrameClient"
),
"xarray_dataset": _LazyLoad(
("..xarray", Container.__module__), "DatasetClient"
),
}
),
"dask": OneShotCachedMap(
{
"container": _Wrap(Container),
"composite": _LazyLoad(
("..composite", Container.__module__), "CompositeClient"
),
"array": _LazyLoad(("..array", Container.__module__), "DaskArrayClient"),
# TODO Create DaskAwkwardClient
# "awkward": _LazyLoad(("..awkward", Container.__module__), "DaskAwkwardClient"),
"dataframe": _LazyLoad(
("..dataframe", Container.__module__), "DaskDataFrameClient"
),
"sparse": _LazyLoad(("..sparse", Container.__module__), "SparseClient"),
"table": _LazyLoad(
("..dataframe", Container.__module__), "DaskDataFrameClient"
),
"xarray_dataset": _LazyLoad(
("..xarray", Container.__module__), "DaskDatasetClient"
),
}
),
}