From 05c3d8603751e78dfc46f66be5dd4b62f8012921 Mon Sep 17 00:00:00 2001 From: Deepak Kasu Date: Tue, 5 Nov 2024 13:02:29 -0700 Subject: [PATCH] APIGOV-28984 Updates --- pkg/agent/agent.go | 7 ++++--- pkg/customunit/client.go | 12 +++++------- pkg/customunit/client_test.go | 3 +-- pkg/customunit/manager.go | 2 +- pkg/transaction/metric/metricscollector.go | 4 +++- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 98e506360..98f9cdf7b 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -53,7 +53,6 @@ type ConfigChangeHandler func() // ShutdownHandler - function that the agent may implement to be called when a shutdown request is received type ShutdownHandler func() - type agentData struct { agentResourceManager resource.Manager teamJob *centralTeamsCache @@ -170,8 +169,6 @@ func InitializeWithAgentFeatures(centralCfg config.CentralConfig, agentFeaturesC // call the metric services. metricServicesConfigs := agentFeaturesCfg.GetMetricServicesConfigs() agent.customUnitMetricServerManager = customunit.NewCustomUnitMetricServerManager(metricServicesConfigs, agent.cacheManager, centralCfg.GetAgentType()) - ctx, ctxCancel := context.WithCancel(context.Background()) - agent.customUnitMetricServerManager.HandleMetricReporting(ctx, ctxCancel) if !agent.isInitialized { err = handleInitialization() @@ -436,6 +433,10 @@ func GetAuthProviderRegistry() oauth.ProviderRegistry { return agent.authProviderRegistry } +func GetCustomUnitMetricServerManager() *customunit.CustomUnitMetricServerManager { + return agent.customUnitMetricServerManager +} + // RegisterShutdownHandler - Registers shutdown handler func RegisterShutdownHandler(handler ShutdownHandler) { agent.agentShutdownHandler = handler diff --git a/pkg/customunit/client.go b/pkg/customunit/client.go index f2dd1328a..398d87640 100644 --- a/pkg/customunit/client.go +++ b/pkg/customunit/client.go @@ -90,23 +90,21 @@ type metricCollector interface { AddCustomMetricDetail(models.CustomMetricDetail) } -func (c *customUnitClient) MetricReporting() error { +func (c *customUnitClient) MetricReporting() { if err := c.createConnection(); err != nil { //TODO:: Retry until the connection is stable - return err + return } c.metricReportingClient = cu.NewMetricReportingServiceClient(c.conn) metricServiceInit := &cu.MetricServiceInit{} client, err := c.metricReportingClient.MetricReporting(c.ctx, metricServiceInit, c.cOpts...) if err != nil { - return err + return } c.metricReportingServiceClient = client // process metrics - go c.processMetrics() - c.stopChan <- true - return nil + c.processMetrics() } // processMetrics will stream custom metrics @@ -119,7 +117,7 @@ func (c *customUnitClient) processMetrics() { c.cancelCtx() } c.MetricReporting() - case <-c.stopChan: + default: metricReport, err := c.recv() if err == io.EOF { c.logger.Debug("stream finished") diff --git a/pkg/customunit/client_test.go b/pkg/customunit/client_test.go index f929270c5..2b73f5334 100644 --- a/pkg/customunit/client_test.go +++ b/pkg/customunit/client_test.go @@ -73,8 +73,7 @@ func Test_MetricReporting(t *testing.T) { ctx := context.Background() fakeServer := &fakeCustomUnitMetricReportingServer{} client, _ := createMRConnection(fakeServer, ctx) - err := client.MetricReporting() - fmt.Println(err) + client.MetricReporting() } func createMRConnection(fakeServer *fakeCustomUnitMetricReportingServer, ctx context.Context) (customUnitClient, error) { diff --git a/pkg/customunit/manager.go b/pkg/customunit/manager.go index f688421d8..a57310e24 100644 --- a/pkg/customunit/manager.go +++ b/pkg/customunit/manager.go @@ -143,6 +143,6 @@ func (m *CustomUnitMetricServerManager) HandleMetricReporting(ctx context.Contex factory := NewCustomUnitClientFactory(config.URL, m.cache, &customunits.QuotaInfo{}) client, _ := factory(ctx, cancelCtx) - client.MetricReporting() + go client.MetricReporting() } } diff --git a/pkg/transaction/metric/metricscollector.go b/pkg/transaction/metric/metricscollector.go index d8c3f8cc4..bb583c870 100644 --- a/pkg/transaction/metric/metricscollector.go +++ b/pkg/transaction/metric/metricscollector.go @@ -1,6 +1,7 @@ package metric import ( + "context" "encoding/json" "fmt" "os" @@ -148,7 +149,8 @@ func GetMetricCollector() Collector { if globalMetricCollector == nil && util.IsNotTest() { globalMetricCollector = createMetricCollector() } - + ctx, ctxCancel := context.WithCancel(context.Background()) + agent.GetCustomUnitMetricServerManager().HandleMetricReporting(ctx, ctxCancel) return globalMetricCollector }