Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review and potentially simplify the implementation #132

Open
IvanUkhov opened this issue Feb 1, 2024 · 25 comments
Open

Review and potentially simplify the implementation #132

IvanUkhov opened this issue Feb 1, 2024 · 25 comments
Assignees
Labels
enhancement New feature or request

Comments

@IvanUkhov
Copy link

IvanUkhov commented Feb 1, 2024

Hello,

Thank you for the package! As discussed in keras-team/tf-keras#107, I am wondering if it would be possible to make use of the following example to make the package even better:

https://blog.ivanukhov.com/2024/01/31/gradient-accumulation.html

@andreped andreped added the enhancement New feature or request label Feb 1, 2024
@andreped
Copy link
Owner

andreped commented Feb 1, 2024

As discussed in the other thread, it would be of interest to test the proposed solution as it works for the latest implementation of the Optimizer class and may work in distributed settings.

Are you able to have a look at this, @tno123?


EDIT: For now I suggest first testing if using the implementation above in a distributed setting works out-of-the-box using the latest TF version (TF==2.15). You can make a Jupyter Notebook and test it for free on Kaggle where you will have access to 2 GPUs.

You can likely base the notebook on this test:
https://github.com/andreped/GradientAccumulator/blob/main/tests/test_optimizer_distribute.py

@andreped andreped assigned tno123 and unassigned andreped Feb 1, 2024
@tno123
Copy link
Collaborator

tno123 commented Feb 1, 2024

Sure, I'll take a look 👍

@andreped
Copy link
Owner

andreped commented Feb 1, 2024

Feel free to tag in @IvanUkhov, if you have any questions regarding the implementation or whatnot.

Might be that we should add support for his implementation to this framework, regardless whether this worked fine in a distributed setting, as it uses the newest Optimizer implementation. Likely we should also move to the new one. It seems to require less code than the old legacy Optimizer.

@tno123
Copy link
Collaborator

tno123 commented Feb 1, 2024

@IvanUkhov
I am getting a few errors (might be due to old test code):

            opt = CumulativeAdam()

            # compile model
            model.compile(
                optimizer=opt,
                loss=tf.keras.losses.CategoricalCrossentropy(),
                metrics=["acc"],
                run_eagerly=False,
            )

        # train model
        model.fit(
            ds_train,
            epochs=10,
            validation_data=ds_test,
            verbose=1
        )

errors:

  File "/opt/conda/lib/python3.10/site-packages/keras/src/utils/traceback_utils.py", line 65, in error_handler

  File "/opt/conda/lib/python3.10/site-packages/keras/src/engine/training.py", line 1807, in fit

3 root error(s) found.
  (0) INVALID_ARGUMENT:  You must feed a value for placeholder tensor 'cond/else/_1/cond/Placeholder_1' with dtype int32
	 [[{{node cond/Placeholder_1}}]]
	 [[cond/else/_1/cond/StatefulPartitionedCall/cond/output/_307/_218]]
	 [[cond/then/_0/cond/StatefulPartitionedCall/div_no_nan_1/ReadVariableOp_3/_272]]
  (1) INVALID_ARGUMENT:  You must feed a value for placeholder tensor 'cond/else/_1/cond/Placeholder_1' with dtype int32
	 [[{{node cond/Placeholder_1}}]]
	 [[cond/else/_1/cond/StatefulPartitionedCall/cond/output/_307/_218]]
  (2) INVALID_ARGUMENT:  You must feed a value for placeholder tensor 'cond/else/_1/cond/Placeholder_1' with dtype int32
	 [[{{node cond/Placeholder_1}}]]
0 successful operations.
0 derived errors ignored. [Op:__inference_fn_with_cond_42426]

Any ideas what might be causing them? Alternatively, do you have some complete training code that you can share?

Thanks in advance!

@IvanUkhov
Copy link
Author

IvanUkhov commented Feb 1, 2024

Sure! My use case is distributed training, and I have been following closely this tutorial under Training loop:

https://www.tensorflow.org/tutorials/distribute/custom_training#training_loop

I have not tried with Model.fit, but I will take a look at what happens there.

