import copy
from collections.abc import Set
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Union
from urllib.parse import quote_plus
import dask.dataframe
import pandas
from tiled.adapters.core import Adapter
from ..catalog.orm import Node
from ..storage import FileStorage, Storage
from ..structures.array import ArrayStructure, StructDtype
from ..structures.core import Spec, StructureFamily
from ..structures.data_source import Asset, DataSource, Management
from ..structures.table import TableStructure
from ..type_aliases import JSON
from ..utils import ensure_uri, path_from_uri
from .array import ArrayAdapter
from .utils import init_adapter_from_catalog
[docs]
class CSVAdapter(Adapter[TableStructure]):
"""Adapter for tabular data stored as partitioned text (csv) files"""
structure_family = StructureFamily.table
[docs]
def __init__(
self,
data_uris: Iterable[str],
structure: Optional[TableStructure] = None,
*,
metadata: Optional[JSON] = None,
specs: Optional[List[Spec]] = None,
**kwargs: Optional[Any],
) -> None:
"""Adapter for partitioned tabular data stored as a sequence of text (csv) files
Parameters
----------
data_uris : list of uris to csv files
structure :
metadata :
specs :
kwargs : dict
any keyword arguments that can be passed to the pandas.read_csv function, e.g. names, sep, dtype, etc.
"""
self._file_paths = [path_from_uri(uri) for uri in data_uris]
self._read_csv_kwargs = kwargs
if structure is None:
ddf = dask.dataframe.read_csv(self._file_paths, **self._read_csv_kwargs)
structure = TableStructure.from_dask_dataframe(ddf)
super().__init__(structure, metadata=metadata, specs=specs)
@classmethod
def supported_storage(cls) -> Set[type[Storage]]:
return {FileStorage}
@classmethod
def from_catalog(
cls,
data_source: DataSource[TableStructure],
node: Node,
/,
**kwargs: Optional[Any],
) -> "CSVAdapter":
return init_adapter_from_catalog(cls, data_source, node, **kwargs)
@classmethod
def from_uris(
cls,
*data_uris: str,
**kwargs: Optional[Any],
) -> "CSVAdapter":
return cls(data_uris, **kwargs)
def __repr__(self) -> str:
return f"{type(self).__name__}({self._structure.columns!r})"
@classmethod
def init_storage(
cls,
storage: Storage,
data_source: DataSource[TableStructure],
path_parts: List[str],
) -> DataSource[TableStructure]:
"""Initialize partitioned CSV storage
Parameters
----------
data_uri : str
location of the dataset, should point to a folder in which partitioned csv files will be created
structure : TableStructure
description of the data structure
Returns
-------
list of assets with each element corresponding to individual partition files
"""
data_source = copy.deepcopy(data_source) # Do not mutate caller input.
data_uri = storage.uri + "".join(
f"/{quote_plus(segment)}" for segment in path_parts
)
directory = path_from_uri(data_uri)
directory.mkdir(parents=True, exist_ok=True)
assets = [
Asset(
data_uri=f"{data_uri}/partition-{i}.csv",
is_directory=False,
parameter="data_uris",
num=i,
)
for i in range(data_source.structure.npartitions)
]
data_source.assets.extend(assets)
return data_source
def append_partition(
self, partition: int, data: Union[dask.dataframe.DataFrame, pandas.DataFrame]
) -> None:
"""Append data to an existing partition
Parameters
----------
partition : int
index of the partition to be appended to
data : dask.dataframe.DataFrame or pandas.DataFrame
data to be appended
"""
uri = self._file_paths[partition]
data.to_csv(uri, index=False, mode="a", header=False)
def write_partition(
self, partition: int, data: Union[dask.dataframe.DataFrame, pandas.DataFrame]
) -> None:
"""Write data to a new partition or overwrite an existing one
Parameters
----------
partition : int
index of the partition to be appended to
data : dask.dataframe.DataFrame or pandas.DataFrame
data to be appended
"""
uri = self._file_paths[partition]
data.to_csv(uri, index=False)
def write(self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame]) -> None:
"""Default writing function to a dataset with a single partition
Parameters
----------
data : dask.dataframe.DataFrame or pandas.DataFrame
data to be written
"""
if self.structure().npartitions != 1:
raise NotImplementedError
uri = self._file_paths[0]
data.to_csv(uri, index=False)
def read(self, fields: Optional[List[str]] = None) -> dask.dataframe.DataFrame:
"""
Parameters
----------
fields :
Returns
-------
"""
dfs = [
self.read_partition(i, fields=fields) for i in range(len(self._file_paths))
]
return dask.dataframe.concat(dfs, axis=0)
def read_partition(
self,
indx: int,
fields: Optional[List[str]] = None,
) -> dask.dataframe.DataFrame:
"""Read a single partition
Parameters
----------
indx : int
index of the partition to read
fields :
Returns
-------
"""
df = dask.dataframe.read_csv(self._file_paths[indx], **self._read_csv_kwargs)
if fields is not None:
df = df[fields]
return df.compute()
def get(self, key: str) -> Union[ArrayAdapter, None]:
"""
Parameters
----------
key :
Returns
-------
"""
if key not in self.structure().columns:
return None
return ArrayAdapter.from_array(self.read([key])[key].values)
def generate_data_sources(
self,
mimetype: str,
dict_or_none: Callable[[TableStructure], Dict[str, str]],
item: Union[str, Path],
is_directory: bool,
) -> List[DataSource[TableStructure]]:
"""
Parameters
----------
mimetype :
dict_or_none :
item :
is_directory :
Returns
-------
"""
return [
DataSource(
structure_family=StructureFamily.table,
mimetype=mimetype,
structure=self.structure(),
parameters={},
management=Management.external,
assets=[
Asset(
data_uri=ensure_uri(item),
is_directory=is_directory,
parameter="data_uris", # <-- PLURAL!
num=0, # <-- denoting that the Adapter expects a list, and this is the first element
)
],
)
]
def __getitem__(self, key: str) -> ArrayAdapter:
"""Get an ArrayAdapter for a single column
Parameters
----------
key : str
column name to get
Returns
-------
An array adapter corresponding to a single column in the table.
"""
# Must compute to determine shape.
return ArrayAdapter.from_array(self.read([key])[key].values)
def items(self) -> Iterator[Tuple[str, ArrayAdapter]]:
"""Iterator over table columns
Returns
-------
Tuples of column names and corresponding ArrayAdapters
"""
yield from (
(key, ArrayAdapter.from_array(self.read([key])[key].values))
for key in self._structure.columns
)
class CSVArrayAdapter(ArrayAdapter):
"""Adapter for array-type data stored as partitioned csv files"""
@classmethod
def from_catalog(
cls,
data_source: DataSource[ArrayStructure],
node: Node,
/,
**kwargs: Optional[Any],
) -> "CSVArrayAdapter":
"""Adapter for partitioned array data stored as a sequence of csv files
Parameters
----------
data_source :
node :
kwargs : dict
any keyword arguments that can be passed to the pandas.read_csv function, e.g. names, sep, dtype, etc.
"""
# Load the array lazily with Dask
file_paths = [path_from_uri(ast.data_uri) for ast in data_source.assets]
structure = data_source.structure
nrows = kwargs.pop("nrows", None) # dask doesn't accept nrows
kwargs = {"header": None, **kwargs} # no header for arrays by default
ddf = dask.dataframe.read_csv(file_paths, **kwargs).rename(columns=str)
# Ensure columns are in the same order as in the usecols parameter
if usecols := kwargs.get("usecols"):
ddf = ddf[usecols]
chunks_0: tuple[int, ...] = structure.chunks[0] # rows chunking, if not stacked
# Read as a structural array if needed; ensure the correct dtype
if isinstance(structure.data_type, StructDtype):
array = ddf.to_records(lengths=chunks_0)[list(ddf.columns)].reshape(-1, 1)
else:
array = ddf.to_dask_array(lengths=chunks_0)
array = array.astype(structure.data_type.to_numpy_dtype())
# Possibly extend or cut the table according the nrows parameter
if nrows is not None:
# TODO: this pulls all the data and can take long to compute. Instead, we can open the files and
# iterate over the rows directly, which is about 4-5 times faster for 50K rows.
# Can also just .compute() and return a np array instead
nrows_actual = len(ddf)
if nrows > nrows_actual:
padding = dask.array.zeros_like(
array, shape=(nrows - nrows_actual, *array.shape[1:])
)
array = dask.array.append(array[:nrows_actual, ...], padding, axis=0)
else:
array = array[:nrows, ...]
array = array.reshape(structure.shape).rechunk(structure.chunks)
return cls(
array,
structure,
metadata=node.metadata_,
specs=node.specs,
)
@classmethod
def from_uris(
cls,
*data_uris: str,
**kwargs: Optional[Any],
) -> "CSVArrayAdapter":
file_paths = [path_from_uri(uri) for uri in data_uris]
ddf = dask.dataframe.read_csv(file_paths, **{"header": None, **kwargs})
if usecols := kwargs.get("usecols"):
ddf = ddf[usecols] # Ensure the order of columns is preserved
array = ddf.to_dask_array()
structure = ArrayStructure.from_array(array)
return cls(array, structure)