Source code for tiled.adapters.table

from typing import Any, Iterator, List, Optional, Tuple, Union

import dask.base
import dask.dataframe
import pandas

from ..structures.core import Spec, StructureFamily
from ..structures.table import TableStructure
from ..type_aliases import JSON
from .array import ArrayAdapter


[docs] class TableAdapter: """ Wrap a dataframe-like object in an interface that Tiled can serve. Examples -------- >>> df = pandas.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) >>> DataFrameAdapter.from_pandas(df, npartitions=1) """ structure_family = StructureFamily.table
[docs] def __init__( self, partitions: Union[dask.dataframe.DataFrame, pandas.DataFrame], structure: TableStructure, *, metadata: Optional[JSON] = None, specs: Optional[List[Spec]] = None, ) -> None: """ Parameters ---------- partitions : structure : metadata : specs : """ self._metadata = metadata or {} self._partitions = list(partitions) self._structure = structure self.specs = specs or []
@classmethod def from_pandas( cls, *args: Any, metadata: Optional[JSON] = None, specs: Optional[List[Spec]] = None, npartitions: int = 1, **kwargs: Any, ) -> "TableAdapter": """ Parameters ---------- args : metadata : specs : npartitions : kwargs : Returns ------- """ ddf = dask.dataframe.from_pandas(*args, npartitions=npartitions, **kwargs) if specs is None: specs = [Spec("dataframe")] return cls.from_dask_dataframe(ddf, metadata=metadata, specs=specs) @classmethod def from_dict( cls, *args: Any, metadata: Optional[JSON] = None, specs: Optional[List[Spec]] = None, npartitions: int = 1, **kwargs: Any, ) -> "TableAdapter": """ Parameters ---------- args : metadata : specs : npartitions : kwargs : Returns ------- """ ddf = dask.dataframe.from_dict(*args, npartitions=npartitions, **kwargs) if specs is None: specs = [Spec("dataframe")] return cls.from_dask_dataframe(ddf, metadata=metadata, specs=specs) @classmethod def from_dask_dataframe( cls, ddf: dask.dataframe.DataFrame, metadata: Optional[JSON] = None, specs: Optional[List[Spec]] = None, ) -> "TableAdapter": """ Parameters ---------- ddf : metadata : specs : Returns ------- """ structure = TableStructure.from_dask_dataframe(ddf) if specs is None: specs = [Spec("dataframe")] return cls( ddf.partitions, structure, metadata=metadata, specs=specs, ) def __repr__(self) -> str: return f"{type(self).__name__}({self._structure.columns!r})" def __getitem__(self, key: str) -> ArrayAdapter: # Must compute to determine shape array = self.read([key])[key].values # Convert (experimental) pandas.StringDtype to numpy's unicode string dtype if isinstance(array.dtype, pandas.StringDtype): import numpy max_size = max((len(i) for i in array.ravel())) array = array.astype(dtype=numpy.dtype(f"<U{max_size}")) return ArrayAdapter.from_array(array) def get(self, key: str) -> Union[ArrayAdapter, None]: if key not in self.structure().columns: return None return self[key] def items(self) -> Iterator[Tuple[str, ArrayAdapter]]: yield from ((key, self[key]) for key in self._structure.columns) def metadata(self) -> JSON: return self._metadata def structure(self) -> TableStructure: return self._structure def read( self, fields: Optional[List[str]] = None ) -> Union[dask.dataframe.DataFrame, pandas.DataFrame]: """ Parameters ---------- fields : Returns ------- """ if any(p is None for p in self._partitions): raise ValueError("Not all partitions have been stored.") if isinstance(self._partitions[0], dask.dataframe.DataFrame): if fields is not None: ddf = dask.dataframe.concat( [p[fields] for p in self._partitions], axis=0 ) else: ddf = dask.dataframe.concat(self._partitions, axis=0) return ddf.compute() df = pandas.concat(self._partitions, axis=0) if fields is not None: df = df[fields] return df def read_partition( self, partition: int, fields: Optional[List[str]] = None, ) -> Union[pandas.DataFrame, dask.dataframe.DataFrame]: """ Parameters ---------- partition : fields : Returns ------- """ df = self._partitions[partition] if df is None: raise RuntimeError(f"Partition {partition} has not been stored yet.") if fields is not None: df = df[fields] if isinstance(df, dask.dataframe.DataFrame): return df.compute() return partition
DataFrameAdapter = TableAdapter