Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of SDAP-517 #11

Open
wants to merge 8 commits into
base: SDAP-517-daac-creds
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- SDAP-497: Added tool to ease building of releases. Can build from ASF distributions, git repos, and local
- SDAP-520: (Documentation) Added guide to docs for evaluating official release candidates.
### Changed
- Performance improvements to non-nexusproto dataset backend handling
### Deprecated
### Removed
### Fixed
Expand Down
4 changes: 3 additions & 1 deletion analysis/webservice/algorithms_spark/ClimMapSpark.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _map(tile_service_factory, tile_in_spark):
startTime = tile_in_spark[1]
endTime = tile_in_spark[2]
ds = tile_in_spark[3]
tile_service = tile_service_factory()
tile_service = tile_service_factory(spark=True, collections=[ds])
# print 'Started tile', tile_bounds
# sys.stdout.flush()
tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
Expand Down Expand Up @@ -195,6 +195,8 @@ def calc(self, computeOptions, **args):
# Launch Spark computations
spark_nparts = self._spark_nparts(nparts_requested)
self.log.info('Using {} partitions'.format(spark_nparts))

NexusTileService.save_to_spark(self._sc, self._ds)
rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
sum_count_part = rdd.map(partial(self._map, self._tile_service_factory))
sum_count = \
Expand Down
5 changes: 4 additions & 1 deletion analysis/webservice/algorithms_spark/CorrMapSpark.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import numpy as np

from nexustiles.nexustiles import NexusTileService
from webservice.NexusHandler import nexus_handler, DEFAULT_PARAMETERS_SPEC
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.webmodel import NexusProcessingException, NexusResults, NoDataException
Expand Down Expand Up @@ -57,7 +58,7 @@ def _map(tile_service_factory, tile_in):
# print 'days_at_a_time = ', days_at_a_time
t_incr = 86400 * days_at_a_time

tile_service = tile_service_factory()
tile_service = tile_service_factory(spark=True, collections=list(ds))

# Compute the intermediate summations needed for the Pearson
# Correlation Coefficient. We use a one-pass online algorithm
Expand Down Expand Up @@ -231,6 +232,8 @@ def calc(self, computeOptions, **args):
spark_nparts = self._spark_nparts(nparts_requested)
self.log.info('Using {} partitions'.format(spark_nparts))

NexusTileService.save_to_spark(self._sc, *self._ds)

rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
sum_tiles_part = rdd.map(partial(self._map, self._tile_service_factory))
# print "sum_tiles_part = ",sum_tiles_part.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ def spark_anomalies_driver(tile_service_factory, tile_ids, bounding_wkt, dataset
from functools import partial

with DRIVER_LOCK:
NexusTileService.save_to_spark(sc, dataset, climatology)

bounding_wkt_b = sc.broadcast(bounding_wkt)
dataset_b = sc.broadcast(dataset)
climatology_b = sc.broadcast(climatology)
Expand Down Expand Up @@ -329,7 +331,7 @@ def calculate_diff(tile_service_factory, tile_ids, bounding_wkt, dataset, climat
tile_ids = list(tile_ids)
if len(tile_ids) == 0:
return []
tile_service = tile_service_factory()
tile_service = tile_service_factory(spark=True, collections=[dataset.value, climatology.value])

for tile_id in tile_ids:
# Get the dataset tile
Expand Down
9 changes: 7 additions & 2 deletions analysis/webservice/algorithms_spark/HofMoellerSpark.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from matplotlib.ticker import FuncFormatter
from pytz import timezone

from nexustiles.nexustiles import NexusTileService
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.algorithms_spark import utils
Expand All @@ -46,7 +47,7 @@ def hofmoeller_stats(tile_service_factory, metrics_callback, tile_in_spark):
(latlon, tile_id, index,
min_lat, max_lat, min_lon, max_lon, dataset) = tile_in_spark

tile_service = tile_service_factory()
tile_service = tile_service_factory(spark=True, collections=[dataset])
try:
# Load the dataset tile
tile = tile_service.find_tile_by_id(tile_id, metrics_callback=metrics_callback, ds=dataset)[0]
Expand Down Expand Up @@ -272,7 +273,9 @@ def hof_tuple_to_dict(t, avg_var_name):
'min': t[7]}


def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark, metrics_callback, normalize_dates):
def spark_driver(sc, ds, latlon, tile_service_factory, nexus_tiles_spark, metrics_callback, normalize_dates):
NexusTileService.save_to_spark(sc, ds)

# Parallelize list of tile ids
rdd = sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
if latlon == 0:
Expand Down Expand Up @@ -372,6 +375,7 @@ def calc(self, compute_options, **args):
raise NoDataException(reason="No data found for selected timeframe")

