diff --git a/agent/agentserver/server.go b/agent/agentserver/server.go index 3d3268e7d..30eed7f9d 100644 --- a/agent/agentserver/server.go +++ b/agent/agentserver/server.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -22,6 +22,7 @@ import ( _ "net/http/pprof" // Registers /debug/pprof endpoints in http.DefaultServeMux. "os" "strings" + "sync" "github.com/uber/kraken/build-index/tagclient" "github.com/uber/kraken/core" @@ -29,6 +30,7 @@ import ( "github.com/uber/kraken/lib/middleware" "github.com/uber/kraken/lib/store" "github.com/uber/kraken/lib/torrent/scheduler" + "github.com/uber/kraken/tracker/announceclient" "github.com/uber/kraken/utils/handler" "github.com/uber/kraken/utils/httputil" @@ -46,6 +48,7 @@ type Server struct { cads *store.CADownloadStore sched scheduler.ReloadableScheduler tags tagclient.Client + ac announceclient.Client containerRuntime containerruntime.Factory } @@ -56,13 +59,22 @@ func New( cads *store.CADownloadStore, sched scheduler.ReloadableScheduler, tags tagclient.Client, + ac announceclient.Client, containerRuntime containerruntime.Factory) *Server { stats = stats.Tagged(map[string]string{ "module": "agentserver", }) - return &Server{config, stats, cads, sched, tags, containerRuntime} + return &Server{ + config: config, + stats: stats, + cads: cads, + sched: sched, + tags: tags, + ac: ac, + containerRuntime: containerRuntime, + } } // Handler returns the HTTP handler. @@ -73,6 +85,7 @@ func (s *Server) Handler() http.Handler { r.Use(middleware.LatencyTimer(s.stats)) r.Get("/health", handler.Wrap(s.healthHandler)) + r.Get("/readiness", handler.Wrap(s.readinessCheckHandler)) r.Get("/tags/{tag}", handler.Wrap(s.getTagHandler)) @@ -194,6 +207,39 @@ func (s *Server) healthHandler(w http.ResponseWriter, r *http.Request) error { return nil } +func (s *Server) readinessCheckHandler(w http.ResponseWriter, r *http.Request) error { + var schedErr, buildIndexErr, trackerErr error + var wg sync.WaitGroup + + wg.Add(3) + go func() { + schedErr = s.sched.Probe() + wg.Done() + }() + go func() { + buildIndexErr = s.tags.CheckReadiness() + wg.Done() + }() + go func() { + trackerErr = s.ac.CheckReadiness() + wg.Done() + }() + wg.Wait() + + // TODO(akalpakchiev): Replace with errors.Join once upgraded to Go 1.20+. + errMsgs := []string{} + for _, err := range []error{schedErr, buildIndexErr, trackerErr} { + if err != nil { + errMsgs = append(errMsgs, err.Error()) + } + } + if len(errMsgs) != 0 { + return handler.Errorf("agent not ready: %v", strings.Join(errMsgs, "\n")).Status(http.StatusServiceUnavailable) + } + io.WriteString(w, "OK") + return nil +} + // patchSchedulerConfigHandler restarts the agent torrent scheduler with // the config in request body. func (s *Server) patchSchedulerConfigHandler(w http.ResponseWriter, r *http.Request) error { diff --git a/agent/agentserver/server_test.go b/agent/agentserver/server_test.go index 40f08602c..000207873 100644 --- a/agent/agentserver/server_test.go +++ b/agent/agentserver/server_test.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -21,6 +21,7 @@ import ( "fmt" "io/ioutil" "net/url" + "strings" "testing" "time" @@ -35,6 +36,7 @@ import ( mockcontainerd "github.com/uber/kraken/mocks/lib/containerruntime/containerd" mockdockerdaemon "github.com/uber/kraken/mocks/lib/containerruntime/dockerdaemon" mockscheduler "github.com/uber/kraken/mocks/lib/torrent/scheduler" + mockannounceclient "github.com/uber/kraken/mocks/tracker/announceclient" "github.com/uber/kraken/utils/httputil" "github.com/uber/kraken/utils/testutil" @@ -49,6 +51,7 @@ type serverMocks struct { tags *mocktagclient.MockClient dockerCli *mockdockerdaemon.MockDockerClient containerdCli *mockcontainerd.MockClient + ac *mockannounceclient.MockClient containerRuntime *mockcontainerruntime.MockFactory cleanup *testutil.Cleanup } @@ -68,14 +71,15 @@ func newServerMocks(t *testing.T) (*serverMocks, func()) { dockerCli := mockdockerdaemon.NewMockDockerClient(ctrl) containerdCli := mockcontainerd.NewMockClient(ctrl) + ac := mockannounceclient.NewMockClient(ctrl) containerruntime := mockcontainerruntime.NewMockFactory(ctrl) return &serverMocks{ - cads, sched, tags, dockerCli, containerdCli, + cads, sched, tags, dockerCli, containerdCli, ac, containerruntime, &cleanup}, cleanup.Run } func (m *serverMocks) startServer() string { - s := New(Config{}, tally.NoopScope, m.cads, m.sched, m.tags, m.containerRuntime) + s := New(Config{}, tally.NoopScope, m.cads, m.sched, m.tags, m.ac, m.containerRuntime) addr, stop := testutil.StartServer(s.Handler()) m.cleanup.Add(stop) return addr @@ -207,6 +211,71 @@ func TestHealthHandler(t *testing.T) { } } +func TestReadinessCheckHandler(t *testing.T) { + for _, tc := range []struct { + desc string + probeErr error + buildIndexErr error + trackerErr error + wantErr string + }{ + { + desc: "success", + probeErr: nil, + buildIndexErr: nil, + trackerErr: nil, + wantErr: "", + }, + { + desc: "failure (probe fails)", + probeErr: errors.New("test scheduler error"), + buildIndexErr: nil, + trackerErr: nil, + wantErr: "GET http://{address}/readiness 503: agent not ready: test scheduler error", + }, + { + desc: "failure (build index not ready)", + probeErr: nil, + buildIndexErr: errors.New("build index not ready"), + trackerErr: nil, + wantErr: "GET http://{address}/readiness 503: agent not ready: build index not ready", + }, + { + desc: "failure (tracker not ready)", + probeErr: nil, + buildIndexErr: nil, + trackerErr: errors.New("tracker not ready"), + wantErr: "GET http://{address}/readiness 503: agent not ready: tracker not ready", + }, + { + desc: "failure (all conditions fail)", + probeErr: errors.New("test scheduler error"), + buildIndexErr: errors.New("build index not ready"), + trackerErr: errors.New("tracker not ready"), + wantErr: "GET http://{address}/readiness 503: agent not ready: test scheduler error\nbuild index not ready\ntracker not ready", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require := require.New(t) + + mocks, cleanup := newServerMocks(t) + defer cleanup() + + mocks.sched.EXPECT().Probe().Return(tc.probeErr) + mocks.tags.EXPECT().CheckReadiness().Return(tc.buildIndexErr) + mocks.ac.EXPECT().CheckReadiness().Return(tc.trackerErr) + + addr := mocks.startServer() + _, err := httputil.Get(fmt.Sprintf("http://%s/readiness", addr)) + if tc.wantErr == "" { + require.Nil(err) + } else { + require.EqualError(err, strings.ReplaceAll(tc.wantErr, "{address}", addr)) + } + }) + } +} + func TestPatchSchedulerConfigHandler(t *testing.T) { require := require.New(t) diff --git a/agent/cmd/cmd.go b/agent/cmd/cmd.go index 96ee32c3c..496a20232 100644 --- a/agent/cmd/cmd.go +++ b/agent/cmd/cmd.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -30,6 +30,7 @@ import ( "github.com/uber/kraken/lib/torrent/scheduler" "github.com/uber/kraken/metrics" "github.com/uber/kraken/nginx" + "github.com/uber/kraken/tracker/announceclient" "github.com/uber/kraken/utils/configutil" "github.com/uber/kraken/utils/log" "github.com/uber/kraken/utils/netutil" @@ -183,8 +184,9 @@ func Run(flags *Flags, opts ...Option) { log.Fatalf("Error building client tls config: %s", err) } + announceClient := announceclient.New(pctx, trackers, tls) sched, err := scheduler.NewAgentScheduler( - config.Scheduler, stats, pctx, cads, netevents, trackers, tls) + config.Scheduler, stats, pctx, cads, netevents, trackers, announceClient, tls) if err != nil { log.Fatalf("Error creating scheduler: %s", err) } @@ -216,7 +218,7 @@ func Run(flags *Flags, opts ...Option) { } agentServer := agentserver.New( - config.AgentServer, stats, cads, sched, tagClient, containerRuntimeFactory) + config.AgentServer, stats, cads, sched, tagClient, announceClient, containerRuntimeFactory) addr := fmt.Sprintf(":%d", flags.AgentServerPort) log.Infof("Starting agent server on %s", addr) go func() { diff --git a/lib/torrent/scheduler/constructors.go b/lib/torrent/scheduler/constructors.go index 15b7cab27..5ead5e2c0 100644 --- a/lib/torrent/scheduler/constructors.go +++ b/lib/torrent/scheduler/constructors.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -39,6 +39,7 @@ func NewAgentScheduler( cads *store.CADownloadStore, netevents networkevent.Producer, trackers hashring.PassiveRing, + announceClient announceclient.Client, tls *tls.Config) (ReloadableScheduler, error) { s, err := newScheduler( @@ -46,7 +47,7 @@ func NewAgentScheduler( agentstorage.NewTorrentArchive(stats, cads, metainfoclient.New(trackers, tls)), stats, pctx, - announceclient.New(pctx, trackers, tls), + announceClient, netevents) if err != nil { return nil, fmt.Errorf("new scheduler: %s", err)