Skip to content

Commit

Permalink
Gather ray nodes gpus info (#848)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmbmb authored Aug 1, 2024
1 parent 3d2fca0 commit c776d36
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions src/distilabel/pipeline/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,13 @@ def _init_ray(self) -> None:
else:
ray.init(**self._ray_init_kwargs)

self._ray_node_ids = {node["NodeID"]: False for node in ray.nodes()}
# Get the number of GPUs per Ray node
for node in ray.nodes():
node_id = node["NodeID"]
gpus = node["Resources"].get("GPU", 0)
self._ray_node_ids[node_id] = gpus

self._logger.info(f"Ray nodes GPUs: {self._ray_node_ids}")

@property
def QueueClass(self) -> Callable:
Expand Down Expand Up @@ -290,11 +296,19 @@ def _create_vllm_placement_group(
"pipeline_parallel_size", 1
)

node_id = next(
node_id for node_id, used in self._ray_node_ids.items() if not used
)

self._ray_node_ids[node_id] = True
# TODO: this is suboptimal as placement groups can get distributed in a suboptimal
# way (some GPUs are not used and some LLMs cannot be loaded because they need 2
# GPUs but there is 1 GPU in one node and 1 GPU in another node)
selected_node_id = None
for node_id, gpus in self._ray_node_ids.items():
if gpus >= tensor_parallel_size:
self._logger.info(
f"Ray node with ID '{node_id}' has enough GPUs (needed: {tensor_parallel_size},"
f" available: {gpus}) for allocating `vLLM` used by '{step.name}' step."
)
self._ray_node_ids[node_id] -= tensor_parallel_size
selected_node_id = node_id
break

# Create a placement group
pg = ray.util.placement_group(
Expand All @@ -303,7 +317,9 @@ def _create_vllm_placement_group(
# https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html#schedule-tasks-and-actors-to-placement-groups-use-reserved-resources
bundles=[{"CPU": 1}] + [{"GPU": 1}] * tensor_parallel_size,
strategy="SPREAD" if pipeline_parallel_size > 1 else "STRICT_PACK",
_soft_target_node_id=node_id if pipeline_parallel_size is None else None,
_soft_target_node_id=selected_node_id
if pipeline_parallel_size is None
else None,
)

self._logger.info(
Expand Down

0 comments on commit c776d36

Please sign in to comment.