diff --git a/poetry.lock b/poetry.lock index d245f611..91a3c836 100644 --- a/poetry.lock +++ b/poetry.lock @@ -109,6 +109,46 @@ d = ["aiohttp (>=3.7.4)"] jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"] uvloop = ["uvloop (>=0.15.2)"] +[[package]] +name = "boto3" +version = "1.28.10" +description = "The AWS SDK for Python" +category = "main" +optional = false +python-versions = ">= 3.7" +files = [ + {file = "boto3-1.28.10-py3-none-any.whl", hash = "sha256:67001b3f512cbe2e00e352c65fb443b504e5e388fee39d73bcc42da1ae87d9e3"}, + {file = "boto3-1.28.10.tar.gz", hash = "sha256:cb8af03f553f1c7db7137bc897785baeeaa97b8fde483eb1cdb1f1ef3cec9cb7"}, +] + +[package.dependencies] +botocore = ">=1.31.10,<1.32.0" +jmespath = ">=0.7.1,<2.0.0" +s3transfer = ">=0.6.0,<0.7.0" + +[package.extras] +crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] + +[[package]] +name = "botocore" +version = "1.31.10" +description = "Low-level, data-driven core of boto 3." +category = "main" +optional = false +python-versions = ">= 3.7" +files = [ + {file = "botocore-1.31.10-py3-none-any.whl", hash = "sha256:a3bfd3627a490faedf37d79373d6957936d7720888ca85466e0471cb921e4557"}, + {file = "botocore-1.31.10.tar.gz", hash = "sha256:736a9412f405d6985570c4a87b533c2396dd8d4042d8c7a0ca14e73d4f1bcf9d"}, +] + +[package.dependencies] +jmespath = ">=0.7.1,<2.0.0" +python-dateutil = ">=2.1,<3.0.0" +urllib3 = ">=1.25.4,<1.27" + +[package.extras] +crt = ["awscrt (==0.16.26)"] + [[package]] name = "cachetools" version = "5.3.0" @@ -556,6 +596,18 @@ pipfile-deprecated-finder = ["pip-shims (>=0.5.2)", "pipreqs", "requirementslib" plugins = ["setuptools"] requirements-deprecated-finder = ["pip-api", "pipreqs"] +[[package]] +name = "jmespath" +version = "1.0.1" +description = "JSON Matching Expressions" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980"}, + {file = "jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"}, +] + [[package]] name = "kiwisolver" version = "1.4.4" @@ -1574,6 +1626,24 @@ files = [ [package.dependencies] pyasn1 = ">=0.1.3" +[[package]] +name = "s3transfer" +version = "0.6.1" +description = "An Amazon S3 Transfer Manager" +category = "main" +optional = false +python-versions = ">= 3.7" +files = [ + {file = "s3transfer-0.6.1-py3-none-any.whl", hash = "sha256:3c0da2d074bf35d6870ef157158641178a4204a6e689e82546083e31e0311346"}, + {file = "s3transfer-0.6.1.tar.gz", hash = "sha256:640bb492711f4c0c0905e1f62b6aaeb771881935ad27884852411f8e9cacbca9"}, +] + +[package.dependencies] +botocore = ">=1.12.36,<2.0a.0" + +[package.extras] +crt = ["botocore[crt] (>=1.20.29,<2.0a.0)"] + [[package]] name = "setuptools" version = "67.4.0" @@ -1799,4 +1869,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = ">=3.9,<3.12" -content-hash = "c9745b8f30ad68c862f3d34fb7856096f2fa178700aea8fdf0540915f19bc8a9" +content-hash = "00dda7500749b57eed7d2fc326a21c2d4c029e801d874fc0c2f137b90c4ae7a8" diff --git a/pyproject.toml b/pyproject.toml index 7c28803f..3fc5b645 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,8 @@ kubernetes = "^26.1.0" prometheus-api-client = "^0.5.3" numpy = "^1.24.2" alive-progress = "^3.1.2" +botocore = "^1.31.10" +boto3 = "^1.28.10" [tool.poetry.group.dev.dependencies] diff --git a/robusta_krr/common/prometheus/__init__.py b/robusta_krr/common/prometheus/__init__.py new file mode 100644 index 00000000..dcb6534f --- /dev/null +++ b/robusta_krr/common/prometheus/__init__.py @@ -0,0 +1,2 @@ +from .models import AWSPrometheusConfig, AzurePrometheusConfig, PrometheusConfig +from .utils import CustomPrometheusConnect, AWSPrometheusConnect diff --git a/robusta_krr/common/prometheus/auth.py b/robusta_krr/common/prometheus/auth.py new file mode 100644 index 00000000..d24600d7 --- /dev/null +++ b/robusta_krr/common/prometheus/auth.py @@ -0,0 +1,70 @@ +import logging +import os +from typing import Dict + +import requests + +from ..prometheus.models import ( + AzurePrometheusConfig, + CoralogixPrometheusConfig, + PrometheusConfig, +) + + +class PrometheusAuthorization: + bearer_token: str = "" + azure_authorization: bool = ( + os.environ.get("AZURE_CLIENT_ID", "") != "" and os.environ.get("AZURE_TENANT_ID", "") != "" + ) and (os.environ.get("AZURE_CLIENT_SECRET", "") != "" or os.environ.get("AZURE_USE_MANAGED_ID", "") != "") + + @classmethod + def get_authorization_headers(cls, config: PrometheusConfig) -> Dict: + if isinstance(config, CoralogixPrometheusConfig): + return {"token": config.prometheus_token} + elif config.prometheus_auth: + return {"Authorization": config.prometheus_auth.get_secret_value()} + elif cls.azure_authorization: + return {"Authorization": (f"Bearer {cls.bearer_token}")} + else: + return {} + + @classmethod + def request_new_token(cls, config: PrometheusConfig) -> bool: + if cls.azure_authorization and isinstance(config, AzurePrometheusConfig): + try: + if config.azure_use_managed_id: + res = requests.get( + url=config.azure_metadata_endpoint, + headers={ + "Metadata": "true", + }, + data={ + "api-version": "2018-02-01", + "client_id": config.azure_client_id, + "resource": config.azure_resource, + }, + ) + else: + res = requests.post( + url=config.azure_token_endpoint, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data={ + "grant_type": "client_credentials", + "client_id": config.azure_client_id, + "client_secret": config.azure_client_secret, + "resource": config.azure_resource, + }, + ) + except Exception: + logging.exception("Unexpected error when trying to generate azure access token.") + return False + + if not res.ok: + logging.error(f"Could not generate an azure access token. {res.reason}") + return False + + cls.bearer_token = res.json().get("access_token") + logging.info("Generated new azure access token.") + return True + + return False diff --git a/robusta_krr/common/prometheus/custom_connect.py b/robusta_krr/common/prometheus/custom_connect.py new file mode 100644 index 00000000..2fbcadca --- /dev/null +++ b/robusta_krr/common/prometheus/custom_connect.py @@ -0,0 +1,272 @@ +from typing import Any, Dict, Optional + +import requests +from botocore.auth import * +from botocore.awsrequest import AWSRequest +from botocore.credentials import Credentials +from prometheus_api_client import PrometheusApiClientException, PrometheusConnect +from requests.adapters import HTTPAdapter +from requests.exceptions import ConnectionError, HTTPError + +from ..prometheus.exceptions import PrometheusFlagsConnectionError, PrometheusNotFound, VictoriaMetricsNotFound +from ..prometheus.auth import PrometheusAuthorization +from ..prometheus.models import PrometheusApis, PrometheusConfig + + +class CustomPrometheusConnect(PrometheusConnect): + def __init__(self, config: PrometheusConfig): + super().__init__(url=config.url, disable_ssl=config.disable_ssl, headers=config.headers) + self.config = config + self._session = requests.Session() + self._session.mount(self.url, HTTPAdapter(pool_maxsize=10, pool_block=True)) + + def custom_query_range( + self, + query: str, + start_time: datetime, + end_time: datetime, + step: str, + params: dict = None, + ): + """ + The main difference here is that the method here is POST and the prometheus_cli is GET + """ + start = round(start_time.timestamp()) + end = round(end_time.timestamp()) + params = params or {} + data = None + query = str(query) + # using the query_range API to get raw data + response = self._session.post( + "{0}/api/v1/query_range".format(self.url), + data={ + "query": query, + "start": start, + "end": end, + "step": step, + **params, + }, + verify=self.ssl_verification, + headers=self.headers, + auth=self.auth, + ) + if response.status_code == 200: + data = response.json()["data"]["result"] + else: + raise PrometheusApiClientException( + "HTTP Status Code {} ({!r})".format(response.status_code, response.content) + ) + return data + + def custom_query(self, query: str, params: dict = None): + """ + The main difference here is that the method here is POST and the prometheus_cli is GET + """ + params = params or {} + data = None + query = str(query) + # using the query API to get raw data + response = self._session.post( + "{0}/api/v1/query".format(self.url), + data={"query": query, **params}, + verify=self.ssl_verification, + headers=self.headers, + auth=self.auth, + ) + if response.status_code == 200: + data = response.json()["data"]["result"] + else: + raise PrometheusApiClientException( + "HTTP Status Code {} ({!r})".format(response.status_code, response.content) + ) + + return data + + def check_prometheus_connection(self, params: dict = None): + params = params or {} + try: + if isinstance(self, AWSPrometheusConnect): + # will throw exception if not 200 + return self.custom_query(query="example") + else: + response = self._session.get( + f"{self.url}/api/v1/query", + verify=self.ssl_verification, + headers=self.headers, + # This query should return empty results, but is correct + params={"query": "example", **params}, + context={}, + ) + if response.status_code == 401: + if PrometheusAuthorization.request_new_token(self.config): + self.headers = PrometheusAuthorization.get_authorization_headers(self.config) + response = self._session.get( + f"{self.url}/api/v1/query", + verify=self.ssl_verification, + headers=self.headers, + params={"query": "example", **params}, + ) + + response.raise_for_status() + except (ConnectionError, HTTPError, PrometheusApiClientException) as e: + raise PrometheusNotFound( + f"Couldn't connect to Prometheus found under {self.url}\nCaused by {e.__class__.__name__}: {e})" + ) from e + + def __text_config_to_dict(self, text: str) -> Dict: + conf = {} + lines = text.strip().split("\n") + for line in lines: + key, val = line.strip().split("=") + conf[key] = val.strip('"') + + return conf + + def get_prometheus_flags(self) -> Optional[Dict]: + try: + if PrometheusApis.FLAGS in self.config.supported_apis: + return self.fetch_prometheus_flags() + if PrometheusApis.VM_FLAGS in self.config.supported_apis: + return self.fetch_victoria_metrics_flags() + except Exception as e: + service_name = "Prometheus" if PrometheusApis.FLAGS in self.config.supported_apis else "Victoria Metrics" + raise PrometheusFlagsConnectionError(f"Couldn't connect to the url: {self.url}\n\t\t{service_name}: {e}") + + def fetch_prometheus_flags(self) -> Dict: + try: + response = self._session.get( + f"{self.url}/api/v1/status/flags", + verify=self.ssl_verification, + headers=self.headers, + # This query should return empty results, but is correct + params={}, + ) + response.raise_for_status() + return response.json().get("data", {}) + except Exception as e: + raise PrometheusNotFound( + f"Couldn't connect to Prometheus found under {self.url}\nCaused by {e.__class__.__name__}: {e})" + ) from e + + def fetch_victoria_metrics_flags(self) -> Dict: + try: + # connecting to VictoriaMetrics + response = self._session.get( + f"{self.url}/flags", + verify=self.ssl_verification, + headers=self.headers, + # This query should return empty results, but is correct + params={}, + ) + response.raise_for_status() + + configuration = self.__text_config_to_dict(response.text) + return configuration + except Exception as e: + raise VictoriaMetricsNotFound( + f"Couldn't connect to VictoriaMetrics found under {self.url}\nCaused by {e.__class__.__name__}: {e})" + ) from e + + +class AWSPrometheusConnect(CustomPrometheusConnect): + def __init__(self, access_key: str, secret_key: str, region: str, service_name: str, **kwargs): + super().__init__(**kwargs) + self._credentials = Credentials(access_key, secret_key) + self._sigv4auth = S3SigV4Auth(self._credentials, service_name, region) + + def signed_request(self, method, url, data=None, params=None, verify=False, headers=None): + request = AWSRequest(method=method, url=url, data=data, params=params, headers=headers) + self._sigv4auth.add_auth(request) + return requests.request(method=method, url=url, headers=dict(request.headers), verify=verify, data=data) + + def custom_query(self, query: str, params: dict = None): + """ + Send a custom query to a Prometheus Host. + + This method takes as input a string which will be sent as a query to + the specified Prometheus Host. This query is a PromQL query. + + :param query: (str) This is a PromQL query, a few examples can be found + at https://prometheus.io/docs/prometheus/latest/querying/examples/ + :param params: (dict) Optional dictionary containing GET parameters to be + sent along with the API request, such as "time" + :returns: (list) A list of metric data received in response of the query sent + :raises: + (RequestException) Raises an exception in case of a connection error + (PrometheusApiClientException) Raises in case of non 200 response status code + """ + params = params or {} + data = None + query = str(query) + # using the query API to get raw data + response = self.signed_request( + method="POST", + url="{0}/api/v1/query".format(self.url), + data={**{"query": query}, **params}, + params={}, + verify=self.ssl_verification, + headers=self.headers, + ) + if response.status_code == 200: + data = response.json()["data"]["result"] + else: + raise PrometheusApiClientException( + "HTTP Status Code {} ({!r})".format(response.status_code, response.content) + ) + + return data + + def custom_query_range( + self, + query: str, + start_time: datetime, + end_time: datetime, + step: str, + params: Optional[Dict[str, Any]] = None, + ): + """ + Send a query_range to a Prometheus Host. + This method takes as input a string which will be sent as a query to + the specified Prometheus Host. This query is a PromQL query. + :param query: (str) This is a PromQL query, a few examples can be found + at https://prometheus.io/docs/prometheus/latest/querying/examples/ + :param start_time: (datetime) A datetime object that specifies the query range start time. + :param end_time: (datetime) A datetime object that specifies the query range end time. + :param step: (str) Query resolution step width in duration format or float number of seconds - i.e 100s, 3d, 2w, 170.3 + :param params: (dict) Optional dictionary containing GET parameters to be + sent along with the API request, such as "timeout" + :returns: (dict) A dict of metric data received in response of the query sent + :raises: + (RequestException) Raises an exception in case of a connection error + (PrometheusApiClientException) Raises in case of non 200 response status code + """ + start = round(start_time.timestamp()) + end = round(end_time.timestamp()) + params = params or {} + + prometheus_result = None + query = str(query) + response = self.signed_request( + method="POST", + url="{0}/api/v1/query_range".format(self.url), + data={**{"query": query, "start": start, "end": end, "step": step}, **params}, + params={}, + headers=self.headers, + ) + if response.status_code == 200: + prometheus_result = data=response.json()["data"]["result"] + else: + raise PrometheusApiClientException( + "HTTP Status Code {} ({!r})".format(response.status_code, response.content) + ) + return prometheus_result + + def check_prometheus_connection(self, params: dict = None) -> bool: + """ + Check Promethus connection. + + :param params: (dict) Optional dictionary containing parameters to be + sent along with the API request. + :returns: (bool) True if the endpoint can be reached, False if cannot be reached. + """ + return self.custom_query(query="example", params=params) diff --git a/robusta_krr/common/prometheus/exceptions.py b/robusta_krr/common/prometheus/exceptions.py new file mode 100644 index 00000000..d401114c --- /dev/null +++ b/robusta_krr/common/prometheus/exceptions.py @@ -0,0 +1,36 @@ + + +class MetricsNotFound(Exception): + """ + An exception raised when Metrics service is not found. + """ + + pass + +class PrometheusNotFound(MetricsNotFound): + """ + An exception raised when Prometheus is not found. + """ + + pass + +class VictoriaMetricsNotFound(MetricsNotFound): + """ + An exception raised when Victoria Metrics is not found. + """ + + pass + +class ThanosMetricsNotFound(MetricsNotFound): + """ + An exception raised when Thanos is not found. + """ + + pass + +class PrometheusFlagsConnectionError(Exception): + """ + Exception, when Prometheus flag or AlertManager flag api cannot be reached + """ + + pass \ No newline at end of file diff --git a/robusta_krr/common/prometheus/models.py b/robusta_krr/common/prometheus/models.py new file mode 100644 index 00000000..0ea854eb --- /dev/null +++ b/robusta_krr/common/prometheus/models.py @@ -0,0 +1,168 @@ +from enum import Enum +from typing import Dict, List, Optional + +from pydantic import BaseModel, SecretStr + + +class PrometheusApis(Enum): + QUERY = 0 + QUERY_RANGE = 1 + LABELS = 2 + FLAGS = 3 + VM_FLAGS = 4 + + +class PrometheusConfig(BaseModel): + url: str + disable_ssl: bool = False + headers: Dict[str, str] = {} + prometheus_auth: Optional[SecretStr] = None + prometheus_url_query_string: Optional[str] + additional_labels: Optional[Dict[str, str]] + supported_apis: List[PrometheusApis] = [ + PrometheusApis.QUERY, + PrometheusApis.QUERY_RANGE, + PrometheusApis.LABELS, + PrometheusApis.FLAGS, + ] + + +class AWSPrometheusConfig(PrometheusConfig): + access_key: str + secret_access_key: str + service_name: str = "aps" + aws_region: str + supported_apis: List[PrometheusApis] = [ + PrometheusApis.QUERY, + PrometheusApis.QUERY_RANGE, + PrometheusApis.LABELS, + ] + + +class CoralogixPrometheusConfig(PrometheusConfig): + prometheus_token: str + supported_apis: List[PrometheusApis] = [ + PrometheusApis.QUERY, + PrometheusApis.QUERY_RANGE, + PrometheusApis.LABELS, + ] + + +class VictoriaMetricsPrometheusConfig(PrometheusConfig): + supported_apis: List[PrometheusApis] = [ + PrometheusApis.QUERY, + PrometheusApis.QUERY_RANGE, + PrometheusApis.LABELS, + PrometheusApis.VM_FLAGS, + ] + + +class AzurePrometheusConfig(PrometheusConfig): + azure_resource: str + azure_metadata_endpoint: str + azure_token_endpoint: str + azure_use_managed_id: Optional[str] + azure_client_id: Optional[str] + azure_client_secret: Optional[str] + + +""" + The Metric object is defined in prometheus as a dictionary so no specific labels are guaranteed + Known commonly returned labels in dictionary 'metric': + __name__: the name of the outer function in prometheus + Other labels 'metric' sometimes contains: + [container, created_by_kind, created_by_name, endpoint, host_ip, host_network, instance, job, namespace, node, pod, + pod_ip, service, uid, ...] +""" +PrometheusMetric = Dict[str, str] + + +class PrometheusScalarValue(BaseModel): + timestamp: float + value: str + + def __init__(self, raw_scalar_list: list): + """ + :var raw_scalar: is the list prometheus returns as a scalar value from its queries + + While usually this is a list of size 2 in the form of [float_timestamp, str_returned_value] + the list size and return value types are not guaranteed + """ + if len(raw_scalar_list) != 2: + raise Exception(f"Invalid prometheus scalar value {raw_scalar_list}") + timestamp = float(raw_scalar_list[0]) + value = str(raw_scalar_list[1]) + super().__init__(timestamp=timestamp, value=value) + + +class PrometheusVector(BaseModel): + metric: PrometheusMetric + value: PrometheusScalarValue + + +class PrometheusSeries(BaseModel): + metric: PrometheusMetric + timestamps: List[float] + values: List[str] + + def __init__(self, **kwargs): + raw_values = kwargs.pop("values", None) + if not raw_values: + raise Exception("values missing") + timestamps = [] + values = [] + for raw_value in raw_values: + prometheus_scalar_value = PrometheusScalarValue(raw_value) + timestamps.append(prometheus_scalar_value.timestamp) + values.append(prometheus_scalar_value.value) + super().__init__(timestamps=timestamps, values=values, **kwargs) + + +class PrometheusQueryResult(BaseModel): + """ + This class is the returned object for prometheus queries + :var result_type: can be of type "vector", "matrix", "scalar", "string" depending on the query + :var vector_result: a formatted vector list from the query result, if the var result_type is "vector" + :var series_list_result: a formatted series list from the query result, if the var result_type is "matrix" + :var scalar_result: scalar object of the query result, if the var result_type is "scalar" + :var string_result: a string of the query result, if the var result_type is "string" + + :raises: + (Exception) Raises an Exception in the case that there is an issue with the resultType and result not matching + """ + + result_type: str + vector_result: Optional[List[PrometheusVector]] + series_list_result: Optional[List[PrometheusSeries]] + scalar_result: Optional[PrometheusScalarValue] + string_result: Optional[str] + + def __init__(self, data): + result = data.get("result", None) + result_type = data.get("resultType", None) + vector_result = None + series_list_result = None + scalar_result = None + string_result = None + if not result_type: + raise Exception("resultType missing") + if result is None: + raise Exception("result object missing") + elif result_type == "string" or result_type == "error": + string_result = str(result) + elif result_type == "scalar" and isinstance(result, list): + scalar_result = PrometheusScalarValue(result) + elif result_type == "vector" and isinstance(result, list): + vector_result = [PrometheusVector(**vector_result) for vector_result in result] + elif result_type == "matrix" and isinstance(result, list): + series_list_result = [PrometheusSeries(**series_list_result) for series_list_result in result] + else: + raise Exception("result or returnType is invalid") + + super().__init__( + result_type=result_type, + vector_result=vector_result, + series_list_result=series_list_result, + scalar_result=scalar_result, + string_result=string_result, + ) diff --git a/robusta_krr/common/prometheus/utils.py b/robusta_krr/common/prometheus/utils.py new file mode 100644 index 00000000..7df3dbdb --- /dev/null +++ b/robusta_krr/common/prometheus/utils.py @@ -0,0 +1,39 @@ +from requests.sessions import merge_setting + +from ..prometheus.auth import PrometheusAuthorization +from ..prometheus.custom_connect import AWSPrometheusConnect, CustomPrometheusConnect +from ..prometheus.models import AWSPrometheusConfig, PrometheusConfig +from collections import defaultdict +from typing import List, Dict +from urllib.parse import parse_qs + +def parse_query_string(query_string: str) -> Dict[str, List[str]]: + if not query_string: + return {} + query_params = parse_qs(query_string, keep_blank_values=True) + parsed_params = defaultdict(list) + + for key, values in query_params.items(): + for value in values: + parsed_params[key].append(value) + + return parsed_params + +def get_custom_prometheus_connect(prom_config: PrometheusConfig) -> "CustomPrometheusConnect": + prom_config.headers.update(PrometheusAuthorization.get_authorization_headers(prom_config)) + if isinstance(prom_config, AWSPrometheusConfig): + prom = AWSPrometheusConnect( + access_key=prom_config.access_key, + secret_key=prom_config.secret_access_key, + service_name=prom_config.service_name, + region=prom_config.aws_region, + config=prom_config, + ) + else: + prom = CustomPrometheusConnect(config=prom_config) + + if prom_config.prometheus_url_query_string: + query_string_params = parse_query_string(prom_config.prometheus_url_query_string) + prom._session.params = merge_setting(prom._session.params, query_string_params) + prom.config = prom_config + return prom diff --git a/robusta_krr/core/integrations/prometheus/__init__.py b/robusta_krr/core/integrations/prometheus/__init__.py index 8f9dbb75..69d668cd 100644 --- a/robusta_krr/core/integrations/prometheus/__init__.py +++ b/robusta_krr/core/integrations/prometheus/__init__.py @@ -1,3 +1,3 @@ from .loader import MetricsLoader from .metrics_service.prometheus_metrics_service import PrometheusDiscovery, PrometheusNotFound -from .prometheus_client import CustomPrometheusConnect, ClusterNotSpecifiedException +from .prometheus_utils import ClusterNotSpecifiedException diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index 590bc830..97244027 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -10,9 +10,9 @@ from robusta_krr.core.models.objects import K8sObjectData from robusta_krr.core.models.result import ResourceType from robusta_krr.utils.configurable import Configurable - -from .metrics_service.base_metric_service import MetricsNotFound, MetricsService -from .metrics_service.prometheus_metrics_service import PrometheusMetricsService, PrometheusNotFound +from robusta_krr.common.prometheus.exceptions import PrometheusNotFound, MetricsNotFound +from .metrics_service.base_metric_service import MetricsService +from .metrics_service.prometheus_metrics_service import PrometheusMetricsService from .metrics_service.thanos_metrics_service import ThanosMetricsService from .metrics_service.victoria_metrics_service import VictoriaMetricsService diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py index e2654e8f..5597a11c 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py @@ -12,13 +12,6 @@ from robusta_krr.utils.configurable import Configurable -class MetricsNotFound(Exception): - """ - An exception raised when Metrics service is not found. - """ - - pass - class MetricsService(Configurable, abc.ABC): def __init__( diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 5b224721..fff23a7c 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -14,8 +14,10 @@ from robusta_krr.utils.service_discovery import MetricsServiceDiscovery from ..metrics import BaseMetricLoader -from ..prometheus_client import ClusterNotSpecifiedException, CustomPrometheusConnect -from .base_metric_service import MetricsNotFound, MetricsService +from ..prometheus_utils import ClusterNotSpecifiedException, generate_prometheus_config +from robusta_krr.common.prometheus.utils import get_custom_prometheus_connect +from .base_metric_service import MetricsService +from robusta_krr.common.prometheus.exceptions import PrometheusNotFound class PrometheusDiscovery(MetricsServiceDiscovery): @@ -41,12 +43,7 @@ def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optiona ) -class PrometheusNotFound(MetricsNotFound): - """ - An exception raised when Prometheus is not found. - """ - pass class PrometheusMetricsService(MetricsService): @@ -62,6 +59,7 @@ def __init__( api_client: Optional[ApiClient] = None, service_discovery: Type[MetricsServiceDiscovery] = PrometheusDiscovery, executor: Optional[ThreadPoolExecutor] = None, + is_victoria_metrics: bool = False, ) -> None: super().__init__(config=config, api_client=api_client, cluster=cluster, executor=executor) @@ -89,8 +87,8 @@ def __init__( headers |= {"Authorization": self.auth_header} elif not self.config.inside_cluster and self.api_client is not None: self.api_client.update_params_for_auth(headers, {}, ["BearerToken"]) - - self.prometheus = CustomPrometheusConnect(url=self.url, disable_ssl=not self.ssl_enabled, headers=headers) + self.prom_config = generate_prometheus_config(config, url=self.url, headers=headers, is_victoria_metrics=is_victoria_metrics) + self.prometheus = get_custom_prometheus_connect(self.prom_config) def check_connection(self): """ @@ -99,14 +97,7 @@ def check_connection(self): PrometheusNotFound: If the connection to Prometheus cannot be established. """ try: - response = self.prometheus._session.get( - f"{self.prometheus.url}/api/v1/query", - verify=self.prometheus.ssl_verification, - headers=self.prometheus.headers, - # This query should return empty results, but is correct - params={"query": "example"}, - ) - response.raise_for_status() + self.prometheus.custom_query(query="example") except (ConnectionError, HTTPError) as e: raise PrometheusNotFound( f"Couldn't connect to Prometheus found under {self.prometheus.url}\nCaused by {e.__class__.__name__}: {e})" diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py index f1de8b25..a429a0ed 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py @@ -6,8 +6,9 @@ from robusta_krr.core.models.config import Config from robusta_krr.utils.service_discovery import MetricsServiceDiscovery -from .prometheus_metrics_service import MetricsNotFound, PrometheusMetricsService +from .prometheus_metrics_service import PrometheusMetricsService +from robusta_krr.common.prometheus.exceptions import MetricsNotFound, ThanosMetricsNotFound class ThanosMetricsDiscovery(MetricsServiceDiscovery): def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: @@ -29,13 +30,6 @@ def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optiona ) -class ThanosMetricsNotFound(MetricsNotFound): - """ - An exception raised when Thanos is not found. - """ - - pass - class ThanosMetricsService(PrometheusMetricsService): """ diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py index fd9f8a97..d3e0fe88 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py @@ -6,8 +6,9 @@ from robusta_krr.core.models.config import Config from robusta_krr.utils.service_discovery import MetricsServiceDiscovery -from .prometheus_metrics_service import MetricsNotFound, PrometheusMetricsService +from .prometheus_metrics_service import PrometheusMetricsService +from robusta_krr.common.prometheus.exceptions import MetricsNotFound, VictoriaMetricsNotFound class VictoriaMetricsDiscovery(MetricsServiceDiscovery): def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: @@ -28,14 +29,6 @@ def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optiona ) -class VictoriaMetricsNotFound(MetricsNotFound): - """ - An exception raised when Victoria Metrics is not found. - """ - - pass - - class VictoriaMetricsService(PrometheusMetricsService): """ A class for fetching metrics from Victoria Metrics. @@ -55,6 +48,7 @@ def __init__( api_client=api_client, service_discovery=VictoriaMetricsDiscovery, executor=executor, + is_victoria_metrics=True ) def check_connection(self): diff --git a/robusta_krr/core/integrations/prometheus/prometheus_client.py b/robusta_krr/core/integrations/prometheus/prometheus_client.py deleted file mode 100644 index 824cd925..00000000 --- a/robusta_krr/core/integrations/prometheus/prometheus_client.py +++ /dev/null @@ -1,88 +0,0 @@ -from typing import no_type_check - -import requests -from datetime import datetime -from prometheus_api_client import PrometheusConnect, Retry, PrometheusApiClientException -from requests.adapters import HTTPAdapter - - -class ClusterNotSpecifiedException(Exception): - """ - An exception raised when a prometheus requires a cluster label but an invalid one is provided. - """ - - pass - - -class CustomPrometheusConnect(PrometheusConnect): - """ - Custom PrometheusConnect class to handle retries. - """ - - def __init__( - self, - url: str = "http://127.0.0.1:9090", - headers: dict = None, - disable_ssl: bool = False, - retry: Retry = None, - auth: tuple = None, - ): - super().__init__(url, headers, disable_ssl, retry, auth) - self._session = requests.Session() - self._session.mount(self.url, HTTPAdapter(max_retries=retry, pool_maxsize=10, pool_block=True)) - - def custom_query(self, query: str, params: dict = None): - params = params or {} - data = None - query = str(query) - # using the query API to get raw data - response = self._session.post( - "{0}/api/v1/query".format(self.url), - data={"query": query, **params}, - verify=self.ssl_verification, - headers=self.headers, - auth=self.auth, - ) - if response.status_code == 200: - data = response.json()["data"]["result"] - else: - raise PrometheusApiClientException( - "HTTP Status Code {} ({!r})".format(response.status_code, response.content) - ) - - return data - - def custom_query_range( - self, - query: str, - start_time: datetime, - end_time: datetime, - step: str, - params: dict = None, - ): - start = round(start_time.timestamp()) - end = round(end_time.timestamp()) - params = params or {} - data = None - query = str(query) - # using the query_range API to get raw data - response = self._session.post( - "{0}/api/v1/query_range".format(self.url), - data={ - "query": query, - "start": start, - "end": end, - "step": step, - **params, - }, - verify=self.ssl_verification, - headers=self.headers, - auth=self.auth, - ) - if response.status_code == 200: - data = response.json()["data"]["result"] - else: - raise PrometheusApiClientException( - "HTTP Status Code {} ({!r})".format(response.status_code, response.content) - ) - return data diff --git a/robusta_krr/core/integrations/prometheus/prometheus_utils.py b/robusta_krr/core/integrations/prometheus/prometheus_utils.py new file mode 100644 index 00000000..5eb159b2 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/prometheus_utils.py @@ -0,0 +1,43 @@ +from robusta_krr.common.prometheus.models import PrometheusConfig, CoralogixPrometheusConfig, AWSPrometheusConfig, VictoriaMetricsPrometheusConfig +from robusta_krr.core.models.config import Config +import boto3 + +class ClusterNotSpecifiedException(Exception): + """ + An exception raised when a prometheus requires a cluster label but an invalid one is provided. + """ + + pass + +def generate_prometheus_config(config: Config, url: str, headers: dict[str, str], is_victoria_metrics: bool = False) -> PrometheusConfig: + baseconfig = { + "url": url, + "disable_ssl": not config.prometheus_ssl_enabled, + "headers": headers, + } + + # aws config + if config.eks_managed_prom: + session = boto3.Session(profile_name=config.eks_managed_prom_profile_name) + credentials = session.get_credentials() + credentials = credentials.get_frozen_credentials() + region = config.eks_managed_prom_region if config.eks_managed_prom_region else session.region_name + access_key = config.eks_access_key if config.eks_access_key else credentials.access_key + secret_key = config.eks_secret_key if config.eks_secret_key else credentials.secret_key + service_name = config.eks_service_name if config.eks_secret_key else "aps" + if not region: + raise Exception("No eks region specified") + + return AWSPrometheusConfig( + access_key=access_key, + secret_access_key=secret_key, + aws_region= region, + service_name=service_name, + **baseconfig, + ) + # coralogix config + if config.coralogix_token: + return CoralogixPrometheusConfig(**baseconfig, prometheus_token=config.coralogix_token) + if is_victoria_metrics: + return VictoriaMetricsPrometheusConfig(**baseconfig) + return PrometheusConfig(**baseconfig) \ No newline at end of file diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index 36eb1529..3219daa8 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -19,8 +19,8 @@ class Config(pd.BaseSettings): selector: Optional[str] = None # Value settings - cpu_min_value: int = pd.Field(5, ge=0) # in millicores - memory_min_value: int = pd.Field(10, ge=0) # in megabytes + cpu_min_value: int = pd.Field(100, ge=0) # in millicores + memory_min_value: int = pd.Field(100, ge=0) # in megabytes # Prometheus Settings prometheus_url: Optional[str] = pd.Field(None) @@ -29,6 +29,13 @@ class Config(pd.BaseSettings): prometheus_ssl_enabled: bool = pd.Field(False) prometheus_cluster_label: Optional[str] = pd.Field(None) prometheus_label: Optional[str] = pd.Field(None) + eks_managed_prom: bool = pd.Field(False) + eks_managed_prom_profile_name: Optional[str] = pd.Field(None) + eks_access_key: Optional[str] = pd.Field(None) + eks_secret_key: Optional[str] = pd.Field(None) + eks_service_name: Optional[str] = pd.Field(None) + eks_managed_prom_region: Optional[str] = pd.Field(None) + coralogix_token: Optional[str] = pd.Field(None) # Threading settings max_workers: int = pd.Field(6, ge=1) diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 68ceb18d..b20b1590 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -5,7 +5,8 @@ from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult from robusta_krr.core.integrations.kubernetes import KubernetesLoader -from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, MetricsLoader, PrometheusNotFound +from robusta_krr.common.prometheus.exceptions import PrometheusNotFound +from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, MetricsLoader from robusta_krr.core.models.config import Config from robusta_krr.core.models.objects import K8sObjectData from robusta_krr.core.models.result import ( @@ -109,7 +110,6 @@ async def _calculate_object_recommendations(self, object: K8sObjectData) -> tupl if prometheus_loader is None: return {resource: ResourceRecommendation.undefined() for resource in ResourceType}, {} - data_tuple = await asyncio.gather( *[ prometheus_loader.gather_data( diff --git a/robusta_krr/main.py b/robusta_krr/main.py index db974334..49d50505 100644 --- a/robusta_krr/main.py +++ b/robusta_krr/main.py @@ -118,6 +118,48 @@ def {func_name}( help="The label in prometheus used to differentiate clusters. (Only relevant for centralized prometheus)", rich_help_panel="Prometheus Settings", ), + eks_managed_prom: bool = typer.Option( + False, + "--eks-managed-prom", + help="Adds additional signitures for eks prometheus connection.", + rich_help_panel="Prometheus EKS Settings", + ), + eks_managed_prom_profile_name: Optional[str] = typer.Option( + None, + "--eks-profile-name", + help="Sets the profile name for eks prometheus connection.", + rich_help_panel="Prometheus EKS Settings", + ), + eks_access_key: Optional[str] = typer.Option( + None, + "--eks-access-key", + help="Sets the access key for eks prometheus connection.", + rich_help_panel="Prometheus EKS Settings", + ), + eks_secret_key: Optional[str] = typer.Option( + None, + "--eks-secret-key", + help="Sets the secret key for eks prometheus connection.", + rich_help_panel="Prometheus EKS Settings", + ), + eks_service_name: Optional[str] = typer.Option( + "aps", + "--eks-service-name", + help="Sets the service name for eks prometheus connection.", + rich_help_panel="Prometheus EKS Settings", + ), + eks_managed_prom_region: Optional[str] = typer.Option( + None, + "--eks-managed-prom-region", + help="Sets the region for eks prometheus connection.", + rich_help_panel="Prometheus EKS Settings", + ), + coralogix_token: Optional[str] = typer.Option( + None, + "--coralogix-token", + help="Adds the token needed to query Coralogix managed prometheus.", + rich_help_panel="Prometheus Coralogix Settings", + ), max_workers: int = typer.Option( 10, "--max-workers", @@ -144,6 +186,13 @@ def {func_name}( prometheus_ssl_enabled=prometheus_ssl_enabled, prometheus_cluster_label=prometheus_cluster_label, prometheus_label=prometheus_label, + eks_managed_prom=eks_managed_prom, + eks_managed_prom_region=eks_managed_prom_region, + eks_managed_prom_profile_name=eks_managed_prom_profile_name, + eks_access_key=eks_access_key, + eks_secret_key=eks_secret_key, + eks_service_name=eks_service_name, + coralogix_token=coralogix_token, max_workers=max_workers, format=format, verbose=verbose,