Skip to content

Commit

Permalink
Merge branch 'main' into hpa-v1-fallback
Browse files Browse the repository at this point in the history
  • Loading branch information
LeaveMyYard committed Jul 11, 2023
2 parents cd74b96 + d120a07 commit c137f76
Show file tree
Hide file tree
Showing 20 changed files with 297 additions and 50 deletions.
12 changes: 12 additions & 0 deletions docker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Dockerfiles for specific clouds

This directory will include Dockerfiles for various cloud providers.

## AWS

For the usage of `krr` container we need the Dockerfile to have `awscli` installed on it.
The `aws.Dockerfile` is a modified `krr` dockerfile which includes:
- installation of curl & zip
- installation of awscli


28 changes: 28 additions & 0 deletions docker/aws.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use the official Python 3.9 slim image as the base image
FROM python:3.9-slim as builder

# Set the working directory
WORKDIR /app

# Install system dependencies required for Poetry
RUN apt-get update && \
dpkg --add-architecture arm64

COPY ./requirements.txt requirements.txt

# Install the project dependencies
RUN pip install -r requirements.txt

# Install curl and unzip for awscli
RUN apt-get -y update; apt-get -y install curl; apt-get -y install unzip

# Download awscli and unzip it
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
unzip awscliv2.zip && \
./aws/install

# Copy the rest of the application code
COPY . .

# Run the application using 'poetry run krr simple'
ENTRYPOINT ["python", "krr.py", "simple"]
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ multi_line_output = 3
include_trailing_comma = true

[tool.mypy]
plugins = "numpy.typing.mypy_plugin"
plugins = "numpy.typing.mypy_plugin,pydantic.mypy"

[tool.poetry.scripts]
krr = "robusta_krr.main:run"
Expand Down
34 changes: 29 additions & 5 deletions robusta_krr/core/integrations/kubernetes.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,35 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
import itertools
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Union

from kubernetes import client, config # type: ignore
from kubernetes.client import ApiException # type: ignore
from kubernetes.client import ApiException
from kubernetes.client.models import (
V1Container,
V1DaemonSet,
V1DaemonSetList,
V1Deployment,
V1DeploymentList,
V1Job,
V1JobList,
V1LabelSelector,
V1PodList,
V1Pod,
V1Job,
V1PodList,
V1StatefulSet,
V1StatefulSetList,
V1HorizontalPodAutoscalerList,
V2HorizontalPodAutoscaler,
V2HorizontalPodAutoscalerList,
)

from robusta_krr.core.models.objects import K8sObjectData, PodData, HPAData
from robusta_krr.core.models.objects import HPAData, K8sObjectData, PodData
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.configurable import Configurable


from .rollout import RolloutAppsV1Api

AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job]
HPAKey = tuple[str, str, str]

Expand All @@ -45,6 +47,7 @@ def __init__(self, cluster: Optional[str], *args, **kwargs):
else None
)
self.apps = client.AppsV1Api(api_client=self.api_client)
self.rollout = RolloutAppsV1Api(api_client=self.api_client)
self.batch = client.BatchV1Api(api_client=self.api_client)
self.core = client.CoreV1Api(api_client=self.api_client)
self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client)
Expand All @@ -64,10 +67,12 @@ async def list_scannable_objects(self) -> list[K8sObjectData]:
self.__hpa_list = await self.__list_hpa()
objects_tuple = await asyncio.gather(
self._list_deployments(),
self._list_rollouts(),
self._list_all_statefulsets(),
self._list_all_daemon_set(),
self._list_all_jobs(),
)

except Exception as e:
self.error(f"Error trying to list pods in cluster {self.cluster}: {e}")
self.debug_exception()
Expand Down Expand Up @@ -150,6 +155,25 @@ async def _list_deployments(self) -> list[K8sObjectData]:
]
)

async def _list_rollouts(self) -> list[K8sObjectData]:
try:
ret: V1DeploymentList = await asyncio.to_thread(self.rollout.list_rollout_for_all_namespaces, watch=False)
except ApiException as e:
if e.status == 404:
self.debug(f"Rollout API not available in {self.cluster}")
return []
raise

self.debug(f"Found {len(ret.items)} rollouts in {self.cluster}")

