Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finalize recipe (+ test new rclone copy stage) #1

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a5370cf
Test recipe
jbusecke Oct 17, 2024
8ef1232
Ready to test dataflow
jbusecke Oct 17, 2024
5251d62
Update recipe.py
jbusecke Oct 17, 2024
a19597c
Full date range
jbusecke Oct 17, 2024
8c3b0a9
Update recipe.py
jbusecke Oct 24, 2024
07c0bfd
Update requirements.txt
jbusecke Oct 24, 2024
72f86c8
Update requirements.txt
jbusecke Oct 24, 2024
3e17987
Update requirements.txt
jbusecke Oct 24, 2024
4f0d239
Update requirements.txt
jbusecke Oct 24, 2024
e203a41
Update recipe.py
jbusecke Oct 24, 2024
7410be1
Update recipe.py
jbusecke Oct 24, 2024
eab1ce6
Update recipe.py
jbusecke Oct 24, 2024
ec1f52d
Update config_dataflow.py
jbusecke Oct 24, 2024
7934492
Update catalog.yaml
jbusecke Oct 25, 2024
622fd1e
Update recipe.py
jbusecke Oct 25, 2024
c78daf4
Update recipe.py
jbusecke Oct 25, 2024
07fdfcb
Update recipe.py
jbusecke Oct 25, 2024
66754bd
Update recipe.py
jbusecke Oct 25, 2024
687649b
Update config_dataflow.py
jbusecke Oct 28, 2024
a903a3b
Update config_dataflow.py
jbusecke Oct 28, 2024
7cf6564
Update config_dataflow.py
jbusecke Oct 28, 2024
39d5458
Update recipe.py
jbusecke Oct 28, 2024
23d4a82
bump beam & change config
norlandrhagen Nov 14, 2024
d9006af
version conflight
norlandrhagen Nov 14, 2024
dab46d7
wip
norlandrhagen Nov 14, 2024
95dbe6f
wip
norlandrhagen Nov 14, 2024
b6e151c
wip
norlandrhagen Nov 14, 2024
5135711
chunking update
norlandrhagen Nov 14, 2024
8894564
downgrade beam
norlandrhagen Nov 14, 2024
8289fbb
update local hub config
norlandrhagen Nov 19, 2024
0462ddb
chunk
norlandrhagen Nov 19, 2024
eb9bd01
updated chunking
norlandrhagen Nov 19, 2024
bcf0561
bump dataflow config, pin gcsfs, only storetozarr
norlandrhagen Nov 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions configs/config_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
repo_path = os.environ['GITHUB_REPOSITORY']
FEEDSTOCK_NAME = repo_path.split('/')[-1]

c.Bake.prune = 1
c.Bake.prune = False
c.Bake.bakery_class = "pangeo_forge_runner.bakery.dataflow.DataflowBakery"
c.DataflowBakery.use_dataflow_prime = True
c.DataflowBakery.max_workers = 50
c.Bake.container_image = "quay.io/leap-stc/rclone-beam:2024.09.24"
c.DataflowBakery.use_dataflow_prime = False
c.DataflowBakery.machine_type = "n2d-highmem-2"
c.DataflowBakery.max_num_workers = 30
c.DataflowBakery.use_public_ips = True
c.DataflowBakery.service_account_email = (
"[email protected]"
)
c.DataflowBakery.autoscaling_algorithm = "NONE"
c.DataflowBakery.project_id = "leap-pangeo"
c.DataflowBakery.temp_gcs_location = f"gs://leap-scratch/data-library/feedstocks/temp/{FEEDSTOCK_NAME}"
c.TargetStorage.fsspec_class = "gcsfs.GCSFileSystem"
Expand Down
4 changes: 2 additions & 2 deletions configs/config_local_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
BUCKET_PREFIX = f"gs://leap-scratch/{user}/{repo_name}"
print(f"{BUCKET_PREFIX=}")

