Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing plan/notes #1

Open
ian-whitestone opened this issue Oct 14, 2018 · 4 comments
Open

Testing plan/notes #1

ian-whitestone opened this issue Oct 14, 2018 · 4 comments

Comments

@ian-whitestone
Copy link
Owner

ian-whitestone commented Oct 14, 2018

Testing Plan

Dummy Credit Card Application Dataset

Test 1

  • Read in each dataset into a dataframe
    • time creating the dataframe for each
  • Join the dataframes
  • Filter out USA
  • Convert to pandas dataframe

Leave the default schedulers

Modifications

  • Fiddle with the dask schedulers
  • Fiddle with spark settings - control # of workers somehow?

Test 2 - Runing Some Calcs

  • Read in each dataset into a dataframe
    • time creating the dataframe for each
  • Join the dataframes
  • Do some column level calculations
    • min,max,avg credit limit, model score
  • How many errors (in fulfillment, in scoring)

Modifications

  • Group by binned date and re-run calculations

Test 3 - Running some Python UDFs

  • Read in each dataset into a dataframe
    • time creating the dataframe for each
  • Join the dataframes
  • Run some UDFs
    • strip '-'s from phone number

Test 4 - Scaling on a Single Machine

  • increment number of workers for dask and spark and build a graph of run time
    • filter and cnt
    • join, filter and convert to pandas
    • for dask do it for threaded scheduler and processes scheduler
    • show diminishing returns of adding more threads..

NYC Taxi Public Dataset

Coming soon..

Sample ETL Workflows

@ian-whitestone
Copy link
Owner Author

ian-whitestone commented Oct 15, 2018

Spark Things

Tweaking pyspark config:

conf = (SparkConf()
        .setAppName("implicit_benchmark")
        .setMaster('local[*]') # Run Spark locally with as many worker threads as logical cores on your machine.
        .set('spark.driver.memory', '16G')
        )

Spark repartitioning:

@ian-whitestone
Copy link
Owner Author

ian-whitestone commented Oct 24, 2018

Dask Things

  • revisit processes scheduler with less partitions

    • This is probably why processes is so slow: However moving data to remote processes and back can introduce performance penalties, particularly when the data being transferred between processes is large. The multiprocessing scheduler is an excellent choice when workflows are relatively linear, and so do not involve significant inter-task data transfer, and when inputs and outputs are both small, like filenames and counts.
  • try using distributed...

  • play with local diagnostics

  • find way to demonstrate impact of having too many tasks..

resources:

notes from talking to Martin Durant:

  • make sure each task has a sizeable chunk of work, scheduler adds some more overhead so if you can reduce the number of tasks taht is better (as long as they aren't too big)
  • spark is great for sql-like workflows
  • dask.distributed has become the go-to for single machine scheduler as well (largely due to the diagnostics)

@ian-whitestone
Copy link
Owner Author

ian-whitestone commented Nov 14, 2018

Test this out: dask/dask-yarn#28
Make a PR for this: https://github.com/dask/dask/issues/4110

@ian-whitestone
Copy link
Owner Author

ian-whitestone commented Dec 20, 2018

Raise issues for the following:

  1. raise a new issue in S3FS for how it just returns no files if your tokens aren't refreshed but doesn't raise errors.

  2. Invalid paths returns weird errors. Should return no files or empty bag.

>>> bag = dask.bag.read_avro(urlpath, storage_options = {'profile_name': AWS_PROFILE})
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/anaconda3/envs/gandalf/lib/python3.6/site-packages/dask/bag/avro.py", line 103, in read_avro
    heads, sizes = zip(*out)
ValueError: not enough values to unpack (expected 2, got 0)
  1. Empty bag to dataframe to pandas returns a empty tuple, you can't figure this out until you do a compute. May not be a way to fix this..check if you can join empty dask dataframes, since if you convert to pandas first you can do a manual check there.

dask/dask#4321

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant