Skip to content

Commit

Permalink
Merge pull request #120 from robusta-dev/metric-query-batching
Browse files Browse the repository at this point in the history
Implement prometheus request batching
  • Loading branch information
LeaveMyYard authored Aug 10, 2023
2 parents 842d97e + 123e8e1 commit 80b958e
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 26 deletions.
78 changes: 72 additions & 6 deletions robusta_krr/core/integrations/prometheus/metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import asyncio
import datetime
import enum
import copy
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Any, Optional
from typing import Any, TYPE_CHECKING, Optional
from functools import reduce

import numpy as np
import pydantic as pd
Expand Down Expand Up @@ -156,7 +158,6 @@ async def load_data(
)

if result == []:
self.warning(f"{self.service_name} returned no {self.__class__.__name__} metrics for {object}")
return {}

return {pod_result["metric"]["pod"]: np.array(pod_result["values"], dtype=np.float64) for pod_result in result}
Expand All @@ -177,7 +178,7 @@ class QueryMetric(PrometheusMetric):
PrometheusSeries = Any


class FilterMetric(PrometheusMetric):
class FilterJobsMixin(PrometheusMetric):
"""
This is the version of the BasicMetricLoader, that filters out data,
if multiple metrics with the same name were found.
Expand Down Expand Up @@ -206,16 +207,16 @@ def filter_prom_jobs_results(
return series_list_result

target_names = {
FilterMetric.get_target_name(series)
FilterJobsMixin.get_target_name(series)
for series in series_list_result
if FilterMetric.get_target_name(series)
if FilterJobsMixin.get_target_name(series)
}
return_list: list[PrometheusSeries] = []

# takes kubelet job if exists, return first job alphabetically if it doesn't
for target_name in target_names:
relevant_series = [
series for series in series_list_result if FilterMetric.get_target_name(series) == target_name
series for series in series_list_result if FilterJobsMixin.get_target_name(series) == target_name
]
relevant_kubelet_metric = [series for series in relevant_series if series["metric"].get("job") == "kubelet"]
if len(relevant_kubelet_metric) == 1:
Expand All @@ -228,3 +229,68 @@ def filter_prom_jobs_results(
async def query_prometheus(self, data: PrometheusMetricData) -> list[PrometheusSeries]:
result = await super().query_prometheus(data)
return self.filter_prom_jobs_results(result)


class BatchedRequestMixin(PrometheusMetric):
"""
This type of PrometheusMetric is used to split the query into multiple queries,
each querying a subset of the pods of the object.
The results of the queries are then combined into a single result.
This is useful when the number of pods is too large for a single query.
"""

pods_batch_size = 50

def combine_batches(self, results: list[PodsTimeData]) -> PodsTimeData:
"""
Combines the results of multiple queries into a single result.
Args:
results (list[MetricPodData]): A list of query results.
Returns:
MetricPodData: A combined result.
"""

return reduce(lambda x, y: x | y, results, {})

@staticmethod
def _slice_object(object: K8sObjectData, s: slice) -> K8sObjectData:
obj_copy = copy.deepcopy(object)
obj_copy.pods = object.pods[s]
return obj_copy

@staticmethod
def _split_objects(object: K8sObjectData, max_pods: int) -> list[K8sObjectData]:
"""
Splits the object into multiple objects, each containing at most max_pods pods.
Args:
object (K8sObjectData): The object to split.
Returns:
list[K8sObjectData]: A list of objects.
"""
return [
BatchedRequestMixin._slice_object(object, slice(i, i + max_pods))
for i in range(0, len(object.pods), max_pods)
]

async def load_data(
self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta
) -> PodsTimeData:
splitted_objects = self._split_objects(object, self.pods_batch_size)

# If we do not exceed the batch size, we can use the regular load_data method.
if len(splitted_objects) <= 1:
return await super().load_data(object, period, step)

results = await asyncio.gather(
*[
super(BatchedRequestMixin, self).load_data(splitted_object, period, step)
for splitted_object in splitted_objects
]
)
return self.combine_batches(results)
9 changes: 5 additions & 4 deletions robusta_krr/core/integrations/prometheus/metrics/cpu.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from robusta_krr.core.models.objects import K8sObjectData
from robusta_krr.core.abstract.strategies import PodsTimeData

from .base import FilterMetric, QueryMetric, QueryRangeMetric
from .base import QueryMetric, QueryRangeMetric, FilterJobsMixin, BatchedRequestMixin


class CPULoader(QueryRangeMetric, FilterMetric):
class CPULoader(QueryRangeMetric, FilterJobsMixin, BatchedRequestMixin):
def get_query(self, object: K8sObjectData, resolution: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand All @@ -21,7 +22,7 @@ def get_query(self, object: K8sObjectData, resolution: str) -> str:
"""


