Skip to content

Commit

Permalink
finalize tropo dask extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
bbuzz31 committed Sep 7, 2023
1 parent 4840963 commit 6ebadf9
Showing 1 changed file with 128 additions and 71 deletions.
199 changes: 128 additions & 71 deletions tools/ARIAtools/tsSetup_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

import dask
import logging
import os.path as op
import shutil, glob
import os, os.path as op
from pathlib import Path
from itertools import compress
from osgeo import gdal
Expand Down Expand Up @@ -371,6 +372,71 @@ def vprint(x): return print(x) if verbose == True else None
client.close()


def exportIono(product_dict, bbox_file, prods_TOTbbox, arres,
work_dir, outputFormat='ISCE', verbose=True,
mask=None, multilook=None, n_jobs=1):

# verbose printing
vprint = lambda x: print(x) if verbose == True else None

# initalize multiprocessing
if n_jobs>1 or len(product_dict)>1:
vprint('Running GUNW ionosphere in parallel!')
client = Client(threads_per_worker=1,
n_workers=n_jobs,
memory_limit='10GB')
vprint(f'Link: {client.dashboard_link}')
else:
client = None

# Create output directories
iono_dir = Path(work_dir) / 'ionosphere'
iono_dir.mkdir(parents=True, exist_ok=True)

outNames = [ix['pair_name'][0] for ix in product_dict]
outFileIono = [iono_dir / name for name in outNames]

export_list = [not(outIono.exists()) for outIono in outFileIono]

# Get only the pairs we need to run
product_dict = compress(product_dict, export_list)
outNames = compress(outNames, export_list)
outFileIono = compress(outFileIono, export_list)

zipped_jobs = zip(product_dict, outNames, outFileIono)

# Get bounds
bounds = open_shapefile(bbox_file, 0, 0).bounds

# Run exporting and stitching
export_dict = dict(
input_iono_files = None,
arrres = arrres,
output_iono = None,
output_format = outputFormat,
bounds = bounds,
clip_json = prods_TOTbbox,
mask_file = mask,
verbose = verbose,
overwrite = True)

jobs = []
for product, name, outIono in zipped_jobs:
export_dict['input_iono_files'] = product['ionosphere']
export_dict['output_iono'] = outIono

job = dask.delayed(export_ionosphere)(**export_dict, dask_key_name=name)
jobs.append(job)

# Run export jobs
vprint(f'Run number of jobs: {len(jobs)}')
out = dask.compute(*jobs)
progress(out) # need to check how to make dask progress bar with dask
# close dask
if client:
client.close()


def exportTropo(product_dict, bbox_file, prods_TOTbbox, dem, Latitude, Longitude, workdir,
wmodel='HRRR', layer='troposphereWet', mask=None,
n_threads=1, n_jobs=1, verbose=True, debug=False):
Expand Down Expand Up @@ -406,8 +472,8 @@ def vprint(x): return print(x) if verbose == True else None
export_list = [not (outFile.exists()) for outFile in outFiles]

# Get only the pairs we need to run
# product_dict = compress(product_dict, export_list)
# outNames = compress(outNames, export_list)
product_dict = compress(product_dict, export_list)
outNames = compress(outNames, export_list)

