Source code for tiled.structures.table

import base64
import io
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Any, List, Tuple, Union

import pyarrow

from tiled.structures.root import Structure

B64_ENCODED_PREFIX = "data:application/vnd.apache.arrow.file;base64,"


def _uri_from_schema(pyarrow_schema: pyarrow.Schema) -> str:
    schema_bytes = pyarrow_schema.serialize()
    schema_b64 = base64.b64encode(schema_bytes).decode("utf-8")
    return B64_ENCODED_PREFIX + schema_b64


[docs] @dataclass class TableStructure(Structure): # This holds a Arrow schema, base64-encoded so that it can be transported # as JSON. For clarity, the encoded data (...) is prefixed like: # # data:application/vnd.apache.arrow.file;base64,... # # Arrow does not support an official JSON serialization, but it # could in the future: https://github.com/apache/arrow/pull/7110 # If it does, we could switch to using that here. arrow_schema: str npartitions: int columns: List[str] resizable: Union[bool, Tuple[bool, ...]] = False def __post_init__(self): self.columns = list(map(str, self.columns)) # Ensure all column names are str for column in self.columns: if column.startswith("_"): raise ValueError( "Tiled reserved column names starting with '_' for internal use." ) @classmethod def from_dask_dataframe(cls, ddf) -> "TableStructure": import dask.dataframe.utils # Make a pandas Table with 0 rows. # We can use this to define an Arrow schema without loading any row data. meta = dask.dataframe.utils.make_meta(ddf) schema = pyarrow.Table.from_pandas(meta).schema return cls( arrow_schema=_uri_from_schema(schema), npartitions=ddf.npartitions, columns=list(ddf.columns), ) @classmethod def from_pandas(cls, df) -> "TableStructure": schema = pyarrow.Table.from_pandas(df).schema return cls( arrow_schema=_uri_from_schema(schema), npartitions=1, columns=list(df.columns), ) @classmethod def from_dict(cls, d: Mapping[str, Any]) -> "TableStructure": schema = pyarrow.Table.from_pydict(d).schema return cls( arrow_schema=_uri_from_schema(schema), npartitions=1, columns=list(d.keys()) ) @classmethod def from_arrays(cls, arr, names: List[str]) -> "TableStructure": schema = pyarrow.Table.from_arrays(arr, names).schema return cls( arrow_schema=_uri_from_schema(schema), npartitions=1, columns=list(names) ) @classmethod def from_schema( cls, schema: pyarrow.Schema, npartitions: int = 1 ) -> "TableStructure": return cls( arrow_schema=_uri_from_schema(schema), npartitions=npartitions, columns=schema.names, ) @classmethod def from_arrow_table( cls, table: pyarrow.Table, npartitions: int = 1 ) -> "TableStructure": schema = table.schema return cls( arrow_schema=_uri_from_schema(schema), npartitions=npartitions, columns=list(table.column_names), ) @property def arrow_schema_decoded(self) -> pyarrow.Schema: if not self.arrow_schema.startswith(B64_ENCODED_PREFIX): raise ValueError( f"Expected base64-encoded data prefixed with {B64_ENCODED_PREFIX}." ) payload = self.arrow_schema[len(B64_ENCODED_PREFIX) :].encode( # noqa: 203 "utf-8" ) return pyarrow.ipc.read_schema(io.BytesIO(base64.b64decode(payload))) @property def meta(self): return self.arrow_schema_decoded.empty_table().to_pandas()