Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: ENH: Basic implementation of SGD #69

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

stsievert
Copy link
Member

@stsievert stsievert commented Mar 11, 2018

This implements the popular mini-batch stochastic gradient descent in dask-glm. At each iteration, it grabs batch_size examples, computes the gradients for these examples, and then updates the parameter accordingly.

The main benefit of SGD is that the convergence time does not depend on the number of examples. Here, "convergence time" means "number of arithmetic operations", not "wall-clock time until completion".

TODOs:

  • finish implementation
  • resolve bugs
  • test

@mrocklin
Copy link
Member

This is cool to see. Have you tried this on a dataset, artificial or otherwise? Do you have a sense for the performance bottlenecks here? The first thing I would do here would be to run this with the distributed scheduler on a single machine and watch the dashboard during execution. I suspect that it would be illiuminating

@stsievert
Copy link
Member Author

@mrocklin and I have had some discussion over at dask/dask#3251 about this on one of the bugs that I encountered while implementing this. The takeaway from this discussion was that I should randomly shuffle the dataset every epoch and walk sequentially through it, not sample the dataset at random.

I'm now curious what kinds of parallel SGD algorithms exist. dask/dask#3251 (comment)

Most parallel SGD algorithms I've seen rely on some sparsity constraints (e.g., Hogwild!, Hogwild++, Cyclades). Practically, there's a much easier method to parallelize SGD: parallelize the gradient computation for a mini-batch. In practice for deep learning at least, that's the main bottleneck.

futures samples asynchronously to overlap communication and computation

Got it. I see what you're saying. The futures API should make that pretty convenient.

We are computing the gradient for different examples, then summing them. Could we use map_blocks to compute the gradient locally for each chunk, then send the gradients computed from each chunk back to the master? Or maybe a better question: would this change anything?

(I meant to send this in as soon as PR filed; sorry the delay)

@stsievert
Copy link
Member Author

Have you tried this on a dataset, artificial or otherwise? Do you have a sense for the performance bottlenecks here?

