From ade46b5dfe27346eaf84acb59539913e2930b623 Mon Sep 17 00:00:00 2001 From: LeaveMyYard <33721692+LeaveMyYard@users.noreply.github.com> Date: Wed, 2 Aug 2023 15:14:16 +0300 Subject: [PATCH] Implement prometheus request batching --- .../integrations/prometheus/metrics/base.py | 76 +++++++++++++++++-- .../integrations/prometheus/metrics/cpu.py | 9 ++- .../integrations/prometheus/metrics/memory.py | 10 ++- .../prometheus_metrics_service.py | 24 ++++-- robusta_krr/strategies/simple.py | 6 +- 5 files changed, 100 insertions(+), 25 deletions(-) diff --git a/robusta_krr/core/integrations/prometheus/metrics/base.py b/robusta_krr/core/integrations/prometheus/metrics/base.py index d076ff47..b6be241b 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base.py @@ -4,8 +4,10 @@ import asyncio import datetime import enum +import copy from concurrent.futures import ThreadPoolExecutor from typing import Any, TYPE_CHECKING, Optional +from functools import reduce import numpy as np import pydantic as pd @@ -156,7 +158,6 @@ async def load_data( ) if result == []: - self.warning(f"{self.service_name} returned no {self.__class__.__name__} metrics for {object}") return {} return {pod_result["metric"]["pod"]: np.array(pod_result["values"], dtype=np.float64) for pod_result in result} @@ -177,7 +178,7 @@ class QueryMetric(PrometheusMetric): PrometheusSeries = Any -class FilterMetric(PrometheusMetric): +class FilterJobsMixin(PrometheusMetric): """ This is the version of the BasicMetricLoader, that filters out data, if multiple metrics with the same name were found. @@ -206,16 +207,16 @@ def filter_prom_jobs_results( return series_list_result target_names = { - FilterMetric.get_target_name(series) + FilterJobsMixin.get_target_name(series) for series in series_list_result - if FilterMetric.get_target_name(series) + if FilterJobsMixin.get_target_name(series) } return_list: list[PrometheusSeries] = [] # takes kubelet job if exists, return first job alphabetically if it doesn't for target_name in target_names: relevant_series = [ - series for series in series_list_result if FilterMetric.get_target_name(series) == target_name + series for series in series_list_result if FilterJobsMixin.get_target_name(series) == target_name ] relevant_kubelet_metric = [series for series in relevant_series if series["metric"].get("job") == "kubelet"] if len(relevant_kubelet_metric) == 1: @@ -228,3 +229,68 @@ def filter_prom_jobs_results( async def query_prometheus(self, data: PrometheusMetricData) -> list[PrometheusSeries]: result = await super().query_prometheus(data) return self.filter_prom_jobs_results(result) + + +class BatchedRequestMixin(PrometheusMetric): + """ + This type of PrometheusMetric is used to split the query into multiple queries, + each querying a subset of the pods of the object. + + The results of the queries are then combined into a single result. + + This is useful when the number of pods is too large for a single query. + """ + + pods_batch_size = 50 + + def combine_batches(self, results: list[PodsTimeData]) -> PodsTimeData: + """ + Combines the results of multiple queries into a single result. + + Args: + results (list[MetricPodData]): A list of query results. + + Returns: + MetricPodData: A combined result. + """ + + return reduce(lambda x, y: x | y, results, {}) + + @staticmethod + def _slice_object(object: K8sObjectData, s: slice) -> K8sObjectData: + obj_copy = copy.deepcopy(object) + obj_copy.pods = object.pods[s] + return obj_copy + + @staticmethod + def _split_objects(object: K8sObjectData, max_pods: int) -> list[K8sObjectData]: + """ + Splits the object into multiple objects, each containing at most max_pods pods. + + Args: + object (K8sObjectData): The object to split. + + Returns: + list[K8sObjectData]: A list of objects. + """ + return [ + BatchedRequestMixin._slice_object(object, slice(i, i + max_pods)) + for i in range(0, len(object.pods), max_pods) + ] + + async def load_data( + self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta + ) -> PodsTimeData: + splitted_objects = self._split_objects(object, self.pods_batch_size) + + # If we do not exceed the batch size, we can use the regular load_data method. + if len(splitted_objects) <= 1: + return await super().load_data(object, period, step) + + results = await asyncio.gather( + *[ + super(BatchedRequestMixin, self).load_data(splitted_object, period, step) + for splitted_object in splitted_objects + ] + ) + return self.combine_batches(results) diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu.py b/robusta_krr/core/integrations/prometheus/metrics/cpu.py index 3aab7b48..31cb38fa 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/cpu.py +++ b/robusta_krr/core/integrations/prometheus/metrics/cpu.py @@ -1,9 +1,10 @@ from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.abstract.strategies import PodsTimeData -from .base import QueryMetric, QueryRangeMetric, FilterMetric +from .base import QueryMetric, QueryRangeMetric, FilterJobsMixin, BatchedRequestMixin -class CPULoader(QueryRangeMetric, FilterMetric): +class CPULoader(QueryRangeMetric, FilterJobsMixin, BatchedRequestMixin): def get_query(self, object: K8sObjectData, resolution: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() @@ -21,7 +22,7 @@ def get_query(self, object: K8sObjectData, resolution: str) -> str: """ -class MaxCPULoader(QueryMetric, FilterMetric): +class MaxCPULoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin): def get_query(self, object: K8sObjectData, resolution: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() @@ -40,7 +41,7 @@ def get_query(self, object: K8sObjectData, resolution: str) -> str: def PercentileCPULoader(percentile: float) -> type[QueryMetric]: - class PercentileCPULoader(QueryMetric, FilterMetric): + class PercentileCPULoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin): def get_query(self, object: K8sObjectData, resolution: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory.py b/robusta_krr/core/integrations/prometheus/metrics/memory.py index 5fc6f732..c5685a93 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/memory.py +++ b/robusta_krr/core/integrations/prometheus/metrics/memory.py @@ -1,9 +1,11 @@ +from robusta_krr.core.abstract.strategies import PodsTimeData + from robusta_krr.core.models.objects import K8sObjectData -from .base import QueryMetric, QueryRangeMetric, FilterMetric +from .base import QueryMetric, QueryRangeMetric, FilterJobsMixin, BatchedRequestMixin -class MemoryLoader(QueryRangeMetric, FilterMetric): +class MemoryLoader(QueryRangeMetric, FilterJobsMixin, BatchedRequestMixin): def get_query(self, object: K8sObjectData, resolution: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() @@ -19,7 +21,7 @@ def get_query(self, object: K8sObjectData, resolution: str) -> str: """ -class MaxMemoryLoader(QueryMetric, FilterMetric): +class MaxMemoryLoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin): def get_query(self, object: K8sObjectData, resolution: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() @@ -36,7 +38,7 @@ def get_query(self, object: K8sObjectData, resolution: str) -> str: def PercentileMemoryLoader(percentile: float) -> type[QueryMetric]: - class PercentileMemoryLoader(QueryMetric, FilterMetric): + class PercentileMemoryLoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin): def get_query(self, object: K8sObjectData, resolution: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) cluster_label = self.get_prometheus_cluster_label() diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index b2cc1357..08f141b1 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -157,7 +157,14 @@ async def gather_data( self.debug(f"Gathering {LoaderClass.__name__} metric for {object}") metric_loader = LoaderClass(self.config, self.prometheus, self.name, self.executor) - return await metric_loader.load_data(object, period, step) + data = await metric_loader.load_data(object, period, step) + + if len(data) == 0: + self.warning( + f"{metric_loader.service_name} returned no {metric_loader.__class__.__name__} metrics for {object}" + ) + + return data async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None: """ @@ -176,13 +183,14 @@ async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> cluster_label = self.get_prometheus_cluster_label() if object.kind == "Deployment": replicasets = await self.query( - "kube_replicaset_owner{" - f'owner_name="{object.name}", ' - f'owner_kind="Deployment", ' - f'namespace="{object.namespace}"' - f"{cluster_label}" - "}" - f"[{period_literal}]" + f""" + kube_replicaset_owner{{ + owner_name="{object.name}", + owner_kind="Deployment", + namespace="{object.namespace}" + {cluster_label} + }}[{period_literal}] + """ ) pod_owners = [replicaset["metric"]["replicaset"] for replicaset in replicasets] pod_owner_kind = "ReplicaSet" diff --git a/robusta_krr/strategies/simple.py b/robusta_krr/strategies/simple.py index 5db22921..fa59fac2 100644 --- a/robusta_krr/strategies/simple.py +++ b/robusta_krr/strategies/simple.py @@ -1,4 +1,3 @@ -from typing import Sequence import numpy as np import pydantic as pd @@ -12,7 +11,6 @@ RunResult, StrategySettings, ) -from robusta_krr.core.abstract.metrics import BaseMetric from robusta_krr.core.integrations.prometheus.metrics import PercentileCPULoader, MaxMemoryLoader, PrometheusMetric @@ -27,7 +25,7 @@ def calculate_memory_proposal(self, data: PodsTimeData) -> float: if len(data_) == 0: return float("NaN") - return max(data_) * (1 + self.memory_buffer_percentage / 100) + return np.max(data_) * (1 + self.memory_buffer_percentage / 100) def calculate_cpu_proposal(self, data: PodsTimeData) -> float: if len(data) == 0: @@ -38,7 +36,7 @@ def calculate_cpu_proposal(self, data: PodsTimeData) -> float: else: data_ = list(data.values())[0][:, 1] - return np.percentile(data_, self.cpu_percentile) + return np.max(data_) class SimpleStrategy(BaseStrategy[SimpleStrategySettings]):