From e08eeb850f76e70cb8fd372869d54cd26738d2df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Tue, 23 Jul 2024 16:52:31 +0200 Subject: [PATCH 1/7] Create file per hostname --- src/distilabel/llms/mixins/cuda_device_placement.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/distilabel/llms/mixins/cuda_device_placement.py b/src/distilabel/llms/mixins/cuda_device_placement.py index caf9c17945..268a6a3066 100644 --- a/src/distilabel/llms/mixins/cuda_device_placement.py +++ b/src/distilabel/llms/mixins/cuda_device_placement.py @@ -15,6 +15,7 @@ import json import logging import os +import socket import tempfile from contextlib import contextmanager from pathlib import Path @@ -26,7 +27,11 @@ from distilabel.mixins.runtime_parameters import RuntimeParameter _CUDA_DEVICE_PLACEMENT_MIXIN_FILE = ( - Path(tempfile.gettempdir()) / "distilabel_cuda_device_placement_mixin.json" + Path(tempfile.gettempdir()) + / "distilabel" + / "cuda_device_placement" + / socket.gethostname() + / "distilabel_cuda_device_placement_mixin.json" ) @@ -105,6 +110,7 @@ def _device_llm_placement_map(self) -> Generator[Dict[str, List[int]], None, Non Yields: The content of the device placement file. """ + _CUDA_DEVICE_PLACEMENT_MIXIN_FILE.parent.mkdir(parents=True, exist_ok=True) _CUDA_DEVICE_PLACEMENT_MIXIN_FILE.touch() with portalocker.Lock( _CUDA_DEVICE_PLACEMENT_MIXIN_FILE, From c2ace431b5b94337a0c3f9ef109f0554073bba7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Tue, 23 Jul 2024 20:53:41 +0200 Subject: [PATCH 2/7] Set default `_desired_num_gpus` to `1` --- src/distilabel/pipeline/step_wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/distilabel/pipeline/step_wrapper.py b/src/distilabel/pipeline/step_wrapper.py index bc92d46705..19b22af7ba 100644 --- a/src/distilabel/pipeline/step_wrapper.py +++ b/src/distilabel/pipeline/step_wrapper.py @@ -68,7 +68,7 @@ def __init__( and isinstance(self.step.llm, CudaDevicePlacementMixin) ): self.step.llm._llm_identifier = self.step.name - self.step.llm._desired_num_gpus = self.step.resources.gpus + self.step.llm._desired_num_gpus = self.step.resources.gpus or 1 def run(self) -> str: """The target function executed by the process. This function will also handle From 0c837b965ea0f76c8fc98361a9bbcca2b2c5f491 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Wed, 24 Jul 2024 12:11:08 +0200 Subject: [PATCH 3/7] Fix `GeneratorTask`s not getting assigned gpus and name --- src/distilabel/pipeline/step_wrapper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/distilabel/pipeline/step_wrapper.py b/src/distilabel/pipeline/step_wrapper.py index 19b22af7ba..4bdbae3223 100644 --- a/src/distilabel/pipeline/step_wrapper.py +++ b/src/distilabel/pipeline/step_wrapper.py @@ -21,7 +21,7 @@ from distilabel.pipeline.constants import LAST_BATCH_SENT_FLAG from distilabel.pipeline.typing import StepLoadStatus from distilabel.steps.base import GeneratorStep, Step, _Step -from distilabel.steps.tasks.base import Task +from distilabel.steps.tasks.base import _Task class _StepWrapper: @@ -63,7 +63,7 @@ def __init__( self._dry_run = dry_run if ( - isinstance(self.step, Task) + isinstance(self.step, _Task) and hasattr(self.step, "llm") and isinstance(self.step.llm, CudaDevicePlacementMixin) ): From a7abc0ac217fb49717a221d0204a1136fb8ef242 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Wed, 24 Jul 2024 12:14:54 +0200 Subject: [PATCH 4/7] Add `_init_cuda_device_placement` method --- src/distilabel/pipeline/step_wrapper.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/distilabel/pipeline/step_wrapper.py b/src/distilabel/pipeline/step_wrapper.py index 4bdbae3223..32cd5ade26 100644 --- a/src/distilabel/pipeline/step_wrapper.py +++ b/src/distilabel/pipeline/step_wrapper.py @@ -62,13 +62,24 @@ def __init__( self.load_queue = load_queue self._dry_run = dry_run + self._init_cuda_device_placement() + + def _init_cuda_device_placement(self) -> None: + """Sets the LLM identifier and the number of desired GPUs of the `CudaDevicePlacementMixin` + if the step is a `_Task` that uses an `LLM` with CUDA capabilities.""" if ( isinstance(self.step, _Task) and hasattr(self.step, "llm") and isinstance(self.step.llm, CudaDevicePlacementMixin) ): + desired_num_gpus = self.step.resources.gpus or 1 + self.step._logger.info( + f"Step '{self.step.name}' is a `Task` using an `LLM` with CUDA capabilities." + f" Setting identifier to '{self.step.name}' and number of desired GPUs to" + f" {desired_num_gpus}." + ) self.step.llm._llm_identifier = self.step.name - self.step.llm._desired_num_gpus = self.step.resources.gpus or 1 + self.step.llm._desired_num_gpus = desired_num_gpus def run(self) -> str: """The target function executed by the process. This function will also handle From af5ff868f0879fdad2b15d56d1c615d8b118454e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Wed, 24 Jul 2024 12:52:47 +0200 Subject: [PATCH 5/7] Remove info message --- src/distilabel/pipeline/step_wrapper.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/distilabel/pipeline/step_wrapper.py b/src/distilabel/pipeline/step_wrapper.py index 32cd5ade26..01e12d4a64 100644 --- a/src/distilabel/pipeline/step_wrapper.py +++ b/src/distilabel/pipeline/step_wrapper.py @@ -73,11 +73,6 @@ def _init_cuda_device_placement(self) -> None: and isinstance(self.step.llm, CudaDevicePlacementMixin) ): desired_num_gpus = self.step.resources.gpus or 1 - self.step._logger.info( - f"Step '{self.step.name}' is a `Task` using an `LLM` with CUDA capabilities." - f" Setting identifier to '{self.step.name}' and number of desired GPUs to" - f" {desired_num_gpus}." - ) self.step.llm._llm_identifier = self.step.name self.step.llm._desired_num_gpus = desired_num_gpus From d96c21ed2f5c35d26ce13d5252933bd4cc034cc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Wed, 24 Jul 2024 13:43:53 +0200 Subject: [PATCH 6/7] Add disabling `CudaDevicePlacementMixin` if `RayPipeline` --- .../llms/mixins/cuda_device_placement.py | 12 ++++++++++++ src/distilabel/pipeline/local.py | 1 + src/distilabel/pipeline/ray.py | 1 + src/distilabel/pipeline/step_wrapper.py | 16 +++++++++++----- 4 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/distilabel/llms/mixins/cuda_device_placement.py b/src/distilabel/llms/mixins/cuda_device_placement.py index 268a6a3066..e5018951af 100644 --- a/src/distilabel/llms/mixins/cuda_device_placement.py +++ b/src/distilabel/llms/mixins/cuda_device_placement.py @@ -48,6 +48,8 @@ class CudaDevicePlacementMixin(BaseModel): placement information provided in `_device_llm_placement_map`. If set to a list of devices, it will be checked if the devices are available to be used by the `LLM`. If not, a warning will be logged. + disable_cuda_device_placement: Whether to disable the CUDA device placement logic + or not. Defaults to `False`. _llm_identifier: the identifier of the `LLM` to be used as key in `_device_llm_placement_map`. _device_llm_placement_map: a dictionary with the device placement information for each `LLM`. @@ -56,6 +58,10 @@ class CudaDevicePlacementMixin(BaseModel): cuda_devices: RuntimeParameter[Union[List[int], Literal["auto"]]] = Field( default="auto", description="A list with the ID of the CUDA devices to be used." ) + disable_cuda_device_placement: RuntimeParameter[bool] = Field( + default=False, + description="Whether to disable the CUDA device placement logic or not.", + ) _llm_identifier: Union[str, None] = PrivateAttr(default=None) _desired_num_gpus: PositiveInt = PrivateAttr(default=1) @@ -68,6 +74,9 @@ def load(self) -> None: """Assign CUDA devices to the LLM based on the device placement information provided in `_device_llm_placement_map`.""" + if self.disable_cuda_device_placement: + return + try: import pynvml @@ -93,6 +102,9 @@ def load(self) -> None: def unload(self) -> None: """Unloads the LLM and removes the CUDA devices assigned to it from the device placement information provided in `_device_llm_placement_map`.""" + if self.disable_cuda_device_placement: + return + with self._device_llm_placement_map() as device_map: if self._llm_identifier in device_map: self._logger.debug( # type: ignore diff --git a/src/distilabel/pipeline/local.py b/src/distilabel/pipeline/local.py index 2142eedfa2..2f39ac3182 100644 --- a/src/distilabel/pipeline/local.py +++ b/src/distilabel/pipeline/local.py @@ -233,6 +233,7 @@ def _run_step(self, step: "_Step", input_queue: "Queue[Any]", replica: int) -> N output_queue=self._output_queue, load_queue=self._load_queue, dry_run=self._dry_run, + ray_pipeline=False, ) self._pool.apply_async(step_wrapper.run, error_callback=self._error_callback) diff --git a/src/distilabel/pipeline/ray.py b/src/distilabel/pipeline/ray.py index 2ff95ac886..7ec0dab9d3 100644 --- a/src/distilabel/pipeline/ray.py +++ b/src/distilabel/pipeline/ray.py @@ -235,6 +235,7 @@ def run(self) -> str: output_queue=self._output_queue, load_queue=self._load_queue, dry_run=self._dry_run, + ray_pipeline=True, ), log_queue=self._log_queue, ) diff --git a/src/distilabel/pipeline/step_wrapper.py b/src/distilabel/pipeline/step_wrapper.py index 01e12d4a64..d51161ec44 100644 --- a/src/distilabel/pipeline/step_wrapper.py +++ b/src/distilabel/pipeline/step_wrapper.py @@ -44,6 +44,7 @@ def __init__( output_queue: "Queue[_Batch]", load_queue: "Queue[Union[StepLoadStatus, None]]", dry_run: bool = False, + ray_pipeline: bool = False, ) -> None: """Initializes the `_ProcessWrapper`. @@ -54,13 +55,15 @@ def __init__( load_queue: The queue used to notify the main process that the step has been loaded, has been unloaded or has failed to load. dry_run: Flag to ensure we are forcing to run the last batch. + ray_pipeline: Whether the step is running a `RayPipeline` or not. """ self.step = step self.replica = replica self.input_queue = input_queue self.output_queue = output_queue self.load_queue = load_queue - self._dry_run = dry_run + self.dry_run = dry_run + self.ray_pipeline = ray_pipeline self._init_cuda_device_placement() @@ -72,9 +75,12 @@ def _init_cuda_device_placement(self) -> None: and hasattr(self.step, "llm") and isinstance(self.step.llm, CudaDevicePlacementMixin) ): - desired_num_gpus = self.step.resources.gpus or 1 - self.step.llm._llm_identifier = self.step.name - self.step.llm._desired_num_gpus = desired_num_gpus + if self.ray_pipeline: + self.step.llm.disable_cuda_device_placement = True + else: + desired_num_gpus = self.step.resources.gpus or 1 + self.step.llm._llm_identifier = self.step.name + self.step.llm._desired_num_gpus = desired_num_gpus def run(self) -> str: """The target function executed by the process. This function will also handle @@ -162,7 +168,7 @@ def _generator_step_process_loop(self) -> None: for data, last_batch in step.process_applying_mappings(offset=offset): batch.set_data([data]) - batch.last_batch = self._dry_run or last_batch + batch.last_batch = self.dry_run or last_batch self._send_batch(batch) if batch.last_batch: From 7bee86c6e494e22f02b5231c82918e5d85dd007b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Sat, 27 Jul 2024 20:26:22 +0200 Subject: [PATCH 7/7] Fix unit test --- tests/unit/pipeline/test_local.py | 4 ++++ tests/unit/steps/tasks/structured_outputs/test_outlines.py | 2 ++ 2 files changed, 6 insertions(+) diff --git a/tests/unit/pipeline/test_local.py b/tests/unit/pipeline/test_local.py index 7e9d993ac9..d661f21a95 100644 --- a/tests/unit/pipeline/test_local.py +++ b/tests/unit/pipeline/test_local.py @@ -55,6 +55,7 @@ def test_run_steps(self, step_wrapper_mock: mock.MagicMock) -> None: output_queue=pipeline._output_queue, load_queue=pipeline._load_queue, dry_run=False, + ray_pipeline=False, ), mock.call( step=dummy_step_1, @@ -63,6 +64,7 @@ def test_run_steps(self, step_wrapper_mock: mock.MagicMock) -> None: output_queue=pipeline._output_queue, load_queue=pipeline._load_queue, dry_run=False, + ray_pipeline=False, ), mock.call( step=dummy_step_1, @@ -71,6 +73,7 @@ def test_run_steps(self, step_wrapper_mock: mock.MagicMock) -> None: output_queue=pipeline._output_queue, load_queue=pipeline._load_queue, dry_run=False, + ray_pipeline=False, ), mock.call( step=dummy_step_2, @@ -79,6 +82,7 @@ def test_run_steps(self, step_wrapper_mock: mock.MagicMock) -> None: output_queue=pipeline._output_queue, load_queue=pipeline._load_queue, dry_run=False, + ray_pipeline=False, ), ], ) diff --git a/tests/unit/steps/tasks/structured_outputs/test_outlines.py b/tests/unit/steps/tasks/structured_outputs/test_outlines.py index 0e488eea13..b940cee321 100644 --- a/tests/unit/steps/tasks/structured_outputs/test_outlines.py +++ b/tests/unit/steps/tasks/structured_outputs/test_outlines.py @@ -59,6 +59,7 @@ class DummyUserTest(BaseModel): "device_map": None, "token": None, "use_magpie_template": False, + "disable_cuda_device_placement": False, "type_info": { "module": "distilabel.llms.huggingface.transformers", "name": "TransformersLLM", @@ -85,6 +86,7 @@ class DummyUserTest(BaseModel): "device_map": None, "token": None, "use_magpie_template": False, + "disable_cuda_device_placement": False, "type_info": { "module": "distilabel.llms.huggingface.transformers", "name": "TransformersLLM",