Skip to content

Commit

Permalink
add parquet support
Browse files Browse the repository at this point in the history
  • Loading branch information
vincentsarago committed Nov 4, 2024
1 parent 8dffefc commit ea4709c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 16 deletions.
1 change: 1 addition & 0 deletions src/titiler/xarray/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ test = [
"pytest-asyncio",
"httpx",
"kerchunk",
"fastparquet",
]

[project.urls]
Expand Down
29 changes: 22 additions & 7 deletions src/titiler/xarray/tests/test_io_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import numpy
import pytest
import xarray
from kerchunk.df import refs_to_dataframe
from kerchunk.hdf import SingleHdf5ToZarr

from titiler.xarray.io import Reader, get_variable
Expand Down Expand Up @@ -152,19 +153,33 @@ def test_kerchunk_reference(tmp_path):
d.mkdir()

netcdf = os.path.join(prefix, "dataset_3d.nc")

# JSON kerchunk
reference = os.path.join(
str(d),
"reference.json",
)

with fsspec.open(netcdf, mode="rb", anon=True) as infile:
h5chunks = SingleHdf5ToZarr(infile, netcdf, inline_threshold=100)
with open(reference, "w") as f:
out_dict = h5chunks.translate()
f.write(json.dumps(h5chunks.translate()))

for protocol in ["", "reference://"]:
src_path = protocol + reference
assert Reader.list_variables(src_path) == ["dataset"]
with Reader(src_path, variable="dataset") as src:
assert src.info()
assert src.tile(0, 0, 0)
src_path = "reference://" + reference
assert Reader.list_variables(src_path) == ["dataset"]
with Reader(src_path, variable="dataset") as src:
assert src.info()
assert src.tile(0, 0, 0)

# kerchunk Parquet impl
reference_parquet = os.path.join(
str(d),
"reference.parq",
)

refs_to_dataframe(out_dict, reference_parquet)
src_path = "reference://" + reference_parquet
assert Reader.list_variables(src_path) == ["dataset"]
with Reader(src_path, variable="dataset") as src:
assert src.info()
assert src.tile(0, 0, 0)
33 changes: 24 additions & 9 deletions src/titiler/xarray/titiler/xarray/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def xarray_engine(src_path: str) -> str:
# ".hdf", ".hdf5", ".h5" will be supported once we have tests + expand the type permitted for the group parameter
if any(src_path.lower().endswith(ext) for ext in [".nc", ".nc4"]):
return "h5netcdf"

return "zarr"


Expand All @@ -67,14 +68,15 @@ def get_filesystem(
if xr_engine == "h5netcdf"
else s3fs.S3Map(root=src_path, s3=s3_filesystem)
)
elif protocol == "reference" or src_path.lower().endswith(".json"):

elif protocol == "reference":
reference_args = {
"fo": src_path.replace("reference://", ""),
"remote_options": {"anon": anon},
}
return fsspec.filesystem("reference", **reference_args).get_mapper("")

elif protocol in ["https", "http", "file", "reference"]:
elif protocol in ["https", "http", "file"]:
if protocol.startswith("http"):
assert (
aiohttp is not None
Expand Down Expand Up @@ -120,13 +122,7 @@ def xarray_open_dataset(
if group is not None:
xr_open_args["group"] = group

# NetCDF arguments
if xr_engine == "h5netcdf":
xr_open_args["engine"] = "h5netcdf"
xr_open_args["lock"] = False
ds = xarray.open_dataset(file_handler, **xr_open_args)

elif protocol == "reference" or src_path.lower().endswith(".json"):
if protocol == "reference":
xr_open_args.update(
{
"engine": "zarr",
Expand All @@ -137,8 +133,27 @@ def xarray_open_dataset(

ds = xarray.open_dataset(file_handler, **xr_open_args)

# NetCDF arguments
elif xr_engine == "h5netcdf":
xr_open_args.update(
{
"engine": "h5netcdf",
"lock": False,
}
)

ds = xarray.open_dataset(file_handler, **xr_open_args)

# Fallback to Zarr
else:
if protocol == "reference":
xr_open_args.update(
{
"consolidated": False,
"backend_kwargs": {"consolidated": False},
}
)

ds = xarray.open_zarr(file_handler, **xr_open_args)

if cache_client:
Expand Down

0 comments on commit ea4709c

Please sign in to comment.