Source code for tiled.adapters.parquet

from pathlib import Path
from typing import Any, List, Optional, Union

import dask.dataframe
import pandas

from ..server.schemas import Asset
from ..structures.core import Spec, StructureFamily
from ..structures.table import TableStructure
from ..utils import path_from_uri
from .dataframe import DataFrameAdapter
from .protocols import AccessPolicy
from .type_alliases import JSON


[docs] class ParquetDatasetAdapter: """ """ structure_family = StructureFamily.table
[docs] def __init__( self, data_uris: List[str], structure: TableStructure, metadata: Optional[JSON] = None, specs: Optional[List[Spec]] = None, access_policy: Optional[AccessPolicy] = None, ) -> None: """ Parameters ---------- data_uris : structure : metadata : specs : access_policy : """ # TODO Store data_uris instead and generalize to non-file schemes. self._partition_paths = [path_from_uri(uri) for uri in data_uris] self._metadata = metadata or {} self._structure = structure self.specs = list(specs or []) self.access_policy = access_policy
def metadata(self) -> JSON: """ Returns ------- """ return self._metadata @property def dataframe_adapter(self) -> DataFrameAdapter: """ Returns ------- """ partitions = [] for path in self._partition_paths: if not Path(path).exists(): partition = None else: partition = dask.dataframe.read_parquet(path) partitions.append(partition) return DataFrameAdapter(partitions, self._structure) @classmethod def init_storage(cls, data_uri: str, structure: TableStructure) -> List[Asset]: """ Parameters ---------- data_uri : structure : Returns ------- """ directory = path_from_uri(data_uri) directory.mkdir(parents=True, exist_ok=True) assets = [ Asset( data_uri=f"{data_uri}/partition-{i}.parquet", is_directory=False, parameter="data_uris", num=i, ) for i in range(structure.npartitions) ] return assets def write_partition( self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame], partition: int ) -> None: """ Parameters ---------- data : partition : Returns ------- """ uri = self._partition_paths[partition] data.to_parquet(uri) def write(self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame]) -> None: """ Parameters ---------- data : Returns ------- """ if self.structure().npartitions != 1: raise NotImplementedError uri = self._partition_paths[0] data.to_parquet(uri) def read(self, *args: Any, **kwargs: Any) -> pandas.DataFrame: """ Parameters ---------- args : kwargs : Returns ------- """ return self.dataframe_adapter.read(*args, **kwargs) def read_partition(self, *args: Any, **kwargs: Any) -> pandas.DataFrame: """ Parameters ---------- args : kwargs : Returns ------- """ return self.dataframe_adapter.read_partition(*args, **kwargs) def structure(self) -> TableStructure: """ Returns ------- """ return self._structure