Source code for tiled.client.dataframe

import dask
import dask.dataframe

from ..serialization.dataframe import deserialize_arrow, serialize_arrow
from ..utils import APACHE_ARROW_FILE_MIME_TYPE, UNCHANGED
from .base import BaseStructureClient
from .utils import ClientError, client_for_item, export_util


[docs]class DaskDataFrameClient(BaseStructureClient): "Client-side wrapper around an array-like that returns dask arrays" def new_variation(self, structure=UNCHANGED, **kwargs): if structure is UNCHANGED: structure = self._structure return super().new_variation(structure=structure, **kwargs) def _repr_pretty_(self, p, cycle): """ Provide "pretty" display in IPython/Jupyter. See https://ipython.readthedocs.io/en/stable/config/integrating.html#rich-display """ structure = self.structure() if not structure.macro.resizable: p.text(f"<{type(self).__name__} {structure.macro.columns}>") else: # Try to get the column names, but give up quickly to avoid blocking # for long. TIMEOUT = 0.2 # seconds try: content = self.context.get_json( self.uri, params={"fields": "structure.macro"}, timeout=TIMEOUT, ) except TimeoutError: p.text( f"<{type(self).__name__} Loading column names took too long; use list(...) >" ) except Exception as err: p.text( f"<{type(self).__name__} Loading column names raised error {err!r}>" ) else: try: columns = content["data"]["attributes"]["structure"]["macro"][ "columns" ] except Exception as err: p.text( f"<{type(self).__name__} Loading column names raised error {err!r}>" ) else: p.text(f"<{type(self).__name__} {columns}>") def _ipython_key_completions_(self): """ Provide method for the key-autocompletions in IPython. See http://ipython.readthedocs.io/en/stable/config/integrating.html#tab-completion """ structure = self.structure() if not structure.macro.resizable: # Use cached structure. return structure.macro.columns try: content = self.context.get_json( self.uri, params={"fields": "structure.macro"} ) columns = content["data"]["attributes"]["structure"]["macro"]["columns"] except Exception: # Do not print messy traceback from thread. Just fail silently. return [] return columns @property def columns(self): return self.structure().macro.columns def download(self): super().download() self._ipython_key_completions_() self.read().compute() def _get_partition(self, partition, columns): """ Fetch the actual data for one partition in a partitioned (dask) dataframe. See read_partition for a public version of this. """ params = {"partition": partition} if columns: # Note: The singular/plural inconsistency here is due to the fact that # ["A", "B"] will be encoded in the URL as field=A&field=B params["field"] = columns content = self.context.get_content( self.item["links"]["partition"], headers={"Accept": APACHE_ARROW_FILE_MIME_TYPE}, params=params, ) return deserialize_arrow(content)
[docs] def read_partition(self, partition, columns=None): """ Access one partition in a partitioned (dask) dataframe. Optionally select a subset of the columns. """ structure = self.structure() npartitions = structure.macro.npartitions if not (0 <= partition < npartitions): raise IndexError(f"partition {partition} out of range") meta = structure.micro.meta_decoded if columns is not None: meta = meta[columns] return dask.dataframe.from_delayed( [dask.delayed(self._get_partition)(partition, columns)], meta=meta, divisions=structure.micro.divisions_decoded, )
[docs] def read(self, columns=None): """ Access the entire DataFrame. Optionally select a subset of the columns. The result will be internally partitioned with dask. """ structure = self.structure() # Build a client-side dask dataframe whose partitions pull from a # server-side dask array. name = f"remote-dask-dataframe-{self.item['links']['self']}" dask_tasks = { (name, partition): (self._get_partition, partition, columns) for partition in range(structure.macro.npartitions) } meta = structure.micro.meta_decoded if columns is not None: meta = meta[columns] ddf = dask.dataframe.DataFrame( dask_tasks, name=name, meta=meta, divisions=structure.micro.divisions_decoded, ) if columns is not None: ddf = ddf[columns] return ddf
# We implement *some* of the Mapping interface here but intentionally not # all of it. DataFrames are not quite Mapping-like. Their __len__ for # example returns the number of rows (which it would be costly for us to # compute) as opposed to holding to the usual invariant # `len(list(obj)) == # len(obj)` for Mappings. Additionally, their behavior # with `__getitem__` is a bit "extra", e.g. df[["A", "B"]]. def __getitem__(self, column): try: self_link = self.item["links"]["self"] if self_link.endswith("/"): self_link = self_link[:-1] content = self.context.get_json( self_link + f"/{column}", ) except ClientError as err: if err.response.status_code == 404: raise KeyError(column) raise item = content["data"] return client_for_item(self.context, self.structure_clients, item) def __iter__(self): yield from self.structure().macro.columns # __len__ is intentionally not implemented. For DataFrames it means "number # of rows" which is expensive to compute. def write(self, dataframe): self.context.put_content( self.item["links"]["full"], content=bytes(serialize_arrow(dataframe, {})), headers={"Content-Type": APACHE_ARROW_FILE_MIME_TYPE}, ) def write_partition(self, dataframe, partition): self.context.put_content( self.item["links"]["partition"].format(index=partition), content=bytes(serialize_arrow(dataframe, {})), headers={"Content-Type": APACHE_ARROW_FILE_MIME_TYPE}, )
[docs] def export(self, filepath, columns=None, *, format=None): """ Download data in some format and write to a file. Parameters ---------- file: str or buffer Filepath or writeable buffer. format : str, optional If format is None and `file` is a filepath, the format is inferred from the name, like 'table.csv' implies format="text/csv". The format may be given as a file extension ("csv") or a media type ("text/csv"). columns: List[str], optional Select a subset of the columns. """ params = {} if columns is not None: params["field"] = columns return export_util( filepath, format, self.context.get_content, self.item["links"]["full"], params=params, )
[docs]class DataFrameClient(DaskDataFrameClient): "Client-side wrapper around a dataframe-like that returns in-memory dataframes"
[docs] def read_partition(self, partition, columns=None): """ Access one partition of the DataFrame. Optionally select a subset of the columns. """ return super().read_partition(partition, columns).compute()
[docs] def read(self, columns=None): """ Access the entire DataFrame. Optionally select a subset of the columns. """ return super().read(columns).compute()
def download(self): # Do not run super().download() because DaskDataFrameClient calls compute() # which does not apply here. BaseStructureClient.download(self) self._ipython_key_completions_() self.read()