Skip to content

Commit

Permalink
feat: improved time search for grafana loki
Browse files Browse the repository at this point in the history
  • Loading branch information
nherment committed Dec 10, 2024
1 parent 5712b81 commit 243a1a1
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 36 deletions.
35 changes: 20 additions & 15 deletions holmes/plugins/toolsets/grafana/loki_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

import logging
import os
import requests
from typing import Dict, List, Tuple
Expand Down Expand Up @@ -49,28 +50,27 @@ def parse_loki_response(results: List[Dict]) -> List[Dict]:
def execute_loki_query(
loki_datasource_id:str,
query: str,
time_range_minutes: int,
start: int,
end: int,
limit: int) -> List[Dict]:
"""
Execute a Loki query through Grafana
Args:
query: Loki query string
time_range_minutes: Time range to query in minutes
start: Start of the time window to fetch the logs for. Epoch timestamp in seconds
end: End of the time window to fetch the logs for. Epoch timestamp in seconds
limit: Maximum number of log lines to return
Returns:
List of log entries
"""

end_time = datetime.now()
start_time = end_time - timedelta(minutes=time_range_minutes)

params = {
'query': query,
'limit': limit,
'start': int(start_time.timestamp()),
'end': int(end_time.timestamp())
'start': start,
'end': end
}

try:
Expand Down Expand Up @@ -107,10 +107,9 @@ def list_loki_datasources() -> List[Dict]:
response.raise_for_status()
datasources = response.json()

# Print datasources for debugging
loki_datasources = []
for ds in datasources:
print(f"Found datasource: {ds['name']} (type: {ds['type']}, id: {ds['id']})")
logging.info(f"Found datasource: {ds['name']} (type: {ds['type']}, id: {ds['id']})")
if ds['type'].lower() == 'loki':
loki_datasources.append(ds)