results = spark_driver(self._sc,
ds,
self._latlon,
self._tile_service_factory,
nexus_tiles_spark,
Expand Down Expand Up @@ -429,6 +433,7 @@ def calc(self, compute_options, **args):
raise NoDataException(reason="No data found for selected timeframe")

results = spark_driver(self._sc,
ds,
self._latlon,
self._tile_service_factory,
nexus_tiles_spark,
Expand Down
8 changes: 7 additions & 1 deletion analysis/webservice/algorithms_spark/Matchup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from shapely.geometry import box
import functools

from nexustiles.nexustiles import NexusTileService
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkTornadoHandler import NexusCalcSparkTornadoHandler
from webservice.algorithms.doms import config as edge_endpoints
Expand Down Expand Up @@ -657,6 +658,8 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n
from functools import partial

with DRIVER_LOCK:
NexusTileService.save_to_spark(sc, primary_ds_name, *secondary_ds_names.split(','))

# Broadcast parameters
primary_b = sc.broadcast(primary_ds_name)
secondary_b = sc.broadcast(secondary_ds_names)
Expand Down Expand Up @@ -838,7 +841,10 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
if len(tile_ids) == 0:
return []

tile_service = tile_service_factory()
tile_service = tile_service_factory(
spark=True,
collections=[primary_b.value] + secondary_b.value.split(',')
)

# Determine the spatial temporal extents of this partition of tiles
tiles_bbox = tile_service.get_bounding_box(tile_ids, ds=primary_b.value)
Expand Down
8 changes: 7 additions & 1 deletion analysis/webservice/algorithms_spark/MatchupDoms.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from shapely.geometry import box
from shapely.geos import WKTReadingError

from nexustiles.nexustiles import NexusTileService
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.algorithms.doms import config as edge_endpoints
Expand Down Expand Up @@ -498,6 +499,8 @@ def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, secondary_ds_n
from functools import partial

with DRIVER_LOCK:
NexusTileService.save_to_spark(sc, primary_ds_name, *secondary_ds_names.split(','))

# Broadcast parameters
primary_b = sc.broadcast(primary_ds_name)
secondary_b = sc.broadcast(secondary_ds_names)
Expand Down Expand Up @@ -619,7 +622,10 @@ def match_satellite_to_insitu(tile_ids, primary_b, secondary_b, parameter_b, tt_
if len(tile_ids) == 0:
return []

tile_service = tile_service_factory()
tile_service = tile_service_factory(
spark=True,
collections=[primary_b.value] + secondary_b.value.split(',')
)

# Determine the spatial temporal extents of this partition of tiles
tiles_bbox = tile_service.get_bounding_box(tile_ids)
Expand Down
6 changes: 5 additions & 1 deletion analysis/webservice/algorithms_spark/MaximaMinimaSpark.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import shapely.geometry
from pytz import timezone

from nexustiles.nexustiles import NexusTileService
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
Expand Down Expand Up @@ -212,6 +213,8 @@ def calc(self, compute_options, **args):
spark_nparts = self._spark_nparts(nparts_requested)
self.log.info('Using {} partitions'.format(spark_nparts))

NexusTileService.save_to_spark(self._sc, ds)

rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
max_min_part = rdd.map(partial(self._map, self._tile_service_factory, min_elevation, max_elevation))
max_min_count = \
Expand Down Expand Up @@ -297,7 +300,8 @@ def _map(tile_service_factory, min_elevation, max_elevation, tile_in_spark):
startTime = tile_in_spark[1]
endTime = tile_in_spark[2]
ds = tile_in_spark[3]
tile_service = tile_service_factory()

tile_service = tile_service_factory(spark=True, collections=[ds])

tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)

Expand Down
5 changes: 4 additions & 1 deletion analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import shapely.geometry
from pytz import timezone

from nexustiles.nexustiles import NexusTileService
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
Expand Down Expand Up @@ -203,6 +204,8 @@ def calc(self, compute_options, **args):
spark_nparts = self._spark_nparts(nparts_requested)
self.log.info('Using {} partitions'.format(spark_nparts))

NexusTileService.save_to_spark(self._sc, ds)

rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
metrics_record.record_metrics(partitions=rdd.getNumPartitions())
sum_count_part = rdd.map(partial(self._map, self._tile_service_factory, metrics_record.record_metrics, min_elevation, max_elevation))
Expand Down Expand Up @@ -278,7 +281,7 @@ def _map(tile_service_factory, metrics_callback, min_elevation, max_elevation, t
startTime = tile_in_spark[1]
endTime = tile_in_spark[2]
ds = tile_in_spark[3]
tile_service = tile_service_factory()
tile_service = tile_service_factory(spark=True, collections=[ds])

tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)

Expand Down
10 changes: 7 additions & 3 deletions analysis/webservice/algorithms_spark/TimeSeriesSpark.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import itertools
import logging
import traceback
from io import StringIO
from datetime import datetime
from functools import partial
from io import StringIO

import matplotlib.dates as mdates
import matplotlib.pyplot as plt
Expand All @@ -29,10 +29,12 @@
from backports.functools_lru_cache import lru_cache
from pytz import timezone
from scipy import stats

from nexustiles.nexustiles import NexusTileService
from webservice import Filtering as filtering
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.algorithms_spark import utils
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException

EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
Expand Down Expand Up @@ -464,6 +466,8 @@ def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory, metric
in np.array_split(daysinrange, spark_nparts)]

# Launch Spark computations
NexusTileService.save_to_spark(sc, ds)

rdd = sc.parallelize(nexus_tiles_spark, spark_nparts)
metrics_callback(partitions=rdd.getNumPartitions())
results = rdd.flatMap(partial(calc_average_on_day, tile_service_factory, metrics_callback, normalize_dates, min_elevation, max_elevation)).collect()
Expand All @@ -482,7 +486,7 @@ def calc_average_on_day(tile_service_factory, metrics_callback, normalize_dates,
(bounding_polygon, dataset, timestamps, fill) = tile_in_spark
if len(timestamps) == 0:
return []
tile_service = tile_service_factory()
tile_service = tile_service_factory(spark=True, collections=[dataset])

logger.info(f'{max_elevation=} {min_elevation=}')

Expand Down
5 changes: 4 additions & 1 deletion analysis/webservice/algorithms_spark/VarianceSpark.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import shapely.geometry
from pytz import timezone

from nexustiles.nexustiles import NexusTileService
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
Expand Down Expand Up @@ -212,6 +213,8 @@ def calc(self, compute_options, **args):
spark_nparts = self._spark_nparts(nparts_requested)
self.log.info('Using {} partitions'.format(spark_nparts))

NexusTileService.save_to_spark(self._sc, ds)

rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
sum_count_part = rdd.map(partial(self._map, self._tile_service_factory, min_elevation, max_elevation))
sum_count = \
Expand Down Expand Up @@ -317,7 +320,7 @@ def _map(tile_service_factory, min_elevation, max_elevation, tile_in_spark):
startTime = tile_in_spark[1]
endTime = tile_in_spark[2]
ds = tile_in_spark[3]
tile_service = tile_service_factory()
tile_service = tile_service_factory(spark=True, collections=[ds])

tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def set_modules(self, module_dir, algorithm_config, remote_collections=None, max
NexusHandler.executeInitializers(algorithm_config)

self.log.info("Initializing request ThreadPool to %s" % max_request_threads)
tile_service_factory = partial(NexusTileService, algorithm_config)
tile_service_factory = partial(NexusTileService.instance, algorithm_config)
handler_args_builder = HandlerArgsBuilder(
max_request_threads,
tile_service_factory,
Expand Down
9 changes: 8 additions & 1 deletion data-access/nexustiles/AbstractTileService.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,32 @@
import numpy as np
import numpy.ma as ma

from datetime import datetime


class AbstractTileService(ABC):
def __init__(self, dataset_name):
self._name = dataset_name
self._last_updated = datetime.now()

def heartbeat(self) -> bool:
return True

def update(self, force: bool=False) -> bool:
def update(self, force: bool = False, load: bool = False) -> bool:
"""
If applicable, verify the underlying connection(s) are still open and valid, replacing them if necessary

Default implementation is to do nothing and assume all is valid.

@param load:
@param force: Forcibly replace underlying connections.
@return: True if valid and successful, False if dataset is now unreachable and therefore should not be used
"""
return True

def last_updated(self):
return self._last_updated

@abstractmethod
def get_dataseries_list(self, simple=False):
raise NotImplementedError()
Expand Down
44 changes: 31 additions & 13 deletions data-access/nexustiles/backends/zarr/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,20 @@ def __init__(self, dataset_name, path, config=None):
if config.get('verified', False):
self.__corrections = []

self.__ds: xr.Dataset = self.__open_ds()
self.__ds_instance: xr.Dataset = self.__open_ds()
self.__open = True

@property
def __ds(self):
if not self.__open:
logger.info('Existing dataset instance has been closed, reopening...')
self.__ds = self.__open_ds()
self.__open = True
return self.__ds_instance

@__ds.setter
def __ds(self, ds):
self.__ds_instance = ds

def __open_ds(self) -> xr.Dataset:
if self.__store_type in ['', 'file']:
Expand Down Expand Up @@ -175,22 +188,27 @@ def __open_ds(self) -> xr.Dataset:
logger.error(f'Failed to open zarr dataset at {self.__path}, ignoring it. Cause: {e}')
raise NexusTileServiceException(f'Cannot open dataset ({e})')

def update(self, force: bool = False) -> bool:
if force or (self.__credentials is not None and not self.__credentials.is_valid()):
try:
def update(self, force: bool = False, load: bool = False) -> bool:
try:
if force or (self.__credentials is not None and not self.__credentials.is_valid()):
logger.info(f'Refreshing zarr dataset {self._name} at {self.__path}')
self.__ds = self.__open_ds()
except Exception as e:
logger.error('Backend update failed')
logger.exception(e)
return False
self.__open = False
self.__credentials.renew()
# self.__ds = self.__open_ds()
self._last_updated = datetime.now()

return True
if load:
_ = self.__ds

return True
except Exception as e:
logger.error('Backend update failed')
logger.exception(e)
return False

def heartbeat(self) -> bool:
# TODO: This is temporary, eventually we should use the logic to be introduced for SDAP-517 (PR#312) to evaluate
# if data is accessible currently.
return True
# True if cred management is not needed or if managed creds are valid
return self.__credentials is None or self.__credentials.is_valid()

def get_dataseries_list(self, simple=False):
ds = {
Expand Down
Loading