This is a very early work (the code isn't even error-free), and I only filed this PR to move the discussion from the other unrelated issue. I'll look more into performance bottlenecks after I finish the implementation.

@mrocklin
Copy link
Member

My understanding of Hogwild! and similar algorithms is that they expect roundtrip latencies in the microseconds, which is, for them, a critical number for performance. Is this understanding correct? If so then how does a dynamic distributed system like Dask (which has latencies in the several milliseconds) manage?

The takeaway from this discussion was that I should randomly shuffle the dataset every epoch and walk sequentially through it, not sample the dataset at random

How many epochs are there? Are you expected to use all of the data every epoch? You should be aware that while shuffling is much faster than n random accesses, it's still quite slow. There might be other approaches, like gathering a sizable random sample (like 10% of the data) 10x more frequently.

This is a very early work (the code isn't even error-free), and I only filed this PR to move the discussion from the other unrelated issue. I'll look more into performance bottlenecks after I finish the implementation.

Understood, and I really appreciate the early submission to stimulate discussion. I think that most of my questions so far have been aiming to to the following point: Our cost model has changed dramatically from what it was on a multi-core machine. We should build intuition sooner rather than later to help inform algorithm decisions.

We are computing the gradient for different examples, then summing them. Could we use map_blocks to compute the gradient locally for each chunk, then send the gradients computed from each chunk back to the master? Or maybe a better question: would this change anything?

Sure, that's easy to do, that seems quite different than the SGD approach you were mentioning earlier though that avoids looking at all of the data on each iteration.

@mrocklin
Copy link
Member

In case you haven't seen it already you might want to look at #57

@mrocklin
Copy link
Member

Ah, as before I see that you've already covered this ground: #57 (comment)

@stsievert
Copy link
Member Author

expect roundtrip latencies in the microseconds, which is, for them, a critical number for performance. Is this understanding correct?

In the literature the latency unit is number of writes to the parameter vector (e.g., section 2.1 of taming the wild). In the limiting case where bandwidth is an issue, I don't see how halving the communication bandwidth could be an issue (but it could be if stragglers are considered).

How many epochs are there? Are you expected to use all of the data every epoch? You should be aware that while shuffling is much faster than n random accesses, it's still quite slow

Normally less than 100 (~100 for CIFAR-10 or ImageNet, ~10–20 for MNIST). In the case of the taxicab dataset, I'd imagine less: it's a simple model with many data points.

Is that acceptable? I'm okay shuffling the data completely infrequently and shuffling on each machine infrequently. Either way, I think this will provide gains over computing the gradient for all examples.

Our cost model has changed dramatically from what it was on a multi-core machine. We should build intuition sooner rather than later to help inform algorithm decisions.

Agreed. We should avoid premature optimization and fix the problems we see.

@mrocklin
Copy link
Member

Full shuffles on the nyc taxi cab dataset on a modest sized cluster take a minute or so. Pulling a random GB of data or so from the cluster and then shuffling it locally would likely take a few seconds. I could also imagine doing shuffling asynchronously while also pulling data locally.

BUG: dataframe size info not exact and indexing needed

squash

Getting rid of ASGD
@mrocklin
Copy link
Member

Have you had a chance to run this while looking at the dashboard? If not let me know and we can walk through this together. I think that it will help.

@stsievert
Copy link
Member Author

stsievert commented Mar 23, 2018

Yeah, I've had a chance to look at the dashboard (after making sure the code optimizes successfully). It looks like indexing is the bottleneck, I believe it spends about 67% of the time there. At least, that portion of the dashboard remains constant when "getitem" is selected instead of "All" from the dropdown menu.

I think something is high bandwidth too, some task is using up to 20Mb/s. That seems high, especially since the iterations are not fast, my beta is only 20 elements and 32 examples are used to approximate the gradient. Could indexing be responsible for this? I don't think dot products or scalar functions are responsible.

I think we can use map_blocks to get around this issue (well, if this is an issue), which would rely on the fact that gradient(X, beta) = sum(gradient(x, beta) for x in X) for most losses (including GLMs). This would only require the communication of beta, which has the same communication cost as 1 example (at least for a linear model).

@stsievert
Copy link
Member Author

stsievert commented Mar 26, 2018

I've played with this a little more, encountered some performance issues and thought of some (possible) solutions. Shuffling the arrays (algorithms.py#L189) takes a long time on my local machine, even if it's once every epoch. This issue can be resolved by moving the indexing to each array chunk, which means that we wouldn't ever have to move the entire dataset.

This would need to make one assumption: that each row in X is not split into different blocks, or the chunks are only along the first dimension. SGD is designed for when the number of examples is very large, and it's typically much larger than the dimension of each example (e.g., the NYC taxicab dataset: 19 features, millions of examples).

The implementation would look something like

def _sgd_grad(family, Xbeta, X, y, idx, chunks, block_id=0):
    i = _proper_indices(idx, block_id, chunks)
    return family.grad(Xbeta[i], X[i], y[i])

for k in range(100):
    i = np.random.choice(n, size=(batch_size,))
    grad = da.map_blocks(_sgd_grad, famliy, Xbeta, X, y, i, X.chunks)
    grad = grad.reshape(...).sum(axis=...)
    ...

I think I'll point to this idea tomorrow @mrocklin.

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small comments. Is there a test dataset that you're using for this work?

raise ValueError('SGD needs shape information to allow indexing. '
'Possible by passing a computed array in (`X.compute()` '
'or `X.values.compute()`), then doing using '
'`dask.array.from_array ')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably won't work well on a larger dataset on a cluster. We should probably find a better approach here if possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've allowed passing a keyword arg in to get the number of examples. I print an error message if it's not present, and provide helpful examples with a distributed dataframe (implementation is at algorithms.py#_get_n).

Is this more of what you'd like to see? I debated going to a dataframe and computing the length from that, but this option was cleanest and allowed the user to use outside information.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general this is something that we should probably push upstream into dask.array proper. It looks like there is an upstream issue here: dask/dask#3293

Short term if you're experimenting then I say that we just go with this and move on. Long term I don't think that it would be reasonable to ask users to provide this information, but if we're ok not merging this implementation (which given performance issues seems wise) then it might be best to just blow past it for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving to the upstream issue sounds best to me.

for epoch in range(epochs):
j = np.random.permutation(n)
X = X[j]
y = y[j]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned by @stsievert this can be inefficient both because it causes a shuffle, and because we're pushing a possibly biggish array through the scheduler. Ideally we would find ways to avoid explicit indexing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might also force a dask array to a single chunk?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another approach would be to add a random column, switch to dask.dataframe, sort by that column using set_index, and then come back to an array. This avoids the movement of a possibly large numpy array, incurs extra costs from moving to a dask.dataframe and back (probably not that large) and keeps the costs of doing a full shuffle.

@stsievert
Copy link
Member Author

There is a dask/sklearn/skimage sprint soon, and will help achieve some of the ML goals: scisprints/2018_05_sklearn_skimage_dask#1 (comment), specifically on scaling to larger datasets. I'll try to put in time during the sprint to implement this, regardless if I'm unable to attend (which may happen for a variety of concerns).

cc @mrocklin

@mrocklin
Copy link
Member

mrocklin commented May 5, 2018 via email

@stsievert
Copy link
Member Author

stsievert commented Jul 27, 2018

I've updated this, with help from dask/dask#3407. This implementation relies on slicing a Dask array with a NumPy array. I'm not convinced this is the best approach. I generate something like 200k jobs when I index with Dask slices for a feature matrix with size (1015701, 20) and a batch size of 1000.

I'll try to rework for the re-indexing approach mentioned in
revisiting #69 (comment).

Base automatically changed from master to main February 10, 2021 01:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants