Skip to content

Commit

Permalink
Merge pull request #45 from ASFHyP3/forrest_edits
Browse files Browse the repository at this point in the history
Add product loading and packaging to time_series tool
  • Loading branch information
AndrewPlayer3 authored Sep 6, 2024
2 parents e45865e + a298753 commit 98ab3f8
Showing 1 changed file with 105 additions and 94 deletions.
199 changes: 105 additions & 94 deletions src/hyp3_srg/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,76 @@

import argparse
import logging
import shutil
import zipfile
from os import mkdir
from shutil import copyfile
from pathlib import Path
from shutil import copyfile
from typing import Iterable, Optional

from boto3 import client
from hyp3lib.aws import upload_file_to_s3
from hyp3lib.fetch import download_file as download_from_http
from shapely import unary_union

from hyp3_srg import dem, utils


S3 = client('s3')
log = logging.getLogger(__name__)


def download_from_s3(uri: str, dest_dir: Optional[Path] = None) -> None:
"""Download a file from an S3 bucket
Args:
uri: URI of the file to download
"""
if dest_dir is None:
dest_dir = Path.cwd()

simple_s3_uri = Path(uri.replace('s3://', ''))
bucket = simple_s3_uri.parts[0]
key = '/'.join(simple_s3_uri.parts[1:])
out_path = dest_dir / simple_s3_uri.parts[-1]
S3.download_file(bucket, key, out_path)
return out_path


def load_products(uris: Iterable[str], overwrite: bool = False):
"""Load the products from the provided URIs
Args:
uris: list of URIs to the SRG GSLC products
overwrite: overwrite existing products
"""
work_dir = Path.cwd()
granule_names = []
for uri in uris:
name = Path(Path(uri).name)
geo_name = name.with_suffix('.geo')
zip_name = name.with_suffix('.zip')

product_exists = geo_name.exists() or zip_name.exists()
if product_exists and not overwrite:
pass
elif uri.startswith('s3'):
download_from_s3(uri, dest_dir=work_dir)
elif uri.startswith('http'):
download_from_http(uri, directory=work_dir)
elif len(Path(uri).parts) > 1:
shutil.copy(uri, work_dir)

if not geo_name.exists():
shutil.unpack_archive(name.with_suffix('.zip'), work_dir)

granule_names.append(str(name))

return granule_names


def get_size_from_dem(dem_file: str) -> tuple[int]:
""" Get the length and width from a .rsc DEM file
"""Get the length and width from a .rsc DEM file
Args:
dem_file: path to the .rsc dem file.
Expand All @@ -37,12 +91,9 @@ def get_size_from_dem(dem_file: str) -> tuple[int]:


