Source code for tiled.adapters.dataframe

import dask.base
import dask.dataframe
import pandas

from ..server.object_cache import get_object_cache
from ..structures.core import StructureFamily
from ..structures.dataframe import DataFrameStructure
from .array import ArrayAdapter


[docs]class DataFrameAdapter: """ Wrap a dataframe-like object in an interface that Tiled can serve. Examples -------- >>> df = pandas.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) >>> DataFrameAdapter.from_pandas(df, npartitions=1) """ structure_family = StructureFamily.dataframe @classmethod def from_pandas( cls, *args, metadata=None, specs=None, access_policy=None, **kwargs ): ddf = dask.dataframe.from_pandas(*args, **kwargs) return cls.from_dask_dataframe( ddf, metadata=metadata, specs=specs, access_policy=access_policy ) @classmethod def from_dask_dataframe( cls, ddf, metadata=None, specs=None, access_policy=None, ): structure = DataFrameStructure.from_dask_dataframe(ddf) return cls( ddf.partitions, structure, metadata=metadata, specs=specs, access_policy=access_policy, )
[docs] def __init__( self, partitions, structure, *, metadata=None, specs=None, access_policy=None, ): self._metadata = metadata or {} self._partitions = list(partitions) self._structure = structure self.specs = specs or [] self.access_policy = access_policy
def __repr__(self): return f"{type(self).__name__}({self._structure.columns!r})" def __getitem__(self, key): # Must compute to determine shape. return ArrayAdapter.from_array(self.read([key])[key].values) def items(self): yield from ( (key, ArrayAdapter.from_array(self.read([key])[key].values)) for key in self._structure.columns ) def metadata(self): return self._metadata def structure(self): return self._structure def read(self, fields=None): if any(p is None for p in self._partitions): raise ValueError("Not all partitions have been stored.") if isinstance(self._partitions[0], dask.dataframe.DataFrame): if fields is not None: ddf = dask.dataframe.concat( [p[fields] for p in self._partitions], axis=0 ) else: ddf = dask.dataframe.concat(self._partitions, axis=0) # Note: If the cache is set to NO_CACHE, this is a null context. with get_object_cache().dask_context: return ddf.compute() df = pandas.concat(self._partitions, axis=0) if fields is not None: df = df[fields] return df def read_partition(self, partition, fields=None): partition = self._partitions[partition] if partition is None: raise RuntimeError(f"partition {partition} has not be stored yet") if fields is not None: partition = partition[fields] # Special case for dask to cache computed result in object cache. if isinstance(partition, dask.dataframe.DataFrame): # Note: If the cache is set to NO_CACHE, this is a null context. with get_object_cache().dask_context: return partition.compute() return partition