import contextlib
from typing import Any, List, Optional, Tuple
import dask.array
import numpy
import pandas
from numpy.typing import NDArray
from tiled.adapters.core import Adapter
from ..ndslice import NDSlice
from ..structures.array import ArrayStructure
from ..structures.core import Spec, StructureFamily
from ..type_aliases import JSON
from .utils import force_reshape
[docs]
class ArrayAdapter(Adapter[ArrayStructure]):
"""
Wrap an array-like object in an interface that Tiled can serve.
Examples
--------
Wrap any array-like.
>>> ArrayAdapter.from_array(numpy.random.random((100, 100)))
>>> ArrayAdapter.from_array(dask.array.from_array(numpy.random.random((100, 100)), chunks=(100, 50)))
"""
structure_family = StructureFamily.array
[docs]
def __init__(
self,
array: NDArray[Any],
structure: ArrayStructure,
*,
metadata: Optional[JSON] = None,
specs: Optional[List[Spec]] = None,
) -> None:
self._array = array
super().__init__(structure, metadata=metadata, specs=specs)
@classmethod
def from_array(
cls,
array: NDArray[Any],
*,
shape: Optional[Tuple[int, ...]] = None,
chunks: Optional[Tuple[Tuple[int, ...], ...]] = None,
dims: Optional[Tuple[str, ...]] = None,
metadata: Optional[JSON] = None,
specs: Optional[List[Spec]] = None,
) -> "ArrayAdapter":
# May be a list of something; convert to array
if not hasattr(array, "__array__"):
array = numpy.asanyarray(array)
# Convert array of arrays to ND array to expose the underlying dtype
is_array_of_arrays = (
array.dtype == "object"
and array.shape[0]
and isinstance(array[0], numpy.ndarray)
)
if is_array_of_arrays:
with contextlib.suppress(ValueError):
# only uniform arrays (with same dimensions) are stackable
array = numpy.vstack(array)
# Convert (experimental) pandas.StringDtype to numpy's unicode string dtype
is_likely_string_dtype = isinstance(array.dtype, pandas.StringDtype) or (
array.dtype == "object" and array.dtype.fields is None
)
if is_likely_string_dtype:
array = numpy.array([str(x) for x in array]) # becomes "<Un" dtype
structure = ArrayStructure.from_array(
array, shape=shape, chunks=chunks, dims=dims
)
return cls(
array,
structure=structure,
metadata=metadata,
specs=specs,
)
def __repr__(self) -> str:
return f"{type(self).__name__}({self._array!r})"
@property
def dims(self) -> Optional[Tuple[str, ...]]:
return self._structure.dims
def read(
self,
slice: NDSlice = NDSlice(...),
) -> NDArray[Any]:
array = force_reshape(self._array, self._structure.shape)
# _array[...] requires an actual tuple, not just a subclass of tuple
array = array[tuple(slice)] if slice else array
if isinstance(self._array, dask.array.Array):
return array.compute()
return array
def read_block(
self,
block: Tuple[int, ...],
slice: NDSlice = NDSlice(...),
) -> NDArray[Any]:
# Slice the whole array to get this block.
array = force_reshape(self._array, self._structure.shape)
slice_, _ = slice_and_shape_from_block_and_chunks(block, self._structure.chunks)
# _array[...] requires an actual tuple, not just a subclass of tuple
array = array[tuple(slice_)]
# Slice within the block.
array = array[slice] if slice else array
if isinstance(self._array, dask.array.Array):
return array.compute()
return array
def slice_and_shape_from_block_and_chunks(
block: Tuple[int, ...], chunks: Tuple[Tuple[int, ...], ...]
) -> tuple[NDSlice, NDSlice]:
"""
Given dask-like chunks and block id, return slice and shape of the block.
"""
slice_ = []
shape = []
for b, c in zip(block, chunks):
start = sum(c[:b])
dim = c[b]
slice_.append(slice(start, start + dim))
shape.append(dim)
return NDSlice(*slice_), NDSlice(*shape)