Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use xarray.open_zarr and make aiohttp and s3fs optional #1016

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:

- name: Test titiler.xarray
run: |
python -m pip install -e src/titiler/xarray["test"]
python -m pip install -e src/titiler/xarray["test,all"]
python -m pytest src/titiler/xarray --cov=titiler.xarray --cov-report=xml --cov-append --cov-report=term-missing

- name: Test titiler.mosaic
Expand Down
21 changes: 14 additions & 7 deletions src/titiler/xarray/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,31 @@ classifiers = [
dynamic = ["version"]
dependencies = [
"titiler.core==0.19.0.dev",
"cftime",
"h5netcdf",
"xarray",
"rioxarray",
"zarr",
"fsspec",
"s3fs",
"aiohttp",
"pandas",
"httpx",
"zarr",
"h5netcdf",
Comment on lines +36 to +37
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO Zarr and h5netcdf should be optional dependencies so people could deploy titiler-xarray with only one option to keep image size down. This may also make it easier to support other HDF5 readers that work better for remote files in the future (e.g., https://github.com/gauteh/hidefix).

Suggested change
"zarr",
"h5netcdf",

"cftime",
]

[project.optional-dependencies]
s3 = [
"s3fs",
]
http = [
"aiohttp",
]
all = [
"s3fs",
"aiohttp",
]
Comment on lines +48 to +51
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
all = [
"s3fs",
"aiohttp",
]
zarr = [
"zarr",
]
hdf5 = [
"h5netcdf",
]
all = [
"s3fs",
"aiohttp",
"zarr",
"hdf5",
]

Suggestion to align with above comment about optional dependencies

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxrjones I'll take care of this either in the main PR or in another one later

test = [
"pytest",
"pytest-cov",
"pytest-asyncio",
"httpx",
"kerchunk",
]

[project.urls]
Expand Down
24 changes: 24 additions & 0 deletions src/titiler/xarray/tests/fixtures/generate_fixtures.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,30 @@
" ds.to_zarr(store=f\"pyramid.zarr\", mode=\"w\", group=ix)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"import fsspec\n",
"from kerchunk.hdf import SingleHdf5ToZarr\n",
"\n",
"with fsspec.open(\"dataset_3d.nc\", mode=\"rb\", anon=True) as infile:\n",
" h5chunks = SingleHdf5ToZarr(infile, \"dataset_3d.nc\", inline_threshold=100)\n",
"\n",
" with open(\"reference.json\", 'w') as f:\n",
" f.write(json.dumps(h5chunks.translate()));\n"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down
1 change: 1 addition & 0 deletions src/titiler/xarray/tests/fixtures/reference.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"version": 1, "refs": {".zgroup": "{\"zarr_format\":2}", "dataset/.zarray": "{\"chunks\":[1,500,1000],\"compressor\":null,\"dtype\":\"<f8\",\"fill_value\":\"NaN\",\"filters\":[{\"elementsize\":8,\"id\":\"shuffle\"},{\"id\":\"zlib\",\"level\":9}],\"order\":\"C\",\"shape\":[2,1000,2000],\"zarr_format\":2}", "dataset/.zattrs": "{\"_ARRAY_DIMENSIONS\":[\"time\",\"y\",\"x\"],\"fill_value\":0,\"valid_max\":1000.0,\"valid_min\":1.0}", "dataset/0.0.0": ["tests/fixtures/dataset_3d.nc", 37134, 113251], "dataset/0.0.1": ["tests/fixtures/dataset_3d.nc", 150385, 112805], "dataset/0.1.0": ["tests/fixtures/dataset_3d.nc", 263190, 65106], "dataset/0.1.1": ["tests/fixtures/dataset_3d.nc", 328296, 65049], "dataset/1.0.0": ["tests/fixtures/dataset_3d.nc", 393345, 65468], "dataset/1.0.1": ["tests/fixtures/dataset_3d.nc", 458813, 65506], "dataset/1.1.0": ["tests/fixtures/dataset_3d.nc", 524319, 58101], "dataset/1.1.1": ["tests/fixtures/dataset_3d.nc", 582420, 58075], "time/.zarray": "{\"chunks\":[2],\"compressor\":null,\"dtype\":\"<i8\",\"fill_value\":null,\"filters\":null,\"order\":\"C\",\"shape\":[2],\"zarr_format\":2}", "time/.zattrs": "{\"_ARRAY_DIMENSIONS\":[\"time\"],\"calendar\":\"proleptic_gregorian\",\"units\":\"days since 2022-01-01 00:00:00\"}", "time/0": "\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000m\u0001\u0000\u0000\u0000\u0000\u0000\u0000", "x/.zarray": "{\"chunks\":[2000],\"compressor\":null,\"dtype\":\"<f8\",\"fill_value\":\"NaN\",\"filters\":null,\"order\":\"C\",\"shape\":[2000],\"zarr_format\":2}", "x/.zattrs": "{\"_ARRAY_DIMENSIONS\":[\"x\"]}", "x/0": ["tests/fixtures/dataset_3d.nc", 1415, 16000], "y/.zarray": "{\"chunks\":[1000],\"compressor\":null,\"dtype\":\"<f8\",\"fill_value\":\"NaN\",\"filters\":null,\"order\":\"C\",\"shape\":[1000],\"zarr_format\":2}", "y/.zattrs": "{\"_ARRAY_DIMENSIONS\":[\"y\"]}", "y/0": ["tests/fixtures/dataset_3d.nc", 17570, 8000]}}
42 changes: 38 additions & 4 deletions src/titiler/xarray/tests/test_io_tools.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""test titiler.xarray.io utility functions."""

import json
import os
from datetime import datetime

import fsspec
import numpy
import pytest
import xarray
from kerchunk.hdf import SingleHdf5ToZarr

from titiler.xarray.io import Reader, get_variable

Expand Down Expand Up @@ -109,12 +112,19 @@ def test_get_variable():


@pytest.mark.parametrize(
"filename",
["dataset_2d.nc", "dataset_3d.nc", "dataset_3d.zarr"],
"protocol,filename",
[
("file://", "dataset_2d.nc"),
("file://", "dataset_3d.nc"),
("file://", "dataset_3d.zarr"),
("", "dataset_2d.nc"),
("", "dataset_3d.nc"),
("", "dataset_3d.zarr"),
],
)
def test_reader(filename):
def test_reader(protocol, filename):
"""test reader."""
src_path = os.path.join(prefix, filename)
src_path = protocol + os.path.join(protocol, prefix, filename)
assert Reader.list_variables(src_path) == ["dataset"]

with Reader(src_path, variable="dataset") as src:
Expand All @@ -134,3 +144,27 @@ def test_zarr_group(group):
assert src.info()
assert src.tile(0, 0, 0)
assert src.point(0, 0).data[0] == group * 2


def test_kerchunk_reference(tmp_path):
"""test Kerchunk reference."""
d = tmp_path / "ref"
d.mkdir()

netcdf = os.path.join(prefix, "dataset_3d.nc")
reference = os.path.join(
str(d),
"reference.json",
)

with fsspec.open(netcdf, mode="rb", anon=True) as infile:
h5chunks = SingleHdf5ToZarr(infile, netcdf, inline_threshold=100)
with open(reference, "w") as f:
f.write(json.dumps(h5chunks.translate()))

for protocol in ["", "reference://"]:
src_path = protocol + reference
assert Reader.list_variables(src_path) == ["dataset"]
with Reader(src_path, variable="dataset") as src:
assert src.info()
assert src.tile(0, 0, 0)
16 changes: 0 additions & 16 deletions src/titiler/xarray/titiler/xarray/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@ class XarrayIOParams(DefaultDependency):
),
] = None

reference: Annotated[
Optional[bool],
Query(
title="reference",
description="Whether the dataset is a kerchunk reference",
),
] = None

decode_times: Annotated[
Optional[bool],
Query(
Expand All @@ -38,14 +30,6 @@ class XarrayIOParams(DefaultDependency):
),
] = None

consolidated: Annotated[
Optional[bool],
Query(
title="consolidated",
description="Whether to expect and open zarr store with consolidated metadata",
),
] = None

# cache_client


Expand Down
82 changes: 43 additions & 39 deletions src/titiler/xarray/titiler/xarray/io.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
"""titiler.xarray.io"""

import pickle
import re
from typing import Any, Callable, Dict, List, Optional, Protocol
from urllib.parse import urlparse

import attr
import fsspec
import numpy
import s3fs
import xarray
from morecantile import TileMatrixSet
from rio_tiler.constants import WEB_MERCATOR_TMS
from rio_tiler.io.xarray import XarrayReader

try:
import s3fs
except ImportError: # pragma: nocover
s3fs = None # type: ignore

try:
import aiohttp
except ImportError: # pragma: nocover
aiohttp = None # type: ignore


class CacheClient(Protocol):
"""CacheClient Protocol."""
Expand All @@ -26,27 +35,18 @@ def set(self, key: str, body: bytes) -> None:
...


def parse_protocol(src_path: str, reference: Optional[bool] = False) -> str:
def parse_protocol(src_path: str) -> str:
"""Parse protocol from path."""
match = re.match(r"^(s3|https|http)", src_path)
protocol = "file"
if match:
protocol = match.group(0)

# override protocol if reference
if reference:
protocol = "reference"

return protocol
parsed = urlparse(src_path)
return parsed.scheme or "file"


def xarray_engine(src_path: str) -> str:
"""Parse xarray engine from path."""
# ".hdf", ".hdf5", ".h5" will be supported once we have tests + expand the type permitted for the group parameter
if any(src_path.lower().endswith(ext) for ext in [".nc", ".nc4"]):
return "h5netcdf"
else:
return "zarr"
return "zarr"


def get_filesystem(
Expand All @@ -59,18 +59,27 @@ def get_filesystem(
Get the filesystem for the given source path.
"""
if protocol == "s3":
assert s3fs is not None, "s3fs must be installed to support S3:// url"

s3_filesystem = s3fs.S3FileSystem()
return (
s3_filesystem.open(src_path)
if xr_engine == "h5netcdf"
else s3fs.S3Map(root=src_path, s3=s3_filesystem)
)

elif protocol == "reference":
reference_args = {"fo": src_path, "remote_options": {"anon": anon}}
elif protocol == "reference" or src_path.lower().endswith(".json"):
reference_args = {
"fo": src_path.replace("reference://", ""),
"remote_options": {"anon": anon},
}
return fsspec.filesystem("reference", **reference_args).get_mapper("")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if we have a reference:// url or a .json file

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are at least three ways to store virtual references:

  • json following kerchunk spec
  • parquet following kerchunk spec
  • icechunk spec (currently mostly messagepack)

Do you think the "reference" protocol will be used for all of these in the future, with some automatic differentiation between the storage options? Another option would be to specifically have "kerchunk" and "icechunk" as protocol options, with automatic differentiations between json and parquet based on the file extension.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could just support reference:// prefix instead of checking the .json

I'm also not sure how to support S3:// referenced files, I'm reading that you can use "remote_protocol": "s3", but we don't have ways to know this!

cc @abarciauskas-bgse

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8e10da4

only support reference with reference:// prefix and added support for parquet file (but fastparquet dependency should be installed)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to try and figure out how to test this locally but in the mean time, here is what I am wondering:

  • json following kerchunk spec - the current implementation should work for local json references (reference://file.json) but would it work for remotely stored references, e.g. s3://file.json or http://file.json?
  • parquet following kerchunk spec - files will typically end in .parq or .parquet - again I think this would work for local parquet stores but not sure about remote parquet stores which will be using the s3 or https protocol
  • @maxrjones have you tested opening an icechunk store with fsspec and passing that as an argument to xarray? I haven't so I am not sure if it will work, but I think we can address icechunk support later.

If we have src_path.lower().endswith(".json") I feel like we should also have .parquet conditional here, but I think this is too coupled with what formats kerchunk is implemented in and we should rely on something else... like a path parameter 😬

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the latest commit I've removed the .json test to only accept reference:// prefix

I guess we could accept things like reference+s3:// 🤷

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abarciauskas-bgse 🤔 I'm not sure we can really support non-local reference file!

FYI, I've tried reference://s3://ds-satellite/netcdf/reference.json and it correctly read the reference file but fails at reading the referenced netcdf (stored on S3 as well). I guess I need to set remote_protocol="s3" somewhere.

@abarciauskas-bgse are we using kerchunk in any production project? to me it seems this need more input definition

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using it here: https://www.earthdata.nasa.gov/dashboard/data-catalog/combined_CMIP6_daily_GISS-E2-1-G_tas_kerchunk_DEMO (which I believe uses an s3 protocol for the reference file (private bucket) and the netcdfs it points to are in a public s3 bucket) so I know it should work. I can take a closer look later today ...

Copy link
Contributor

@abarciauskas-bgse abarciauskas-bgse Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by the way, I'm on the fence about whether we support kerchunk-style references in this new xarray integration to core titiler, since I think we will be able to use icechunk for reference-style (aka virtual) access in the future. Anyways, kerchunk-style references are still supported by titiler-xarray so we wouldn't have to deprecate https://www.earthdata.nasa.gov/dashboard/data-catalog/combined_CMIP6_daily_GISS-E2-1-G_tas_kerchunk_DEMO right away.

@maxrjones thoughts here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see the motivation for dropping support, since it seems that icechunk is committed to avoiding the fsspec method of using keyword arguments for configuration which is the source of a lot of the complexity here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, ok, @vincentsarago sorry for the delay in decision here, but lets just drop reference support for now and we can add it back if we decide we need it.

Ideally we could have some documentation about how to easily add it, if people want kerchunk-style reference support with titiler, but I'm not sure at this point how we would do that.


elif protocol in ["https", "http", "file"]:
elif protocol in ["https", "http", "file", "reference"]:
if protocol.startswith("http"):
assert (
aiohttp is not None
), "aiohttp must be installed to support HTTP:// url"

filesystem = fsspec.filesystem(protocol) # type: ignore
return (
filesystem.open(src_path)
Expand All @@ -85,9 +94,7 @@ def get_filesystem(
def xarray_open_dataset(
src_path: str,
group: Optional[Any] = None,
reference: Optional[bool] = False,
decode_times: Optional[bool] = True,
consolidated: Optional[bool] = True,
cache_client: Optional[CacheClient] = None,
) -> xarray.Dataset:
"""Open dataset."""
Expand All @@ -98,7 +105,7 @@ def xarray_open_dataset(
if data_bytes:
return pickle.loads(data_bytes)

protocol = parse_protocol(src_path, reference=reference)
protocol = parse_protocol(src_path)
xr_engine = xarray_engine(src_path)
file_handler = get_filesystem(src_path, protocol, xr_engine)

Expand All @@ -117,17 +124,22 @@ def xarray_open_dataset(
if xr_engine == "h5netcdf":
xr_open_args["engine"] = "h5netcdf"
xr_open_args["lock"] = False
else:
# Zarr arguments
xr_open_args["engine"] = "zarr"
xr_open_args["consolidated"] = consolidated
ds = xarray.open_dataset(file_handler, **xr_open_args)

elif protocol == "reference" or src_path.lower().endswith(".json"):
xr_open_args.update(
{
"engine": "zarr",
"consolidated": False,
"backend_kwargs": {"consolidated": False},
}
)

# Additional arguments when dealing with a reference file.
if reference:
xr_open_args["consolidated"] = False
xr_open_args["backend_kwargs"] = {"consolidated": False}
ds = xarray.open_dataset(file_handler, **xr_open_args)

ds = xarray.open_dataset(file_handler, **xr_open_args)
# Fallback to Zarr
else:
ds = xarray.open_zarr(file_handler, **xr_open_args)

if cache_client:
# Serialize the dataset to bytes using pickle
Expand Down Expand Up @@ -245,9 +257,7 @@ class Reader(XarrayReader):
opener: Callable[..., xarray.Dataset] = attr.ib(default=xarray_open_dataset)

group: Optional[Any] = attr.ib(default=None)
reference: bool = attr.ib(default=False)
decode_times: bool = attr.ib(default=False)
consolidated: Optional[bool] = attr.ib(default=True)
cache_client: Optional[CacheClient] = attr.ib(default=None)

# xarray.DataArray options
Expand All @@ -266,9 +276,7 @@ def __attrs_post_init__(self):
self.ds = self.opener(
self.src_path,
group=self.group,
reference=self.reference,
decode_times=self.decode_times,
consolidated=self.consolidated,
cache_client=self.cache_client,
)

Expand All @@ -293,14 +301,10 @@ def list_variables(
cls,
src_path: str,
group: Optional[Any] = None,
reference: Optional[bool] = False,
consolidated: Optional[bool] = True,
) -> List[str]:
"""List available variable in a dataset."""
with xarray_open_dataset(
src_path,
group=group,
reference=reference,
consolidated=consolidated,
) as ds:
return list(ds.data_vars) # type: ignore
Loading