@andreped
Copy link
Owner

andreped commented Feb 1, 2024

@tno123 Could you share the notebook/gist/script that reproduces the issue? That way, @IvanUkhov can use it for debugging as well :]

@IvanUkhov
Copy link
Author

IvanUkhov commented Feb 1, 2024

Just quickly opened this tutorial in Colab and replaced the optimizer:

https://www.tensorflow.org/tutorials/quickstart/beginner

There is no error. Not sure what the difference is with respect to that test setup you mentioned.

Skärmavbild 2024-02-01 kl  21 21 04

@andreped
Copy link
Owner

andreped commented Feb 1, 2024

Just quickly opened this tutorial in Colab

@IvanUkhov I believe @tno123 tested this in Kaggle, where he has access to two GPUs. Hence, he can test whether using this implementation with multiple GPUs would work out-of-the-box.

I made a very simple Jupyter Notebook, which reproduces the issue. I have made it publicly available at:
https://www.kaggle.com/code/andreped/cumulativeadam-distributed-test

The notebook is setup to use two GPUs already, so you should be able to reproduce the issue if you were to run it yourself.

Feel free to use the notebook as a base to do your own experiments, and feel free to share an updated notebook, when you have gotten something working, or found some interesting observations :] We can use this thread to share experiences.

If you only select one GPU, this script runs fine. The problem is when you want to use multiple GPUs, as highlighted in my notebook. Hence, in CoLab this script should run fine, as you only have access to a single GPU.

@IvanUkhov
Copy link
Author

Thank you for the pointers! I have managed to reproduce the problem with the following tutorial on my machine with TensorFlow 2.15 and two GPUs:

https://www.tensorflow.org/guide/distributed_training

Please give a try to the updated version:

https://blog.ivanukhov.com/2024/01/31/gradient-accumulation.html

It works now. Here is the main change:

Skärmavbild 2024-02-02 kl  09 59 19

I missed this section:

https://github.com/keras-team/keras/blob/v2.15.0/keras/optimizers/optimizer.py#L635-L638

@andreped
Copy link
Owner

andreped commented Feb 2, 2024

It works now. Here is the main change:

@IvanUkhov Sounds great! Could you run some tests using it, @tno123? :]

@tno123
Copy link
Collaborator

tno123 commented Feb 2, 2024

Nice! I'll test it now 👍

Edit: It seems to work! It is training, but I'll do some more tests just to make sure everything is good.

@andreped
Copy link
Owner

andreped commented Feb 4, 2024

Edit: It seems to work! It is training, but I'll do some more tests just to make sure everything is good.

Any update, @tno123? :]

Getting it to mechanically run with multiple GPUs might not necessarily mean that it has the right behaviour. One example where my implementation works is that I can make it train with 2 GPUs, but what I want is to distribute the batch across two GPUs, hence, reducing the VRAM required per GPU by half. What I tend to see is that this is not the case. It seems to allocate the same across each GPUs as it would use if it used a single GPU.

What are you seeing currently?

@roebel
Copy link

roebel commented Feb 4, 2024

Hello

I have myself implemented a subclass of the keras.Model to support gradient accumulation, and this has worked fine for a while. But I would much prefer the approach with an Optimizer Subclass instead, which appears to be simpler to combine with mixed_precision training. Now I looked into the implementation that you have proposed here, and I feel that I really would like to use this. Obviously it would be even better if this would be available in keras directly.

I wonder about a few points though:
Is it not a problem to call apply_gradients with zero value gradients all the time? I imagine this would change the internal gradient stats, which means when you replace Adam with batch size 32 by CumulAdam with accumulation 4 and batch size 8 you would probably not get similar results.
Also the optimizer iteration will no longer reflect the number of updates that have been performed and this may affect the learning rate if one uses a learning rate scheduler

It appears to me that all this can be circumvented easily, by means of avoiding the call to apply_gradients.
Further, by means of storing all helper variabels in a dedicated class, one can remove the helper variables from the checkpoint, such that the CumulAdam and Adam can be exchanged without problem given the same checkpoint.
This would mean the accumulation object (see below) could be part of the standard optimizers because if accumulation is = 1, then the CumulAdam behaves exactly like the original. And for accumulation > 1 besides aggregating bacthes it does not appear to change anything either.

