Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of SDAP-517 #11

Open
wants to merge 8 commits into
base: SDAP-517-daac-creds
Choose a base branch
from

Conversation

RKuttruff
Copy link
Owner

@RKuttruff RKuttruff commented Jul 17, 2024

Try to minimize time spent in xarray.open_zarr after SDAP startup. We should run open_zarr for each new dataset in webapp driver at least once upon discovery to ensure validity. This is an especially prevalent issue with Spark. The Spark workers would open ALL datasets every time they were given a task, which could introduce severe performance penalties.

This PR will:

  • Share the opened dataset(s) required for Spark algorithms to the workers. (done: note 1)
  • Minimize time for Zarr backend updates
    • Should avoid regular blocking call to open_zarr. Either make it lazy (open on use), or run it asynchronously (use a future)
      • Did lazy open with checks it's always done in driver
    • Credential rotation can stay as is re: blocking behavior

Note 1 - Spark Algs

Despite efforts to do so, I could not find a way to make this behavior automatic. There are some manual steps that need to be taken in the Spark algorithm definition. These are fortunately fairly simple and, if done incorrectly or if something goes wrong, the old behavior should be used as a fallback.

  1. The Spark driver code should invoke NexusTileService.save_to_spark using the SparkContext object from the NexusCalcSparkHandler or the SDAP webservice.nexus_tornado.app_builders.SparkContextBuilder.SparkContextBuilder.get_spark_context() method and all the datasets that will be worked with.
  2. The executor code should get its NexusTileService instance from the provided tile_service_factory with the kwargs spark=True, collections=[...] where the collections kwarg is a list of all the dataset names saved in step 1.
def spark_driver(tiles, ds1, ds2, tile_service_factory, sc, spark_nparts=1):
  NexusTileService.save_to_spark(sc, ds1, ds2)
  
  tiles_spark = [(ds1, ds2, tile) for tile in tiles]
  
  rdd = sc.parallelize(tiles_spark, spark_nparts)
  results = rdd.flatMap(partial(calc_executor, tile_service_factory)).collect()
  results = list(itertools.chain.from_iterable(results))
  return results

def calc_executor(tile_service_factory, spark_tile):
  tile, ds1, ds2 = spark_tile

  tile_service = tile_service_factory(spark=True, collections=[ds1, ds2])
  
  # Do work

  return result

rileykk added 4 commits July 17, 2024 07:47
May end up able to walk back some of these. The dask update did what I wanted, but I updated a bunch of other deps while trying to find out. Xarray deps are somewhat complicated, so it may be best to leave the deps as-is unless something is breaking.
For Spark, ensure the dataset is opened before saving it to HDFS
@RKuttruff RKuttruff marked this pull request as ready for review July 22, 2024 18:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant