From 618647f144af705fb5997ce92b267d14270aa251 Mon Sep 17 00:00:00 2001 From: tigranthegreat Date: Wed, 25 Oct 2023 19:40:56 +0400 Subject: [PATCH 01/11] add conformation code --- confirm_tests.py | 70 +++++++++ src/__init__.py | 0 src/custom_trainer.py | 6 + src/tests/fsdp_precommit_test.py | 143 ------------------ src/train.py | 12 +- test_status.yaml | 1 + tests/__init__.py | 0 tests/_test_failing.py | 6 + tests/dataset/__init__.py | 0 .../dataset/test_resumable_dataset.py | 6 + .../flash_attention_2_generate_test.py | 0 tests/fsdp/__init__.py | 0 tests/fsdp/test_model_consist.py | 70 +++++++++ {src/tests => tests}/test_dataloader_speed.py | 0 {src/tests => tests}/test_utils.py | 20 ++- 15 files changed, 184 insertions(+), 150 deletions(-) create mode 100644 confirm_tests.py create mode 100644 src/__init__.py delete mode 100644 src/tests/fsdp_precommit_test.py create mode 100644 test_status.yaml create mode 100644 tests/__init__.py create mode 100644 tests/_test_failing.py create mode 100644 tests/dataset/__init__.py rename src/tests/precommit_test.py => tests/dataset/test_resumable_dataset.py (97%) rename src/tests/test_flash_attention_2_generate.py => tests/flash_attention_2_generate_test.py (100%) create mode 100644 tests/fsdp/__init__.py create mode 100644 tests/fsdp/test_model_consist.py rename {src/tests => tests}/test_dataloader_speed.py (100%) rename {src/tests => tests}/test_utils.py (52%) diff --git a/confirm_tests.py b/confirm_tests.py new file mode 100644 index 0000000..7325646 --- /dev/null +++ b/confirm_tests.py @@ -0,0 +1,70 @@ +import subprocess +import argparse +import unittest +import yaml + + +def write_test_status(git_commit_hash: str, status: str="FAIL", file_name: str="test_status"): + data = {git_commit_hash: status} + with open(f"{file_name}.yaml", "w") as _f: + yaml.dump(data, _f) + + +def read_test_status(git_commit_hash: str, file_name: str="test_status"): + with open(f"{file_name}.yaml", "r") as _f: + data = yaml.full_load(_f) + return data.get(git_commit_hash) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--run", + action="store_true", + dest="run", + help="whether or not profile the training", + ) + parser.add_argument( + "--no_run", + action="store_false", + dest="run", + help="whether or not profile the training", + ) + parser.set_defaults(run=False) + parser.add_argument( + "--confirm", + action="store_true", + dest="confirm", + help="whether or not profile the training", + ) + parser.add_argument( + "--no_confirm", + action="store_false", + dest="confirm", + help="whether or not profile the training", + ) + parser.set_defaults(confirm=False) + args = parser.parse_args() + run = args.run + confirm = args.confirm + git_commit_hash = subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode().strip() + if run: + loader = unittest.TestLoader() + tests = loader.discover("tests", pattern="test_*.py") + testRunner = unittest.runner.TextTestRunner(verbosity=2) + test_results = testRunner.run(tests) + if len(test_results.errors) == 0 and len(test_results.failures) == 0 and test_results.wasSuccessful: + status = "PASS" + else: + status = "FAIL" + write_test_status(git_commit_hash, status=status) + elif confirm: + status = read_test_status(git_commit_hash) + if status == "FAIL": + raise Exception(f"Commit '{git_commit_hash}' failed.") + elif status == "PASS": + print(f"Commit '{git_commit_hash}' passed.") + else: + raise Exception(f"Commit '{git_commit_hash}' has an unexpected status '{status}'.") + else: + raise Exception("Please pass the proper option in command line.") \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/custom_trainer.py b/src/custom_trainer.py index 4ca2907..afb43bc 100644 --- a/src/custom_trainer.py +++ b/src/custom_trainer.py @@ -11,6 +11,12 @@ def _save_checkpoint(self, model, trial, metrics=None): print("**disk is full didn't save**") def _load_from_checkpoint(self, resume_from_checkpoint, model=None): + """ + This code is added because we had a failure when resuming training. + Basically, we load the model with fsdp when the model is not fsdp wrapped. + In the future versions transformers this issue is handled, by adding an extra check, + but not in 4.31.0 version. So this is our manual check addition to solve the problem. + """ if type(self.model) != FSDP: return return super()._load_from_checkpoint(resume_from_checkpoint, model) diff --git a/src/tests/fsdp_precommit_test.py b/src/tests/fsdp_precommit_test.py deleted file mode 100644 index 06f4aba..0000000 --- a/src/tests/fsdp_precommit_test.py +++ /dev/null @@ -1,143 +0,0 @@ -import subprocess -import argparse -import unittest -import gc -import os -import sys - -import torch -import torch.distributed as dist -from train import train - - -test_directory = "/tmp/chemlactica_fsdp_precommit_test" - - -def create_train_command(module, module_args, script, script_args): - train_command = "python3 -m " - train_command += ( - f"{module} {''.join([f'--{arg} {val} ' for arg, val in module_args.items()])}" - ) - train_command += ( - f"{script} {''.join([f'--{arg} {val} ' for arg, val in script_args.items()])}" - ) - return train_command - - -# class TestNetwork(unittest.TestCase): -# def setUp(self): -# script_path = os.path.dirname(os.path.abspath(__file__)) -# # Building absolute paths -# self.train_data_dir = os.path.join( -# script_path, "..", "..", ".small_data", "train" -# ) -# self.valid_data_dir = os.path.join( -# script_path, "..", "..", ".small_data", "valid" -# ) - -# def test_train_eval_save(self): -# gc.collect() -# torch.cuda.empty_cache() - -# train_command = create_train_command( -# module="accelerate.commands.launch", -# module_args={"config_file": "src/config/config.yaml"}, -# script="src/train.py", -# script_args={ -# "from_pretrained": "facebook/galactica-125m", -# "model_config": "125m", -# "training_data_dir": self.train_data_dir, -# "valid_data_dir": self.valid_data_dir, -# "train_batch_size": "16", -# "max_steps": "20", -# "eval_steps": "5", -# "save_steps": "5", -# "dataloader_num_workers": "0", -# "experiment_name": "gal125m_test_train_eval_save", -# "checkpoints_root_dir": "../checkpoints/facebook/galactica-125m", -# "no-track": "", -# }, -# ) -# print(train_command) -# executed_prog = subprocess.run( -# train_command, -# shell=True, -# ) -# if executed_prog.returncode != 0: -# raise Exception(f"\n\tExit code: {executed_prog.returncode}") - - -class TestReproducabilityOfModelOutput(unittest.TestCase): - - def setUp(self): - # clean up - gc.collect() - torch.cuda.empty_cache() - - subprocess.run(f"mkdir {test_directory}", shell=True) - subprocess.run(f"mkdir {test_directory}/checkpoints", shell=True) - - def tearDown(self): - subprocess.run(f"rm -rf {test_directory}", shell=True) - - # clean up - gc.collect() - torch.cuda.empty_cache() - - def test_repr_of_model_output(self): - args = { - "from_pretrained": "facebook/galactica-125m", - "model_config": "125m", - "training_data_dir": ".small_data/train", - "valid_data_dir": ".small_data/valid", - "train_batch_size": 4, - "max_steps": 20, - "eval_steps": 5, - "save_steps": 5, - "dataloader_num_workers": 1, - "checkpoints_root_dir": f"{test_directory}/checkpoints", - "experiment_name": "fsdp_pretest_reproducability", - "track": False, - "check_reproducability": True, - "use_flash_attn": True, - "gradient_accumulation_steps": 1, - } - train(**args) - dist.barrier() # process sync in the end - - -if __name__ == "__main__": - - # parser = argparse.ArgumentParser() - - # parser.add_argument( - # "--gpu_devices", - # type=str, - # metavar="GD", - # dest="gpu_devices", - # required=True, - # help="comma seperated gpu device indices", - # ) - # parser.add_argument( - # "--test_directory", - # type=str, - # metavar="TD", - # dest="test_directory", - # required=False, - # help="dir where to create intermediate test files (this dir will be deleted at the end)", - # default="/tmp/chemlactica_precommit_test" - # ) - - # args = parser.parse_args() - # gpu_devices = args.gpu_devices - # test_directory = args.test_directory - - # script_path = os.path.dirname(os.path.abspath(__file__)) - # src_code_path = os.path.join(script_path, "..") - - # os.environ["CUDA_VISIBLE_DEVICES"] = gpu_devices - # os.environ["TOKENIZERS_PARALLELISM"] = "true" - # # os.environ["PYTHONPATH"] = src_code_path - # print(f"TESTING WITH DEVICES '{gpu_devices}'") - - unittest.main(verbosity=2) diff --git a/src/train.py b/src/train.py index 738a32c..2907673 100644 --- a/src/train.py +++ b/src/train.py @@ -357,7 +357,7 @@ def train( help="whether or not track the training using aim", ) parser.add_argument( - "--no-track", + "--no_track", action="store_false", dest="track", help="the directory to save the aim tracking information", @@ -379,7 +379,7 @@ def train( help="whether or not profile the training", ) parser.add_argument( - "--no-profile", + "--no_profile", action="store_false", dest="profile", help="whether or not profile the training", @@ -395,13 +395,13 @@ def train( default=None, ) parser.add_argument( - "--flash-attn", + "--flash_attn", action="store_true", dest="use_flash_attn", help="whether or not to use flash attn)", ) parser.add_argument( - "--no-flash-attn", + "--no_flash_attn", action="store_false", dest="use_flash_attn", help="whether or not to use flash attn", @@ -417,13 +417,13 @@ def train( default=1, ) parser.add_argument( - "--check-reproducability", + "--check_reproducability", action="store_true", dest="check_reproducability", help="whether or not check reproducability (should only be use for testing)", ) parser.add_argument( - "--no-check-reproducability", + "--no_check_reproducability", action="store_false", dest="check_reproducability", help="whether or not check reproducability (should only be use for testing)", diff --git a/test_status.yaml b/test_status.yaml new file mode 100644 index 0000000..032c2cb --- /dev/null +++ b/test_status.yaml @@ -0,0 +1 @@ +56edb520f4eb805af2b90765be5f79a9aad55ba5: FAIL diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/_test_failing.py b/tests/_test_failing.py new file mode 100644 index 0000000..125ad29 --- /dev/null +++ b/tests/_test_failing.py @@ -0,0 +1,6 @@ +import unittest + + +class FailingTest(unittest.TestCase): + def test_failing(self): + raise Exception("Failed") \ No newline at end of file diff --git a/tests/dataset/__init__.py b/tests/dataset/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/tests/precommit_test.py b/tests/dataset/test_resumable_dataset.py similarity index 97% rename from src/tests/precommit_test.py rename to tests/dataset/test_resumable_dataset.py index 71feb2b..6609661 100644 --- a/src/tests/precommit_test.py +++ b/tests/dataset/test_resumable_dataset.py @@ -2,6 +2,8 @@ import gc import glob import multiprocessing +import os +import sys import torch from torch.utils.data import DataLoader @@ -9,6 +11,10 @@ from datasets.dataset_dict import IterableDatasetDict from transformers.trainer_utils import seed_worker + # for relative imports +SRC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../..") +sys.path.append(SRC_DIR) + from jsonl_dataset import samples_generator diff --git a/src/tests/test_flash_attention_2_generate.py b/tests/flash_attention_2_generate_test.py similarity index 100% rename from src/tests/test_flash_attention_2_generate.py rename to tests/flash_attention_2_generate_test.py diff --git a/tests/fsdp/__init__.py b/tests/fsdp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/fsdp/test_model_consist.py b/tests/fsdp/test_model_consist.py new file mode 100644 index 0000000..8d1c6d8 --- /dev/null +++ b/tests/fsdp/test_model_consist.py @@ -0,0 +1,70 @@ +import subprocess +import unittest +import gc +import os +import sys +import shutil + +import torch + +# for relative imports +PARENT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..") +sys.path.append(PARENT_DIR) + +from test_utils import create_train_command + + +test_directory = "/tmp/chemlactica_fsdp_precommit_test" + + +class TestConsistencyOfModelOutput(unittest.TestCase): + + def setUp(self): + # clean up + gc.collect() + torch.cuda.empty_cache() + + if os.path.exists(test_directory): + print(f"Removing {test_directory}") + shutil.rmtree(test_directory) + os.mkdir(test_directory) + os.mkdir(f"{test_directory}/checkpoints") + + def tearDown(self): + shutil.rmtree(test_directory) + + # clean up + gc.collect() + torch.cuda.empty_cache() + + def test_consist_of_model_output(self): + command = create_train_command( + module="accelerate.commands.launch", + module_args={"config_file": "src/config/test_configs/fsdp_config.yaml"}, + script="src/train.py", + script_args={ + "from_pretrained": "facebook/galactica-125m", + "model_config": "125m", + "training_data_dir": ".small_data/train", + "valid_data_dir": ".small_data/valid", + "train_batch_size": 4, + "max_steps": 20, + "eval_steps": 5, + "save_steps": 5, + "dataloader_num_workers": 1, + "checkpoints_root_dir": f"{test_directory}/checkpoints", + "experiment_name": "fsdp_model_output_consist", + "gradient_accumulation_steps": 1, + "no_track": "", + "check_reproducability": "", + "flash_attn": "", + } + ) + print(f"Running command: {command}") + out = subprocess.run(command, shell=True, capture_output=True) + if out.returncode != 0: + raise Exception(f"error: {out.stderr}") + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/src/tests/test_dataloader_speed.py b/tests/test_dataloader_speed.py similarity index 100% rename from src/tests/test_dataloader_speed.py rename to tests/test_dataloader_speed.py diff --git a/src/tests/test_utils.py b/tests/test_utils.py similarity index 52% rename from src/tests/test_utils.py rename to tests/test_utils.py index cab78a1..40cd60d 100644 --- a/src/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,6 +1,24 @@ from datasets import load_dataset from dataset_utils import process_dataset -import logging + + +def create_train_command(module, module_args, script, script_args): + train_command = "python3 -m " + train_command += ( + f"{module} {''.join([f'--{arg} {val} ' for arg, val in module_args.items()])}" + ) + train_command += ( + f"{script} {''.join([f'--{arg} {val} ' for arg, val in script_args.items()])}" + ) + return train_command + + +def create_vs_code_launch(module, module_args, script, script_args): + """ + this is for creating a launch file config for vs code editor to be able + to easily debug the command running in the test. + """ + pass def generate_batches_from_jsonls(jsonl_files, count): From e3912f8621df804b75f063e3770ee78d8b889448 Mon Sep 17 00:00:00 2001 From: tigranthegreat Date: Thu, 26 Oct 2023 17:31:35 +0400 Subject: [PATCH 02/11] add test for checking deps in the env vs deps in the env.yml file, text correction, add gpu argument --- confirm_tests.py | 22 ++++++++++++++++++---- test_status.yaml | 2 +- tests/test_env.py | 20 ++++++++++++++++++++ 3 files changed, 39 insertions(+), 5 deletions(-) create mode 100644 tests/test_env.py diff --git a/confirm_tests.py b/confirm_tests.py index 7325646..6867cde 100644 --- a/confirm_tests.py +++ b/confirm_tests.py @@ -2,6 +2,7 @@ import argparse import unittest import yaml +import os def write_test_status(git_commit_hash: str, status: str="FAIL", file_name: str="test_status"): @@ -22,33 +23,46 @@ def read_test_status(git_commit_hash: str, file_name: str="test_status"): "--run", action="store_true", dest="run", - help="whether or not profile the training", + help="whether or not run tests", ) parser.add_argument( "--no_run", action="store_false", dest="run", - help="whether or not profile the training", + help="whether or not run tests", + ) + parser.add_argument( + "--gpus", + type=str, + dest="gpus", + help="comma seperated string of gpus indices to use for testing \ + (please choose at least 2 for proper testing, default is '0, 1').", + required=False, + default="0, 1" ) parser.set_defaults(run=False) parser.add_argument( "--confirm", action="store_true", dest="confirm", - help="whether or not profile the training", + help="whether or not confirm already run tests", ) parser.add_argument( "--no_confirm", action="store_false", dest="confirm", - help="whether or not profile the training", + help="whether or not confirm already run tests", ) parser.set_defaults(confirm=False) args = parser.parse_args() run = args.run confirm = args.confirm + gpus = args.gpus git_commit_hash = subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode().strip() + assert git_commit_hash if run: + os.environ["CUDA_VISIBLE_DEVICES"] = gpus + print(f"NOTE: Using GPU(s) '{gpus}' for testing.") loader = unittest.TestLoader() tests = loader.discover("tests", pattern="test_*.py") testRunner = unittest.runner.TextTestRunner(verbosity=2) diff --git a/test_status.yaml b/test_status.yaml index 032c2cb..a867cd8 100644 --- a/test_status.yaml +++ b/test_status.yaml @@ -1 +1 @@ -56edb520f4eb805af2b90765be5f79a9aad55ba5: FAIL +618647f144af705fb5997ce92b267d14270aa251: PASS diff --git a/tests/test_env.py b/tests/test_env.py new file mode 100644 index 0000000..411edc3 --- /dev/null +++ b/tests/test_env.py @@ -0,0 +1,20 @@ +import unittest +import subprocess + + +class TestEnv(unittest.TestCase): + + def test_env_file_vs_current_env_depenedency_match(self): + env_file_path = "environment.yml" + conda_deps_str = subprocess.check_output(["conda", "env", "export"]).decode().strip().split("\n") + with open(env_file_path) as _f: + env_file_deps_str = [line.rstrip("\n") for line in _f.readlines()] + + conda_deps_str = conda_deps_str[:-1] + env_file_deps_str = env_file_deps_str[:-1] + for i, (l1, l2) in enumerate(zip(conda_deps_str, env_file_deps_str), start=1): + assert l1 == l2, f"'{l1}' != '{l2}' on line {i}" + + +if __name__ == "__main__": + unittest.main(verbosity=2) \ No newline at end of file From eed5714fec0ac26efba4400bdf54e83d898d19d9 Mon Sep 17 00:00:00 2001 From: tigranthegreat Date: Mon, 30 Oct 2023 11:05:23 +0000 Subject: [PATCH 03/11] add intermediate stuff --- src/config/test_configs/fsdp_config.yaml | 2 +- test_status.yaml | 2 +- ...der_speed.py => _test_dataloader_speed.py} | 0 tests/dataset/test_resumable_dataset.py | 4 -- tests/fsdp/test_model_consist.py | 7 +-- tests/test_env.py | 13 ++--- tests/test_utils.py | 51 ++++++++++--------- 7 files changed, 35 insertions(+), 44 deletions(-) rename tests/{test_dataloader_speed.py => _test_dataloader_speed.py} (100%) diff --git a/src/config/test_configs/fsdp_config.yaml b/src/config/test_configs/fsdp_config.yaml index da5cf60..62fbae8 100644 --- a/src/config/test_configs/fsdp_config.yaml +++ b/src/config/test_configs/fsdp_config.yaml @@ -19,4 +19,4 @@ tpu_env: [] tpu_use_cluster: false tpu_use_sudo: false use_cpu: false -# main_process_port: 30000 \ No newline at end of file +main_process_port: 30001 \ No newline at end of file diff --git a/test_status.yaml b/test_status.yaml index a867cd8..ae25797 100644 --- a/test_status.yaml +++ b/test_status.yaml @@ -1 +1 @@ -618647f144af705fb5997ce92b267d14270aa251: PASS +e3912f8621df804b75f063e3770ee78d8b889448: FAIL diff --git a/tests/test_dataloader_speed.py b/tests/_test_dataloader_speed.py similarity index 100% rename from tests/test_dataloader_speed.py rename to tests/_test_dataloader_speed.py diff --git a/tests/dataset/test_resumable_dataset.py b/tests/dataset/test_resumable_dataset.py index 6609661..2e9a557 100644 --- a/tests/dataset/test_resumable_dataset.py +++ b/tests/dataset/test_resumable_dataset.py @@ -11,10 +11,6 @@ from datasets.dataset_dict import IterableDatasetDict from transformers.trainer_utils import seed_worker - # for relative imports -SRC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../..") -sys.path.append(SRC_DIR) - from jsonl_dataset import samples_generator diff --git a/tests/fsdp/test_model_consist.py b/tests/fsdp/test_model_consist.py index 8d1c6d8..0236c69 100644 --- a/tests/fsdp/test_model_consist.py +++ b/tests/fsdp/test_model_consist.py @@ -7,10 +7,6 @@ import torch -# for relative imports -PARENT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..") -sys.path.append(PARENT_DIR) - from test_utils import create_train_command @@ -63,7 +59,8 @@ def test_consist_of_model_output(self): print(f"Running command: {command}") out = subprocess.run(command, shell=True, capture_output=True) if out.returncode != 0: - raise Exception(f"error: {out.stderr}") + print(f"error: {out.stderr}") + raise Exception() if __name__ == "__main__": diff --git a/tests/test_env.py b/tests/test_env.py index 411edc3..03af122 100644 --- a/tests/test_env.py +++ b/tests/test_env.py @@ -5,15 +5,10 @@ class TestEnv(unittest.TestCase): def test_env_file_vs_current_env_depenedency_match(self): - env_file_path = "environment.yml" - conda_deps_str = subprocess.check_output(["conda", "env", "export"]).decode().strip().split("\n") - with open(env_file_path) as _f: - env_file_deps_str = [line.rstrip("\n") for line in _f.readlines()] - - conda_deps_str = conda_deps_str[:-1] - env_file_deps_str = env_file_deps_str[:-1] - for i, (l1, l2) in enumerate(zip(conda_deps_str, env_file_deps_str), start=1): - assert l1 == l2, f"'{l1}' != '{l2}' on line {i}" + compare_output = subprocess.check_output(["conda", "compare", "environment.yml"]).decode().strip() + success_output = "Success. All the packages in the specification file are present in the environment with matching version and build string." + print(compare_output) + assert compare_output == success_output if __name__ == "__main__": diff --git a/tests/test_utils.py b/tests/test_utils.py index 40cd60d..7593c49 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,7 +1,3 @@ -from datasets import load_dataset -from dataset_utils import process_dataset - - def create_train_command(module, module_args, script, script_args): train_command = "python3 -m " train_command += ( @@ -13,6 +9,13 @@ def create_train_command(module, module_args, script, script_args): return train_command +# def setUp(): +# file_path = os.path.dirname(os.path.abspath(__file__)) +# print("FILE_PATH", file_path) +# os.environ["PYTHONPATH"] = os.path.join(file_path, "src") +# os.environ["PYTHONPATH"] += os.path.join(file_path, "tests") + + def create_vs_code_launch(module, module_args, script, script_args): """ this is for creating a launch file config for vs code editor to be able @@ -21,23 +24,23 @@ def create_vs_code_launch(module, module_args, script, script_args): pass -def generate_batches_from_jsonls(jsonl_files, count): - dataset = load_dataset( - "text", - data_files={"data": jsonl_files}, - streaming=True, - ) - - processed_dataset = process_dataset( - dataset=dataset, - train_config={"block_size": 2048}, - process_batch_sizes=(100, 100), - ) - - batches = [] - for i, inp in enumerate(processed_dataset["data"]): - if i == count: break - del inp["token_type_ids"] - inp = {k: inp[k].unsqueeze(0) for k in inp.keys()} - batches.append(inp) - return batches \ No newline at end of file +# def generate_batches_from_jsonls(jsonl_files, count): +# dataset = load_dataset( +# "text", +# data_files={"data": jsonl_files}, +# streaming=True, +# ) + +# processed_dataset = process_dataset( +# dataset=dataset, +# train_config={"block_size": 2048}, +# process_batch_sizes=(100, 100), +# ) + +# batches = [] +# for i, inp in enumerate(processed_dataset["data"]): +# if i == count: break +# del inp["token_type_ids"] +# inp = {k: inp[k].unsqueeze(0) for k in inp.keys()} +# batches.append(inp) +# return batches \ No newline at end of file From effe33b961b2f61f5aa5131da4f9e5c680a1cc36 Mon Sep 17 00:00:00 2001 From: tigranthegreat Date: Wed, 1 Nov 2023 12:31:36 +0000 Subject: [PATCH 04/11] fix samples_generator function of custom dataset --- src/jsonl_dataset.py | 6 +++--- test_status.yaml | 2 +- tests/dataset/test_resumable_dataset.py | 10 ++++++---- tests/test_env.py | 1 - 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/jsonl_dataset.py b/src/jsonl_dataset.py index 9e52bad..bbeeafc 100644 --- a/src/jsonl_dataset.py +++ b/src/jsonl_dataset.py @@ -1,4 +1,3 @@ -from multiprocessing import Manager, Pool from typing import List import torch from io import StringIO @@ -10,13 +9,15 @@ def samples_generator( chunk_size=25000, return_line_info=False ): if not torch.distributed.is_initialized() or torch.distributed.get_rank() == 0: + print("sharded_jsonl_files", shared_jsonl_files) print(f"TOK_PAR: {os.environ['TOKENIZERS_PARALLELISM']}") print("process id", os.getpid(), files) - file_states = {file: {"position": 0, "line_number": 0} for file in files} + file_states = {f: {"position": 0, "line_number": 0} for f in files} for file in file_states.keys(): if shared_jsonl_files.get(file): jsonl_state = shared_jsonl_files[file] + file_states[file] = jsonl_state print(f"loaded {file}: {jsonl_state['position']}") returned = True @@ -33,7 +34,6 @@ def samples_generator( batch = [line.rstrip("\n") for line in batch] state["position"] = f.tell() state["line_number"] += len(batch) - for i, sample in enumerate(batch, start=1): returned = True ret = {"text": sample} diff --git a/test_status.yaml b/test_status.yaml index ae25797..480fb31 100644 --- a/test_status.yaml +++ b/test_status.yaml @@ -1 +1 @@ -e3912f8621df804b75f063e3770ee78d8b889448: FAIL +eed5714fec0ac26efba4400bdf54e83d898d19d9: FAIL diff --git a/tests/dataset/test_resumable_dataset.py b/tests/dataset/test_resumable_dataset.py index 2e9a557..fd776d4 100644 --- a/tests/dataset/test_resumable_dataset.py +++ b/tests/dataset/test_resumable_dataset.py @@ -70,7 +70,7 @@ def test_dataloader(self): shared_jsonl_files = manager.dict() training_data_files = glob.glob(training_data_dir + "/*.jsonl") # combine small train and valid to have 2 files to test - training_data_files.extend(glob.glob(valid_data_dir + "/*.jsonl")) + # training_data_files.extend(glob.glob(valid_data_dir + "/*.jsonl")) initial_train_dataset = IterableDatasetDict({ "train": IterableDataset.from_generator( @@ -111,16 +111,17 @@ def test_dataloader(self): for text, file, line_number in zip( samples["text"], samples["line_info"]["file"], - samples["line_info"]["line_number"] + samples["line_info"]["line_number"].tolist() ): # check if the line matches with what is actually in the file assert loaded_files[file][line_number - 1]["text"] == text - # assert not loaded_files[file][line_number - 1]["is_read"] + assert not loaded_files[file][line_number - 1]["is_read"] loaded_files[file][line_number - 1]["is_read"] = True print(f'{file} {line_number} passed') if i == sample_to_pass: break + fixed_shared_jsonl_files = {k: v for k, v in shared_jsonl_files.items()} resumed_train_dataset = IterableDatasetDict({ "train": IterableDataset.from_generator( samples_generator, @@ -140,10 +141,11 @@ def test_dataloader(self): for text, file, line_number in zip( samples["text"], samples["line_info"]["file"], - samples["line_info"]["line_number"] + samples["line_info"]["line_number"].tolist() ): # check if the line matches with what is actually in the file assert loaded_files[file][line_number - 1]["text"] == text + assert fixed_shared_jsonl_files[file]["line_number"] < line_number # assert not loaded_files[file][line_number - 1]["is_read"] loaded_files[file][line_number - 1]["is_read"] = True print(f'{file} {line_number} passed') diff --git a/tests/test_env.py b/tests/test_env.py index 03af122..e5c6c55 100644 --- a/tests/test_env.py +++ b/tests/test_env.py @@ -3,7 +3,6 @@ class TestEnv(unittest.TestCase): - def test_env_file_vs_current_env_depenedency_match(self): compare_output = subprocess.check_output(["conda", "compare", "environment.yml"]).decode().strip() success_output = "Success. All the packages in the specification file are present in the environment with matching version and build string." From a127631c11825fca3a222595d755fdf4de4700ae Mon Sep 17 00:00:00 2001 From: tigranthegreat Date: Wed, 1 Nov 2023 14:39:09 +0000 Subject: [PATCH 05/11] finilize dataset test for now --- test_status.yaml | 2 +- tests/dataset/test_resumable_dataset.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/test_status.yaml b/test_status.yaml index 480fb31..ba04c20 100644 --- a/test_status.yaml +++ b/test_status.yaml @@ -1 +1 @@ -eed5714fec0ac26efba4400bdf54e83d898d19d9: FAIL +effe33b961b2f61f5aa5131da4f9e5c680a1cc36: PASS diff --git a/tests/dataset/test_resumable_dataset.py b/tests/dataset/test_resumable_dataset.py index fd776d4..73fbcf2 100644 --- a/tests/dataset/test_resumable_dataset.py +++ b/tests/dataset/test_resumable_dataset.py @@ -70,7 +70,7 @@ def test_dataloader(self): shared_jsonl_files = manager.dict() training_data_files = glob.glob(training_data_dir + "/*.jsonl") # combine small train and valid to have 2 files to test - # training_data_files.extend(glob.glob(valid_data_dir + "/*.jsonl")) + training_data_files.extend(glob.glob(valid_data_dir + "/*.jsonl")) initial_train_dataset = IterableDatasetDict({ "train": IterableDataset.from_generator( @@ -151,9 +151,11 @@ def test_dataloader(self): print(f'{file} {line_number} passed') for file_name, lines in loaded_files.items(): + number_of_read: int = 0 for i, line in enumerate(lines, start=1): - assert line["is_read"], f"'{file_name}' line {i} is not read." - print("All lines are read at least once.") + # assert line["is_read"], f"'{file_name}' line {i} is not read." + number_of_read += int(line["is_read"]) + print(f"File: {file_name}: number of read line {number_of_read}, number of not read {len(lines) - number_of_read}.") if __name__ == "__main__": From 7be97ae21e5e6d5480c2ea678e9607ce509986c4 Mon Sep 17 00:00:00 2001 From: tigranthegreat Date: Wed, 1 Nov 2023 14:56:01 +0000 Subject: [PATCH 06/11] check the previous commit for validating tests --- confirm_tests.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/confirm_tests.py b/confirm_tests.py index 6867cde..3d705b3 100644 --- a/confirm_tests.py +++ b/confirm_tests.py @@ -58,9 +58,9 @@ def read_test_status(git_commit_hash: str, file_name: str="test_status"): run = args.run confirm = args.confirm gpus = args.gpus - git_commit_hash = subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode().strip() - assert git_commit_hash if run: + git_commit_hash = subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode().strip() + assert git_commit_hash os.environ["CUDA_VISIBLE_DEVICES"] = gpus print(f"NOTE: Using GPU(s) '{gpus}' for testing.") loader = unittest.TestLoader() @@ -73,6 +73,8 @@ def read_test_status(git_commit_hash: str, file_name: str="test_status"): status = "FAIL" write_test_status(git_commit_hash, status=status) elif confirm: + git_commit_hash = subprocess.check_output(['git', 'rev-parse', 'HEAD~1']).decode().strip() + assert git_commit_hash status = read_test_status(git_commit_hash) if status == "FAIL": raise Exception(f"Commit '{git_commit_hash}' failed.") From a3fb6c02bf3a15f763f5c3c3c31140b3911e920f Mon Sep 17 00:00:00 2001 From: tigranthegreat Date: Wed, 1 Nov 2023 15:27:26 +0000 Subject: [PATCH 07/11] add train/valid/resume test --- test_status.yaml | 2 +- tests/fsdp/test_model_consist.py | 3 +- tests/fsdp/test_model_training.py | 168 ++++++++++++++++++++++++++++++ 3 files changed, 170 insertions(+), 3 deletions(-) create mode 100644 tests/fsdp/test_model_training.py diff --git a/test_status.yaml b/test_status.yaml index ba04c20..5086fb5 100644 --- a/test_status.yaml +++ b/test_status.yaml @@ -1 +1 @@ -effe33b961b2f61f5aa5131da4f9e5c680a1cc36: PASS +7be97ae21e5e6d5480c2ea678e9607ce509986c4: PASS diff --git a/tests/fsdp/test_model_consist.py b/tests/fsdp/test_model_consist.py index 0236c69..89f6825 100644 --- a/tests/fsdp/test_model_consist.py +++ b/tests/fsdp/test_model_consist.py @@ -59,8 +59,7 @@ def test_consist_of_model_output(self): print(f"Running command: {command}") out = subprocess.run(command, shell=True, capture_output=True) if out.returncode != 0: - print(f"error: {out.stderr}") - raise Exception() + raise Exception(out.stderr.decode()) if __name__ == "__main__": diff --git a/tests/fsdp/test_model_training.py b/tests/fsdp/test_model_training.py new file mode 100644 index 0000000..ae5cae6 --- /dev/null +++ b/tests/fsdp/test_model_training.py @@ -0,0 +1,168 @@ +import subprocess +import unittest +import gc +import os +import sys +import shutil + +import torch + +from test_utils import create_train_command + +test_directory = "/tmp/chemlactica_fsdp_precommit_test" + + +class TestModelTraining(unittest.TestCase): + + def setUp(self): + # clean up + gc.collect() + torch.cuda.empty_cache() + + if os.path.exists(test_directory): + print(f"Removing {test_directory}") + shutil.rmtree(test_directory) + os.mkdir(test_directory) + os.mkdir(f"{test_directory}/checkpoints") + + def tearDown(self): + shutil.rmtree(test_directory) + + # clean up + gc.collect() + torch.cuda.empty_cache() + + def test_model_train(self): + # clean up + gc.collect() + torch.cuda.empty_cache() + + command = create_train_command( + module="accelerate.commands.launch", + module_args={"config_file": "src/config/test_configs/fsdp_config.yaml"}, + script="src/train.py", + script_args={ + "from_pretrained": "facebook/galactica-125m", + "model_config": "125m", + "training_data_dir": ".small_data/train", + "valid_data_dir": ".small_data/valid", + "train_batch_size": 4, + "max_steps": 1000, + "eval_steps": 2000, + "save_steps": 2000, + "dataloader_num_workers": 1, + "checkpoints_root_dir": f"{test_directory}/checkpoints", + "experiment_name": "fsdp_model_train", + "gradient_accumulation_steps": 1, + "no_track": "", + "flash_attn": "", + } + ) + + print(f"Running command: {command}") + out = subprocess.run(command, shell=True, capture_output=True) + if out.returncode != 0: + raise Exception(out.stderr.decode()) + else: + print(out.stdout.decode()) + + def test_model_valid(self): + # clean up + gc.collect() + torch.cuda.empty_cache() + + command = create_train_command( + module="accelerate.commands.launch", + module_args={"config_file": "src/config/test_configs/fsdp_config.yaml"}, + script="src/train.py", + script_args={ + "from_pretrained": "facebook/galactica-125m", + "model_config": "125m", + "training_data_dir": ".small_data/train", + "valid_data_dir": ".small_data/valid", + "train_batch_size": 4, + "max_steps": 100, + "eval_steps": 10, + "save_steps": 2000, + "dataloader_num_workers": 1, + "checkpoints_root_dir": f"{test_directory}/checkpoints", + "experiment_name": "fsdp_model_valid", + "gradient_accumulation_steps": 1, + "no_track": "", + "flash_attn": "", + } + ) + + print(f"Running command: {command}") + out = subprocess.run(command, shell=True, capture_output=True) + if out.returncode != 0: + raise Exception(out.stderr.decode()) + else: + print(out.stdout.decode()) + + def test_model_resume(self): + # clean up + gc.collect() + torch.cuda.empty_cache() + + first_command = create_train_command( + module="accelerate.commands.launch", + module_args={"config_file": "src/config/test_configs/fsdp_config.yaml"}, + script="src/train.py", + script_args={ + "from_pretrained": "facebook/galactica-125m", + "model_config": "125m", + "training_data_dir": ".small_data/train", + "valid_data_dir": ".small_data/valid", + "train_batch_size": 4, + "max_steps": 20, + "eval_steps": 10, + "save_steps": 10, + "dataloader_num_workers": 1, + "checkpoints_root_dir": f"{test_directory}/checkpoints", + "experiment_name": "fsdp_model_resume", + "gradient_accumulation_steps": 1, + "no_track": "", + "flash_attn": "", + } + ) + + print(f"Running command: {first_command}") + out = subprocess.run(first_command, shell=True, capture_output=True) + if out.returncode != 0: + raise Exception(out.stderr.decode()) + else: + print(out.stdout.decode()) + + second_command = create_train_command( + module="accelerate.commands.launch", + module_args={"config_file": "src/config/test_configs/fsdp_config.yaml"}, + script="src/train.py", + script_args={ + "from_pretrained": f"{test_directory}/checkpoints/facebook/galactica-125m/none/checkpoint-{20}", + "model_config": "125m", + "training_data_dir": ".small_data/train", + "valid_data_dir": ".small_data/valid", + "train_batch_size": 4, + "max_steps": 40, + "eval_steps": 10, + "save_steps": 10, + "dataloader_num_workers": 1, + "checkpoints_root_dir": f"{test_directory}/checkpoints", + "experiment_name": "fsdp_model_resume", + "gradient_accumulation_steps": 1, + "no_track": "", + "flash_attn": "", + } + ) + + print(f"Running command: {second_command}") + out = subprocess.run(second_command, shell=True, capture_output=True) + if out.returncode != 0: + raise Exception(out.stderr.decode()) + else: + print(out.stdout.decode()) + + +if __name__ == "__main__": + unittest.main(verbosity=2) \ No newline at end of file From 306817fdf132257a8da898c6e078ad2f2a110631 Mon Sep 17 00:00:00 2001 From: Philipp Guevorguian Date: Sat, 4 Nov 2023 14:24:39 +0400 Subject: [PATCH 08/11] update github workflow to match current test organization --- .github/workflows/main.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6d642d3..ff9cb27 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -17,7 +17,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v2 with: - python-version: '3.11' + python-version: '3.10' - name: Set up Conda uses: conda-incubator/setup-miniconda@v2 @@ -28,4 +28,4 @@ jobs: - name: Run unittests shell: bash -l {0} - run: python3 -m unittest src/tests/precommit_test.py + run: python3 confirm_test.py --confirm From b6e9107b3c662d5268e6a44320e0eaff8165c0e3 Mon Sep 17 00:00:00 2001 From: Philipp Guevorguian Date: Sat, 4 Nov 2023 14:25:51 +0400 Subject: [PATCH 09/11] re-implement pre-commit --- environment.yml | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/environment.yml b/environment.yml index 2a6432f..3c21fd7 100644 --- a/environment.yml +++ b/environment.yml @@ -15,7 +15,6 @@ dependencies: - cuda-nvrtc=11.7.99=0 - cuda-nvtx=11.7.91=0 - cuda-runtime=11.7.1=0 - - filelock=3.9.0=py310h06a4308_0 - gmp=6.2.1=h295c915_3 - gmpy2=2.1.2=py310heeb90bb_0 - intel-openmp=2023.1.0=hdb19cb5_46305 @@ -77,14 +76,17 @@ dependencies: - cachetools==5.3.1 - certifi==2023.7.22 - cffi==1.16.0 + - cfgv==3.4.0 - charset-normalizer==3.3.0 - click==8.1.7 - cryptography==41.0.4 - datasets==2.14.4 - dill==0.3.7 + - distlib==0.3.7 - einops==0.7.0 - exceptiongroup==1.1.3 - fastapi==0.103.2 + - filelock==3.13.1 - flash-attn==2.3.2 - frozenlist==1.4.0 - fsspec==2023.9.2 @@ -92,16 +94,20 @@ dependencies: - grpcio==1.59.0 - h11==0.14.0 - huggingface-hub==0.17.3 + - identify==2.5.31 - idna==3.4 - mako==1.2.4 - monotonic==1.6 - multidict==6.0.4 - multiprocess==0.70.15 - ninja==1.11.1.1 + - nodeenv==1.8.0 - numpy==1.26.0 - packaging==23.2 - pandas==2.1.1 - pillow==10.0.1 + - platformdirs==3.11.0 + - pre-commit==3.5.0 - protobuf==4.24.4 - psutil==5.9.5 - py3nvml==0.2.7 @@ -127,7 +133,8 @@ dependencies: - tzdata==2023.3 - urllib3==2.0.6 - uvicorn==0.23.2 + - virtualenv==20.24.6 - xmltodict==0.13.0 - xxhash==3.4.1 - yarl==1.9.2 -prefix: /home/tigranfahradyan/miniconda3/envs/cl11.7 +prefix: /home/philipp/miniconda3/envs/cl11.7 From 19b773dc9255656da7f59681f036aac4e1a4a6a5 Mon Sep 17 00:00:00 2001 From: Philipp Guevorguian Date: Sat, 4 Nov 2023 14:31:17 +0400 Subject: [PATCH 10/11] add test result --- test_status.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_status.yaml b/test_status.yaml index 5086fb5..30c6d97 100644 --- a/test_status.yaml +++ b/test_status.yaml @@ -1 +1 @@ -7be97ae21e5e6d5480c2ea678e9607ce509986c4: PASS +b6e9107b3c662d5268e6a44320e0eaff8165c0e3: PASS From aeadf1d78475bac966a9cc89b02a64d644062847 Mon Sep 17 00:00:00 2001 From: Philipp Guevorguian Date: Fri, 10 Nov 2023 13:13:48 +0400 Subject: [PATCH 11/11] increase max lr and decrease end lr to 0 --- src/config/create_train_config.py | 2 ++ src/train.py | 51 ++++++++++++++++++++----------- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/src/config/create_train_config.py b/src/config/create_train_config.py index b17574e..8f83f5d 100644 --- a/src/config/create_train_config.py +++ b/src/config/create_train_config.py @@ -12,6 +12,8 @@ for key in model_train_configs.keys(): model_train_configs[key]["max_learning_rate"] *= 0.08 +model_train_configs["125m"]["max_learning_rate"] = 5e-4 + model_train_configs["1.3b"]["warmup_steps"] = 2000 model_train_configs["1.3b"]["max_learning_rate"] = 1.0e-5 model_train_configs["1.3b"]["global_gradient_norm"] = 0.1 diff --git a/src/train.py b/src/train.py index 2907673..751dd24 100644 --- a/src/train.py +++ b/src/train.py @@ -7,7 +7,11 @@ import numpy import transformers -from transformers import TrainingArguments, get_polynomial_decay_schedule_with_warmup, ProgressCallback +from transformers import ( + TrainingArguments, + get_polynomial_decay_schedule_with_warmup, + ProgressCallback, +) from accelerate import Accelerator, logging from accelerate.utils import broadcast_object_list import torch @@ -23,7 +27,7 @@ EpochCallback, CustomProgressCallback, ReproducabilityCallback, - JsonlDatasetResumeCallback + JsonlDatasetResumeCallback, ) from config.create_train_config import model_train_configs from eval_metrics import compute_metrics, preprocess_logits_for_metrics @@ -58,7 +62,7 @@ def train( check_reproducability=False, valid_batch_size=None, profile=False, - profile_dir=None + profile_dir=None, ): transformers.logging.set_verbosity_info() transformers.utils.logging.enable_explicit_format() @@ -67,7 +71,9 @@ def train( train_config = model_train_configs[model_config] - model = load_model(from_pretrained, use_flash_attn=use_flash_attn, train_config=train_config) + model = load_model( + from_pretrained, use_flash_attn=use_flash_attn, train_config=train_config + ) model.resize_token_embeddings( train_config["vocab_size"] + len(chemlactica_special_tokens) ) @@ -132,17 +138,24 @@ def train( trainer_callback_dict["epoch_callback"] = EpochCallback(num_epochs=1) if check_reproducability: - trainer_callback_dict["reproducability_callback"] = ReproducabilityCallback(accelerator, model_config, use_flash_attn) + trainer_callback_dict["reproducability_callback"] = ReproducabilityCallback( + accelerator, model_config, use_flash_attn + ) trainer_callback_dict["progress_callback"] = CustomProgressCallback() with multiprocessing.Manager() if accelerator.is_main_process else nullcontext() as manager: shared_jsonl_files = None if accelerator.is_main_process: shared_jsonl_files = manager.dict() - trainer_callback_dict["json_dataset_resume_callback"] = JsonlDatasetResumeCallback(shared_jsonl_files) + trainer_callback_dict[ + "json_dataset_resume_callback" + ] = JsonlDatasetResumeCallback(shared_jsonl_files) checkpoints_dir = os.path.join( - checkpoints_root_dir, "facebook", f"galactica-{model_config}", experiment_hash + checkpoints_root_dir, + "facebook", + f"galactica-{model_config}", + experiment_hash, ) accelerator.print("resuming from checkpoint:", resume_from_checkpoint) @@ -157,7 +170,7 @@ def train( optimizer, num_warmup_steps=train_config["warmup_steps"], num_training_steps=max_steps, - lr_end=0.1 * train_config["max_learning_rate"], + lr_end=0.0 * train_config["max_learning_rate"], power=1.0, ) @@ -201,15 +214,17 @@ def train( # streaming=True, # ) - train_dataset = IterableDatasetDict({ - "train": IterableDataset.from_generator( - samples_generator, - gen_kwargs={ - "files": training_data_files, - "shared_jsonl_files": shared_jsonl_files - } - ) - }) + train_dataset = IterableDatasetDict( + { + "train": IterableDataset.from_generator( + samples_generator, + gen_kwargs={ + "files": training_data_files, + "shared_jsonl_files": shared_jsonl_files, + }, + ) + } + ) eval_dataset = load_dataset( "text", data_files={"validation": valid_data_files}, streaming=False ) @@ -431,4 +446,4 @@ def train( parser.set_defaults(profile=False) args = parser.parse_args() - train(**args.__dict__) \ No newline at end of file + train(**args.__dict__)