Expand All @@ -121,15 +120,17 @@ def list_loki_datasources() -> List[Dict]:
def query_loki_logs_by_node(
loki_datasource_id:str,
node_name: str,
start: int,
end: int,
node_name_search_key: str = "node",
time_range_minutes: int = 60,
limit: int = 1000) -> List[Dict]:
"""
Query Loki logs filtered by node name
Args:
node_name: Kubernetes node name
time_range_minutes: Time range to query in minutes
start: Start of the time window to fetch the logs for. Epoch timestamp in seconds
end: End of the time window to fetch the logs for. Epoch timestamp in seconds
limit: Maximum number of log lines to return
Returns:
Expand All @@ -141,24 +142,27 @@ def query_loki_logs_by_node(
return execute_loki_query(
loki_datasource_id=loki_datasource_id,
query=query,
time_range_minutes=time_range_minutes,
start=start,
end=end,
limit=limit)

def query_loki_logs_by_pod(
loki_datasource_id:str,
namespace: str,
pod_regex: str,
start: int,
end: int,
pod_name_search_key: str = "pod",
namespace_search_key: str = "namespace",
time_range_minutes: int = 60,
limit: int = 1000) -> List[Dict]:
"""
Query Loki logs filtered by namespace and pod name regex
Args:
namespace: Kubernetes namespace
pod_regex: Regular expression to match pod names
time_range_minutes: Time range to query in minutes
start: Start of the time window to fetch the logs for. Epoch timestamp in seconds
end: End of the time window to fetch the logs for. Epoch timestamp in seconds
limit: Maximum number of log lines to return
Returns:
Expand All @@ -169,5 +173,6 @@ def query_loki_logs_by_pod(
return execute_loki_query(
loki_datasource_id=loki_datasource_id,
query=query,
time_range_minutes=time_range_minutes,
start=start,
end=end,
limit=limit)
45 changes: 34 additions & 11 deletions holmes/plugins/toolsets/grafana_loki.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@

from typing import Any, Dict
from typing import Any, Dict, Optional, Union
from pydantic import BaseModel
import yaml

import time
from holmes.core.tools import EnvironmentVariablePrerequisite, Tool, ToolParameter, Toolset
from holmes.plugins.toolsets.grafana.loki_api import GRAFANA_API_KEY_ENV_NAME, GRAFANA_URL_ENV_NAME, list_loki_datasources, query_loki_logs_by_node, query_loki_logs_by_pod

Expand Down Expand Up @@ -36,9 +36,25 @@ def invoke(self, params: Dict) -> str:
def get_parameterized_one_liner(self, params:Dict) -> str:
return "Fetched Grafana Loki datasources"

ONE_HOUR = 3600

def process_timestamps(start_timestamp: Optional[Union[int, str]], end_timestamp: Optional[Union[int, str]]):
if start_timestamp and isinstance(start_timestamp, str):
start_timestamp = int(start_timestamp)
if end_timestamp and isinstance(end_timestamp, str):
end_timestamp = int(end_timestamp)

if not end_timestamp:
end_timestamp = int(time.time())
if not start_timestamp:
start_timestamp = end_timestamp - ONE_HOUR
if start_timestamp < 0:
start_timestamp = end_timestamp + start_timestamp
return (start_timestamp, end_timestamp)

class GetLokiLogsByNode(Tool):

def __init__(self, config: GrafanaLokiConfig):
def __init__(self, config: GrafanaLokiConfig = GrafanaLokiConfig()):
super().__init__(
name = "fetch_loki_logs_by_node",
description = """Fetches the Loki logs for a given node""",
Expand All @@ -53,10 +69,15 @@ def __init__(self, config: GrafanaLokiConfig):
type="string",
required=True,
),
"time_range_minutes": ToolParameter(
description="Time range to query in minutes",
"start_timestamp": ToolParameter(
description="The beginning time boundary for the log search period. Epoch in seconds. Logs with timestamps before this value will be excluded from the results. If negative, the number of seconds relative to the end_timestamp.",
type="string",
required=True,
required=False,
),
"end_timestamp": ToolParameter(
description="The ending time boundary for the log search period. Epoch in seconds. Logs with timestamps after this value will be excluded from the results. Defaults to NOW()",
type="string",
required=False,
),
"limit": ToolParameter(
description="Maximum number of logs to return.",
Expand All @@ -68,12 +89,13 @@ def __init__(self, config: GrafanaLokiConfig):
self._config = config

def invoke(self, params: Dict) -> str:

(start, end) = process_timestamps(params.get("start_timestamp"), params.get("end_timestamp"))
logs = query_loki_logs_by_node(
loki_datasource_id=get_param_or_raise(params, "loki_datasource_id"),
node_name=get_param_or_raise(params, "node_name"),
node_name_search_key=self._config.node_name_search_key,
time_range_minutes=int(get_param_or_raise(params, "time_range_minutes")),
start=start,
end=end,
limit=int(get_param_or_raise(params, "limit"))
)
return yaml.dump(logs)
Expand All @@ -84,7 +106,7 @@ def get_parameterized_one_liner(self, params:Dict) -> str:

class GetLokiLogsByPod(Tool):

def __init__(self, config: GrafanaLokiConfig):
def __init__(self, config: GrafanaLokiConfig = GrafanaLokiConfig()):
super().__init__(
name = "fetch_loki_logs_by_pod",
description = "Fetches the Loki logs for a given pod",
Expand Down Expand Up @@ -120,14 +142,15 @@ def __init__(self, config: GrafanaLokiConfig):
self._config = config

def invoke(self, params: Dict) -> str:

(start, end) = process_timestamps(params.get("start_timestamp"), params.get("end_timestamp"))
logs = query_loki_logs_by_pod(
loki_datasource_id=get_param_or_raise(params, "loki_datasource_id"),
pod_regex=get_param_or_raise(params, "pod_regex"),
namespace=get_param_or_raise(params, "namespace"),
namespace_search_key=self._config.namespace_search_key,
pod_name_search_key=self._config.pod_name_search_key,
time_range_minutes=int(get_param_or_raise(params, "time_range_minutes")),
start=start,
end=end,
limit=int(get_param_or_raise(params, "limit"))
)
return yaml.dump(logs)
Expand Down
25 changes: 15 additions & 10 deletions tests/test_grafana_loki.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest
from holmes.plugins.toolsets.grafana.loki_api import GRAFANA_URL_ENV_NAME, list_loki_datasources, query_loki_logs_by_node, query_loki_logs_by_pod
from holmes.plugins.toolsets.grafana_loki import GetLokiLogsByNode, GetLokiLogsByPod, ListLokiDatasources

@pytest.mark.skipif(not os.environ.get(GRAFANA_URL_ENV_NAME), reason=f"{GRAFANA_URL_ENV_NAME} must be set to run Grafana tests")
def test_grafana_list_loki_datasources():
Expand All @@ -15,20 +16,24 @@ def test_grafana_query_loki_logs_by_node():
datasources = list_loki_datasources()
assert len(datasources) > 0

tool = GetLokiLogsByNode()
# just tests that this does not throw
query_loki_logs_by_node(
loki_datasource_id=datasources[0]["id"],
node_name="foo"
)
tool.invoke(params={
"loki_datasource_id": datasources[0]["id"],
"node_name": "foo",
"limit": 10
})

@pytest.mark.skipif(not os.environ.get(GRAFANA_URL_ENV_NAME), reason=f"{GRAFANA_URL_ENV_NAME} must be set to run Grafana tests")
def test_grafana_query_loki_logs_by_pod():
datasources = list_loki_datasources()
assert len(datasources) > 0

# just tests that this does not throw
query_loki_logs_by_pod(
loki_datasource_id=datasources[0]["id"],
namespace="kube-system",
pod_regex="coredns.*"
)
tool = GetLokiLogsByPod()
# just tests that this does not throw
tool.invoke(params={
"loki_datasource_id": datasources[0]["id"],
"namespace": "kube-system",
"pod_regex": "coredns.*",
"limit": 10
})

0 comments on commit 243a1a1

Please sign in to comment.