Skip to content

Commit

Permalink
HPCIC 2024: Updates and Fixes DYAD Component of Tutorial (#43)
Browse files Browse the repository at this point in the history
* dyad: fixes content and DYAD data dyad data loader

This commit corrects logic in the the PyTorch data loader for DYAD.
It also makes various corrections to the text in the DYAD notebook.

* docker: adds workaround regarding Ubuntu Jammy

The flux-sched image for Ubuntu Jammy has a system install of
UCX 1.12.0. However, we are wanting to use UCX 1.13.1 with DYAD.
This commit updates LD_LIBRARY_PATH to point to UCX 1.13.1 to prevent
runtime issues with DYAD.

* dyad: updates the env file for DYAD notebook

In light of the name change of DLIO Profiler to DFTracer, this commit
updates the env file created in the DYAD notebook to use the new names
for environment variables.

* dyad: fixes bug in DYAD data loader

This commit fixes a bug in the DYAD PyTorch data loader
that causes 'brokers_per_node' to not be set before reference.

* dyad: update multiprocessing approach for DLIO

This commit tweaks the DLIO config file to use forking for
multiprocessing instead of spawning

* dyad: changes cpu-affinity for DLIO

This commit changes cpu-affinity to off when running DLIO for
training for consistency

---------

Co-authored-by: Hariharan <[email protected]>
  • Loading branch information
ilumsden and hariharan-devarajan authored Aug 28, 2024
1 parent 62a7a13 commit d90c83a
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 30 deletions.
9 changes: 7 additions & 2 deletions 2024-HPCIC-AWS/JupyterNotebook/docker/Dockerfile.spawn
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ RUN apt-get update && apt-get install -y nodejs && apt-get clean && rm -rf /var/
RUN wget https://nodejs.org/dist/v20.15.0/node-v20.15.0-linux-x64.tar.xz && \
apt-get update && apt-get install -y xz-utils && rm -rf /var/lib/apt/lists/* && \
xz -d -v node-v20.15.0-linux-x64.tar.xz && \
tar -C /usr/local --strip-components=1 -xvf node-v20.15.0-linux-x64.tar
tar -C /usr/local --strip-components=1 -xvf node-v20.15.0-linux-x64.tar

# This customizes the launcher UI
# https://jupyter-app-launcher.readthedocs.io/en/latest/usage.html
Expand All @@ -113,6 +113,11 @@ COPY ./docker/flux-icon.png $HOME/flux-icon.png
# note that previous examples are added via git volume in config.yaml
ENV SHELL=/usr/bin/bash
ENV FLUX_URI_RESOLVE_LOCAL=t
# Prepend /usr/lib to LD_LIBRARY_PATH because Ubuntu Jammy comes with
# UCX 1.12. Without this, DYAD will build (correctly) with UCX 1.13.1, but
# it will try to run with the system install of UCX 1.12, which can cause
# either a crash or a hang.
ENV LD_LIBRARY_PATH="/usr/lib:$LD_LIBRARY_PATH"

EXPOSE 8888
ENTRYPOINT ["tini", "--"]
Expand All @@ -132,7 +137,7 @@ RUN mkdir -p $HOME/.local/share && \
# flux start flux account add-user --username=jovyan --bank=default && \
# flux start flux jobtap load mf_priority.so && \
# flux start flux account-update-db

USER ${NB_USER}

CMD ["flux", "start", "--test-size=4", "jupyter", "lab"]
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ reader:
read_threads: 1
file_shuffle: seed
sample_shuffle: seed
multiprocessing_context: spawn
multiprocessing_context: fork
data_loader_classname: dyad_torch_data_loader.DyadTorchDataLoader
data_loader_sampler: index

Expand All @@ -32,4 +32,4 @@ checkpoint:
checkpoint_folder: checkpoints/unet3d
checkpoint_after_epoch: 5
epochs_between_checkpoints: 2
model_size: 499153191
model_size: 499153191
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ def __init__(self, format_type, dataset_type, epoch, num_samples, num_workers, b
self.reader = None
self.num_images_read = 0
self.batch_size = batch_size
self.broker_per_node = 1
args = ConfigArguments.get_instance()
self.serial_args = pickle.dumps(args)
if num_workers == 0:
self.worker_init(-1)
self.broker_per_node = 1

def worker_init(self, worker_id):
# Configure PyTorch components
Expand Down Expand Up @@ -138,12 +138,10 @@ def read(self):
prefetch_factor = math.ceil(self._args.prefetch_size / self._args.read_threads)
else:
prefetch_factor = self._args.prefetch_size
if prefetch_factor > 0:
if self._args.my_rank == 0:
else:
if prefetch_factor <= 0:
prefetch_factor = 2
if self._args.my_rank == 0:
logging.debug(f"{utcnow()} Setup dataloader with {self._args.read_threads} workers {torch.__version__}")
logging.debug(f"{utcnow()} Setup dataloader with {self._args.read_threads} workers {torch.__version__}")
if self._args.read_threads==0:
kwargs={}
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,28 @@
"source": [
"# Using DYAD to accelerate distributed Deep Learning (DL) training\n",
"\n",
"Now that we have seen how Flux enables the management and deployment of services, let's look at an example of using DYAD, an advanced Flux service for runtime data movement, in a real world application. Specifically, we will show how DYAD speeds up distributed Deep Learning (DL) training. In this module, we cover these topics:\n",
"Now that we have seen how Flux enables the management and deployment of services, let's look at an example of using DYAD, an advanced Flux service for runtime data movement, in a real-world application. Specifically, we will show how DYAD speeds up distributed Deep Learning (DL) training. In this module, we cover these topics:\n",
"1. Design of DYAD\n",
"2. Distributed DL training\n",
"3. Deep Learning I/O (DLIO) benchmark\n",
"4. Accelerating distributed DL training\n",
"\n",
"## Design of DYAD\n",
"\n",
"DYAD provides transparent, locality-aware, write-once, read-many file caching that runs on top of local NVMe and other burst buffer-style technologies (e.g., El Capitan Rabbit nodes). Figure X shows the components of DYAD, including the DYAD service (implemented as a Flux broker module), the DYAD client, and DYAD's data transport layer. DYAD uses the Flux KVS to store metadata about tracked files, and it uses Flux's remote proceedure call capabilities to communicate between client and service. DYAD also uses UCX to perform RDMA-based data transfer to move files.\n",
"DYAD provides transparent, locality-aware, write-once, read-many file caching that runs on top of local NVMe and other burst buffer-style technologies (e.g., El Capitan Rabbit nodes). Figure 1 shows the components of DYAD, including the DYAD service (implemented as a Flux broker module), the DYAD client, and DYAD's data transport layer. DYAD uses the Flux KVS to store metadata about tracked files, and it uses Flux's remote procedure call capabilities to communicate between client and service. DYAD also uses UCX to perform RDMA-based data transfer to move files.\n",
"\n",
"<figure>\n",
"<img src=\"img/dyad-software-stack.png\" width=50% height=auto>\n",
"<figcaption>\n",
"<i>Image created by Ian Lumsden for a poster at SC'23</i></figcaption>\n",
"<i>Figure 1: Image created by Ian Lumsden for a poster at SC'23</i></figcaption>\n",
"</figure>\n",
"\n",
"DYAD is designed to accelerate large, distributed workloads, such as distributed Deep Learning (DL) training and scientific computing workflows, on HPC systems. It is also designed be transparent, which allows users to leverage DYAD with little to no code refactoring. Unlike similar tools (e.g., DataSpaces and UnifyFS), which tend to optimize for write performance, DYAD aims to provide good write **and read** performance. To optimize read performance, DYAD uses a locality-aware \"Hierarchical Data Locator,\" which prioritizes node-local metadata and data retrieval to minimize the amount of network communications. When moving data from another node, DYAD also uses a streaming RPC over RDMA protocol, which uses preallocated buffers and connection caching to maximize network bandwidth. This process is shown in the figure below:\n",
"\n",
"<figure>\n",
"<img src=\"img/dyad_design.png\">\n",
"<figcaption>\n",
"<i>Image created by Hari Devarajan for a paper submitted to SC'24</i></figcaption>\n",
"<i>Figure 2: Image created by Hari Devarajan for a paper submitted to SC'24</i></figcaption>\n",
"</figure>"
]
},
Expand All @@ -41,20 +41,20 @@
"source": [
"## Distributed DL Training\n",
"\n",
"Distributed DL training is an approach to speed up the training of large Deep Learning models by performing multiple epochs of training in parallel across multiple GPUs and, oftentimes, multiple nodes. This approach is supported by most major DL libraries, such as PyTorch and Tensorflow. In this module, we focus on PyTorch. When running training across multiple nodes and GPUs, PyTorch starts by spawning one process per GPU, called the worker. Each worker performs three major tasks:\n",
"Distributed DL training is an approach to speed up the training of large Deep Learning models by performing multiple epochs of training in parallel across multiple GPUs and, oftentimes, multiple nodes. Most major DL libraries, such as PyTorch and TensorFlow support this approach. In this module, we focus on PyTorch. When running training across multiple nodes and GPUs, PyTorch starts by spawning one process per GPU, called the worker. Each worker performs three major tasks:\n",
"1. Determining which samples from the dataset will comprise the batch for the next epoch of training (i.e., epoch *N+1*)\n",
"2. Reading these samples from the filesystem\n",
"3. Building a batch from these samples and moving the batch to the GPU\n",
"\n",
"To assist with reading the samples from the filesystem, each worker also spawns additional I/O processes. Each of these processes reads data and, optionally, transforms the data based on the configuration of the training pipeline. Figure X shows this process for a single GPU, a single worker, and a single spawned I/O process. In this figure, \"I/O\" indicates data being read from the filesystem, and \"Map\" indicates the optional transformation of data. \"Batch\" indicates the building of a batch from the read samples.\n",
"To assist with reading the samples from the filesystem, each worker also spawns additional I/O processes. Each of these processes reads data and, optionally, transforms the data based on the configuration of the training pipeline. Figure 3 shows this process for a single GPU, a single worker, and a single spawned I/O process. In this figure, \"I/O\" indicates data being read from the filesystem, and \"Map\" indicates the optional transformation of data. \"Batch\" indicates the building of a batch from the read samples.\n",
"\n",
"<figure>\n",
"<img src=\"img/dl-training-io.png\">\n",
"<figcaption>\n",
"<i>Image created by Ian Lumsden based on an image from <a href=\"https://towardsdatascience.com/building-efficient-data-pipelines-using-tensorflow-8f647f03b4ce\">this article</a></i></figcaption>\n",
"<i>Figure 3: Image created by Ian Lumsden based on an image from <a href=\"https://towardsdatascience.com/building-efficient-data-pipelines-using-tensorflow-8f647f03b4ce\">this article</a></i></figcaption>\n",
"</figure>\n",
"\n",
"One key difference between distributed DL training and many conventional HPC applications (e.g., MPI-based simulations) is the asynchronous loading of data by workers during training. In many conventional HPC applications, data loading and computation are performed one after the one. On the other hand, as shown in Figure X, the loading of data in distributed DL training is asynchronous. In other words, while the GPU is training the DL model for epoch *N*, the worker reading and creating the batch for epoch *N+1*. This asynchronous loading of data can lead to imbalance between data loading and training. For example, Figure X shows a scenario where the data loading takes longer than training, resulting in idle time on the GPU, wasted resources, and, overall, an I/O bound application.\n",
"One key difference between distributed DL training and many conventional HPC applications (e.g., MPI-based simulations) is the asynchronous loading of data by workers during training. In many conventional HPC applications, data loading and computation are performed one after the other. On the other hand, as shown in Figure 3, the loading of data in distributed DL training is asynchronous. In other words, while the GPU is training the DL model for epoch *N*, the worker is reading and creating the batch for epoch *N+1*. This asynchronous loading of data can lead to an imbalance between data loading and training. For example, Figure 3 shows a scenario where the data loading takes longer than training, resulting in idle time on the GPU, wasted resources, and, overall, an I/O bound application.\n",
"\n",
"At the end of each epoch of training, all workers and GPUs are synchronized so that the DL models from each GPU can be merged together. This synchronization and merging usually consists of an allreduce-style operation. This synchronization makes the effects of any imbalance between data loading and training more pronounced because, if even one worker and GPU become imbalanced, the performance of the entire distributed training will suffer."
]
Expand Down Expand Up @@ -112,7 +112,7 @@
"from pygments.formatters import HtmlFormatter\n",
"from IPython.display import display, HTML\n",
"\n",
"sys.path.insert(0, os.path.abspath(\"../dlio_extensions/\"))\n",
"sys.path.insert(0, os.path.abspath(\"dlio_extensions/\"))\n",
"\n",
"from dyad_torch_data_loader import DYADTorchDataset"
]
Expand Down Expand Up @@ -226,7 +226,7 @@
"workers_per_node = 1\n",
"dyad_install_prefix = \"/usr/local\"\n",
"num_nodes = 2\n",
"dlio_extensions_dir = \"/home/jovyan/flux-tutorial-2024/dlio_extensions\"\n",
"dlio_extensions_dir = \"/home/jovyan/supplementary/dyad/dlio_extensions\"\n",
"workload = \"dyad_unet3d_demo\""
]
},
Expand All @@ -249,9 +249,9 @@
"DYAD_KVS_NAMESPACE={kvs_namespace}\n",
"DYAD_DTL_MODE={dtl_mode}\n",
"DYAD_PATH={managed_directory}\n",
"LD_LIBRARY_PATH={dyad_install_prefix}/lib\n",
"LD_LIBRARY_PATH={dyad_install_prefix}/lib:$LD_LIBRARY_PATH\n",
"PYTHONPATH={dlio_extensions_dir}:$PYTHONPATH\n",
"DLIO_PROFILER_ENABLE=0\n",
"DFTRACER_ENABLE=0\n",
"\"\"\"\n",
"\n",
"with open(\"dlio_env.txt\", \"w\") as f:\n",
Expand Down Expand Up @@ -378,7 +378,7 @@
"metadata": {},
"outputs": [],
"source": [
"!flux run -N {num_nodes} -o cpu-affinity=on --tasks-per-node={workers_per_node} --env-file=dlio_env.txt \\\n",
"!flux run -N {num_nodes} -o cpu-affinity=off --tasks-per-node={workers_per_node} --env-file=dlio_env.txt \\\n",
" dlio_benchmark --config-dir={dlio_extensions_dir}/configs workload={workload} \\\n",
" ++workload.dataset.data_folder={initial_data_directory} ++workload.workflow.generate_data=False \\\n",
" ++workload.workflow.train=True\n",
Expand Down Expand Up @@ -465,33 +465,41 @@
"<figure>\n",
"<img src=\"img/dyad-unet3d-results.svg\">\n",
"<figcaption>\n",
"<i></i></figcaption>\n",
"<i>Figure 4: DYAD improves the epoch time of Unet3D by up to 10.82x due to locality-aware caching as compared to UnifyFS.</i></figcaption>\n",
"</figure>\n",
"\n",
"Figure X shows the performance of Lustre, [UnifyFS](https://ieeexplore.ieee.org/document/10177390), and DYAD in terms of runtime and I/O bandwidth for the full version of the 3D U-Net training. As explained on the [webpage for the KiTS19 Challenge](https://kits19.grand-challenge.org/), the dataset for the full version of this application consists of 10,240, NPZ-formatted image files, resulting in a total dataset size of 1.36 TB. Within each epoch of PyTorch-based training, the model processes batches of 4 images using 6 I/O processes per GPU. The model trains for 20 epochs without checkpointing. The model scales from 8 to 64 nodes of LLNL's [Corona](https://hpc.llnl.gov/hardware/compute-platforms/corona) supercomputer, with 8 GPUs per node.\n",
"Figure 4 shows the performance of Lustre, [UnifyFS](https://ieeexplore.ieee.org/document/10177390), and DYAD in terms of runtime and I/O bandwidth for the full version of the 3D U-Net training. As explained on the [webpage for the KiTS19 Challenge](https://kits19.grand-challenge.org/), the dataset for the full version of this application consists of 10,240, NPZ-formatted image files, resulting in a total dataset size of 1.36 TB. Within each epoch of PyTorch-based training, the model processes batches of 4 images using 6 I/O processes per GPU. The model trains for 20 epochs without checkpointing. The model scales from 8 to 64 nodes of LLNL's [Corona](https://hpc.llnl.gov/hardware/compute-platforms/corona) supercomputer, with 8 GPUs per node.\n",
"\n",
"In the leftmost plot of Figure X, we show the runtime of the training for Lustre, UnifyFS, and DYAD at 8, 16, 32, and 64 nodes. This plot shows that DYAD provides significant runtime improvement compared to Lustre and UnifyFS for the 3D U-Net, mainly due to locality optimizations. DYAD runs up to 7.5 times faster than Lustre and 1.88 times faster than UnifyFS, with less performance variability due to DYAD's use of node-local storage.\n",
"In the leftmost plot of Figure 4, we show the runtime of the training for Lustre, UnifyFS, and DYAD at 8, 16, 32, and 64 nodes. This plot shows that DYAD provides significant runtime improvement compared to Lustre and UnifyFS for the 3D U-Net, mainly due to locality optimizations. DYAD runs up to 7.5 times faster than Lustre and 1.88 times faster than UnifyFS, with less performance variability due to DYAD's use of node-local storage.\n",
"\n",
"In the middle plot of Figure X, we show the bandwidth per epoch of training across 512 GPUs (64 nodes). Because DYAD's capabilities allow for on-the-fly caching of data, its performance starts similar to that of Lustre. As more data is cached into DYAD, its bandwidth increases to 140 GB/s due to DYAD's streaming RPC over RDMA protocol. Finally, as even more data is cached, DYAD's bandwidth reaches 1409 GB/s because DYAD's locality-aware caching allows almost all sample reads to be performed directly on node-local NVMe. In comparison, both Lustre and Unify maintain consistent bandwidths well under those of DYAD. By the 20th epoch, DYAD speeds up training by 10.62 times compared to UnifyFS.\n",
"In the middle plot of Figure 4, we show the bandwidth per epoch of training across 512 GPUs (64 nodes). Because DYAD's capabilities allow for on-the-fly caching of data, its performance starts similar to that of Lustre. As more data is cached into DYAD, its bandwidth increases to 140 GB/s due to DYAD's streaming RPC over RDMA protocol. Finally, as even more data is cached, DYAD's bandwidth reaches 1409 GB/s because DYAD's locality-aware caching allows almost all sample reads to be performed directly on node-local NVMe. In comparison, both Lustre and Unify maintain consistent bandwidths well under those of DYAD. By the 20th epoch, DYAD speeds up training by 10.62 times compared to UnifyFS.\n",
"\n",
"Finally, in the rightmost plot of Figure X, we show how often DYAD retrieved data from node-local storage versus retrieving data from storage on a remote node in terms of percentage of data access requests. Initially, DYAD mostly performs remote requests. As training continues, more and more data is replicated with DYAD's locality-aware caching, resulting in a larger percentage of local requests. By epoch 13, almost all data is accessed through local requests. This transition from mostly remote requests to mostly local requests corresponds with the increase in bandwidth shown in the middle plot of Figure X."
"Finally, in the rightmost plot of Figure 4, we show how often DYAD retrieved data from node-local storage versus retrieving data from storage on a remote node in terms of the percentage of data access requests. Initially, DYAD mostly performs remote requests. As training continues, more and more data is replicated with DYAD's locality-aware caching, resulting in a larger percentage of local requests. By epoch 13, almost all data is accessed through local requests. This transition from mostly remote requests to mostly local requests corresponds with the increase in bandwidth shown in the middle plot of Figure 4."
]
},
{
"cell_type": "markdown",
"id": "81d7d87f-1e09-42c8-b165-8902551f6847",
"metadata": {},
"source": [
"# This concludes Module 3.\n",
"# This concludes Suppliment Module 1.\n",
"\n",
"In this module, we covered:\n",
"1. Design of DYAD\n",
"2. Distributed DL training\n",
"3. Deep Learning I/O (DLIO) benchmark\n",
"4. Accelerating distributed DL training\n",
"\n",
"To continue with the tutorial, open [Module 4](./04_flux_tutorial_conclusions.ipynb)"
"<!--To continue with the tutorial, open [Module 4](./04_flux_tutorial_conclusions.ipynb)-->"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "37fc415f-7972-4f44-ac1e-f3d1258345be",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand All @@ -510,7 +518,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
"version": "3.11.7"
}
},
"nbformat": 4,
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit d90c83a

Please sign in to comment.