What do you think about the following implementation? I tried this with different distribution strategies and it appers to work without problems (but I have a single GPU only). The point I am not so sure about is whether the gradients would not need to be divided by accumulation. Would that not depend on the loss reduction strategy that is used; If it is SUM this seems fine because gradients will always be summed over the batches but if it is SUM_OVER_BATCH_SIZE, one would need to divide by accumulation.

class Accumulator(object):
    def __init__(self, accumulation, optimizer):
        self.accumulation = accumulation
        self._gradients = None
        self._n_acum_step = None
 
    def accumulate(self, gradients: list[tf.Tensor]) -> None:

        self.build(gradients)
        self._n_acum_step.assign_add(1)
        # Add the new gradients to the old ones.
        for gradient, addition in zip(self._gradients, gradients):
            gradient.assign_add(addition)

    def reset(self) -> None:
        # reset
        self._n_acum_step.assign(0)
        for ga in self._gradients:
            ga.assign(tf.zeros_like(ga, dtype=tf.float32)) 

    def build(self, variables: list[tf.Tensor]) -> None:
        """Initialize the internal state for the given trainable variables."""
        if self._gradients is None:
            # Allocate memory for accumulation.
            self._gradients = [
                tf.Variable(tf.zeros_like(variable), trainable=False)
                for variable in variables
            ]
            self._n_acum_step = tf.Variable(0, trainable=False, dtype=tf.int32)


@tf.keras.utils.register_keras_serializable()
class CumulativeAdam(tf.keras.optimizers.Adam):
    """Optimizer that implements the Adam algorithm with gradient accumulation."""

    def __init__(self, accumulation: int = 1, **options) -> None:
        """Create an instance.

        Arguments:
          accumulation: The number of iterations to accumulate gradients over.
          If it is set to one, no accumulation is performed, and the gradients
          are applied as soon as they are computed. If it is set to a value
          greater than one, the gradients will be accumulated for the specified
          number of iterations and only then applied, starting a new cycle.

        All other arguments are passed to `tf.keras.optimizers.Adam`.
        """
        super().__init__(**options)
        self._gradient_accumulator = Accumulator(accumulation=accumulation, optimizer=self)
        

    def apply_gradients(self, pairs: list[tuple[tf.Tensor, tf.Tensor]]) -> tf.Tensor:
        """Apply the gradients according to the accumulation scheme."""
        # Split off the gradients from the trainable variables.
        gradients, variables = zip(*list(pairs))
        if self._gradient_accumulator.accumulation == 1:
            # Apply the gradients to the trainable variables.
            return super().apply_gradients(pairs)
        else:
            # Perform the accumulation and apply gradients if necessary
            self._gradient_accumulator.accumulate(gradients)
            return tf.cond(
                    tf.equal(self._gradient_accumulator._n_acum_step, self._gradient_accumulator.accumulation), 
                    true_fn = lambda: self.apply_accumulated_gradients(variables), 
                    false_fn = lambda: self.iterations
                )            

    def apply_accumulated_gradients(self, variables) -> tf.Tensor:
        # apply accumulated gradients
        ret = super().apply_gradients(zip(self._gradient_accumulator._gradients, variables))
        self._gradient_accumulator.reset()
        return ret
                 

@IvanUkhov
Copy link
Author

IvanUkhov commented Feb 5, 2024

This may affect the learning rate if one uses a learning rate scheduler

Yes, this is something I was willing to sacrifice and have a note about in the article.

Is it not a problem to call apply_gradients with zero value gradients all the time? I imagine this would change the internal gradient stats.

It is, and it is a serious problem. Thank you for pointing this out. Indeed, the momentum or anything else the base optimizer happens to have will be updated with zero gradients.

It appears to me that all this can be circumvented easily, by means of avoiding the call to apply_gradients.

That was my original implementation, but I could not get it to work. The problem was in conditional statements. They cause the following exception (the same with your code):

