Skip to content

Commit

Permalink
feat: add elasticsearch toolset
Browse files Browse the repository at this point in the history
  • Loading branch information
nherment committed Dec 6, 2024
1 parent c35b96d commit 4911177
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 7 deletions.
8 changes: 5 additions & 3 deletions holmes/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
import os
import os.path

from pydantic.main import BaseModel
from holmes.core.llm import LLM, DefaultLLM
from strenum import StrEnum
from typing import List, Optional
from typing import Any, Dict, List, Optional

from openai import AzureOpenAI, OpenAI
from pydantic import FilePath, SecretStr
Expand Down Expand Up @@ -32,7 +34,6 @@
DEFAULT_CONFIG_LOCATION = os.path.expanduser("~/.holmes/config.yaml")
CUSTOM_TOOLSET_LOCATION = "/etc/holmes/config/custom_toolset.yaml"


class Config(RobustaBaseConfig):
api_key: Optional[SecretStr] = (
None # if None, read from OPENAI_API_KEY or AZURE_OPENAI_ENDPOINT env var
Expand Down Expand Up @@ -72,6 +73,7 @@ class Config(RobustaBaseConfig):
custom_runbooks: List[FilePath] = []
custom_toolsets: List[FilePath] = []

opensearch_clusters: Optional[List[Dict]] = None # Passed through to opensearchpy.OpenSearch constructor

@classmethod
def load_from_env(cls):
Expand Down Expand Up @@ -106,7 +108,7 @@ def load_from_env(cls):
def create_tool_executor(
self, console: Console, allowed_toolsets: ToolsetPattern, dal:Optional[SupabaseDal]
) -> ToolExecutor:
all_toolsets = load_builtin_toolsets(dal=dal)
all_toolsets = load_builtin_toolsets(dal=dal, opensearch_clusters=self.opensearch_clusters)
for ts_path in self.custom_toolsets:
all_toolsets.extend(load_toolsets_from_file(ts_path))

Expand Down
15 changes: 11 additions & 4 deletions holmes/plugins/toolsets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import os.path
from typing import List, Optional

from opensearchpy.helpers.signer import Dict

from holmes.core.supabase_dal import SupabaseDal
from holmes.plugins.toolsets.findings import FindingsToolset
from holmes.plugins.toolsets.internet import InternetToolset
from pydantic import BaseModel

from holmes.core.tools import Toolset, YAMLToolset
from holmes.plugins.toolsets.opensearch import OpenSearchToolset
from holmes.utils.pydantic_utils import load_model_from_file

THIS_DIR = os.path.abspath(os.path.dirname(__file__))
Expand All @@ -23,11 +26,15 @@ def load_toolsets_from_file(path: str) -> List[YAMLToolset]:
toolset.set_path(path)
return data.toolsets

def load_python_toolsets(dal:Optional[SupabaseDal]) -> List[Toolset]:
def load_python_toolsets(dal:Optional[SupabaseDal], opensearch_clusters:Optional[List[Dict]]) -> List[Toolset]:
logging.debug("loading python toolsets")
return [InternetToolset(), FindingsToolset(dal)]
toolsets = [InternetToolset(), FindingsToolset(dal)]
if opensearch_clusters and len(opensearch_clusters) > 0:
opensearch = OpenSearchToolset(clusters_configs=opensearch_clusters)
toolsets.append(opensearch)
return toolsets

def load_builtin_toolsets(dal:Optional[SupabaseDal] = None) -> List[Toolset]:
def load_builtin_toolsets(dal:Optional[SupabaseDal] = None, opensearch_clusters:Optional[List[Dict]] = []) -> List[Toolset]:
all_toolsets = []
logging.debug(f"loading toolsets from {THIS_DIR}")
for filename in os.listdir(THIS_DIR):
Expand All @@ -36,5 +43,5 @@ def load_builtin_toolsets(dal:Optional[SupabaseDal] = None) -> List[Toolset]:
path = os.path.join(THIS_DIR, filename)
all_toolsets.extend(load_toolsets_from_file(path))

all_toolsets.extend(load_python_toolsets(dal))
all_toolsets.extend(load_python_toolsets(dal=dal, opensearch_clusters=opensearch_clusters))
return all_toolsets
119 changes: 119 additions & 0 deletions holmes/plugins/toolsets/opensearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import logging
from typing import Any, Dict, List, Optional, Union
from holmes.core.tools import StaticPrerequisite, Tool, ToolParameter, Toolset
from opensearchpy import OpenSearch

class OpenSearchClient:
def __init__(self, **kwargs):
self.hosts = kwargs.get("hosts") or []
self.client = OpenSearch(**kwargs)

def get_client(clients:List[OpenSearchClient], host:Optional[str]):
if len(clients) == 1:
return clients[0]

if not host:
raise Exception("Missing host to resolve opensearch client")

for client in clients:
found = any(host in client.hosts for client in clients)
if found:
return client

raise Exception(f"Failed to resolve opensearch client. Could not find a matching host: {host}")

class ListShards(Tool):
def __init__(self, opensearch_clients:List[OpenSearchClient]):
super().__init__(
name = "opensearch_list_shards",
description = "List the shards within an opensearch cluster",
parameters = {
"host": ToolParameter(
description="The cluster host",
type="string",
required=False,
)
},
)
self._opensearch_clients = opensearch_clients

def invoke(self, params:Any) -> str:
client = get_client(self._opensearch_clients, host=params.get("host", ""))
shards = client.client.cat.shards()
return str(shards)

def get_parameterized_one_liner(self, params:Dict) -> str:
return f"opensearch ListShards({params.get('host')})"

class GetClusterSettings(Tool):
def __init__(self, opensearch_clients:List[OpenSearchClient]):
super().__init__(
name = "opensearch_get_cluster_settings",
description = "Retrieve the cluster's settings",
parameters = {
"host": ToolParameter(
description="The cluster host",
type="string",
required=False,
)
},
)
self._opensearch_clients = opensearch_clients

def invoke(self, params:Any) -> str:
client = get_client(self._opensearch_clients, host=params.get("host"))
response = client.client.cluster.get_settings(
include_defaults=True,
flat_settings=True
)
return str(response)

def get_parameterized_one_liner(self, params) -> str:
return f"opensearch GetClusterSettings({params.get('host')})"


class GetClusterHealth(Tool):
def __init__(self, opensearch_clients:List[OpenSearchClient]):
super().__init__(
name = "opensearch_get_cluster_health",
description = "Retrieve the cluster's health",
parameters = {
"host": ToolParameter(
description="The cluster host",
type="string",
required=False,
)
},
)
self._opensearch_clients = opensearch_clients

def invoke(self, params:Any) -> str:
client = get_client(self._opensearch_clients, host=params.get("host", ""))
health = client.client.cluster.health()
return str(health)

def get_parameterized_one_liner(self, params) -> str:
return f"opensearch GetClusterSettings({params.get('host')})"

class OpenSearchToolset(Toolset):
def __init__(self, clusters_configs:List[Dict]):
clients: List[OpenSearchClient] = []
for config in clusters_configs:
logging.info(f"Setting up OpenSearch client: {str(config)}")
client = OpenSearchClient(**config)
print(client.client.cluster.health())
clients.append(client)

super().__init__(
name = "opensearch",
prerequisites = [
StaticPrerequisite(enabled=len(clients) > 0, disabled_reason="No opensearch client was configured")
],
tools = [
ListShards(clients),
GetClusterSettings(clients),
GetClusterHealth(clients),
],
)
self.check_prerequisites()
self._clients = clients

0 comments on commit 4911177

Please sign in to comment.