job_dict = dict(
mask=mask,
Expand Down Expand Up @@ -447,79 +513,70 @@ def vprint(x): return print(x) if verbose == True else None
vprint(f'Run number of secondary jobs: {len(sec_jobs)} with {n_jobs} workers')
out = dask.compute(*sec_jobs)

## rename the files
# src_dir = op.join(hyd_workdir, ifg)
# dst_dir = op.join(hyd_workdir)
# os.rename(src_dir, dst_dir)

# close dask
if client:
client.close()


def exportIono(product_dict, bbox_file, prods_TOTbbox, arres,
work_dir, outputFormat='ISCE', verbose=True,
mask=None, multilook=None, n_jobs=1):

# verbose printing
vprint = lambda x: print(x) if verbose == True else None

# initalize multiprocessing
if n_jobs>1 or len(product_dict)>1:
vprint('Running GUNW ionosphere in parallel!')
client = Client(threads_per_worker=1,
n_workers=n_jobs,
memory_limit='10GB')
vprint(f'Link: {client.dashboard_link}')
else:
client = None

# Create output directories
iono_dir = Path(work_dir) / 'ionosphere'
iono_dir.mkdir(parents=True, exist_ok=True)

outNames = [ix['pair_name'][0] for ix in product_dict]
outFileIono = [iono_dir / name for name in outNames]

export_list = [not(outIono.exists()) for outIono in outFileIono]

# Get only the pairs we need to run
product_dict = compress(product_dict, export_list)
outNames = compress(outNames, export_list)
outFileIono = compress(outFileIono, export_list)

zipped_jobs = zip(product_dict, outNames, outFileIono)

# Get bounds
bounds = open_shapefile(bbox_file, 0, 0).bounds

# Run exporting and stitching
export_dict = dict(
input_iono_files = None,
arrres = arrres,
output_iono = None,
output_format = outputFormat,
bounds = bounds,
clip_json = prods_TOTbbox,
mask_file = mask,
verbose = verbose,
overwrite = True)
def move_tropo_layers(path_wd, ifgs, wm='HRRR'):
""" Move tropo layers from there temporary 'ifg' directory to one AT expects
jobs = []
for product, name, outIono in zipped_jobs:
export_dict['input_iono_files'] = product['ionosphere']
export_dict['output_iono'] = outIono
path_wd should contain troposphereWet
ifgs is a list of datepairs (YYYYMMDD_YYYYMMDD)
"""

job = dask.delayed(export_ionosphere)(**export_dict, dask_key_name=name)
jobs.append(job)
## move the files to one outer
path_wd = Path(path_wd)
paths_dry = [path_wd / 'troposphereHydrostatic' / ifg / wm for ifg in ifgs]
paths_wet = [path_wd / 'troposphereWet' / ifg / wm for ifg in ifgs]

# Run export jobs
vprint(f'Run number of jobs: {len(jobs)}')
out = dask.compute(*jobs)
progress(out) # need to check how to make dask progress bar with dask
# close dask
if client:
client.close()
for paths in [paths_dry, paths_wet]:
for j, src in enumerate(paths):
if j == 0:
dst = src.parents[1]
shutil.move(src, dst)
else:
for f in glob.glob(f'{src}/*'):
if op.isdir(f) and op.basename(f) == 'dates':
for f1 in glob.glob(f'{f}/*'):
# keep the first single date thats unpacked (theoretically always same between ifgs)
dst = src.parents[1] / wm / 'dates' / op.basename(f1)
if not op.exists(dst):
shutil.move(f1, dst)

else:
shutil.move(f, src.parents[1] / wm )
shutil.rmtree(src.parents[0]) # clean up temporary ifg dir
print ('Moved paths to correct directories')
return


def compute_tropo_total(path_wd, wm='HRRR'):
""" Add wet and dry for the individual dates """
import rioxarray as xrr
i = 0
for root, dirs, files in os.walk(Path(path_wd) / 'troposphereHydrostatic' / wm / 'dates'):
for f in files:
if f.endswith('vrt'):
continue
path_hyd = op.join(root, f)
path_wet = path_hyd.replace('Hydrostatic', 'wet')
path_tot = path_hyd.replace('Hydrostatic', 'Total')
os.makedirs(op.dirname(path_tot), exist_ok=True)

with xrr.open_rasterio(path_hyd) as da_hyd:
arr_hyd = da_hyd.data
with xrr.open_rasterio(path_wet) as da_wet:
arr_wet = da_wet.data

arr_total = arr_hyd + arr_wet
da_total = da_wet.copy()
da_total.data = arr_total
da_total.rio.to_raster(path_tot, driver='GTiff')
gdal.BuildVRT(f'{path_tot}.vrt', path_tot)
i+=1

print (f'Wrote troposphereTotal for {i} dates.')


def _export_tropo(prods, layer, wmodel, workdir, bounds, prods_TOTbbox,
Expand Down Expand Up @@ -675,9 +732,9 @@ def main(inps=None):
outputFormat=inps.outputFormat,
num_threads=inps.num_threads)

# NOTE we should store variable above in PICKLE so we can skip
# NOTE we should store variable above in PICKLE so we can skip
# preparing inputs every time if not otherwise specify

# export unwrappedPhase
layers = ['unwrappedPhase', 'coherence']
print('\nExtracting unwrapped phase, coherence, '
Expand Down Expand Up @@ -712,7 +769,7 @@ def main(inps=None):
mask=inps.mask,
n_threads=inps.num_threads, n_jobs=inps.jobs)


print('\nExtracting perpendicular baseline grids for each '
'interferogram pair')
max_jobs = len(product_dict)
Expand Down

0 comments on commit 6ebadf9

Please sign in to comment.