Skip to content

Commit

Permalink
Address PR review, add quick responses as well
Browse files Browse the repository at this point in the history
  • Loading branch information
vaijab committed Feb 23, 2024
1 parent 911285e commit b6af995
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 90 deletions.
7 changes: 4 additions & 3 deletions cmd/executor/ai-face/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"fmt"
"io"
"net/http"

"github.com/kubeshop/botkube-cloud-plugins/internal/auth"
"strings"

"github.com/hashicorp/go-plugin"

"github.com/kubeshop/botkube-cloud-plugins/internal/auth"
aibrain "github.com/kubeshop/botkube-cloud-plugins/internal/source/ai-brain"
"github.com/kubeshop/botkube/pkg/api"
"github.com/kubeshop/botkube/pkg/api/executor"
Expand Down Expand Up @@ -77,7 +78,7 @@ func (e *AIFace) Execute(_ context.Context, in executor.ExecuteInput) (executor.
aiBrainWebhookURL := fmt.Sprintf("%s/%s", in.Context.IncomingWebhook.BaseSourceURL, cfg.AIBrainSourceName)

body, err := json.Marshal(aibrain.Payload{
Prompt: in.Command,
Prompt: strings.TrimPrefix(in.Command, pluginName),
MessageID: in.Context.Message.ParentActivityID,
})
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions hack/openai/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ func main() {
}

func updateAssistant(ctx context.Context, c *openai.Client, id string) (openai.Assistant, error) {
instructions := `You are an experienced DevOps engineer.
You have deep understand how to operate a kubernetes cluster and troubleshoot running workloads in kubernetes.
You have access to tools which can help you to find needed information.`
instructions := `You are an experienced DevOps engineer. You have deep
understand how to operate a kubernetes cluster and troubleshoot running
workloads in kubernetes. You have access to tools which can help you. Keep
your answers short and on the subject. Do not get involved in unrelated
topics.`

return c.ModifyAssistant(ctx, id, openai.AssistantRequest{
Model: openai.GPT4TurboPreview,
Expand Down
157 changes: 73 additions & 84 deletions internal/source/ai-brain/assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package aibrain
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/kubeshop/botkube/pkg/api"
Expand All @@ -13,44 +13,32 @@ import (
"github.com/sirupsen/logrus"
)

const openAIPollInterval = 2 * time.Second

// Payload represents incoming webhook payload.
type Payload struct {
Prompt string `json:"prompt"`
MessageID string `json:"messageId"`
}

// handle is simplified - don't do that this way!
func (i *sourceInstance) handle(in source.ExternalRequestInput) (api.Message, error) {
p := new(Payload)
err := json.Unmarshal(in.Payload, p)
if err != nil {
return api.Message{}, fmt.Errorf("while unmarshalling payload: %w", err)
}

// TODO: why is the Prompt prefixed with `ai-face`?
if p.Prompt == "ai-face" {
if p.Prompt == "" {
return api.NewPlaintextMessage("Please clarify your question.", false), nil
}

// Cleanup the prompt.
p.Prompt = strings.TrimPrefix(p.Prompt, "ai-face")

// TODO: needs better goroutine management with persistent thread mapping.
go func() {
_ = i.handleThread(context.Background(), p)
if err := i.handleThread(context.Background(), p); err != nil {
i.log.WithError(err).Error("failed to handle request")
}
}()

return api.Message{
ParentActivityID: p.MessageID,
Sections: []api.Section{
{
// TODO: remove?
Base: api.Base{
Body: api.Body{Plaintext: "Let me figure this out.."},
},
},
},
}, nil
return pickQuickResponse(p.MessageID), nil
}

// handleThread creates a new OpenAI assistant thread and handles the conversation.
Expand Down Expand Up @@ -78,13 +66,13 @@ func (i *sourceInstance) handleThread(ctx context.Context, p *Payload) error {

for {
// Wait a little bit before polling. OpenAI assistant api does not support streaming yet.
time.Sleep(2 * time.Second)
time.Sleep(openAIPollInterval)

// Get the run.
run, err = i.openaiClient.RetrieveRun(ctx, run.ThreadID, run.ID)
if err != nil {
i.log.WithError(err).Error("while retrieving assistant thread run")
continue
i.out <- source.Event{Message: msgUnableToHelp(p.MessageID)}
return fmt.Errorf("while retrieving assistant thread run: %w", err)
}

i.log.WithFields(logrus.Fields{
Expand All @@ -94,38 +82,42 @@ func (i *sourceInstance) handleThread(ctx context.Context, p *Payload) error {

switch run.Status {
case openai.RunStatusCancelling, openai.RunStatusFailed, openai.RunStatusExpired:
// TODO tell the user that the assistant has stopped processing the request.
continue
i.out <- source.Event{Message: msgUnableToHelp(p.MessageID)}
return nil

// We have to wait. Here we could tell the user that we are waiting.
case openai.RunStatusQueued, openai.RunStatusInProgress:
continue

// Fetch and return the response.
case openai.RunStatusCompleted:
if err = i.handleStatusCompleted(ctx, run, p); err != nil {
i.log.WithError(err).Error("while handling completed case")
continue
i.out <- source.Event{Message: msgUnableToHelp(p.MessageID)}
return fmt.Errorf("while handling completed case: %w", err)
}
return nil

// The assistant is attempting to call a function.
case openai.RunStatusRequiresAction:
if err = i.handleStatusRequiresAction(ctx, run); err != nil {
i.out <- source.Event{Message: msgUnableToHelp(p.MessageID)}
return fmt.Errorf("while handling requires action: %w", err)
}
}
}
}

func (i *sourceInstance) handleStatusCompleted(ctx context.Context, run openai.Run, p *Payload) error {
msgList, err := i.openaiClient.ListMessage(ctx, run.ThreadID, nil, nil, nil, nil)
limit := 1
msgList, err := i.openaiClient.ListMessage(ctx, run.ThreadID, &limit, nil, nil, nil)
if err != nil {
return fmt.Errorf("while getting assistant messages response")
return fmt.Errorf("while getting assistant messages response: %w", err)
}

// We're listing messages in a thread. They are ordered in desc order. If
// there are no messages in the entire thread, we imply that something went
// wrong on the OpenAI side, could be a bug.
if len(msgList.Messages) == 0 {
i.log.Debug("no response messages were found")
i.log.Debug("no response messages were found, that seems like an edge case.")
i.out <- source.Event{
Message: api.Message{
ParentActivityID: p.MessageID,
Expand All @@ -138,26 +130,44 @@ func (i *sourceInstance) handleStatusCompleted(ctx context.Context, run openai.R
},
},
}

return nil
}

i.out <- source.Event{
Message: api.Message{
ParentActivityID: p.MessageID,
Sections: []api.Section{
{
Base: api.Base{
Body: api.Body{Plaintext: msgList.Messages[0].Content[0].Text.Value},
// Iterate over text content to build messages. We're only interested in text
// context, since the assistant is instructed to return text only.
for _, c := range msgList.Messages[0].Content {
if c.Text == nil {
continue
}

i.out <- source.Event{
Message: api.Message{
ParentActivityID: p.MessageID,
Sections: []api.Section{
{
Base: api.Base{
Body: api.Body{Plaintext: c.Text.Value},
},
Context: []api.ContextItem{
{Text: "AI-generated content may be incorrect."},
},
},
},
},
},
}
}

return nil
}

func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run openai.Run) error {
// That should never happen, unless there is a bug or something is wrong with OpenAI APIs.
if run.RequiredAction == nil || run.RequiredAction.SubmitToolOutputs == nil {
return errors.New("run.RequiredAction or run.RequiredAction.SubmitToolOutputs is nil, that should not happen")
}

toolOutputs := []openai.ToolOutput{}

for _, t := range run.RequiredAction.SubmitToolOutputs.ToolCalls {
if t.Type != openai.ToolTypeFunction {
continue
Expand All @@ -174,18 +184,11 @@ func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run ope
if err != nil {
return err
}
// Submit tool output.
_, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{
ToolOutputs: []openai.ToolOutput{
{
ToolCallID: t.ID,
Output: string(out),
},
},

toolOutputs = append(toolOutputs, openai.ToolOutput{
ToolCallID: t.ID,
Output: string(out),
})
if err != nil {
return err
}

case "kubectlGetSecrets":
args := &kubectlGetSecretsArgs{}
Expand All @@ -197,18 +200,11 @@ func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run ope
if err != nil {
return err
}
// Submit tool output.
_, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{
ToolOutputs: []openai.ToolOutput{
{
ToolCallID: t.ID,
Output: string(out),
},
},

toolOutputs = append(toolOutputs, openai.ToolOutput{
ToolCallID: t.ID,
Output: string(out),
})
if err != nil {
return err
}

case "kubectlDescribePod":
args := &kubectlDescribePodArgs{}
Expand All @@ -220,18 +216,11 @@ func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run ope
if err != nil {
return err
}
// Submit tool output.
_, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{
ToolOutputs: []openai.ToolOutput{
{
ToolCallID: t.ID,
Output: string(out),
},
},

toolOutputs = append(toolOutputs, openai.ToolOutput{
ToolCallID: t.ID,
Output: string(out),
})
if err != nil {
return err
}

case "kubectlLogs":
args := &kubectlLogsArgs{}
Expand All @@ -243,20 +232,20 @@ func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run ope
if err != nil {
return err
}
// Submit tool output.
_, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{
ToolOutputs: []openai.ToolOutput{
{
ToolCallID: t.ID,
Output: string(out),
},
},

toolOutputs = append(toolOutputs, openai.ToolOutput{
ToolCallID: t.ID,
Output: string(out),
})
if err != nil {
return err
}
}
}

_, err := i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{
ToolOutputs: toolOutputs,
})
if err != nil {
return fmt.Errorf("while submitting tool outputs: %w", err)
}

return nil
}
61 changes: 61 additions & 0 deletions internal/source/ai-brain/response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package aibrain

import (
"math/rand"
"time"

"github.com/kubeshop/botkube/pkg/api"
)

var quickResponses = []string{
"Just a moment, please...",
"Thinking about this one...",
"Let me check on that for you.",
"Processing your request...",
"Working on it!",
"This one needs some extra thought.",
"I'm carefully considering your request.",
"Consulting my super-smart brain...",
"Cogs are turning...",
"Accessing the knowledge archives...",
"Running calculations at lightning speed!",
"Hold on tight, I'm diving into the details.",
"I'm here to help!",
"Happy to look into this for you.",
"Always learning to do this better.",
"I want to get this right for you.",
"Let me see what I can find out.",
"My circuits are buzzing!",
"Let me consult with my owl advisor...",
"Consider it done (or at least, I'll try my best!)",
"I'll get back to you with the best possible answer.",
}

func pickQuickResponse(messageID string) api.Message {
rand.New(rand.NewSource(time.Now().UnixNano())) // #nosec G404
i := rand.Intn(len(quickResponses)) // #nosec G404

return api.Message{
ParentActivityID: messageID,
Sections: []api.Section{
{
Base: api.Base{
Body: api.Body{Plaintext: quickResponses[i]},
},
},
},
}
}

func msgUnableToHelp(messageID string) api.Message {
return api.Message{
ParentActivityID: messageID,
Sections: []api.Section{
{
Base: api.Base{
Body: api.Body{Plaintext: "I am sorry, something went wrong, please try again. 😔"},
},
},
},
}
}

0 comments on commit b6af995

Please sign in to comment.