return await asyncio.gather(
*[
self.__build_obj(item, container)
for item in ret.items
for container in item.spec.template.spec.containers
]
)

async def _list_all_statefulsets(self) -> list[K8sObjectData]:
self.debug(f"Listing statefulsets in {self.cluster}")
loop = asyncio.get_running_loop()
Expand Down
14 changes: 8 additions & 6 deletions robusta_krr/core/integrations/prometheus/loader.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import asyncio
import datetime
from typing import Optional, no_type_check

from concurrent.futures import ThreadPoolExecutor
from typing import Optional

from kubernetes import config as k8s_config
from kubernetes.client.api_client import ApiClient
Expand Down Expand Up @@ -49,10 +47,12 @@ def __init__(
if cluster is not None
else None
)
self.loader = self.get_metrics_service(config, api_client=self.api_client, cluster=cluster)
if not self.loader:
loader = self.get_metrics_service(config, api_client=self.api_client, cluster=cluster)
if loader is None:
raise PrometheusNotFound("No Prometheus or metrics service found")

self.loader = loader

self.info(f"{self.loader.name()} connected successfully for {cluster or 'default'} cluster")

def get_metrics_service(
Expand All @@ -68,9 +68,11 @@ def get_metrics_service(
self.echo(f"{service_name} found")
loader.validate_cluster_name()
return loader
except MetricsNotFound as e:
except MetricsNotFound:
self.debug(f"{service_name} not found")

return None

async def gather_data(
self,
object: K8sObjectData,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any, Optional

from robusta_krr.core.abstract.strategies import Metric

from .base_metric import BaseMetricLoader, QueryType
Expand Down
16 changes: 11 additions & 5 deletions robusta_krr/core/integrations/prometheus/metrics/base_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import abc
import asyncio
from concurrent.futures import ThreadPoolExecutor
import datetime
from typing import TYPE_CHECKING, Callable, Optional, TypeVar
import enum
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Callable, Optional, TypeVar

import numpy as np

from robusta_krr.core.abstract.strategies import Metric, ResourceHistoryData
from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData
Expand All @@ -15,7 +17,8 @@
if TYPE_CHECKING:
from .. import CustomPrometheusConnect

MetricsDictionary = dict[str, type[BaseMetricLoader]]

MetricsDictionary = dict[str, type["BaseMetricLoader"]]


class QueryType(str, enum.Enum):
Expand Down Expand Up @@ -72,6 +75,9 @@ def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:

pass

def get_query_type(self) -> QueryType:
return QueryType.QueryRange

def get_graph_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
"""
This method should be implemented by all subclasses to provide a query string in the metadata to produce relevant graphs.
Expand Down Expand Up @@ -158,7 +164,7 @@ async def load_data(
step=self._step_to_string(step),
)
result = await self.query_prometheus(metric=metric, query_type=query_type)
# adding the query in the results for a graph
# adding the query in the results for a graph
metric.query = self.get_graph_query(object, resolution)

if result == []:
Expand All @@ -173,7 +179,7 @@ async def load_data(
)

@staticmethod
def get_by_resource(resource: str, strategy: Optional[str]) -> type[BaseMetricLoader]:
def get_by_resource(resource: str, strategy: str) -> type[BaseMetricLoader]:
"""
Fetches the metric loader corresponding to the specified resource.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Optional

from robusta_krr.core.models.allocations import ResourceType
from robusta_krr.core.models.objects import K8sObjectData

from .base_filtered_metric import BaseFilteredMetricLoader
from .base_metric import bind_metric, QueryType
from .base_metric import QueryType, bind_metric


@bind_metric(ResourceType.CPU)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Optional

from robusta_krr.core.models.allocations import ResourceType
from robusta_krr.core.models.objects import K8sObjectData

from .base_filtered_metric import BaseFilteredMetricLoader
from .base_metric import bind_metric, QueryType, override_metric
from .base_metric import QueryType, bind_metric, override_metric


@bind_metric(ResourceType.Memory)
Expand All @@ -23,9 +24,10 @@ def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
def get_query_type(self) -> QueryType:
return QueryType.QueryRange


# This is a temporary solutions, metric loaders will be moved to strategy in the future
@override_metric("simple", ResourceType.Memory)
class MemoryMetricLoader(MemoryMetricLoader):
class SimpleMemoryMetricLoader(MemoryMetricLoader):
"""
A class that overrides the memory metric on the simple strategy.
"""
Expand All @@ -35,17 +37,17 @@ def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
cluster_label = self.get_prometheus_cluster_label()
resolution_formatted = f"[{resolution}]" if resolution else ""
return (
f"max(max_over_time(container_memory_working_set_bytes{{"
f"max_over_time(container_memory_working_set_bytes{{"
f'namespace="{object.namespace}", '
f'pod=~"{pods_selector}", '
f'container="{object.container}"'
f"{cluster_label}}}"
f"{resolution_formatted}"
f")) by (container, pod, job, id)"
f")"
)

def get_query_type(self) -> QueryType:
return QueryType.Query

def get_graph_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
def get_graph_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
return super().get_query(object, resolution)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import abc
from concurrent.futures import ThreadPoolExecutor
import datetime
from concurrent.futures import ThreadPoolExecutor
from typing import List, Optional

from kubernetes.client.api_client import ApiClient
Expand Down Expand Up @@ -42,7 +42,7 @@ def name(self) -> str:
return classname.replace("MetricsService", "") if classname != MetricsService.__name__ else classname

@abc.abstractmethod
async def get_cluster_names(self) -> Optional[List[str]]:
def get_cluster_names(self) -> Optional[List[str]]:
...

@abc.abstractmethod
Expand All @@ -51,7 +51,6 @@ async def gather_data(
object: K8sObjectData,
resource: ResourceType,
period: datetime.timedelta,
*,
step: datetime.timedelta = datetime.timedelta(minutes=30),
) -> ResourceHistoryData:
...
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
import datetime
from typing import List, Optional, Type
from concurrent.futures import ThreadPoolExecutor

from kubernetes.client import ApiClient
from prometheus_api_client import PrometheusApiClientException
Expand All @@ -11,14 +11,14 @@
from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData, PodData
from robusta_krr.core.models.result import ResourceType
from robusta_krr.utils.service_discovery import ServiceDiscovery
from robusta_krr.utils.service_discovery import MetricsServiceDiscovery

from ..metrics import BaseMetricLoader
from ..prometheus_client import CustomPrometheusConnect
from ..prometheus_client import ClusterNotSpecifiedException, CustomPrometheusConnect
from .base_metric_service import MetricsNotFound, MetricsService


class PrometheusDiscovery(ServiceDiscovery):
class PrometheusDiscovery(MetricsServiceDiscovery):
def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]:
"""
Finds the Prometheus URL using selectors.
Expand Down Expand Up @@ -60,7 +60,7 @@ def __init__(
*,
cluster: Optional[str] = None,
api_client: Optional[ApiClient] = None,
service_discovery: Type[ServiceDiscovery] = PrometheusDiscovery,
service_discovery: Type[MetricsServiceDiscovery] = PrometheusDiscovery,
executor: Optional[ThreadPoolExecutor] = None,
) -> None:
super().__init__(config=config, api_client=api_client, cluster=cluster, executor=executor)
Expand All @@ -87,7 +87,7 @@ def __init__(

if self.auth_header:
headers = {"Authorization": self.auth_header}
elif not self.config.inside_cluster:
elif not self.config.inside_cluster and self.api_client is not None:
self.api_client.update_params_for_auth(headers, {}, ["BearerToken"])

self.prometheus = CustomPrometheusConnect(url=self.url, disable_ssl=not self.ssl_enabled, headers=headers)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Optional

from kubernetes.client import ApiClient
from requests.exceptions import ConnectionError, HTTPError
from concurrent.futures import ThreadPoolExecutor

from robusta_krr.core.models.config import Config
from robusta_krr.utils.service_discovery import ServiceDiscovery
from robusta_krr.utils.service_discovery import MetricsServiceDiscovery

from .prometheus_metrics_service import MetricsNotFound, PrometheusMetricsService


class ThanosMetricsDiscovery(ServiceDiscovery):
class ThanosMetricsDiscovery(MetricsServiceDiscovery):
def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]:
"""
Finds the Thanos URL using selectors.
Expand Down
Loading

0 comments on commit c137f76

Please sign in to comment.