Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve agent startup time #5976

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions cmd/api-server/commons/commons.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ import (
"os/signal"
"strings"
"syscall"
"time"

"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/mongo"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/emptypb"
corev1 "k8s.io/api/core/v1"

"github.com/kubeshop/testkube/internal/config"
dbmigrations "github.com/kubeshop/testkube/internal/db-migrations"
parser "github.com/kubeshop/testkube/internal/template"
"github.com/kubeshop/testkube/pkg/agent"
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/cache"
"github.com/kubeshop/testkube/pkg/cloud"
Expand All @@ -43,7 +44,7 @@ import (
func ExitOnError(title string, err error) {
if err != nil {
log.DefaultLogger.Errorw(title, "error", err)
os.Exit(1)
panic(err)
}
}

Expand Down Expand Up @@ -280,7 +281,7 @@ func ReadDefaultExecutors(cfg *config.Config) (executors []testkube.ExecutorDeta
return next, images, nil
}

func ReadProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) config.ProContext {
func ReadProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) (config.ProContext, error) {
proContext := config.ProContext{
APIKey: cfg.TestkubeProAPIKey,
URL: cfg.TestkubeProURL,
Expand All @@ -297,17 +298,16 @@ func ReadProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.Te
}

if cfg.TestkubeProAPIKey == "" || grpcClient == nil {
return proContext
return proContext, nil
}

ctx, cancel := context.WithTimeout(ctx, time.Second*3)
ctx, cancel := context.WithTimeout(ctx, agent.InitialTimeout)
md := metadata.Pairs("api-key", cfg.TestkubeProAPIKey)
ctx = metadata.NewOutgoingContext(ctx, md)
defer cancel()
foundProContext, err := grpcClient.GetProContext(ctx, &emptypb.Empty{})
foundProContext, err := grpcClient.GetProContext(ctx, &emptypb.Empty{}, grpc.WaitForReady(true))
if err != nil {
log.DefaultLogger.Warnf("cannot fetch pro-context from cloud: %s", err)
return proContext
return proContext, err
}

if proContext.EnvID == "" {
Expand All @@ -318,7 +318,7 @@ func ReadProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.Te
proContext.OrgID = foundProContext.OrgId
}

return proContext
return proContext, nil
}

func MustCreateSlackLoader(cfg *config.Config, envs map[string]string) *slack.SlackLoader {
Expand Down
36 changes: 20 additions & 16 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
"strings"

"github.com/gofiber/fiber/v2/middleware/cors"
"google.golang.org/grpc"

executorsclientv1 "github.com/kubeshop/testkube-operator/pkg/client/executors/v1"
testkubeclientset "github.com/kubeshop/testkube-operator/pkg/clientset/versioned"
"github.com/kubeshop/testkube/cmd/api-server/commons"
"github.com/kubeshop/testkube/cmd/api-server/services"
"github.com/kubeshop/testkube/internal/app/api/debug"
"github.com/kubeshop/testkube/internal/app/api/oauth"
"github.com/kubeshop/testkube/internal/common"
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
cloudartifacts "github.com/kubeshop/testkube/pkg/cloud/data/artifact"
cloudtestworkflow "github.com/kubeshop/testkube/pkg/cloud/data/testworkflow"
"github.com/kubeshop/testkube/pkg/event/kind/cdevent"
Expand All @@ -30,9 +31,6 @@
"github.com/kubeshop/testkube/pkg/tcl/schedulertcl"
"github.com/kubeshop/testkube/pkg/testworkflows/executionworker/executionworkertypes"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/presets"

"github.com/kubeshop/testkube/internal/common"
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/version"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -126,16 +124,32 @@

inspector := commons.CreateImageInspector(cfg, configMapClient, secretClient)

// Connect to NATS
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving NATS first staggers the gRPC connection a bit when we start agent and control plane together. gRPC backoff is quite steep, but I want to avoid tweaking it until absolutely needed as these defaults are there for a reason.

nc := commons.MustCreateNATSConnection(cfg)
eventBus := bus.NewNATSBus(nc)
if cfg.Trace {
eventBus.TraceEvents()
}
eventsEmitter := event.NewEmitter(eventBus, cfg.TestkubeClusterName)

var testWorkflowsClient testworkflowsclientv1.Interface
var testWorkflowTemplatesClient testworkflowsclientv1.TestWorkflowTemplatesInterface

var grpcClient cloud.TestKubeCloudAPIClient
var grpcConn *grpc.ClientConn
// Use local network for local access
controlPlaneUrl := cfg.TestkubeProURL
if strings.HasPrefix(controlPlaneUrl, fmt.Sprintf("%s:%d", cfg.APIServerFullname, cfg.GRPCServerPort)) {
controlPlaneUrl = fmt.Sprintf("127.0.0.1:%d", cfg.GRPCServerPort)
}

log.DefaultLogger.Infow("Connecting to control plane", "server", controlPlaneUrl, "insecure", cfg.TestkubeProTLSInsecure, "skipVerify", cfg.TestkubeProSkipVerify, "certFile", cfg.TestkubeProCertFile, "keyFile", cfg.TestkubeProKeyFile, "caFile", cfg.TestkubeProCAFile)

Check failure on line 144 in cmd/api-server/main.go

View workflow job for this annotation

GitHub Actions / Lint Go

SA1019: cfg.TestkubeProCAFile is deprecated: Instead mount a CA file into a directory and specify the diretory path with the SSL_CERT_DIR environment variable. (staticcheck)
grpcConn, err := agent.NewClient(controlPlaneUrl, cfg.TestkubeProTLSInsecure, cfg.TestkubeProSkipVerify, cfg.TestkubeProCertFile, cfg.TestkubeProKeyFile, cfg.TestkubeProCAFile)

Check failure on line 145 in cmd/api-server/main.go

View workflow job for this annotation

GitHub Actions / Lint Go

SA1019: cfg.TestkubeProCAFile is deprecated: Instead mount a CA file into a directory and specify the diretory path with the SSL_CERT_DIR environment variable. (staticcheck)
grpcClient := cloud.NewTestKubeCloudAPIClient(grpcConn)
defer grpcConn.Close()

// First request will 'wait for ready' with a timeout of agent.InitialTimeout.
proContext, err := commons.ReadProContext(ctx, cfg, grpcClient)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this up a bit, as it groups nicely with the grpcConnection part, this is where we actually wait for the connection to move from idle -> transient error -> ready.

commons.ExitOnError("cannot retrieve pro context from control plane", err)

grpcConn, err = agent.NewGRPCConnection(
ctx,
cfg.TestkubeProTLSInsecure,
Expand All @@ -148,8 +162,6 @@
)
commons.ExitOnError("error creating gRPC connection", err)

grpcClient = cloud.NewTestKubeCloudAPIClient(grpcConn)

if mode == common.ModeAgent && cfg.WorkflowStorage == "control-plane" {
testWorkflowsClient = cloudtestworkflow.NewCloudTestWorkflowRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
testWorkflowTemplatesClient = cloudtestworkflow.NewCloudTestWorkflowTemplateRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
Expand All @@ -167,15 +179,7 @@
triggerLeaseBackend := triggers.NewAcquireAlwaysLeaseBackend()
artifactStorage := cloudartifacts.NewCloudArtifactsStorage(grpcClient, grpcConn, cfg.TestkubeProAPIKey)

nc := commons.MustCreateNATSConnection(cfg)
eventBus := bus.NewNATSBus(nc)
if cfg.Trace {
eventBus.TraceEvents()
}
eventsEmitter := event.NewEmitter(eventBus, cfg.TestkubeClusterName)

// Check Pro/Enterprise subscription
proContext := commons.ReadProContext(ctx, cfg, grpcClient)
subscriptionChecker, err := checktcl.NewSubscriptionChecker(ctx, proContext, grpcClient, grpcConn)
commons.ExitOnError("Failed creating subscription checker", err)

Expand Down
49 changes: 47 additions & 2 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (
)

const (
timeout = 10 * time.Second
InitialTimeout = 30 * time.Second
Copy link
Contributor Author

@WitoDelnat WitoDelnat Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With gRPC backoff this would amount to: 1 + 1.6 + 2.5 + 4 +6.5 + 10.5 -> CRASH after 30s, last attempt at 26.1s. Kubernetes will then restart the pod and it will try again.

Timeout = 10 * time.Second
apiKeyMeta = "api-key"
clusterIDMeta = "cluster-id"
cloudMigrateMeta = "migrate"
Expand All @@ -44,6 +45,50 @@ const (
// buffer up to five messages per worker
const bufferSizePerWorker = 5

func NewClient(server string, isInsecure, skipVerify bool,
certFile, keyFile, caFile string) (*grpc.ClientConn, error) {
creds, err := newTransportCredentials(isInsecure, skipVerify, certFile, keyFile, caFile)
if err != nil {
return nil, err
}

userAgentStr := version.Version + "/" + version.Commit
keepAlive := keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 5 * time.Second,
PermitWithoutStream: true,
}

return grpc.NewClient(server, grpc.WithUserAgent(userAgentStr), grpc.WithKeepaliveParams(keepAlive), grpc.WithTransportCredentials(creds))
}

func newTransportCredentials(
isInsecure, skipVerify bool,
certFile, keyFile, caFile string,
) (credentials.TransportCredentials, error) {
if isInsecure {
return insecure.NewCredentials(), nil
}

tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12}
if skipVerify {
tlsConfig = &tls.Config{InsecureSkipVerify: true}
return credentials.NewTLS(tlsConfig), nil
}

if certFile != "" && keyFile != "" {
if err := clientCert(tlsConfig, certFile, keyFile); err != nil {
return nil, err
}
}
if caFile != "" {
if err := rootCAs(tlsConfig, caFile); err != nil {
return nil, err
}
}
return credentials.NewTLS(tlsConfig), nil
}

func NewGRPCConnection(
ctx context.Context,
isInsecure bool,
Expand All @@ -52,7 +97,7 @@ func NewGRPCConnection(
certFile, keyFile, caFile string,
logger *zap.SugaredLogger,
) (*grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
ctx, cancel := context.WithTimeout(ctx, Timeout)
defer cancel()
tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12}
if skipVerify {
Expand Down
Loading