RuntimeError: merge_call called while defining a new graph or a tf.function. This can often happen if the function fn passed to strategy.run() contains a nested @tf.function, and the nested @tf.function contains a synchronization point, such as aggregating gradients (e.g, optimizer.apply_gradients), or if the function fn uses a control flow statement which contains a synchronization point in the body. Such behaviors are not yet supported. Instead, please avoid nested tf.functions or control flow statements that may potentially cross a synchronization boundary, for example, wrap the fn passed to strategy.run or the entire strategy.run inside a tf.function or move the control flow out of fn. If you are subclassing a tf.keras.Model, please avoid decorating overridden methods test_step and train_step in tf.function.

That is why you see those two scaling factors in my implementation; they bypass the conditioning.

The bottom line is that we need to figure out how to get a conditional into apply_gradients or alike.

@IvanUkhov
Copy link
Author

IvanUkhov commented Feb 5, 2024

Here is one alternative. Redefine update_step, which is still legitimate as part of the public interface:

def update_step(self, gradient: tf.Tensor, variable: tf.Tensor) -> None:
    """Update the trainable variable with the gradient."""
    update_step = super().update_step
    last = (self._accumulation + 1) % self.accumulation == 0
    # Allow the update to happen only at the end of each cycle.
    tf.cond(last, lambda: update_step(gradient, variable), lambda: None)

https://blog.ivanukhov.com/2024/01/31/gradient-accumulation.html

@roebel
Copy link

roebel commented Feb 5, 2024

That was my original implementation, but I could not get it to work. The problem was in conditional statements. They cause the following exception (the same with your code):

Ok, but as I said, I have run it locally with the code you had put on kaggle, and I don't get an exception there. Unfortunately, I could not run it on kaggle because they don't manage to send me an email for account validation before the email validation link becomes invalid.

@IvanUkhov
Copy link
Author

Yeah, it appears when you run it on more than one GPU. I think it takes some shortcuts internally otherwise.

@andreped
Copy link
Owner

andreped commented Feb 5, 2024

they don't manage to send me an email for account validation before the email validation link becomes invalid.

That's unfortunate. You should try again later. Kaggle is the only way I have found to have access to two GPUs for free for doing similar tests.

Yeah, it appears when you run it on more than one GPU. I think it takes some shortcuts internally otherwise.

This is exactly what happens in CoLab for single or no GPU as well. Hence, why you need at least two GPUs to properly debug this issue, sadly...

@roebel
Copy link

roebel commented Feb 5, 2024

Ok, I have access to another server with 4 GPUs, and I also now have got the kaggle notebook running.

I think I understand what the problem is and see that my proposed solution will not work when combined with multi gpu strategies. The update_step seems to be a good idea. In that case on should be able to drop the scaling step right before the apply, because the gradient will not be used anyway, would they ? So there is not need to scale them. I have the following problem though during the model.save operation here https://www.kaggle.com/code/roebel/notebook8a1463bcbb

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
Cell In[7], line 138
    135 if __name__ == "__main__":
    136     print(tf.config.list_physical_devices('GPU'))
--> 138     test_optimizer_distribute()

Cell In[7], line 125, in test_optimizer_distribute()
    117 # train model
    118 model.fit(
    119     ds_train,
    120     epochs=3,
    121     validation_data=ds_test,
    122     verbose=1
    123 )
--> 125 model.save("./trained_model/")
    127 # load trained model and test
    128 del model

File /opt/conda/lib/python3.10/site-packages/keras/src/utils/traceback_utils.py:70, in filter_traceback.<locals>.error_handler(*args, **kwargs)
     67     filtered_tb = _process_traceback_frames(e.__traceback__)
     68     # To get the full stack trace, call:
     69     # `tf.debugging.disable_traceback_filtering()`
---> 70     raise e.with_traceback(filtered_tb) from None
     71 finally:
     72     del filtered_tb

File /tmp/__autograph_generated_filejl4bfrq4.py:22, in outer_factory.<locals>.inner_factory.<locals>.tf__update_step(self, gradient, variable)
     20 def else_body():
     21     pass
