Source code for tiled.adapters.sparse_blocks_parquet

import itertools
from typing import Any, List, Optional, Tuple, Union

import dask.base
import dask.dataframe
import numpy
import pandas
import sparse
from numpy._typing import NDArray

from ..adapters.array import slice_and_shape_from_block_and_chunks
from ..server.schemas import Asset
from ..structures.core import Spec, StructureFamily
from ..structures.sparse import COOStructure
from ..utils import path_from_uri
from .protocols import AccessPolicy
from .type_alliases import JSON, NDSlice


def load_block(uri: str) -> Tuple[List[int], Tuple[NDArray[Any], Any]]:
    """

    Parameters
    ----------
    uri :

    Returns
    -------

    """
    # TODO This can be done without pandas.
    # Better to use a plain I/O library.
    df = pandas.read_parquet(path_from_uri(uri))
    coords = df[df.columns[:-1]].values.T
    data = df["data"].values
    return coords, data


[docs] class SparseBlocksParquetAdapter: """ """ structure_family = StructureFamily.sparse
[docs] def __init__( self, data_uris: List[str], structure: COOStructure, metadata: Optional[JSON] = None, specs: Optional[List[Spec]] = None, access_policy: Optional[AccessPolicy] = None, ) -> None: """ Parameters ---------- data_uris : structure : metadata : specs : access_policy : """ num_blocks = (range(len(n)) for n in structure.chunks) self.blocks = {} for block, uri in zip(itertools.product(*num_blocks), data_uris): self.blocks[block] = uri self._structure = structure self._metadata = metadata or {} self.specs = list(specs or []) self.access_policy = access_policy
@classmethod def init_storage( cls, data_uri: str, structure: COOStructure, ) -> List[Asset]: """ Parameters ---------- data_uri : structure : Returns ------- """ directory = path_from_uri(data_uri) directory.mkdir(parents=True, exist_ok=True) num_blocks = (range(len(n)) for n in structure.chunks) assets = [ Asset( data_uri=f"{data_uri}/block-{'.'.join(map(str, block))}.parquet", is_directory=False, parameter="data_uris", num=i, ) for i, block in enumerate(itertools.product(*num_blocks)) ] return assets def metadata(self) -> JSON: """ Returns ------- """ return self._metadata def write_block( self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame], block: Tuple[int, ...], ) -> None: """ Parameters ---------- data : block : Returns ------- """ uri = self.blocks[block] data.to_parquet(path_from_uri(uri)) def write(self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame]) -> None: """ Parameters ---------- data : Returns ------- """ if len(self.blocks) > 1: raise NotImplementedError uri = self.blocks[(0,) * len(self._structure.shape)] data.to_parquet(path_from_uri(uri)) def read(self, slice: NDSlice) -> sparse.COO: """ Parameters ---------- slice : Returns ------- """ all_coords = [] all_data = [] for block, uri in self.blocks.items(): coords, data = load_block(uri) offsets = [] for b, c in zip(block, self.structure().chunks): offset = sum(c[:b]) offsets.append(offset) global_coords = coords + [[i] for i in offsets] all_coords.append(global_coords) all_data.append(data) arr = sparse.COO( data=numpy.concatenate(all_data), coords=numpy.concatenate(all_coords, axis=-1), shape=self._structure.shape, ) return arr[slice] def read_block( self, block: Tuple[int, ...], slice: Optional[NDSlice] ) -> sparse.COO: """ Parameters ---------- block : slice : Returns ------- """ coords, data = load_block(self.blocks[block]) _, shape = slice_and_shape_from_block_and_chunks(block, self._structure.chunks) arr = sparse.COO(data=data[:], coords=coords[:], shape=shape) return arr[slice] def structure(self) -> COOStructure: """ Returns ------- """ return self._structure