class MaxCPULoader(QueryMetric, FilterMetric):
class MaxCPULoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin):
def get_query(self, object: K8sObjectData, resolution: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand All @@ -40,7 +41,7 @@ def get_query(self, object: K8sObjectData, resolution: str) -> str:


def PercentileCPULoader(percentile: float) -> type[QueryMetric]:
class PercentileCPULoader(QueryMetric, FilterMetric):
class PercentileCPULoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin):
def get_query(self, object: K8sObjectData, resolution: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand Down
10 changes: 6 additions & 4 deletions robusta_krr/core/integrations/prometheus/metrics/memory.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from robusta_krr.core.abstract.strategies import PodsTimeData

from robusta_krr.core.models.objects import K8sObjectData

from .base import FilterMetric, QueryMetric, QueryRangeMetric
from .base import QueryMetric, QueryRangeMetric, FilterJobsMixin, BatchedRequestMixin


class MemoryLoader(QueryRangeMetric, FilterMetric):
class MemoryLoader(QueryRangeMetric, FilterJobsMixin, BatchedRequestMixin):
def get_query(self, object: K8sObjectData, resolution: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand All @@ -19,7 +21,7 @@ def get_query(self, object: K8sObjectData, resolution: str) -> str:
"""


class MaxMemoryLoader(QueryMetric, FilterMetric):
class MaxMemoryLoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin):
def get_query(self, object: K8sObjectData, resolution: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand All @@ -36,7 +38,7 @@ def get_query(self, object: K8sObjectData, resolution: str) -> str:


def PercentileMemoryLoader(percentile: float) -> type[QueryMetric]:
class PercentileMemoryLoader(QueryMetric, FilterMetric):
class PercentileMemoryLoader(QueryMetric, FilterJobsMixin, BatchedRequestMixin):
def get_query(self, object: K8sObjectData, resolution: str) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,14 @@ async def gather_data(
self.debug(f"Gathering {LoaderClass.__name__} metric for {object}")

metric_loader = LoaderClass(self.config, self.prometheus, self.name, self.executor)
return await metric_loader.load_data(object, period, step)
data = await metric_loader.load_data(object, period, step)

if len(data) == 0:
self.warning(
f"{metric_loader.service_name} returned no {metric_loader.__class__.__name__} metrics for {object}"
)

return data

async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> None:
"""
Expand All @@ -159,13 +166,14 @@ async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) ->
cluster_label = self.get_prometheus_cluster_label()
if object.kind == "Deployment":
replicasets = await self.query(
"kube_replicaset_owner{"
f'owner_name="{object.name}", '
f'owner_kind="Deployment", '
f'namespace="{object.namespace}"'
f"{cluster_label}"
"}"
f"[{period_literal}]"
f"""
kube_replicaset_owner{{
owner_name="{object.name}",
owner_kind="Deployment",
namespace="{object.namespace}"
{cluster_label}
}}[{period_literal}]
"""
)
pod_owners = [replicaset["metric"]["replicaset"] for replicaset in replicasets]
pod_owner_kind = "ReplicaSet"
Expand Down
6 changes: 2 additions & 4 deletions robusta_krr/strategies/simple.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from typing import Sequence
import numpy as np
import pydantic as pd

Expand All @@ -12,7 +11,6 @@
RunResult,
StrategySettings,
)
from robusta_krr.core.abstract.metrics import BaseMetric
from robusta_krr.core.integrations.prometheus.metrics import PercentileCPULoader, MaxMemoryLoader, PrometheusMetric


Expand All @@ -27,7 +25,7 @@ def calculate_memory_proposal(self, data: PodsTimeData) -> float:
if len(data_) == 0:
return float("NaN")

return max(data_) * (1 + self.memory_buffer_percentage / 100)
return np.max(data_) * (1 + self.memory_buffer_percentage / 100)

def calculate_cpu_proposal(self, data: PodsTimeData) -> float:
if len(data) == 0:
Expand All @@ -38,7 +36,7 @@ def calculate_cpu_proposal(self, data: PodsTimeData) -> float:
else:
data_ = list(data.values())[0][:, 1]

return np.percentile(data_, self.cpu_percentile)
return np.max(data_)


class SimpleStrategy(BaseStrategy[SimpleStrategySettings]):
Expand Down

0 comments on commit 80b958e

Please sign in to comment.