Source code for tiled.client.container

import collections
import collections.abc
import functools
import importlib
import itertools
import time
import warnings
from dataclasses import asdict
from typing import TYPE_CHECKING, Any, Optional, Union
from urllib.parse import parse_qs, urlparse

import entrypoints
import httpx

from ..adapters.utils import IndexersMixin
from ..iterviews import ItemsView, KeysView, ValuesView
from ..queries import KeyLookup
from ..query_registration import default_query_registry
from ..structures.core import Spec, StructureFamily
from ..structures.data_source import DataSource
from ..utils import UNCHANGED, OneShotCachedMap, Sentinel, node_repr, safe_json_dump
from .base import STRUCTURE_TYPES, BaseClient
from .utils import (
    MSGPACK_MIME_TYPE,
    ClientError,
    client_for_item,
    export_util,
    handle_error,
)

# import pandas


if TYPE_CHECKING:
    import pandas
    import pyarrow


class Container(BaseClient, collections.abc.Mapping, IndexersMixin):
    # This maps the structure_family sent by the server to a client-side object that
    # can interpret the structure_family's structure and content. OneShotCachedMap is used to
    # defer imports.

    # This is populated when the first instance is created.
    STRUCTURE_CLIENTS_FROM_ENTRYPOINTS = None

    @classmethod
    def _discover_entrypoints(cls, entrypoint_name):
        return OneShotCachedMap(
            {
                name: entrypoint.load
                for name, entrypoint in entrypoints.get_group_named(
                    entrypoint_name
                ).items()
            }
        )

    @classmethod
    def discover_clients_from_entrypoints(cls):
        """
        Search the software environment for libraries that register structure clients.

        This is called once automatically the first time Node.from_uri
        is called. It is idempotent.
        """
        if cls.STRUCTURE_CLIENTS_FROM_ENTRYPOINTS is not None:
            # short-circuit
            return
        # The modules associated with these entrypoints will be imported
        # lazily, only when the item is first accessed.
        cls.STRUCTURE_CLIENTS_FROM_ENTRYPOINTS = OneShotCachedMap()
        # Check old name (special_client) and new name (structure_client).
        for entrypoint_name in ["tiled.special_client", "tiled.structure_client"]:
            for name, entrypoint in entrypoints.get_group_named(
                entrypoint_name
            ).items():
                cls.STRUCTURE_CLIENTS_FROM_ENTRYPOINTS.set(name, entrypoint.load)
                DEFAULT_STRUCTURE_CLIENT_DISPATCH["numpy"].set(name, entrypoint.load)
                DEFAULT_STRUCTURE_CLIENT_DISPATCH["dask"].set(name, entrypoint.load)

    def __init__(
        self,
        context,
        *,
        item,
        structure_clients,
        queries=None,
        sorting=None,
        structure=None,
        include_data_sources=False,
    ):
        "This is not user-facing. Use Node.from_uri."

        self.structure_clients = structure_clients
        self._queries = list(queries or [])
        self._queries_as_params = _queries_to_params(*self._queries)
        # If the user has not specified a sorting, give the server the opportunity
        # to tell us the default sorting.
        if sorting:
            self._sorting = sorting
        else:
            # In the Python API we encode sorting as (key, direction).
            # This order-based "record" notion does not play well with OpenAPI.
            # In the HTTP API, therefore, we use {"key": key, "direction": direction}.
            self._sorting = [
                (s["key"], int(s["direction"]))
                for s in (item["attributes"].get("sorting") or [])
            ]
        sorting = sorting or item["attributes"].get("sorting")
        self._sorting_params = {
            "sort": ",".join(
                f"{'-' if item[1] < 0 else ''}{item[0]}" for item in self._sorting
            )
        }
        self._reversed_sorting_params = {
            "sort": ",".join(
                f"{'-' if item[1] > 0 else ''}{item[0]}" for item in self._sorting
            )
        }
        super().__init__(
            context=context,
            item=item,
            structure_clients=structure_clients,
            include_data_sources=include_data_sources,
        )

    def __repr__(self):
        # Display up to the first N keys to avoid making a giant service
        # request. Use _keys_slicer because it is unauthenticated.
        N = 10
        return node_repr(self, self._keys_slice(0, N, direction=1))

    @property
    def sorting(self):
        """
        The current sorting of this Node

        Given as a list of tuples where the first entry is the sorting key
        and the second entry indicates ASCENDING (or 1) or DESCENDING (or -1).
        """
        return list(self._sorting)

    def new_variation(
        self,
        *,
        structure_clients=UNCHANGED,
        queries=UNCHANGED,
        sorting=UNCHANGED,
        **kwargs,
    ):
        """
        Create a copy of this Node, optionally varying some parameters.

        This is intended primarily for internal use and use by subclasses.
        """
        if isinstance(structure_clients, str):
            structure_clients = DEFAULT_STRUCTURE_CLIENT_DISPATCH[structure_clients]
        if structure_clients is UNCHANGED:
            structure_clients = self.structure_clients
        if queries is UNCHANGED:
            queries = self._queries
        if sorting is UNCHANGED:
            sorting = self._sorting
        return super().new_variation(
            structure_clients=structure_clients,
            queries=queries,
            sorting=sorting,
            **kwargs,
        )

    def __len__(self):
        # If the contents of this node was provided in-line, there is an
        # implication that the contents are not expected to be dynamic. Used the
        # count provided in the structure.
        structure = self.item["attributes"]["structure"]
        if structure["contents"]:
            return structure["count"]
        now = time.monotonic()
        if self._cached_len is not None:
            length, deadline = self._cached_len
            if now < deadline:
                # Used the cached value and do not make any request.
                return length
        link = self.item["links"]["search"]
        content = handle_error(
            self.context.http_client.get(
                link,
                headers={"Accept": MSGPACK_MIME_TYPE},
                params={
                    **parse_qs(urlparse(link).query),
                    "fields": "",
                    **self._queries_as_params,
                    **self._sorting_params,
                },
            )
        ).json()
        length = content["meta"]["count"]
        self._cached_len = (length, now + LENGTH_CACHE_TTL)
        return length

    def __length_hint__(self):
        # TODO The server should provide an estimated count.
        # https://www.python.org/dev/peps/pep-0424/
        return len(self)

    def __iter__(self, _ignore_inlined_contents=False):
        # If the contents of this node was provided in-line, and we don't need
        # to apply any filtering or sorting, we can slice the in-lined data
        # without fetching anything from the server.
        contents = self.item["attributes"]["structure"]["contents"]
        if (
            (contents is not None)
            and (not self._queries)
            and ((not self.sorting) or (self.sorting == [("_", 1)]))
            and (not _ignore_inlined_contents)
        ):
            return (yield from contents)
        next_page_url = self.item["links"]["search"]
        while next_page_url is not None:
            content = handle_error(
                self.context.http_client.get(
                    next_page_url,
                    headers={"Accept": MSGPACK_MIME_TYPE},
                    params={
                        **parse_qs(urlparse(next_page_url).query),
                        "fields": "",
                        **self._queries_as_params,
                        **self._sorting_params,
                    },
                )
            ).json()
            self._cached_len = (
                content["meta"]["count"],
                time.monotonic() + LENGTH_CACHE_TTL,
            )
            for item in content["data"]:
                yield item["id"]
            next_page_url = content["links"]["next"]

    def __getitem__(self, keys, _ignore_inlined_contents=False):
        # These are equivalent:
        #
        # >>> node['a']['b']['c']
        # >>> node[('a', 'b', 'c')]
        # >>> node['a', 'b', 'c']
        #
        # The last two are equivalent at a Python level;
        # both call node.__getitem__(('a', 'b', 'c')).
        #
        # We elide this into a single request to the server rather than
        # a chain of requests. This is not totally straightforward because
        # of this use case:
        #
        # >>> node.search(...)['a', 'b']
        #
        # which must only return a result if 'a' is contained in the search results.
        if not isinstance(keys, tuple):
            keys = (keys,)
        for key in keys:
            if not isinstance(key, str):
                raise TypeError("Containers can only be indexed by strings")
        keys = tuple("/".join(keys).strip("/").split("/"))  # Remove any slashes
        if self._queries:
            # Lookup this key *within the search results* of this Node.
            key, *tail = keys
            tail = tuple(tail)  # list -> tuple
            params = {
                **_queries_to_params(KeyLookup(key)),
                **self._queries_as_params,
                **self._sorting_params,
            }
            if self._include_data_sources:
                params["include_data_sources"] = True
            link = self.item["links"]["search"]
            content = handle_error(
                self.context.http_client.get(
                    link,
                    headers={"Accept": MSGPACK_MIME_TYPE},
                    params={**parse_qs(urlparse(link).query), **params},
                )
            ).json()
            self._cached_len = (
                content["meta"]["count"],
                time.monotonic() + LENGTH_CACHE_TTL,
            )
            data = content["data"]
            if not data:
                raise KeyError(key)
            assert (
                len(data) == 1
            ), "The key lookup query must never return more than one result."
            (item,) = data
            result = client_for_item(
                self.context,
                self.structure_clients,
                item,
                include_data_sources=self._include_data_sources,
            )
            if tail:
                result = result[tail]
        else:
            # Straightforwardly look up the keys under this node.
            # There is no search filter in place, so if it is there
            # then we want it.

            # The server may greedily send nested information about children
            # ("inlined contents") to reduce latency. This is how we handle
            # xarray Datasets efficiently, for example.

            # In a loop, walk the key(s). Use inlined contents if we have it.
            # When we reach a key that we don't have inlined contents for, send
            # out a single request with all the rest of the keys, and break
            # the keys-walking loop. We are effectively "jumping" down the tree
            # to the node of interest without downloading information about
            # intermediate parents.
            for i, key in enumerate(keys):
                item = (self.item["attributes"]["structure"]["contents"] or {}).get(key)
                if (item is None) or _ignore_inlined_contents:
                    # The item was not inlined, either because nothing was inlined
                    # or because it was added after we fetched the inlined contents.
                    # Make a request for it.
                    try:
                        self_link = self.item["links"]["self"].rstrip("/")
                        params = {}
                        if self._include_data_sources:
                            params["include_data_sources"] = True
                        link = self_link + "".join(f"/{key}" for key in keys[i:])
                        content = handle_error(
                            self.context.http_client.get(
                                link,
                                headers={"Accept": MSGPACK_MIME_TYPE},
                                params={**parse_qs(urlparse(link).query), **params},
                            )
                        ).json()
                    except ClientError as err:
                        if err.response.status_code == httpx.codes.NOT_FOUND:
                            # If this is a scalar lookup, raise KeyError("X") not KeyError(("X",)).
                            err_arg = keys[i:]
                            if len(err_arg) == 1:
                                (err_arg,) = err_arg
                            raise KeyError(err_arg)
                        raise
                    item = content["data"]

                    # Tables that belong to composite nodes cannot be addressed directly
                    if (
                        item["attributes"]["structure_family"] == StructureFamily.table
                    ) and (
                        "flattened" in (s["name"] for s in item["attributes"]["specs"])
                    ):
                        raise KeyError(
                            f"Attempting to access a table in a composite container; use .parts['{keys[-1]}'] instead."  # noqa
                        )

                    break
            result = client_for_item(
                self.context,
                self.structure_clients,
                item,
                include_data_sources=self._include_data_sources,
            )
        return result

    def delete(self, key):
        self._cached_len = None
        handle_error(self.context.http_client.delete(f"{self.uri}/{key}"))

    # The following two methods are used by keys(), values(), items().

    def _keys_slice(self, start, stop, direction, _ignore_inlined_contents=False):
        # If the contents of this node was provided in-line, and we don't need
        # to apply any filtering or sorting, we can slice the in-lined data
        # without fetching anything from the server.
        contents = self.item["attributes"]["structure"]["contents"]
        if (
            (contents is not None)
            and (not self._queries)
            and ((not self.sorting) or (self.sorting == [("_", 1)]))
            and (not _ignore_inlined_contents)
        ):
            keys = list(contents)
            if direction < 0:
                keys = list(reversed(keys))
            return (yield from keys[start:stop])
        if direction > 0:
            sorting_params = self._sorting_params
        else:
            sorting_params = self._reversed_sorting_params
        assert start >= 0
        assert (stop is None) or (stop >= 0)
        next_page_url = f"{self.item['links']['search']}?page[offset]={start}"
        item_counter = itertools.count(start)
        while next_page_url is not None:
            content = handle_error(
                self.context.http_client.get(
                    next_page_url,
                    headers={"Accept": MSGPACK_MIME_TYPE},
                    params={
                        **parse_qs(urlparse(next_page_url).query),
                        "fields": "",
                        **self._queries_as_params,
                        **sorting_params,
                    },
                )
            ).json()
            self._cached_len = (
                content["meta"]["count"],
                time.monotonic() + LENGTH_CACHE_TTL,
            )
            for item in content["data"]:
                if stop is not None and next(item_counter) == stop:
                    return
                yield item["id"]
            next_page_url = content["links"]["next"]

    def _items_slice(self, start, stop, direction, _ignore_inlined_contents=False):
        # If the contents of this node was provided in-line, and we don't need
        # to apply any filtering or sorting, we can slice the in-lined data
        # without fetching anything from the server.
        contents = self.item["attributes"]["structure"]["contents"]
        if (
            (contents is not None)
            and (not self._queries)
            and ((not self.sorting) or (self.sorting == [("_", 1)]))
            and (not _ignore_inlined_contents)
        ):
            items = list(contents.items())
            if direction < 0:
                items = list(reversed(items))
            for key, item in items[start:stop]:
                yield key, client_for_item(
                    self.context,
                    self.structure_clients,
                    item,
                    include_data_sources=self._include_data_sources,
                )
            return
        if direction > 0:
            sorting_params = self._sorting_params
        else:
            sorting_params = self._reversed_sorting_params
        assert start >= 0
        assert (stop is None) or (stop >= 0)
        next_page_url = f"{self.item['links']['search']}?page[offset]={start}"
        item_counter = itertools.count(start)
        while next_page_url is not None:
            params = {
                **parse_qs(urlparse(next_page_url).query),
                **self._queries_as_params,
                **sorting_params,
            }
            if self._include_data_sources:
                params["include_data_sources"] = True
            content = handle_error(
                self.context.http_client.get(
                    next_page_url,
                    headers={"Accept": MSGPACK_MIME_TYPE},
                    params=params,
                )
            ).json()
            self._cached_len = (
                content["meta"]["count"],
                time.monotonic() + LENGTH_CACHE_TTL,
            )
            for item in content["data"]:
                if stop is not None and next(item_counter) == stop:
                    return
                key = item["id"]
                yield key, client_for_item(
                    self.context,
                    self.structure_clients,
                    item,
                    include_data_sources=self._include_data_sources,
                )
            next_page_url = content["links"]["next"]

