From c776d36c77d1b00eef3d908a4a8fe1ba478f3efb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Mart=C3=ADn=20Bl=C3=A1zquez?= Date: Thu, 1 Aug 2024 14:38:17 +0200 Subject: [PATCH] Gather ray nodes gpus info (#848) --- src/distilabel/pipeline/ray.py | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/src/distilabel/pipeline/ray.py b/src/distilabel/pipeline/ray.py index 0b5094809c..f05110294c 100644 --- a/src/distilabel/pipeline/ray.py +++ b/src/distilabel/pipeline/ray.py @@ -175,7 +175,13 @@ def _init_ray(self) -> None: else: ray.init(**self._ray_init_kwargs) - self._ray_node_ids = {node["NodeID"]: False for node in ray.nodes()} + # Get the number of GPUs per Ray node + for node in ray.nodes(): + node_id = node["NodeID"] + gpus = node["Resources"].get("GPU", 0) + self._ray_node_ids[node_id] = gpus + + self._logger.info(f"Ray nodes GPUs: {self._ray_node_ids}") @property def QueueClass(self) -> Callable: @@ -290,11 +296,19 @@ def _create_vllm_placement_group( "pipeline_parallel_size", 1 ) - node_id = next( - node_id for node_id, used in self._ray_node_ids.items() if not used - ) - - self._ray_node_ids[node_id] = True + # TODO: this is suboptimal as placement groups can get distributed in a suboptimal + # way (some GPUs are not used and some LLMs cannot be loaded because they need 2 + # GPUs but there is 1 GPU in one node and 1 GPU in another node) + selected_node_id = None + for node_id, gpus in self._ray_node_ids.items(): + if gpus >= tensor_parallel_size: + self._logger.info( + f"Ray node with ID '{node_id}' has enough GPUs (needed: {tensor_parallel_size}," + f" available: {gpus}) for allocating `vLLM` used by '{step.name}' step." + ) + self._ray_node_ids[node_id] -= tensor_parallel_size + selected_node_id = node_id + break # Create a placement group pg = ray.util.placement_group( @@ -303,7 +317,9 @@ def _create_vllm_placement_group( # https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html#schedule-tasks-and-actors-to-placement-groups-use-reserved-resources bundles=[{"CPU": 1}] + [{"GPU": 1}] * tensor_parallel_size, strategy="SPREAD" if pipeline_parallel_size > 1 else "STRICT_PACK", - _soft_target_node_id=node_id if pipeline_parallel_size is None else None, + _soft_target_node_id=selected_node_id + if pipeline_parallel_size is None + else None, ) self._logger.info(