def generate_wrapped_interferograms(
looks: tuple[int],
baselines: tuple[int],
dem_shape: tuple[int],
work_dir: Path
looks: tuple[int], baselines: tuple[int], dem_shape: tuple[int], work_dir: Path
) -> None:
""" Generates wrapped interferograms from GSLCs
"""Generates wrapped interferograms from GSLCs
Args:
looks: tuple containing the number range looks and azimuth looks
Expand All @@ -54,31 +105,14 @@ def generate_wrapped_interferograms(
looks_down, looks_across = looks
time_baseline, spatial_baseline = baselines

utils.call_stanford_module(
'sentinel/sbas_list.py',
args=[time_baseline, spatial_baseline],
work_dir=work_dir
)
utils.call_stanford_module('sentinel/sbas_list.py', args=[time_baseline, spatial_baseline], work_dir=work_dir)

sbas_args = [
'sbas_list',
'../elevation.dem.rsc',
1,
1,
dem_width,
dem_length,
looks_down,
looks_across
]
sbas_args = ['sbas_list', '../elevation.dem.rsc', 1, 1, dem_width, dem_length, looks_down, looks_across]
utils.call_stanford_module('sentinel/ps_sbas_igrams.py', args=sbas_args, work_dir=work_dir)


def unwrap_interferograms(
dem_shape: tuple[int],
unw_shape: tuple[int],
work_dir: Path
) -> None:
""" Unwraps wrapped interferograms in parallel
def unwrap_interferograms(dem_shape: tuple[int], unw_shape: tuple[int], work_dir: Path) -> None:
"""Unwraps wrapped interferograms in parallel
Args:
dem_shape: tuple containing the dem width and dem length
Expand All @@ -88,24 +122,15 @@ def unwrap_interferograms(
dem_width, dem_length = dem_shape
unw_width, unw_length = unw_shape

reduce_dem_args = [
'../elevation.dem',
'dem',
dem_width,
dem_width // unw_width,
dem_length // unw_length
]
reduce_dem_args = ['../elevation.dem', 'dem', dem_width, dem_width // unw_width, dem_length // unw_length]
utils.call_stanford_module('util/nbymi2', args=reduce_dem_args, work_dir=work_dir)
utils.call_stanford_module('util/unwrap_parallel.py', args=[unw_width], work_dir=work_dir)


def compute_sbas_velocity_solution(
threshold: float,
do_tropo_correction: bool,
unw_shape: tuple[int],
work_dir: Path
threshold: float, do_tropo_correction: bool, unw_shape: tuple[int], work_dir: Path
) -> None:
""" Computes the sbas velocity solution from the unwrapped interferograms
"""Computes the sbas velocity solution from the unwrapped interferograms
Args:
threshold: ...
Expand All @@ -115,7 +140,7 @@ def compute_sbas_velocity_solution(
"""
unw_width, unw_length = unw_shape

utils.call_stanford_module('sbas/sbas_setup.py', args=['sbas_list', 'geolist'], work_dir=work_dir)
utils.call_stanford_module('sbas/sbas_setup.py', args=['sbas_list', 'geolist'], work_dir=work_dir)
copyfile(work_dir / 'intlist', work_dir / 'unwlist')
utils.call_stanford_module('util/sed.py', args=['s/int/unw/g', 'unwlist'], work_dir=work_dir)

Expand All @@ -126,29 +151,14 @@ def compute_sbas_velocity_solution(
with open(work_dir / 'geolist', 'r') as slc_list:
num_slcs = len(slc_list.readlines())

ref_point_args = [
'unwlist',
unw_width,
unw_length,
threshold
]
ref_point_args = ['unwlist', unw_width, unw_length, threshold]
utils.call_stanford_module('int/findrefpoints', args=ref_point_args, work_dir=work_dir)

if do_tropo_correction:
tropo_correct_args = [
'unwlist',
unw_width,
unw_length
]
tropo_correct_args = ['unwlist', unw_width, unw_length]
utils.call_stanford_module('int/tropocorrect.py', args=tropo_correct_args, work_dir=work_dir)

sbas_velocity_args = [
'unwlist',
num_unw_files,
num_slcs,
unw_width,
'ref_locs'
]
sbas_velocity_args = ['unwlist', num_unw_files, num_slcs, unw_width, 'ref_locs']
utils.call_stanford_module('sbas/sbas', args=sbas_velocity_args, work_dir=work_dir)


Expand All @@ -157,9 +167,9 @@ def create_time_series(
baselines: tuple[int] = (1000, 1000),
threshold: float = 0.5,
do_tropo_correction: bool = True,
work_dir: Path | None = None
work_dir: Path | None = None,
) -> None:
""" Creates a time series from a stack of GSLCs consisting of interferograms and a velocity solution
"""Creates a time series from a stack of GSLCs consisting of interferograms and a velocity solution
Args:
looks: tuple containing the number range looks and azimuth looks
Expand All @@ -169,30 +179,17 @@ def create_time_series(
work_dir: the directory containing the GSLCs to do work in
"""
dem_shape = get_size_from_dem('elevation.dem.rsc')
generate_wrapped_interferograms(
looks=looks,
baselines=baselines,
dem_shape=dem_shape,
work_dir=work_dir
)
generate_wrapped_interferograms(looks=looks, baselines=baselines, dem_shape=dem_shape, work_dir=work_dir)

unw_shape = get_size_from_dem(work_dir / 'dem.rsc')
unwrap_interferograms(
dem_shape=dem_shape,
unw_shape=unw_shape,
work_dir=work_dir
)
unwrap_interferograms(dem_shape=dem_shape, unw_shape=unw_shape, work_dir=work_dir)

compute_sbas_velocity_solution(
threshold=threshold,
do_tropo_correction=do_tropo_correction,
unw_shape=unw_shape,
work_dir=work_dir
threshold=threshold, do_tropo_correction=do_tropo_correction, unw_shape=unw_shape, work_dir=work_dir
)


# TODO: Package the time series files
def package_time_series(work_dir) -> Path:
def package_time_series(work_dir: Optional[Path] = None) -> Path:
"""Package the time series into a product zip file.
Args:
Expand All @@ -201,21 +198,32 @@ def package_time_series(work_dir) -> Path:
Returns:
Path to the created zip file
"""
gslc_path = list(work_dir.glob('S1*.geo'))[0]
product_name = gslc_path.with_suffix('').name
if work_dir is None:
work_dir = Path.cwd()
sbas_dir = work_dir / 'sbas'
# TODO: create name based on input granules
product_name = 'time_series'
product_path = work_dir / product_name
product_path.mkdir(exist_ok=True, parents=True)
zip_path = work_dir / f'{product_name}.zip'

parameter_file = work_dir / f'{product_name}.txt'
input_granules = [x.with_suffix('').name for x in work_dir.glob('S1*.SAFE')]
with open(parameter_file, 'w') as f:
f.write('Process: time-series\n')
f.write(f"Input Granules: {', '.join(input_granules)}\n")

# We don't compress the data because SLC data is psuedo-random
with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_STORED) as z:
z.write(gslc_path, gslc_path.name)
z.write(parameter_file, parameter_file.name)

to_keep = [
# Metadata
'sbas_list',
'parameters',
'reflocs',
'dem.rsc',
# Datasets
'dem',
'locs',
'npts',
'displacement',
'stackmht',
'stacktime',
'velocity',
]
[shutil.copy(sbas_dir / f, product_path / f) for f in to_keep]
shutil.make_archive(product_path, 'zip', product_path)
return zip_path


Expand All @@ -239,9 +247,12 @@ def time_series(
if not sbas_dir.exists():
mkdir(sbas_dir)

granule_names = load_products(granules)

bboxs = []
for granule in granules:
bboxs.append(utils.get_bbox(granule))
for name in granule_names:
# TODO: This may not work for a GSLC product created using multiple L0 granules
bboxs.append(utils.get_bbox(name))
full_bbox = unary_union(bboxs).buffer(0.1)

dem_path = dem.download_dem_for_srg(full_bbox, work_dir)
Expand All @@ -256,7 +267,7 @@ def time_series(
if bucket:
upload_file_to_s3(zip_path, bucket, bucket_prefix)

print(f'Finished time-series processing for {list(sbas_dir.glob("S1*.geo"))[0].with_suffix("").name}!')
print(f'Finished time-series processing for {", ".join(granule_names)}!')


def main():
Expand Down

0 comments on commit 98ab3f8

Please sign in to comment.