Source code for tiled.structures.xarray

import io
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Union

import xarray

from ..media_type_registration import deserialization_registry, serialization_registry
from ..utils import modules_available
from .array import ArrayStructure
from .dataframe import (
    APACHE_ARROW_FILE_MIME_TYPE,
    XLSX_MIME_TYPE,
    serialize_arrow,
    serialize_csv,
    serialize_excel,
    serialize_html,
    serialize_parquet,
)


[docs]@dataclass class DataArrayMacroStructure: variable: ArrayStructure coords: Optional[ Dict[str, str] ] # overridden below to be Optional[Dict[str, DataArrayStructure]] coord_names: List[str] name: str resizable: Union[bool, Tuple[bool, ...]] = False @classmethod def from_json(cls, structure): if structure["coords"] is not None: coords = { key: DataArrayStructure.from_json(value) for key, value in structure["coords"].items() } else: coords = None return cls( variable=ArrayStructure.from_json(structure["variable"]), coords=coords, coord_names=structure["coord_names"], name=structure["name"], )
[docs]@dataclass class DataArrayStructure: macro: DataArrayMacroStructure micro: None @classmethod def from_json(cls, structure): return cls( macro=DataArrayMacroStructure.from_json(structure["macro"]), micro=None )
# Define a nested structure now that the necessary object has been defined. DataArrayMacroStructure.__annotations__[ "coords" ] = DataArrayMacroStructure.__annotations__["coords"].copy_with( (str, DataArrayMacroStructure) )
[docs]@dataclass class DatasetMacroStructure: data_vars: Dict[str, DataArrayStructure] coords: Dict[str, DataArrayStructure] resizable: Union[bool, Tuple[bool, ...]] = False @classmethod def from_json(cls, structure): return cls( data_vars={ key: DataArrayStructure.from_json(value) for key, value in structure["data_vars"].items() }, coords={ key: DataArrayStructure.from_json(value) for key, value in structure["coords"].items() }, )
[docs]@dataclass class DatasetStructure: macro: DatasetMacroStructure micro: None @classmethod def from_json(cls, structure): return cls( macro=DatasetMacroStructure.from_json(structure["macro"]), micro=None )
class _BytesIOThatIgnoresClose(io.BytesIO): def close(self): # When the netcdf writer tells us to close(), ignore it. pass if modules_available("h5netcdf"): def serialize_netcdf(dataset, metadata): file = _BytesIOThatIgnoresClose() # This engine is reportedly faster. # Also, by avoiding the default engine, we avoid a dependency on 'scipy'. dataset.to_netcdf(file, engine="h5netcdf") return file.getbuffer() # Both application/netcdf and application/x-netcdf are used. # https://en.wikipedia.org/wiki/NetCDF serialization_registry.register( "xarray_dataset", "application/netcdf", serialize_netcdf ) serialization_registry.register( "xarray_dataset", "application/x-netcdf", serialize_netcdf ) # Support DataFrame formats by first converting to DataFrame. # This doesn't make much sense for N-dimensional variables, but for # 1-dimensional variables it is useful. serialization_registry.register( "xarray_dataset", APACHE_ARROW_FILE_MIME_TYPE, lambda ds, metadata: serialize_arrow(ds.to_dataframe(), metadata), ) serialization_registry.register( "xarray_dataset", "application/x-parquet", lambda ds, metadata: serialize_parquet(ds.to_dataframe(), metadata), ) serialization_registry.register( "xarray_dataset", "text/csv", lambda ds, metadata: serialize_csv(ds.to_dataframe(), metadata), ) serialization_registry.register( "xarray_dataset", "text/plain", lambda ds, metadata: serialize_csv(ds.to_dataframe(), metadata), ) serialization_registry.register( "xarray_dataset", "text/html", lambda ds, metadata: serialize_html(ds.to_dataframe(), metadata), ) serialization_registry.register( "xarray_dataset", XLSX_MIME_TYPE, lambda ds, metadata: serialize_excel(ds.to_dataframe(), metadata), ) if modules_available("orjson"): import orjson def serialize_json(ds, metadata): df = ds.to_dataframe() return orjson.dumps( {column: df[column].tolist() for column in df}, ) serialization_registry.register( "xarray_dataset", "application/json", serialize_json, ) deserialization_registry.register( "xarray_dataset", "application/x-zarr", lambda ds, metadata: xarray.open_zarr(ds) ) # TODO How should we add support for access via Zarr?