---> 22 ag__.if_stmt((ag__.ld(self).iterations + 1) % ag__.ld(self).accumulation == 0, if_body, else_body, get_state, set_state, (), 0)

File /tmp/__autograph_generated_filejl4bfrq4.py:18, in outer_factory.<locals>.inner_factory.<locals>.tf__update_step.<locals>.if_body()
     17 def if_body():
---> 18     ag__.converted_call(ag__.converted_call(ag__.ld(super), (), None, fscope).update_step, (ag__.ld(gradient), ag__.ld(variable)), None, fscope)

KeyError: in user code:

    File "/tmp/ipykernel_34/392134555.py", line 64, in update_step  *
        super().update_step(gradient, variable)
    File "/opt/conda/lib/python3.10/site-packages/keras/src/optimizers/adam.py", line 171, in update_step
        m = self._momentums[self._index_dict[var_key]]

    KeyError: None

@IvanUkhov
Copy link
Author

Thank you for checking.

I have the following problem though during the model.save operation

Yes, if one wants to put this code in a library, it requires a little bit of polishing. Here is what comes to mind:

  • apply_gradients is taking several optional parameters, which are not currently forwarded.
  • The class has to have a proper interface for saving, which includes overriding get_config to add accumulation.
  • The class requires decoration, such as @tf.keras.utils.register_keras_serializable you did.

@IvanUkhov
Copy link
Author

In that case on should be able to drop the scaling step right before the apply, because the gradient will not be used anyway, would they ? So there is not need to scale them.

Yes, that is a good observation! Removed:

https://blog.ivanukhov.com/2024/01/31/gradient-accumulation.html

@andreped
Copy link
Owner

andreped commented Feb 6, 2024

In that case on should be able to drop the scaling step right before the apply, because the gradient will not be used anyway, would they ? So there is not need to scale them.

I remember doing benchmarks on exactly this, and I found that the approximation was poorer for SUM than for scaled SUMs (which essentially yields an average). I think this is more about handling numerical instabilities than which design is actually most correct theoretically. Summing of gradients makes most sense for me in theory, but fails in practice, at least from what I could see. But then again doing MEAN, where you scale before might result in underflow issues (instead of overflow).

But it is quite a long time ago since I ran this benchmark. I have unit tests that also test this, and I am sure that by just replacing MEAN with SUM aggregation the expected loss would not be the same, nor yield what you want it to approximate (regular batch training). Here is a very simple benchmark I ran to verify that the model wrapper solution produced sufficient approximation (I guess you could use this as base for performing a similar benchmark, if you'd like to test SUM vs MEAN):
https://github.com/andreped/GradientAccumulator/blob/main/tests/test_model_expected_result.py

@roebel
Copy link

roebel commented Feb 6, 2024

@andreped the scaling I mentioned was the scaling by 0 that was used to desactivate the gradient update. This has nothing to do with numerical stability. Concerning sum/mean I don't feel there is anything to say in general, besides as you mentioned, there may arise overflow and/or underflow issues especially with mixed precison.

@IvanUkhov
Copy link
Author

IvanUkhov commented Feb 6, 2024

That actually made me realize that a scaling factor is indeed missing in the implementation. The sum should be divided by the number of summands before applying, and here one have a choice of where to scale, as André pointed out. Added the averaging:

# Apply the average accumulated gradients to the trainable variables.
gradients = [gradient / self.accumulation for gradient in self._gradients]
return super().apply_gradients(zip(gradients, variables))

https://blog.ivanukhov.com/2024/01/31/gradient-accumulation.html

@IvanUkhov
Copy link
Author

IvanUkhov commented Feb 9, 2024

The story continues.

I have been running some tests with toy problems against the behavior of optimizers without wrapping. With SGD, it was matching to the last digit, and it was with and without momentum. However, with Adam, it was drifting because its updates rely on the iteration number, unlike those of SGD. To counteract that, there is now an idea to decrement the internal counter when the gradients were accumulated but without application:

https://blog.ivanukhov.com/2024/01/31/gradient-accumulation.html

Now Adam's numbers with and without wrapping agree under distributed training with several GPUs too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants