Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create file per hostname in CudaDevicePlacementMixin #814

Merged
merged 9 commits into from
Jul 27, 2024
20 changes: 19 additions & 1 deletion src/distilabel/llms/mixins/cuda_device_placement.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import json
import logging
import os
import socket
import tempfile
from contextlib import contextmanager
from pathlib import Path
Expand All @@ -26,7 +27,11 @@
from distilabel.mixins.runtime_parameters import RuntimeParameter

_CUDA_DEVICE_PLACEMENT_MIXIN_FILE = (
Path(tempfile.gettempdir()) / "distilabel_cuda_device_placement_mixin.json"
Path(tempfile.gettempdir())
/ "distilabel"
/ "cuda_device_placement"
/ socket.gethostname()
/ "distilabel_cuda_device_placement_mixin.json"
)


Expand All @@ -43,6 +48,8 @@ class CudaDevicePlacementMixin(BaseModel):
placement information provided in `_device_llm_placement_map`. If set to a list
of devices, it will be checked if the devices are available to be used by the
`LLM`. If not, a warning will be logged.
disable_cuda_device_placement: Whether to disable the CUDA device placement logic
or not. Defaults to `False`.
_llm_identifier: the identifier of the `LLM` to be used as key in `_device_llm_placement_map`.
_device_llm_placement_map: a dictionary with the device placement information for each
`LLM`.
Expand All @@ -51,6 +58,10 @@ class CudaDevicePlacementMixin(BaseModel):
cuda_devices: RuntimeParameter[Union[List[int], Literal["auto"]]] = Field(
default="auto", description="A list with the ID of the CUDA devices to be used."
)
disable_cuda_device_placement: RuntimeParameter[bool] = Field(
default=False,
description="Whether to disable the CUDA device placement logic or not.",
)

_llm_identifier: Union[str, None] = PrivateAttr(default=None)
_desired_num_gpus: PositiveInt = PrivateAttr(default=1)
Expand All @@ -63,6 +74,9 @@ def load(self) -> None:
"""Assign CUDA devices to the LLM based on the device placement information provided
in `_device_llm_placement_map`."""

if self.disable_cuda_device_placement:
return

try:
import pynvml

Expand All @@ -88,6 +102,9 @@ def load(self) -> None:
def unload(self) -> None:
"""Unloads the LLM and removes the CUDA devices assigned to it from the device
placement information provided in `_device_llm_placement_map`."""
if self.disable_cuda_device_placement:
return

