Source code for tiled.client.container

import collections
import functools
import importlib
import itertools
import time
import warnings
from dataclasses import asdict

import entrypoints
import httpx

from ..adapters.utils import IndexersMixin
from ..iterviews import ItemsView, KeysView, ValuesView
from ..queries import KeyLookup
from ..query_registration import query_registry
from ..structures.core import Spec, StructureFamily
from ..structures.data_source import DataSource
from ..utils import UNCHANGED, OneShotCachedMap, Sentinel, node_repr, safe_json_dump
from .base import STRUCTURE_TYPES, BaseClient
from .utils import (

class Container(BaseClient,, IndexersMixin):
    # This maps the structure_family sent by the server to a client-side object that
    # can interpret the structure_family's structure and content. OneShotCachedMap is used to
    # defer imports.

    # This is populated when the first instance is created.

    def _discover_entrypoints(cls, entrypoint_name):
        return OneShotCachedMap(
                name: entrypoint.load
                for name, entrypoint in entrypoints.get_group_named(

    def discover_clients_from_entrypoints(cls):
        Search the software environment for libraries that register structure clients.

        This is called once automatically the first time Node.from_uri
        is called. It is idempotent.
            # short-circuit
        # The modules associated with these entrypoints will be imported
        # lazily, only when the item is first accessed.
        # Check old name (special_client) and new name (structure_client).
        for entrypoint_name in ["tiled.special_client", "tiled.structure_client"]:
            for name, entrypoint in entrypoints.get_group_named(
                cls.STRUCTURE_CLIENTS_FROM_ENTRYPOINTS.set(name, entrypoint.load)
                DEFAULT_STRUCTURE_CLIENT_DISPATCH["numpy"].set(name, entrypoint.load)
                DEFAULT_STRUCTURE_CLIENT_DISPATCH["dask"].set(name, entrypoint.load)

    def __init__(
        "This is not user-facing. Use Node.from_uri."

        self.structure_clients = structure_clients
        self._queries = list(queries or [])
        self._queries_as_params = _queries_to_params(*self._queries)
        # If the user has not specified a sorting, give the server the opportunity
        # to tell us the default sorting.
        if sorting:
            self._sorting = sorting
            # In the Python API we encode sorting as (key, direction).
            # This order-based "record" notion does not play well with OpenAPI.
            # In the HTTP API, therefore, we use {"key": key, "direction": direction}.
            self._sorting = [
                (s["key"], int(s["direction"]))
                for s in (item["attributes"].get("sorting") or [])
        sorting = sorting or item["attributes"].get("sorting")
        self._sorting_params = {
            "sort": ",".join(
                f"{'-' if item[1] < 0 else ''}{item[0]}" for item in self._sorting
        self._reversed_sorting_params = {
            "sort": ",".join(
                f"{'-' if item[1] > 0 else ''}{item[0]}" for item in self._sorting

    def __repr__(self):
        # Display up to the first N keys to avoid making a giant service
        # request. Use _keys_slicer because it is unauthenticated.
        N = 10
        return node_repr(self, self._keys_slice(0, N, direction=1))

    def sorting(self):
        The current sorting of this Node

        Given as a list of tuples where the first entry is the sorting key
        and the second entry indicates ASCENDING (or 1) or DESCENDING (or -1).
        return list(self._sorting)

    def new_variation(
        Create a copy of this Node, optionally varying some parameters.

        This is intended primarily for internal use and use by subclasses.
        if isinstance(structure_clients, str):
            structure_clients = DEFAULT_STRUCTURE_CLIENT_DISPATCH[structure_clients]
        if structure_clients is UNCHANGED:
            structure_clients = self.structure_clients
        if queries is UNCHANGED:
            queries = self._queries
        if sorting is UNCHANGED:
            sorting = self._sorting
        return super().new_variation(

    def __len__(self):
        # If the contents of this node was provided in-line, there is an
        # implication that the contents are not expected to be dynamic. Used the
        # count provided in the structure.
        structure = self.item["attributes"]["structure"]
        if structure["contents"]:
            return structure["count"]
        now = time.monotonic()
        if self._cached_len is not None:
            length, deadline = self._cached_len
            if now < deadline:
                # Used the cached value and do not make any request.
                return length
        content = handle_error(
                headers={"Accept": MSGPACK_MIME_TYPE},
                    "fields": "",
        length = content["meta"]["count"]
        self._cached_len = (length, now + LENGTH_CACHE_TTL)
        return length

    def __length_hint__(self):
        # TODO The server should provide an estimated count.
        return len(self)

    def __iter__(self, _ignore_inlined_contents=False):
        # If the contents of this node was provided in-line, and we don't need
        # to apply any filtering or sorting, we can slice the in-lined data
        # without fetching anything from the server.
        contents = self.item["attributes"]["structure"]["contents"]
        if (
            (contents is not None)
            and (not self._queries)
            and ((not self.sorting) or (self.sorting == [("_", 1)]))
            and (not _ignore_inlined_contents)
            return (yield from contents)
        next_page_url = self.item["links"]["search"]
        while next_page_url is not None:
            content = handle_error(
                    headers={"Accept": MSGPACK_MIME_TYPE},
                        "fields": "",
            self._cached_len = (
                time.monotonic() + LENGTH_CACHE_TTL,
            for item in content["data"]:
                yield item["id"]
            next_page_url = content["links"]["next"]

    def __getitem__(self, keys, _ignore_inlined_contents=False):
        # These are equivalent:
        # >>> node['a']['b']['c']
        # >>> node[('a', 'b', 'c')]
        # >>> node['a', 'b', 'c']
        # The last two are equivalent at a Python level;
        # both call node.__getitem__(('a', 'b', 'c')).
        # We elide this into a single request to the server rather than
        # a chain of requests. This is not totally straightforward because
        # of this use case:
        # >>>['a', 'b']
        # which must only return a result if 'a' is contained in the search results.
        if not isinstance(keys, tuple):
            keys = (keys,)
        for key in keys:
            if not isinstance(key, str):
                raise TypeError("Containers can only be indexed strings")
        if self._queries:
            # Lookup this key *within the search results* of this Node.
            key, *tail = keys
            tail = tuple(tail)  # list -> tuple
            params = {
            if self._include_data_sources:
                params["include_data_sources"] = True
            content = handle_error(
                    headers={"Accept": MSGPACK_MIME_TYPE},
            self._cached_len = (
                time.monotonic() + LENGTH_CACHE_TTL,
            data = content["data"]
            if not data:
                raise KeyError(key)
            assert (
                len(data) == 1
            ), "The key lookup query must never result more than one result."
            (item,) = data
            result = client_for_item(
            if tail:
                result = result[tail]
            # Straightforwardly look up the keys under this node.
            # There is no search filter in place, so if it is there
            # then we want it.

            # The server may greedily send nested information about children
            # ("inlined contents") to reduce latency. This is how we handle
            # xarray Datasets efficiently, for example.

            # In a loop, walk the key(s). Use inlined contents if we have it.
            # When we reach a key that we don't have inlined contents for, send
            # out a single request with all the rest of the keys, and break
            # the keys-walking loop. We are effectively "jumping" down the tree
            # to the node of interest without downloading information about
            # intermediate parents.
            for i, key in enumerate(keys):
                item = (self.item["attributes"]["structure"]["contents"] or {}).get(key)
                if (item is None) or _ignore_inlined_contents:
                    # The item was not inlined, either because nothing was inlined
                    # or because it was added after we fetched the inlined contents.
                    # Make a request for it.
                        self_link = self.item["links"]["self"]
                        if self_link.endswith("/"):
                            self_link = self_link[:-1]
                        params = {}
                        if self._include_data_sources:
                            params["include_data_sources"] = True
                        content = handle_error(
                                self_link + "".join(f"/{key}" for key in keys[i:]),
                                headers={"Accept": MSGPACK_MIME_TYPE},
                    except ClientError as err:
                        if err.response.status_code ==
                            # If this is a scalar lookup, raise KeyError("X") not KeyError(("X",)).
                            err_arg = keys[i:]
                            if len(err_arg) == 1:
                                (err_arg,) = err_arg
                            raise KeyError(err_arg)
                    item = content["data"]
            result = client_for_item(
        return result

    def delete(self, key):
        self._cached_len = None

    # The following two methods are used by keys(), values(), items().

    def _keys_slice(self, start, stop, direction, _ignore_inlined_contents=False):
        # If the contents of this node was provided in-line, and we don't need
        # to apply any filtering or sorting, we can slice the in-lined data
        # without fetching anything from the server.
        contents = self.item["attributes"]["structure"]["contents"]
        if (
            (contents is not None)
            and (not self._queries)
            and ((not self.sorting) or (self.sorting == [("_", 1)]))
            and (not _ignore_inlined_contents)
            keys = list(contents)
            if direction < 0:
                keys = list(reversed(keys))
            return (yield from keys[start:stop])
        if direction > 0:
            sorting_params = self._sorting_params
            sorting_params = self._reversed_sorting_params
        assert start >= 0
        assert (stop is None) or (stop >= 0)
        next_page_url = f"{self.item['links']['search']}?page[offset]={start}"
        item_counter = itertools.count(start)
        while next_page_url is not None:
            content = handle_error(
                    headers={"Accept": MSGPACK_MIME_TYPE},
                        "fields": "",
            self._cached_len = (
                time.monotonic() + LENGTH_CACHE_TTL,
            for item in content["data"]:
                if stop is not None and next(item_counter) == stop:
                yield item["id"]
            next_page_url = content["links"]["next"]

    def _items_slice(self, start, stop, direction, _ignore_inlined_contents=False):
        # If the contents of this node was provided in-line, and we don't need
        # to apply any filtering or sorting, we can slice the in-lined data
        # without fetching anything from the server.
        contents = self.item["attributes"]["structure"]["contents"]
        if (
            (contents is not None)
            and (not self._queries)
            and ((not self.sorting) or (self.sorting == [("_", 1)]))
            and (not _ignore_inlined_contents)
            items = list(contents.items())
            if direction < 0:
                items = list(reversed(items))
            for key, item in items[start:stop]:
                yield key, client_for_item(
        if direction > 0:
            sorting_params = self._sorting_params
            sorting_params = self._reversed_sorting_params
        assert start >= 0
        assert (stop is None) or (stop >= 0)
        next_page_url = f"{self.item['links']['search']}?page[offset]={start}"
        item_counter = itertools.count(start)
        while next_page_url is not None:
            params = {
            if self._include_data_sources:
                params["include_data_sources"] = True
            content = handle_error(
                    headers={"Accept": MSGPACK_MIME_TYPE},
            self._cached_len = (
                time.monotonic() + LENGTH_CACHE_TTL,

            for item in content["data"]:
                if stop is not None and next(item_counter) == stop:
                key = item["id"]
                yield key, client_for_item(
            next_page_url = content["links"]["next"]

[docs] def keys(self): return KeysView(lambda: len(self), self._keys_slice)
[docs] def values(self): return ValuesView(lambda: len(self), self._items_slice)
[docs] def items(self): return ItemsView(lambda: len(self), self._items_slice)
[docs] def search(self, query): """ Make a Node with a subset of this Node's entries, filtered by query. Examples -------- >>> from tiled.queries import FullText >>>"hello")) """ return self.new_variation(queries=self._queries + [query])
[docs] def distinct( self, *metadata_keys, structure_families=False, specs=False, counts=False ): """ Get the unique values and optionally counts of metadata_keys, structure_families, and specs in this Node's entries Examples -------- Query all the distinct values of a key. >>> tree.distinct("foo", counts=True) Query for multiple keys at once. >>> tree.distinct("foo", "bar", counts=True) """ link = self.item["links"]["self"].replace("/metadata", "/distinct", 1) distinct = handle_error( self.context.http_client.get( link, headers={"Accept": MSGPACK_MIME_TYPE}, params={ "metadata": metadata_keys, "structure_families": structure_families, "specs": specs, "counts": counts, **self._queries_as_params, }, ) ).json() return distinct
[docs] def sort(self, *sorting): """ Make a Node with the same entries but sorted according to `sorting`. Examples -------- Sort by "color" in ascending order, and then by "height" in descending order. >>> from tiled.client import ASCENDING, DESCENDING >>> tree.sort(("color", ASCENDING), ("height", DESCENDING)) Note that ``1`` may be used as a synonym for ``ASCENDING``, and ``-1`` may be used as a synonym for ``DESCENDING``. """ return self.new_variation(sorting=sorting)
def export(self, filepath, fields=None, *, format=None): """ Download metadata and data below this node in some format and write to a file. Parameters ---------- file: str or buffer Filepath or writeable buffer. fields: List[str], optional Filter which items in this node to export. format : str, optional If format is None and `file` is a filepath, the format is inferred from the name, like 'table.h5' implies format="application/x-hdf5". The format may be given as a file extension ("h5") or a media type ("application/x-hdf5"). Examples -------- Export all. >>> a.export("everything.h5") """ params = {} if fields is not None: params["field"] = fields return export_util( filepath, format, self.context.http_client.get, self.item["links"]["full"], params=params, ) def _ipython_key_completions_(self): """ Provide method for the key-autocompletions in IPython. See """ MAX_ENTRIES_SUPPORTED = 40 try: if len(self) > MAX_ENTRIES_SUPPORTED: MSG = ( "Tab-completition is not supported on this particular Node " "because it has a large number of entries." ) warnings.warn(MSG) return [] else: return list(self) except Exception: # Do not print messy traceback from thread. Just fail silently. return []
[docs] def new( self, structure_family, data_sources, *, key=None, metadata=None, specs=None, ): """ Create a new item within this Node. This is a low-level method. See high-level convenience methods listed below. See Also -------- write_array write_dataframe write_coo_array """ self._cached_len = None metadata = metadata or {} specs = specs or [] normalized_specs = [] for spec in specs: if isinstance(spec, str): spec = Spec(spec) normalized_specs.append(asdict(spec)) item = { "attributes": { "metadata": metadata, "structure_family": StructureFamily(structure_family), "specs": normalized_specs, "data_sources": [asdict(data_source) for data_source in data_sources], } } body = dict(item["attributes"]) if key is not None: body["id"] = key # if check: if any(data_source.assets for data_source in data_sources): endpoint = self.uri.replace("/metadata/", "/register/", 1) else: endpoint = self.uri document = handle_error( endpoint, headers={"Accept": MSGPACK_MIME_TYPE}, content=safe_json_dump(body), ) ).json() if structure_family == StructureFamily.container: structure = {"contents": None, "count": None} else: # Only containers can have multiple data_sources right now. (data_source,) = data_sources structure = data_source.structure item["attributes"]["structure"] = structure # if server returned modified metadata update the local copy if "metadata" in document: item["attributes"]["metadata"] = document.pop("metadata") # Ditto for structure if "structure" in document: item["attributes"]["structure"] = STRUCTURE_TYPES[structure_family]( document.pop("structure") ) # Merge in "id" and "links" returned by the server. item.update(document) # Ensure this is a dataclass, not a dict. # When we apply type hints and mypy to the client it should be possible # to dispense with this. if (structure_family != StructureFamily.container) and isinstance( structure, dict ): structure_type = STRUCTURE_TYPES[structure_family] structure = structure_type.from_json(structure) return client_for_item( self.context, self.structure_clients, item, structure=structure, include_data_sources=self._include_data_sources, )
# When (re)chunking arrays for upload, we use this limit # to attempt to avoid bumping into size limits. _SUGGESTED_MAX_UPLOAD_SIZE = 100_000_000 # 100 MB
[docs] def create_container(self, key=None, *, metadata=None, dims=None, specs=None): """ EXPERIMENTAL: Create a new, empty container. Parameters ---------- key : str, optional Key (name) for this new node. If None, the server will provide a unique key. metadata : dict, optional User metadata. May be nested. Must contain only basic types (e.g. numbers, strings, lists, dicts) that are JSON-serializable. dims : List[str], optional A label for each dimension of the array. specs : List[Spec], optional List of names that are used to label that the data and/or metadata conform to some named standard specification. """ return StructureFamily.container, [], key=key, metadata=metadata, specs=specs, )
[docs] def write_array(self, array, *, key=None, metadata=None, dims=None, specs=None): """ EXPERIMENTAL: Write an array. Parameters ---------- array : array-like key : str, optional Key (name) for this new node. If None, the server will provide a unique key. metadata : dict, optional User metadata. May be nested. Must contain only basic types (e.g. numbers, strings, lists, dicts) that are JSON-serializable. dims : List[str], optional A label for each dimension of the array. specs : List[Spec], optional List of names that are used to label that the data and/or metadata conform to some named standard specification. """ import dask.array import numpy from dask.array.core import normalize_chunks from ..structures.array import ArrayStructure, BuiltinDtype if not (hasattr(array, "shape") and hasattr(array, "dtype")): # This does not implement enough of the array-like interface. # Coerce to numpy. array = numpy.asarray(array) # Determine chunks such that each chunk is not too large to upload. # Any existing chunking will be taken into account. # If the array is small, there will be only one chunk. if hasattr(array, "chunks"): chunks = normalize_chunks( array.chunks, limit=self._SUGGESTED_MAX_UPLOAD_SIZE, dtype=array.dtype, shape=array.shape, ) else: chunks = normalize_chunks( tuple("auto" for _ in array.shape), limit=self._SUGGESTED_MAX_UPLOAD_SIZE, dtype=array.dtype, shape=array.shape, ) structure = ArrayStructure( shape=array.shape, chunks=chunks, dims=dims, data_type=BuiltinDtype.from_numpy_dtype(array.dtype), ) client = StructureFamily.array, [ DataSource(structure=structure, structure_family=StructureFamily.array), ], key=key, metadata=metadata, specs=specs, ) chunked = any(len(dim) > 1 for dim in chunks) if not chunked: client.write(array) else: # Fan out client.write_block over each chunk using dask. if isinstance(array, dask.array.Array): da = array.rechunk(chunks) else: da = dask.array.from_array(array, chunks=chunks) # Dask inspects the signature and passes block_id in if present. # It also apparently calls it with an empty array and block_id # once, so we catch that call and become a no-op. def write_block(x, block_id, client): if len(block_id): client.write_block(x, block=block_id) return x # TODO Is there a fire-and-forget analogue such that we don't need # to bother with the return type? da.map_blocks(write_block, dtype=da.dtype, client=client).compute() return client
[docs] def write_awkward( self, array, *, key=None, metadata=None, dims=None, specs=None, ): """ Write an AwkwardArray. Parameters ---------- array: awkward.Array key : str, optional Key (name) for this new node. If None, the server will provide a unique key. metadata : dict, optional User metadata. May be nested. Must contain only basic types (e.g. numbers, strings, lists, dicts) that are JSON-serializable. dims : List[str], optional A label for each dimension of the array. specs : List[Spec], optional List of names that are used to label that the data and/or metadata conform to some named standard specification. """ import awkward from ..structures.awkward import AwkwardStructure packed = awkward.to_packed(array) form, length, container = awkward.to_buffers(packed) structure = AwkwardStructure( length=length, form=form.to_dict(), ) client = StructureFamily.awkward, [ DataSource( structure=structure, structure_family=StructureFamily.awkward ), ], key=key, metadata=metadata, specs=specs, ) client.write(container) return client
[docs] def write_sparse( self, coords, data, shape, *, key=None, metadata=None, dims=None, specs=None, ): """ EXPERIMENTAL: Write a sparse array. Parameters ---------- coords : array-like data : array-like shape : tuple key : str, optional Key (name) for this new node. If None, the server will provide a unique key. metadata : dict, optional User metadata. May be nested. Must contain only basic types (e.g. numbers, strings, lists, dicts) that are JSON-serializable. dims : List[str], optional A label for each dimension of the array. specs : List[Spec], optional List of names that are used to label that the data and/or metadata conform to some named standard specification. Examples -------- Write a sparse.COO array. >>> import sparse >>> coo = sparse.COO(coords=[[2, 5]], data=[1.3, 7.5], shape=(10,)) >>> c.write_sparse(coords=coo.coords,, shape=coo.shape) This only supports a single chunk. For chunked upload, use lower-level methods. # Define the overall shape and the dimensions of each chunk. >>> from tiled.structures.sparse import COOStructure >>> structure = COOStructure(shape=(10,), chunks=((5, 5),)) >>> data_source = DataSource(structure=structure, structure_family=StructureFamily.sparse) >>> x ="sparse", [data_source]) # Upload the data in each chunk. # Coords are given with in the reference frame of each chunk. >>> x.write_block(coords=[[2, 4]], data=[3.1, 2.8], block=(0,)) >>> x.write_block(coords=[[0, 1]], data=[6.7, 1.2], block=(1,)) """ from ..structures.sparse import COOStructure structure = COOStructure( shape=shape, # This method only supports single-chunk COO arrays. chunks=tuple((dim,) for dim in shape), dims=dims, ) client = StructureFamily.sparse, [ DataSource( structure=structure, structure_family=StructureFamily.sparse ), ], key=key, metadata=metadata, specs=specs, ) client.write(coords, data) return client
[docs] def write_dataframe( self, dataframe, *, key=None, metadata=None, specs=None, ): """ EXPERIMENTAL: Write a DataFrame. This is subject to change or removal without notice Parameters ---------- dataframe : pandas.DataFrame key : str, optional Key (name) for this new node. If None, the server will provide a unique key. metadata : dict, optional User metadata. May be nested. Must contain only basic types (e.g. numbers, strings, lists, dicts) that are JSON-serializable. specs : List[Spec], optional List of names that are used to label that the data and/or metadata conform to some named standard specification. """ import dask.dataframe from ..structures.table import TableStructure metadata = metadata or {} specs = specs or [] if isinstance(dataframe, dask.dataframe.DataFrame): structure = TableStructure.from_dask_dataframe(dataframe) elif isinstance(dataframe, dict): structure = TableStructure.from_dict(dataframe) else: structure = TableStructure.from_pandas(dataframe) client = StructureFamily.table, [ DataSource(structure=structure, structure_family=StructureFamily.table), ], key=key, metadata=metadata, specs=specs, ) if hasattr(dataframe, "partitions"): if isinstance(dataframe, dask.dataframe.DataFrame): ddf = dataframe else: raise NotImplementedError( f"Unsure how to handle type {type(dataframe)}" ) ddf.map_partitions( functools.partial(_write_partition, client=client), meta=dataframe._meta ).compute() else: client.write(dataframe) return client
def _queries_to_params(*queries): "Compute GET params from the queries." params = collections.defaultdict(list) for query in queries: name = query_registry.query_type_to_name[type(query)] for field, value in query.encode().items(): if value is not None: params[f"filter[{name}][condition][{field}]"].append(value) return dict(params) LENGTH_CACHE_TTL = 1 # second class Ascending(Sentinel): "Intended for more readable sorting operations. An alias for 1." def __index__(self): return 1 class Descending(Sentinel): "Intended for more readable sorting operations. An alias for -1." def __index__(self): return -1 ASCENDING = Ascending("ASCENDING") "Ascending sort order. An alias for 1." DESCENDING = Descending("DESCENDING") "Decending sort order. An alias for -1." class _LazyLoad: # This exists because lambdas and closures cannot be pickled. def __init__(self, import_module_args, attr_name): self.import_module_args = import_module_args self.attr_name = attr_name def __call__(self): return getattr( importlib.import_module(*self.import_module_args), self.attr_name ) class _Wrap: # This exists because lambdas and closures cannot be pickled. def __init__(self, obj): self.obj = obj def __call__(self): return self.obj def _write_partition(x, partition_info, client): client.write_partition(x, partition_info["number"]) return x DEFAULT_STRUCTURE_CLIENT_DISPATCH = { "numpy": OneShotCachedMap( { "container": _Wrap(Container), "array": _LazyLoad(("..array", Container.__module__), "ArrayClient"), "awkward": _LazyLoad(("..awkward", Container.__module__), "AwkwardClient"), "dataframe": _LazyLoad( ("..dataframe", Container.__module__), "DataFrameClient" ), "sparse": _LazyLoad(("..sparse", Container.__module__), "SparseClient"), "table": _LazyLoad( ("..dataframe", Container.__module__), "DataFrameClient" ), "xarray_dataset": _LazyLoad( ("..xarray", Container.__module__), "DatasetClient" ), } ), "dask": OneShotCachedMap( { "container": _Wrap(Container), "array": _LazyLoad(("..array", Container.__module__), "DaskArrayClient"), # TODO Create DaskAwkwardClient # "awkward": _LazyLoad(("..awkward", Container.__module__), "DaskAwkwardClient"), "dataframe": _LazyLoad( ("..dataframe", Container.__module__), "DaskDataFrameClient" ), "sparse": _LazyLoad(("..sparse", Container.__module__), "SparseClient"), "table": _LazyLoad( ("..dataframe", Container.__module__), "DaskDataFrameClient" ), "xarray_dataset": _LazyLoad( ("..xarray", Container.__module__), "DaskDatasetClient" ), } ), }