c.Bake.prune = 1
c.Bake.prune = True
c.Bake.bakery_class = "pangeo_forge_runner.bakery.local.LocalDirectBakery"
c.TargetStorage.fsspec_class = "gcsfs.GCSFileSystem"
c.InputCacheStorage.fsspec_class = "gcsfs.GCSFileSystem"
c.TargetStorage.root_path = f"{BUCKET_PREFIX}/output/{{job_name}}"
c.InputCacheStorage.root_path = f"{BUCKET_PREFIX}/cache/"
c.InputCacheStorage.root_path = f"gs://leap-scratch/data-library/feedstocks/cache"
2 changes: 1 addition & 1 deletion feedstock/catalog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ tags:
stores:
- id: "chirps-global-daily"
name: "CHIRPS-2.0"
url: "gs://leap-scratch/data-library/feedstocks/chirps_feedstock/chirps-global-dailyzarr"
url: "https://nyu1.osn.mghpcc.org/leap-pangeo-pipeline/chirps_feedstock/chirps-global-daily.zarr"
49 changes: 14 additions & 35 deletions feedstock/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@
import os
import apache_beam as beam
from leap_data_management_utils.data_management_transforms import (
Copy,
InjectAttrs,
get_catalog_store_urls,
)
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
OpenURLWithFSSpec,
OpenWithXarray,
StoreToZarr,
ConsolidateMetadata,
ConsolidateDimensionCoordinates,
)

# parse the catalog store locations (this is where the data is copied to after successful write (and maybe testing)
Expand All @@ -32,48 +28,31 @@
print(f"{catalog_store_urls=}")

## Monthly version
input_urls_a = [
"gs://cmip6/pgf-debugging/hanging_bug/file_a.nc",
"gs://cmip6/pgf-debugging/hanging_bug/file_b.nc",
]
input_urls_b = [
"gs://cmip6/pgf-debugging/hanging_bug/file_a_huge.nc",
"gs://cmip6/pgf-debugging/hanging_bug/file_b_huge.nc",
years = range(1981, 2025)
input_urls = [
f"http://data.chc.ucsb.edu/products/CHIRPS-2.0/global_daily/netcdf/p05/chirps-v2.0.{year}.days_p05.nc"
for year in years
]

pattern_a = pattern_from_file_sequence(input_urls_a, concat_dim="time")
pattern_b = pattern_from_file_sequence(input_urls_b, concat_dim="time")
pattern_a = pattern_from_file_sequence(input_urls, concat_dim="time")


# small recipe
small = (
recipe = (
beam.Create(pattern_a.items())
| OpenURLWithFSSpec()
| OpenWithXarray()
| StoreToZarr(
store_name="small.zarr",
store_name="chirps-global-daily.zarr",
# FIXME: This is brittle. it needs to be named exactly like in meta.yaml...
# Can we inject this in the same way as the root?
# Maybe its better to find another way and avoid injections entirely...
combine_dims=pattern_a.combine_dim_keys,
target_chunks={"time": 200, "latitude": 200, "longitude": 720},
)
| InjectAttrs()
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
| Copy(target=catalog_store_urls["small"])
)

# larger recipe
large = (
beam.Create(pattern_b.items())
| OpenURLWithFSSpec()
| OpenWithXarray()
| StoreToZarr(
store_name="large.zarr",
combine_dims=pattern_b.combine_dim_keys,
)
| InjectAttrs()
| ConsolidateDimensionCoordinates()
| ConsolidateMetadata()
| Copy(target=catalog_store_urls["large"])
# | InjectAttrs()
# | ConsolidateDimensionCoordinates()
# | ConsolidateMetadata()
# | CopyRclone(
# target=catalog_store_urls["chirps-global-daily"].replace(
# "https://nyu1.osn.mghpcc.org/", ""))
)
8 changes: 5 additions & 3 deletions feedstock/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pangeo-forge-recipes==0.10.7
apache-beam[gcp]
gcsfs
leap-data-management-utils==0.0.12
apache-beam[gcp]==2.58.0
gcsfs==2024.9.0
leap-data-management-utils[pangeo-forge] @ git+https://github.com/leap-stc/leap-data-management-utils.git@rclone-copy-stage
# leap-data-management-utils==0.0.12
pyopenssl >= 23.2.0
Loading