[docs] def keys(self): return KeysView(lambda: len(self), self._keys_slice)
[docs] def values(self): return ValuesView(lambda: len(self), self._items_slice)
[docs] def items(self): return ItemsView(lambda: len(self), self._items_slice)
[docs] def search(self, query): """ Make a Node with a subset of this Node's entries, filtered by query. Examples -------- >>> from tiled.queries import FullText >>> tree.search(FullText("hello")) """ return self.new_variation(queries=self._queries + [query])
[docs] def distinct( self, *metadata_keys, structure_families=False, specs=False, counts=False ): """ Get the unique values and optionally counts of metadata_keys, structure_families, and specs in this Node's entries Examples -------- Query all the distinct values of a key. >>> tree.distinct("foo", counts=True) Query for multiple keys at once. >>> tree.distinct("foo", "bar", counts=True) """ link = self.item["links"]["self"].replace("/metadata", "/distinct", 1) distinct = handle_error( self.context.http_client.get( link, headers={"Accept": MSGPACK_MIME_TYPE}, params={ **parse_qs(urlparse(link).query), "metadata": metadata_keys, "structure_families": structure_families, "specs": specs, "counts": counts, **self._queries_as_params, }, ) ).json() return distinct
[docs] def sort(self, *sorting): """ Make a Node with the same entries but sorted according to `sorting`. Examples -------- Sort by "color" in ascending order, and then by "height" in descending order. >>> from tiled.client import ASCENDING, DESCENDING >>> tree.sort(("color", ASCENDING), ("height", DESCENDING)) Note that ``1`` may be used as a synonym for ``ASCENDING``, and ``-1`` may be used as a synonym for ``DESCENDING``. """ return self.new_variation(sorting=sorting)
def export(self, filepath, fields=None, *, format=None): """ Download metadata and data below this node in some format and write to a file. Parameters ---------- file: str or buffer Filepath or writeable buffer. fields: List[str], optional Filter which items in this node to export. format : str, optional If format is None and `file` is a filepath, the format is inferred from the name, like 'table.h5' implies format="application/x-hdf5". The format may be given as a file extension ("h5") or a media type ("application/x-hdf5"). Examples -------- Export all. >>> a.export("everything.h5") """ params = {} if fields is not None: params["field"] = fields return export_util( filepath, format, self.context.http_client.get, self.item["links"]["full"], params=params, ) def _ipython_key_completions_(self): """ Provide method for the key-autocompletions in IPython. See http://ipython.readthedocs.io/en/stable/config/integrating.html#tab-completion """ MAX_ENTRIES_SUPPORTED = 40 try: if len(self) > MAX_ENTRIES_SUPPORTED: MSG = ( "Tab-completition is not supported on this particular Node " "because it has a large number of entries." ) warnings.warn(MSG) return [] else: return list(self) except Exception: # Do not print messy traceback from thread. Just fail silently. return []
[docs] def new( self, structure_family, data_sources, *, key=None, metadata=None, specs=None, ): """ Create a new item within this Node. This is a low-level method. See high-level convenience methods listed below. See Also -------- write_array write_dataframe write_coo_array """ self._cached_len = None metadata = metadata or {} specs = specs or [] normalized_specs = [] for spec in specs: if isinstance(spec, str): spec = Spec(spec) normalized_specs.append(asdict(spec)) item = { "attributes": { "metadata": metadata, "structure_family": StructureFamily(structure_family), "specs": normalized_specs, "data_sources": [asdict(data_source) for data_source in data_sources], } } body = dict(item["attributes"]) if key is not None: body["id"] = key # if check: if any(data_source.assets for data_source in data_sources): endpoint = self.uri.replace("/metadata/", "/register/", 1) else: endpoint = self.uri document = handle_error( self.context.http_client.post( endpoint, headers={"Accept": MSGPACK_MIME_TYPE}, content=safe_json_dump(body), ) ).json() if structure_family in {StructureFamily.container, StructureFamily.composite}: structure = {"contents": None, "count": None} else: # Only containers can have multiple data_sources right now. (data_source,) = data_sources structure = data_source.structure item["attributes"]["structure"] = structure # if server returned modified metadata update the local copy if "metadata" in document: item["attributes"]["metadata"] = document.pop("metadata") # Ditto for structure if "structure" in document: item["attributes"]["structure"] = STRUCTURE_TYPES[structure_family]( document.pop("structure") ) # Merge in "id" and "links" returned by the server. item.update(document) # Ensure this is a dataclass, not a dict. # When we apply type hints and mypy to the client it should be possible # to dispense with this. if ( structure_family not in {StructureFamily.container, StructureFamily.composite} ) and isinstance(structure, dict): structure_type = STRUCTURE_TYPES[structure_family] structure = structure_type.from_json(structure) return client_for_item( self.context, self.structure_clients, item, structure=structure, include_data_sources=self._include_data_sources, )
# When (re)chunking arrays for upload, we use this limit # to attempt to avoid bumping into size limits. _SUGGESTED_MAX_UPLOAD_SIZE = 100_000_000 # 100 MB def create_composite(self, key=None, *, metadata=None, specs=None): """Create a new, empty composite container. Parameters ---------- key : str, optional Key (name) for this new node. If None, the server will provide a unique key. metadata : dict, optional User metadata. May be nested. Must contain only basic types (e.g. numbers, strings, lists, dicts) that are JSON-serializable. specs : List[Spec], optional List of names that are used to label that the data and/or metadata conform to some named standard specification. """ return self.new( StructureFamily.composite, [], key=key, metadata=metadata, specs=specs, )
[docs] def create_container(self, key=None, *, metadata=None, specs=None): """Create a new, empty container. Parameters ---------- key : str, optional Key (name) for this new node. If None, the server will provide a unique key. metadata : dict, optional User metadata. May be nested. Must contain only basic types (e.g. numbers, strings, lists, dicts) that are JSON-serializable. specs : List[Spec], optional List of names that are used to label that the data and/or metadata conform to some named standard specification. """ return self.new( StructureFamily.container, [], key=key, metadata=metadata, specs=specs, )
[docs] def write_array(self, array, *, key=None, metadata=None, dims=None, specs=None): """ EXPERIMENTAL: Write an array. Parameters ---------- array : array-like key : str, optional Key (name) for this new node. If None, the server will provide a unique key. metadata : dict, optional User metadata. May be nested. Must contain only basic types (e.g. numbers, strings, lists, dicts) that are JSON-serializable. dims : List[str], optional A label for each dimension of the array. specs : List[Spec], optional List of names that are used to label that the data and/or metadata conform to some named standard specification. """ import dask.array import numpy from dask.array.core import normalize_chunks from ..structures.array import ArrayStructure, BuiltinDtype if not (hasattr(array, "shape") and hasattr(array, "dtype")): # This does not implement enough of the array-like interface. # Coerce to numpy. array = numpy.asarray(array) # Determine chunks such that each chunk is not too large to upload. # Any existing chunking will be taken into account. # If the array is small, there will be only one chunk. if hasattr(array, "chunks"): chunks = normalize_chunks( array.chunks, limit=self._SUGGESTED_MAX_UPLOAD_SIZE, dtype=array.dtype, shape=array.shape, ) else: chunks = normalize_chunks( tuple("auto" for _ in array.shape), limit=self._SUGGESTED_MAX_UPLOAD_SIZE, dtype=array.dtype, shape=array.shape, ) structure = ArrayStructure( shape=array.shape, chunks=chunks, dims=dims, data_type=BuiltinDtype.from_numpy_dtype(array.dtype), ) client = self.new( StructureFamily.array, [ DataSource(structure=structure, structure_family=StructureFamily.array), ], key=key, metadata=metadata, specs=specs, ) chunked = any(len(dim) > 1 for dim in chunks) if not chunked: client.write(array) else: # Fan out client.write_block over each chunk using dask. if isinstance(array, dask.array.Array): da = array.rechunk(chunks) else: da = dask.array.from_array(array, chunks=chunks) # Dask inspects the signature and passes block_id in if present. # It also apparently calls it with an empty array and block_id # once, so we catch that call and become a no-op. def write_block(x, block_id, client): if len(block_id): client.write_block(x, block=block_id) return x # TODO Is there a fire-and-forget analogue such that we don't need # to bother with the return type? da.map_blocks(write_block, dtype=da.dtype, client=client).compute() return client
[docs] def write_awkward( self, array, *, key=None, metadata=None, dims=None, specs=None, ): """ Write an AwkwardArray. Parameters ---------- array: awkward.Array key : str, optional Key (name) for this new node. If None, the server will provide a unique key. metadata : dict, optional User metadata. May be nested. Must contain only basic types (e.g. numbers, strings, lists, dicts) that are JSON-serializable. dims : List[str], optional A label for each dimension of the array. specs : List[Spec], optional List of names that are used to label that the data and/or metadata conform to some named standard specification. """ import awkward from ..structures.awkward import AwkwardStructure packed = awkward.to_packed(array) form, length, container = awkward.to_buffers(packed) structure = AwkwardStructure( length=length, form=form.to_dict(), ) client = self.new( StructureFamily.awkward, [ DataSource( structure=structure, structure_family=StructureFamily.awkward ), ], key=key, metadata=metadata, specs=specs, ) client.write(container) return client
[docs] def write_sparse( self, coords, data, shape, *, key=None, metadata=None, dims=None, specs=None, ): """ EXPERIMENTAL: Write a sparse array. Parameters ---------- coords : array-like data : array-like shape : tuple key : str, optional Key (name) for this new node. If None, the server will provide a unique key. metadata : dict, optional User metadata. May be nested. Must contain only basic types (e.g. numbers, strings, lists, dicts) that are JSON-serializable. dims : List[str], optional A label for each dimension of the array. specs : List[Spec], optional List of names that are used to label that the data and/or metadata conform to some named standard specification. Examples -------- Write a sparse.COO array. >>> import sparse >>> from numpy import array >>> coo = sparse.COO(coords=array([[2, 5]]), data=array([1.3, 7.5]), shape=(10,)) >>> c.write_sparse(coords=coo.coords, data=coo.data, shape=coo.shape) This only supports a single chunk. For chunked upload, use lower-level methods. # Define the overall shape and the dimensions of each chunk. >>> from tiled.structures.sparse import COOStructure >>> structure = COOStructure(shape=(10,), chunks=((5, 5),)) >>> data_source = DataSource(structure=structure, structure_family=StructureFamily.sparse) >>> x = c.new("sparse", [data_source]) # Upload the data in each chunk. # Coords are given with in the reference frame of each chunk. >>> x.write_block(coords=array([[2, 4]]), data=array([3.1, 2.8]), block=(0,)) >>> x.write_block(coords=array([[0, 1]]), data=array([6.7, 1.2]), block=(1,)) """ from ..structures.sparse import COOStructure structure = COOStructure( shape=shape, # This method only supports single-chunk COO arrays. chunks=tuple((dim,) for dim in shape), dims=dims, ) client = self.new( StructureFamily.sparse, [ DataSource( structure=structure, structure_family=StructureFamily.sparse ), ], key=key, metadata=metadata, specs=specs, ) client.write(coords, data) return client
def create_appendable_table( self, schema: "pyarrow.Schema", npartitions: int = 1, *, key=None, metadata=None, specs=None, table_name: Optional[str] = None, ): """ Write a DataFrame and store it such that rows can be appended to a partition. Parameters ---------- schema : column names and dtypes info in the form of pyarrow.Schema key : str, optional Key (name) for this new node. If None, the server will provide a unique key. metadata : dict, optional User metadata. May be nested. Must contain only basic types (e.g. numbers, strings, lists, dicts) that are JSON-serializable. specs : List[Spec], optional List of names that are used to label that the data and/or metadata conform to some named standard specification. table_name : str, optional Optionally provide a name for the table this should be stored in. By default a name unique to the schema will be chosen. See Also -------- write_dataframe """ parameters = {} if table_name is not None: parameters["table_name"] = table_name from ..structures.table import TableStructure structure = TableStructure.from_schema(schema, npartitions) client = self.new( StructureFamily.table, [ DataSource( structure=structure, structure_family=StructureFamily.table, mimetype="application/x-tiled-sql-table", parameters=parameters, ) ], key=key, metadata=metadata or {}, specs=specs or [], ) return client
[docs] def write_dataframe( self, dataframe: Union["pandas.DataFrame", dict[str, Any]], *, key=None, metadata=None, specs=None, ): """ Write a DataFrame. Parameters ---------- dataframe : pandas.DataFrame or dict key : str, optional Key (name) for this new node. If None, the server will provide a unique key. metadata : dict, optional User metadata. May be nested. Must contain only basic types (e.g. numbers, strings, lists, dicts) that are JSON-serializable. specs : List[Spec], optional List of names that are used to label that the data and/or metadata conform to some named standard specification. See Also -------- create_appendable_dataframe """ import dask.dataframe from ..structures.table import TableStructure if isinstance(dataframe, dask.dataframe.DataFrame): structure = TableStructure.from_dask_dataframe(dataframe) elif isinstance(dataframe, dict): # table as dict, e.g. {"a": [1,2,3], "b": [4,5,6]} structure = TableStructure.from_dict(dataframe) else: structure = TableStructure.from_pandas(dataframe) client = self.new( StructureFamily.table, [ DataSource( structure=structure, structure_family=StructureFamily.table, ) ], key=key, metadata=metadata or {}, specs=specs or [], ) if hasattr(dataframe, "partitions"): if isinstance(dataframe, dask.dataframe.DataFrame): ddf = dataframe else: raise NotImplementedError( f"Unsure how to handle type {type(dataframe)}" ) ddf.map_partitions( functools.partial(_write_partition, client=client), meta=dataframe._meta ).compute() else: client.write(dataframe) return client
class Composite(Container): def get_contents(self, maxlen=None, include_metadata=False): result = {} next_page_url = f"{self.item['links']['search']}" while (next_page_url is not None) or ( maxlen is not None and len(result) < maxlen ): content = handle_error( self.context.http_client.get( next_page_url, headers={"Accept": MSGPACK_MIME_TYPE}, params={ **parse_qs(urlparse(next_page_url).query), **self._queries_as_params, } | ({} if include_metadata else {"select_metadata": False}), ) ).json() result.update({item["id"]: item for item in content["data"]}) next_page_url = content["links"]["next"] return result @property def _flat_keys_mapping(self): result = {} for key, item in self.get_contents().items(): if item["attributes"]["structure_family"] == StructureFamily.table: for col in item["attributes"]["structure"]["columns"]: result[col] = item["id"] + "/" + col else: result[item["id"]] = item["id"] self._cached_len = (len(result), time.monotonic() + LENGTH_CACHE_TTL) return result @property def parts(self): return CompositeParts(self) def _keys_slice(self, start, stop, direction, _ignore_inlined_contents=False): yield from self._flat_keys_mapping.keys() def _items_slice(self, start, stop, direction, _ignore_inlined_contents=False): for key in self._flat_keys_mapping.keys(): yield key, self[key] def __len__(self): if self._cached_len is not None: length, deadline = self._cached_len if time.monotonic() < deadline: # Used the cached value and do not make any request. return length return len(self._flat_keys_mapping) def __getitem__(self, key: str, _ignore_inlined_contents=False): if isinstance(key, tuple): key = "/".join(key) if key in self._flat_keys_mapping: key = self._flat_keys_mapping[key] else: raise KeyError( f"Key '{key}' not found. If it refers to a table, use .parts['{key}'] instead." ) return super().__getitem__(key, _ignore_inlined_contents) def create_container(self, key=None, *, metadata=None, specs=None): """Composite nodes can not include nested containers by design.""" raise NotImplementedError("Cannot create a container within a composite node.") def create_composite(self, key=None, *, metadata=None, specs=None): """Ccomposite nodes can not include nested composites by design.""" raise NotImplementedError("Cannot create a composite within a composite node.") class CompositeParts: def __init__(self, node): self.contents = node.get_contents(include_metadata=True) self.context = node.context self.structure_clients = node.structure_clients self._include_data_sources = node._include_data_sources def __repr__(self): return ( f"<{type(self).__name__} {{" + ", ".join(f"'{item}'" for item in self.contents) + "}>" ) def __getitem__(self, key): key, *tail = key.split("/") if key not in self.contents: raise KeyError(key) client = client_for_item( self.context, self.structure_clients, self.contents[key], include_data_sources=self._include_data_sources, ) if tail: return client["/".join(tail)] else: return client def __iter__(self): for key in self.contents: yield key def __len__(self) -> int: return len(self.contents) def _queries_to_params(*queries): "Compute GET params from the queries." params = collections.defaultdict(list) for query in queries: name = default_query_registry.query_type_to_name[type(query)] for field, value in query.encode().items(): if value is not None: params[f"filter[{name}][condition][{field}]"].append(value) return dict(params) LENGTH_CACHE_TTL = 1 # second class Ascending(Sentinel): "Intended for more readable sorting operations. An alias for 1." def __index__(self): return 1 class Descending(Sentinel): "Intended for more readable sorting operations. An alias for -1." def __index__(self): return -1 ASCENDING = Ascending("ASCENDING") "Ascending sort order. An alias for 1." DESCENDING = Descending("DESCENDING") "Decending sort order. An alias for -1." class _LazyLoad: # This exists because lambdas and closures cannot be pickled. def __init__(self, import_module_args, attr_name): self.import_module_args = import_module_args self.attr_name = attr_name def __call__(self): return getattr( importlib.import_module(*self.import_module_args), self.attr_name ) class _Wrap: # This exists because lambdas and closures cannot be pickled. def __init__(self, obj): self.obj = obj def __call__(self): return self.obj def _write_partition(x, partition_info, client): client.write_partition(x, partition_info["number"]) return x DEFAULT_STRUCTURE_CLIENT_DISPATCH = { "numpy": OneShotCachedMap( { "container": _Wrap(Container), "composite": _Wrap(Composite), "array": _LazyLoad(("..array", Container.__module__), "ArrayClient"), "awkward": _LazyLoad(("..awkward", Container.__module__), "AwkwardClient"), "dataframe": _LazyLoad( ("..dataframe", Container.__module__), "DataFrameClient" ), "sparse": _LazyLoad(("..sparse", Container.__module__), "SparseClient"), "table": _LazyLoad( ("..dataframe", Container.__module__), "DataFrameClient" ), "xarray_dataset": _LazyLoad( ("..xarray", Container.__module__), "DatasetClient" ), } ), "dask": OneShotCachedMap( { "container": _Wrap(Container), "composite": _Wrap(Composite), "array": _LazyLoad(("..array", Container.__module__), "DaskArrayClient"), # TODO Create DaskAwkwardClient # "awkward": _LazyLoad(("..awkward", Container.__module__), "DaskAwkwardClient"), "dataframe": _LazyLoad( ("..dataframe", Container.__module__), "DaskDataFrameClient" ), "sparse": _LazyLoad(("..sparse", Container.__module__), "SparseClient"), "table": _LazyLoad( ("..dataframe", Container.__module__), "DaskDataFrameClient" ), "xarray_dataset": _LazyLoad( ("..xarray", Container.__module__), "DaskDatasetClient" ), } ), }