with self._device_llm_placement_map() as device_map:
if self._llm_identifier in device_map:
self._logger.debug( # type: ignore
Expand All @@ -105,6 +122,7 @@ def _device_llm_placement_map(self) -> Generator[Dict[str, List[int]], None, Non
Yields:
The content of the device placement file.
"""
_CUDA_DEVICE_PLACEMENT_MIXIN_FILE.parent.mkdir(parents=True, exist_ok=True)
_CUDA_DEVICE_PLACEMENT_MIXIN_FILE.touch()
with portalocker.Lock(
_CUDA_DEVICE_PLACEMENT_MIXIN_FILE,
Expand Down
1 change: 1 addition & 0 deletions src/distilabel/pipeline/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def _run_step(self, step: "_Step", input_queue: "Queue[Any]", replica: int) -> N
output_queue=self._output_queue,
load_queue=self._load_queue,
dry_run=self._dry_run,
ray_pipeline=False,
)

self._pool.apply_async(step_wrapper.run, error_callback=self._error_callback)
Expand Down
1 change: 1 addition & 0 deletions src/distilabel/pipeline/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def run(self) -> str:
output_queue=self._output_queue,
load_queue=self._load_queue,
dry_run=self._dry_run,
ray_pipeline=True,
),
log_queue=self._log_queue,
)
Expand Down
24 changes: 18 additions & 6 deletions src/distilabel/pipeline/step_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from distilabel.pipeline.constants import LAST_BATCH_SENT_FLAG
from distilabel.pipeline.typing import StepLoadStatus
from distilabel.steps.base import GeneratorStep, Step, _Step
from distilabel.steps.tasks.base import Task
from distilabel.steps.tasks.base import _Task


class _StepWrapper:
Expand All @@ -44,6 +44,7 @@ def __init__(
output_queue: "Queue[_Batch]",
load_queue: "Queue[Union[StepLoadStatus, None]]",
dry_run: bool = False,
ray_pipeline: bool = False,
) -> None:
"""Initializes the `_ProcessWrapper`.

Expand All @@ -54,21 +55,32 @@ def __init__(
load_queue: The queue used to notify the main process that the step has been
loaded, has been unloaded or has failed to load.
dry_run: Flag to ensure we are forcing to run the last batch.
ray_pipeline: Whether the step is running a `RayPipeline` or not.
"""
self.step = step
self.replica = replica
self.input_queue = input_queue
self.output_queue = output_queue
self.load_queue = load_queue
self._dry_run = dry_run
self.dry_run = dry_run
self.ray_pipeline = ray_pipeline

self._init_cuda_device_placement()

def _init_cuda_device_placement(self) -> None:
"""Sets the LLM identifier and the number of desired GPUs of the `CudaDevicePlacementMixin`
if the step is a `_Task` that uses an `LLM` with CUDA capabilities."""
if (
isinstance(self.step, Task)
isinstance(self.step, _Task)
and hasattr(self.step, "llm")
and isinstance(self.step.llm, CudaDevicePlacementMixin)
):
self.step.llm._llm_identifier = self.step.name
self.step.llm._desired_num_gpus = self.step.resources.gpus
if self.ray_pipeline:
self.step.llm.disable_cuda_device_placement = True
else:
desired_num_gpus = self.step.resources.gpus or 1
self.step.llm._llm_identifier = self.step.name
self.step.llm._desired_num_gpus = desired_num_gpus

def run(self) -> str:
"""The target function executed by the process. This function will also handle
Expand Down Expand Up @@ -156,7 +168,7 @@ def _generator_step_process_loop(self) -> None:

for data, last_batch in step.process_applying_mappings(offset=offset):
batch.set_data([data])
batch.last_batch = self._dry_run or last_batch
batch.last_batch = self.dry_run or last_batch
self._send_batch(batch)

if batch.last_batch:
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/pipeline/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def test_run_steps(self, step_wrapper_mock: mock.MagicMock) -> None:
output_queue=pipeline._output_queue,
load_queue=pipeline._load_queue,
dry_run=False,
ray_pipeline=False,
),
mock.call(
step=dummy_step_1,
Expand All @@ -63,6 +64,7 @@ def test_run_steps(self, step_wrapper_mock: mock.MagicMock) -> None:
output_queue=pipeline._output_queue,
load_queue=pipeline._load_queue,
dry_run=False,
ray_pipeline=False,
),
mock.call(
step=dummy_step_1,
Expand All @@ -71,6 +73,7 @@ def test_run_steps(self, step_wrapper_mock: mock.MagicMock) -> None:
output_queue=pipeline._output_queue,
load_queue=pipeline._load_queue,
dry_run=False,
ray_pipeline=False,
),
mock.call(
step=dummy_step_2,
Expand All @@ -79,6 +82,7 @@ def test_run_steps(self, step_wrapper_mock: mock.MagicMock) -> None:
output_queue=pipeline._output_queue,
load_queue=pipeline._load_queue,
dry_run=False,
ray_pipeline=False,
),
],
)
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/steps/tasks/structured_outputs/test_outlines.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class DummyUserTest(BaseModel):
"device_map": None,
"token": None,
"use_magpie_template": False,
"disable_cuda_device_placement": False,
"type_info": {
"module": "distilabel.llms.huggingface.transformers",
"name": "TransformersLLM",
Expand All @@ -85,6 +86,7 @@ class DummyUserTest(BaseModel):
"device_map": None,
"token": None,
"use_magpie_template": False,
"disable_cuda_device_placement": False,
"type_info": {
"module": "distilabel.llms.huggingface.transformers",
"name": "TransformersLLM",
Expand Down
Loading