Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenSearch toolset #221

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,26 @@ Configure Slack to send notifications to specific channels. Provide your Slack t

</details>


<details>
<summary>OpenSearch Integration</summary>

The OpenSearch toolset (`opensearch`) allows Holmes to consult an opensearch cluster for its health, settings and shards information.
The toolset supports multiple opensearch or elasticsearch clusters that are configured by editing Holmes' configuration file:

``` holmesgpt-DoRnpO3K-py3.11 (add_tool_elasticsearch|💩) 15:20
opensearch_clusters:
- hosts:
- https://my_elasticsearch.us-central1.gcp.cloud.es.io:443
headers:
Authorization: "ApiKey <your_API_key>"
```

> The configuration for each opensearch cluster is passed through to the [opensearch-py](https://github.com/opensearch-project/opensearch-py) module. Checkout that module documentation for how to configure connectivity.

</details>


<details>

<summary>Custom Runbooks</summary>
Expand Down
34 changes: 18 additions & 16 deletions holmes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import yaml
import os.path
from holmes.core.llm import LLM, DefaultLLM
from typing import List, Optional
from typing import Dict, List, Optional


from pydantic import FilePath, SecretStr, Field
Expand All @@ -13,7 +13,7 @@

from holmes.core.runbooks import RunbookManager
from holmes.core.supabase_dal import SupabaseDal
from holmes.core.tool_calling_llm import (IssueInvestigator,
from holmes.core.tool_calling_llm import (IssueInvestigator,
ToolCallingLLM,
ToolExecutor)
from holmes.core.tools import ToolsetPattern, get_matching_toolsets, ToolsetStatusEnum, ToolsetTag
Expand Down Expand Up @@ -77,9 +77,11 @@ class Config(RobustaBaseConfig):

custom_runbooks: List[FilePath] = []
custom_toolsets: List[FilePath] = []

enabled_toolsets_names: List[str] = Field(default_factory=list)

opensearch_clusters: Optional[List[Dict]] = None # Passed through to opensearchpy.OpenSearch constructor

@classmethod
def load_from_env(cls):
kwargs = {}
Expand Down Expand Up @@ -109,7 +111,7 @@ def load_from_env(cls):
kwargs[field_name] = val
kwargs["cluster_name"] = Config.__get_cluster_name()
return cls(**kwargs)

@staticmethod
def __get_cluster_name() -> Optional[str]:
config_file_path = ROBUSTA_CONFIG_PATH
Expand All @@ -133,17 +135,17 @@ def create_console_tool_executor(
self, console: Console, allowed_toolsets: ToolsetPattern, dal:Optional[SupabaseDal]
) -> ToolExecutor:
"""
Creates ToolExecutor for the cli
Creates ToolExecutor for the cli
"""
default_toolsets = [toolset for toolset in load_builtin_toolsets(dal) if any(tag in (ToolsetTag.CORE, ToolsetTag.CLI) for tag in toolset.tags)]
default_toolsets = [toolset for toolset in load_builtin_toolsets(dal, opensearch_clusters=self.opensearch_clusters) if any(tag in (ToolsetTag.CORE, ToolsetTag.CLI) for tag in toolset.tags)]

if allowed_toolsets == "*":
matching_toolsets = default_toolsets
else:
matching_toolsets = get_matching_toolsets(
default_toolsets, allowed_toolsets.split(",")
)
)

# Enable all matching toolsets that have CORE or CLI tag
for toolset in matching_toolsets:
toolset.enabled = True
Expand All @@ -155,7 +157,7 @@ def create_console_tool_executor(
toolsets_loaded_from_config,
matched_default_toolsets_by_name,
)

for toolset in filtered_toolsets_by_name.values():
if toolset.enabled:
toolset.check_prerequisites()
Expand All @@ -169,11 +171,11 @@ def create_console_tool_executor(
logging.info(f"Disabled toolset: {ts.name} from {ts.get_path()})")
elif ts.get_status() == ToolsetStatusEnum.FAILED:
logging.info(f"Failed loading toolset {ts.name} from {ts.get_path()}: ({ts.get_error()})")

for ts in default_toolsets:
if ts.name not in filtered_toolsets_by_name.keys():
logging.debug(f"Toolset {ts.name} from {ts.get_path()} was filtered out due to allowed_toolsets value")

enabled_tools = concat(*[ts.tools for ts in enabled_toolsets])
logging.debug(
f"Starting AI session with tools: {[t.name for t in enabled_tools]}"
Expand All @@ -184,10 +186,10 @@ def create_tool_executor(
self, console: Console, dal:Optional[SupabaseDal]
) -> ToolExecutor:
"""
Creates ToolExecutor for the server endpoints
Creates ToolExecutor for the server endpoints
"""

all_toolsets = load_builtin_toolsets(dal=dal)
all_toolsets = load_builtin_toolsets(dal=dal, opensearch_clusters=self.opensearch_clusters)

if os.path.isfile(CUSTOM_TOOLSET_LOCATION):
try:
Expand All @@ -201,7 +203,7 @@ def create_tool_executor(
f"Starting AI session with tools: {[t.name for t in enabled_tools]}"
)
return ToolExecutor(enabled_toolsets)

def create_console_toolcalling_llm(
self, console: Console, allowed_toolsets: ToolsetPattern, dal:Optional[SupabaseDal] = None
) -> ToolCallingLLM:
Expand Down Expand Up @@ -239,7 +241,7 @@ def create_issue_investigator(
self.max_steps,
self._get_llm()
)

def create_console_issue_investigator(
self,
console: Console,
Expand Down
18 changes: 10 additions & 8 deletions holmes/plugins/toolsets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import logging
import os
import os.path
from typing import List, Optional
from typing import List, Optional, Dict

from holmes.core.supabase_dal import SupabaseDal
from holmes.plugins.toolsets.findings import FindingsToolset
from holmes.plugins.toolsets.internet import InternetToolset
from pydantic import BaseModel

from holmes.core.tools import Toolset, YAMLToolset
from typing import Dict
from pydantic import BaseModel
from typing import Optional
import yaml
from holmes.plugins.toolsets.opensearch import OpenSearchToolset

THIS_DIR = os.path.abspath(os.path.dirname(__file__))

Expand All @@ -39,12 +37,16 @@ def load_toolsets_from_file(path: str, silent_fail: bool = False) -> List[YAMLTo
return file_toolsets


def load_python_toolsets(dal:Optional[SupabaseDal]) -> List[Toolset]:
def load_python_toolsets(dal:Optional[SupabaseDal], opensearch_clusters:Optional[List[Dict]]) -> List[Toolset]:
logging.debug("loading python toolsets")
return [InternetToolset(), FindingsToolset(dal)]
toolsets = [InternetToolset(), FindingsToolset(dal)]
if opensearch_clusters and len(opensearch_clusters) > 0:
opensearch = OpenSearchToolset(clusters_configs=opensearch_clusters)
toolsets.append(opensearch)
return toolsets


def load_builtin_toolsets(dal:Optional[SupabaseDal] = None) -> List[Toolset]:
def load_builtin_toolsets(dal:Optional[SupabaseDal] = None, opensearch_clusters:Optional[List[Dict]] = []) -> List[Toolset]:
all_toolsets = []
logging.debug(f"loading toolsets from {THIS_DIR}")
for filename in os.listdir(THIS_DIR):
Expand All @@ -53,5 +55,5 @@ def load_builtin_toolsets(dal:Optional[SupabaseDal] = None) -> List[Toolset]:
path = os.path.join(THIS_DIR, filename)
all_toolsets.extend(load_toolsets_from_file(path))

all_toolsets.extend(load_python_toolsets(dal))
all_toolsets.extend(load_python_toolsets(dal, opensearch_clusters=opensearch_clusters))
return all_toolsets
121 changes: 121 additions & 0 deletions holmes/plugins/toolsets/opensearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import logging
from typing import Any, Dict, List, Optional
from holmes.core.tools import StaticPrerequisite, Tool, ToolParameter, Toolset
from opensearchpy import OpenSearch

class OpenSearchClient:
def __init__(self, **kwargs):
self.hosts = kwargs.get("hosts") or []
self.client = OpenSearch(**kwargs)

def get_client(clients:List[OpenSearchClient], host:Optional[str]):
if len(clients) == 1:
return clients[0]

if not host:
raise Exception("Missing host to resolve opensearch client")

for client in clients:
found = any(host in client.hosts for client in clients)
if found:
return client

raise Exception(f"Failed to resolve opensearch client. Could not find a matching host: {host}")

class ListShards(Tool):
def __init__(self, opensearch_clients:List[OpenSearchClient]):
super().__init__(
name = "opensearch_list_shards",
description = "List the shards within an opensearch cluster",
parameters = {
"host": ToolParameter(
description="The cluster host",
type="string",
required=False,
)
},
)
self._opensearch_clients = opensearch_clients

def invoke(self, params:Any) -> str:
client = get_client(self._opensearch_clients, host=params.get("host", ""))
shards = client.client.cat.shards()
return str(shards)

def get_parameterized_one_liner(self, params:Dict) -> str:
return f"opensearch ListShards({params.get('host')})"

class GetClusterSettings(Tool):
def __init__(self, opensearch_clients:List[OpenSearchClient]):
super().__init__(
name = "opensearch_get_cluster_settings",
description = "Retrieve the cluster's settings",
parameters = {
"host": ToolParameter(
description="The cluster host",
type="string",
required=False,
)
},
)
self._opensearch_clients = opensearch_clients

def invoke(self, params:Any) -> str:
client = get_client(self._opensearch_clients, host=params.get("host"))
response = client.client.cluster.get_settings(
include_defaults=True,
flat_settings=True
)
return str(response)

def get_parameterized_one_liner(self, params) -> str:
return f"opensearch GetClusterSettings({params.get('host')})"


class GetClusterHealth(Tool):
def __init__(self, opensearch_clients:List[OpenSearchClient]):
super().__init__(
name = "opensearch_get_cluster_health",
description = "Retrieve the cluster's health",
parameters = {
"host": ToolParameter(
description="The cluster host",
type="string",
required=False,
)
},
)
self._opensearch_clients = opensearch_clients

def invoke(self, params:Any) -> str:
client = get_client(self._opensearch_clients, host=params.get("host", ""))
health = client.client.cluster.health()
return str(health)

def get_parameterized_one_liner(self, params) -> str:
return f"opensearch GetClusterSettings({params.get('host')})"

class OpenSearchToolset(Toolset):
def __init__(self, clusters_configs:List[Dict]):
clients: List[OpenSearchClient] = []
for config in clusters_configs:
logging.info(f"Setting up OpenSearch client: {str(config)}")
client = OpenSearchClient(**config)
print(client.client.cluster.health())
clients.append(client)

super().__init__(
name = "opensearch",
description="Provide cluster metadata information like health, shards, settings.",
docs_url="https://opensearch.org/docs/latest/clients/python-low-level/",
icon_url="https://upload.wikimedia.org/wikipedia/commons/9/91/Opensearch_Logo.svg",
prerequisites = [
StaticPrerequisite(enabled=len(clients) > 0, disabled_reason="No opensearch client was configured")
],
tools = [
ListShards(clients),
GetClusterSettings(clients),
GetClusterHealth(clients),
],
)
self._clients = clients
41 changes: 39 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ playwright = "1.48.0"
bs4 = "^0.0.2"
markdownify = "^0.13.1"
starlette = "^0.40"
opensearch-py = "^2.8.0"

[tool.poetry.group.dev.dependencies]
pytest = "^8.3.3"
Expand Down
Loading