diff --git a/cmd/executor/ai-face/main.go b/cmd/executor/ai-face/main.go index 28f3056c..f651d6f1 100644 --- a/cmd/executor/ai-face/main.go +++ b/cmd/executor/ai-face/main.go @@ -8,10 +8,11 @@ import ( "fmt" "io" "net/http" - - "github.com/kubeshop/botkube-cloud-plugins/internal/auth" + "strings" "github.com/hashicorp/go-plugin" + + "github.com/kubeshop/botkube-cloud-plugins/internal/auth" aibrain "github.com/kubeshop/botkube-cloud-plugins/internal/source/ai-brain" "github.com/kubeshop/botkube/pkg/api" "github.com/kubeshop/botkube/pkg/api/executor" @@ -77,7 +78,7 @@ func (e *AIFace) Execute(_ context.Context, in executor.ExecuteInput) (executor. aiBrainWebhookURL := fmt.Sprintf("%s/%s", in.Context.IncomingWebhook.BaseSourceURL, cfg.AIBrainSourceName) body, err := json.Marshal(aibrain.Payload{ - Prompt: in.Command, + Prompt: strings.TrimPrefix(in.Command, pluginName), MessageID: in.Context.Message.ParentActivityID, }) if err != nil { diff --git a/hack/openai/main.go b/hack/openai/main.go index 09c0daad..40ab2082 100644 --- a/hack/openai/main.go +++ b/hack/openai/main.go @@ -27,9 +27,11 @@ func main() { } func updateAssistant(ctx context.Context, c *openai.Client, id string) (openai.Assistant, error) { - instructions := `You are an experienced DevOps engineer. - You have deep understand how to operate a kubernetes cluster and troubleshoot running workloads in kubernetes. - You have access to tools which can help you to find needed information.` + instructions := `You are an experienced DevOps engineer. You have deep + understand how to operate a kubernetes cluster and troubleshoot running + workloads in kubernetes. You have access to tools which can help you. Keep + your answers short and on the subject. Do not get involved in unrelated + topics.` return c.ModifyAssistant(ctx, id, openai.AssistantRequest{ Model: openai.GPT4TurboPreview, diff --git a/internal/source/ai-brain/assistant.go b/internal/source/ai-brain/assistant.go index e01d1704..11cbf8c9 100644 --- a/internal/source/ai-brain/assistant.go +++ b/internal/source/ai-brain/assistant.go @@ -3,8 +3,8 @@ package aibrain import ( "context" "encoding/json" + "errors" "fmt" - "strings" "time" "github.com/kubeshop/botkube/pkg/api" @@ -13,13 +13,14 @@ import ( "github.com/sirupsen/logrus" ) +const openAIPollInterval = 2 * time.Second + // Payload represents incoming webhook payload. type Payload struct { Prompt string `json:"prompt"` MessageID string `json:"messageId"` } -// handle is simplified - don't do that this way! func (i *sourceInstance) handle(in source.ExternalRequestInput) (api.Message, error) { p := new(Payload) err := json.Unmarshal(in.Payload, p) @@ -27,30 +28,17 @@ func (i *sourceInstance) handle(in source.ExternalRequestInput) (api.Message, er return api.Message{}, fmt.Errorf("while unmarshalling payload: %w", err) } - // TODO: why is the Prompt prefixed with `ai-face`? - if p.Prompt == "ai-face" { + if p.Prompt == "" { return api.NewPlaintextMessage("Please clarify your question.", false), nil } - // Cleanup the prompt. - p.Prompt = strings.TrimPrefix(p.Prompt, "ai-face") - - // TODO: needs better goroutine management with persistent thread mapping. go func() { - _ = i.handleThread(context.Background(), p) + if err := i.handleThread(context.Background(), p); err != nil { + i.log.WithError(err).Error("failed to handle request") + } }() - return api.Message{ - ParentActivityID: p.MessageID, - Sections: []api.Section{ - { - // TODO: remove? - Base: api.Base{ - Body: api.Body{Plaintext: "Let me figure this out.."}, - }, - }, - }, - }, nil + return pickQuickResponse(p.MessageID), nil } // handleThread creates a new OpenAI assistant thread and handles the conversation. @@ -78,13 +66,13 @@ func (i *sourceInstance) handleThread(ctx context.Context, p *Payload) error { for { // Wait a little bit before polling. OpenAI assistant api does not support streaming yet. - time.Sleep(2 * time.Second) + time.Sleep(openAIPollInterval) // Get the run. run, err = i.openaiClient.RetrieveRun(ctx, run.ThreadID, run.ID) if err != nil { - i.log.WithError(err).Error("while retrieving assistant thread run") - continue + i.out <- source.Event{Message: msgUnableToHelp(p.MessageID)} + return fmt.Errorf("while retrieving assistant thread run: %w", err) } i.log.WithFields(logrus.Fields{ @@ -94,24 +82,24 @@ func (i *sourceInstance) handleThread(ctx context.Context, p *Payload) error { switch run.Status { case openai.RunStatusCancelling, openai.RunStatusFailed, openai.RunStatusExpired: - // TODO tell the user that the assistant has stopped processing the request. - continue + i.out <- source.Event{Message: msgUnableToHelp(p.MessageID)} + return nil - // We have to wait. Here we could tell the user that we are waiting. case openai.RunStatusQueued, openai.RunStatusInProgress: continue // Fetch and return the response. case openai.RunStatusCompleted: if err = i.handleStatusCompleted(ctx, run, p); err != nil { - i.log.WithError(err).Error("while handling completed case") - continue + i.out <- source.Event{Message: msgUnableToHelp(p.MessageID)} + return fmt.Errorf("while handling completed case: %w", err) } return nil // The assistant is attempting to call a function. case openai.RunStatusRequiresAction: if err = i.handleStatusRequiresAction(ctx, run); err != nil { + i.out <- source.Event{Message: msgUnableToHelp(p.MessageID)} return fmt.Errorf("while handling requires action: %w", err) } } @@ -119,13 +107,17 @@ func (i *sourceInstance) handleThread(ctx context.Context, p *Payload) error { } func (i *sourceInstance) handleStatusCompleted(ctx context.Context, run openai.Run, p *Payload) error { - msgList, err := i.openaiClient.ListMessage(ctx, run.ThreadID, nil, nil, nil, nil) + limit := 1 + msgList, err := i.openaiClient.ListMessage(ctx, run.ThreadID, &limit, nil, nil, nil) if err != nil { - return fmt.Errorf("while getting assistant messages response") + return fmt.Errorf("while getting assistant messages response: %w", err) } + // We're listing messages in a thread. They are ordered in desc order. If + // there are no messages in the entire thread, we imply that something went + // wrong on the OpenAI side, could be a bug. if len(msgList.Messages) == 0 { - i.log.Debug("no response messages were found") + i.log.Debug("no response messages were found, that seems like an edge case.") i.out <- source.Event{ Message: api.Message{ ParentActivityID: p.MessageID, @@ -138,26 +130,44 @@ func (i *sourceInstance) handleStatusCompleted(ctx context.Context, run openai.R }, }, } - return nil } - i.out <- source.Event{ - Message: api.Message{ - ParentActivityID: p.MessageID, - Sections: []api.Section{ - { - Base: api.Base{ - Body: api.Body{Plaintext: msgList.Messages[0].Content[0].Text.Value}, + // Iterate over text content to build messages. We're only interested in text + // context, since the assistant is instructed to return text only. + for _, c := range msgList.Messages[0].Content { + if c.Text == nil { + continue + } + + i.out <- source.Event{ + Message: api.Message{ + ParentActivityID: p.MessageID, + Sections: []api.Section{ + { + Base: api.Base{ + Body: api.Body{Plaintext: c.Text.Value}, + }, + Context: []api.ContextItem{ + {Text: "AI-generated content may be incorrect."}, + }, }, }, }, - }, + } } + return nil } func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run openai.Run) error { + // That should never happen, unless there is a bug or something is wrong with OpenAI APIs. + if run.RequiredAction == nil || run.RequiredAction.SubmitToolOutputs == nil { + return errors.New("run.RequiredAction or run.RequiredAction.SubmitToolOutputs is nil, that should not happen") + } + + toolOutputs := []openai.ToolOutput{} + for _, t := range run.RequiredAction.SubmitToolOutputs.ToolCalls { if t.Type != openai.ToolTypeFunction { continue @@ -174,18 +184,11 @@ func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run ope if err != nil { return err } - // Submit tool output. - _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ - ToolOutputs: []openai.ToolOutput{ - { - ToolCallID: t.ID, - Output: string(out), - }, - }, + + toolOutputs = append(toolOutputs, openai.ToolOutput{ + ToolCallID: t.ID, + Output: string(out), }) - if err != nil { - return err - } case "kubectlGetSecrets": args := &kubectlGetSecretsArgs{} @@ -197,18 +200,11 @@ func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run ope if err != nil { return err } - // Submit tool output. - _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ - ToolOutputs: []openai.ToolOutput{ - { - ToolCallID: t.ID, - Output: string(out), - }, - }, + + toolOutputs = append(toolOutputs, openai.ToolOutput{ + ToolCallID: t.ID, + Output: string(out), }) - if err != nil { - return err - } case "kubectlDescribePod": args := &kubectlDescribePodArgs{} @@ -220,18 +216,11 @@ func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run ope if err != nil { return err } - // Submit tool output. - _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ - ToolOutputs: []openai.ToolOutput{ - { - ToolCallID: t.ID, - Output: string(out), - }, - }, + + toolOutputs = append(toolOutputs, openai.ToolOutput{ + ToolCallID: t.ID, + Output: string(out), }) - if err != nil { - return err - } case "kubectlLogs": args := &kubectlLogsArgs{} @@ -243,20 +232,20 @@ func (i *sourceInstance) handleStatusRequiresAction(ctx context.Context, run ope if err != nil { return err } - // Submit tool output. - _, err = i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ - ToolOutputs: []openai.ToolOutput{ - { - ToolCallID: t.ID, - Output: string(out), - }, - }, + + toolOutputs = append(toolOutputs, openai.ToolOutput{ + ToolCallID: t.ID, + Output: string(out), }) - if err != nil { - return err - } } } + _, err := i.openaiClient.SubmitToolOutputs(ctx, run.ThreadID, run.ID, openai.SubmitToolOutputsRequest{ + ToolOutputs: toolOutputs, + }) + if err != nil { + return fmt.Errorf("while submitting tool outputs: %w", err) + } + return nil } diff --git a/internal/source/ai-brain/response.go b/internal/source/ai-brain/response.go new file mode 100644 index 00000000..dde9f06c --- /dev/null +++ b/internal/source/ai-brain/response.go @@ -0,0 +1,61 @@ +package aibrain + +import ( + "math/rand" + "time" + + "github.com/kubeshop/botkube/pkg/api" +) + +var quickResponses = []string{ + "Just a moment, please...", + "Thinking about this one...", + "Let me check on that for you.", + "Processing your request...", + "Working on it!", + "This one needs some extra thought.", + "I'm carefully considering your request.", + "Consulting my super-smart brain...", + "Cogs are turning...", + "Accessing the knowledge archives...", + "Running calculations at lightning speed!", + "Hold on tight, I'm diving into the details.", + "I'm here to help!", + "Happy to look into this for you.", + "Always learning to do this better.", + "I want to get this right for you.", + "Let me see what I can find out.", + "My circuits are buzzing!", + "Let me consult with my owl advisor...", + "Consider it done (or at least, I'll try my best!)", + "I'll get back to you with the best possible answer.", +} + +func pickQuickResponse(messageID string) api.Message { + rand.New(rand.NewSource(time.Now().UnixNano())) // #nosec G404 + i := rand.Intn(len(quickResponses)) // #nosec G404 + + return api.Message{ + ParentActivityID: messageID, + Sections: []api.Section{ + { + Base: api.Base{ + Body: api.Body{Plaintext: quickResponses[i]}, + }, + }, + }, + } +} + +func msgUnableToHelp(messageID string) api.Message { + return api.Message{ + ParentActivityID: messageID, + Sections: []api.Section{ + { + Base: api.Base{ + Body: api.Body{Plaintext: "I am sorry, something went wrong, please try again. 😔"}, + }, + }, + }, + } +}