Skip to content

Commit

Permalink
APIGOV-28984 Updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Deepak Kasu committed Nov 5, 2024
1 parent 77e88a2 commit 05c3d86
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 14 deletions.
7 changes: 4 additions & 3 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type ConfigChangeHandler func()

// ShutdownHandler - function that the agent may implement to be called when a shutdown request is received
type ShutdownHandler func()

type agentData struct {
agentResourceManager resource.Manager
teamJob *centralTeamsCache
Expand Down Expand Up @@ -170,8 +169,6 @@ func InitializeWithAgentFeatures(centralCfg config.CentralConfig, agentFeaturesC
// call the metric services.
metricServicesConfigs := agentFeaturesCfg.GetMetricServicesConfigs()
agent.customUnitMetricServerManager = customunit.NewCustomUnitMetricServerManager(metricServicesConfigs, agent.cacheManager, centralCfg.GetAgentType())
ctx, ctxCancel := context.WithCancel(context.Background())
agent.customUnitMetricServerManager.HandleMetricReporting(ctx, ctxCancel)

if !agent.isInitialized {
err = handleInitialization()
Expand Down Expand Up @@ -436,6 +433,10 @@ func GetAuthProviderRegistry() oauth.ProviderRegistry {
return agent.authProviderRegistry
}

func GetCustomUnitMetricServerManager() *customunit.CustomUnitMetricServerManager {
return agent.customUnitMetricServerManager
}

// RegisterShutdownHandler - Registers shutdown handler
func RegisterShutdownHandler(handler ShutdownHandler) {
agent.agentShutdownHandler = handler
Expand Down
12 changes: 5 additions & 7 deletions pkg/customunit/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,21 @@ type metricCollector interface {
AddCustomMetricDetail(models.CustomMetricDetail)
}

func (c *customUnitClient) MetricReporting() error {
func (c *customUnitClient) MetricReporting() {
if err := c.createConnection(); err != nil {
//TODO:: Retry until the connection is stable
return err
return
}
c.metricReportingClient = cu.NewMetricReportingServiceClient(c.conn)
metricServiceInit := &cu.MetricServiceInit{}

client, err := c.metricReportingClient.MetricReporting(c.ctx, metricServiceInit, c.cOpts...)
if err != nil {
return err
return
}
c.metricReportingServiceClient = client
// process metrics
go c.processMetrics()
c.stopChan <- true
return nil
c.processMetrics()
}

// processMetrics will stream custom metrics
Expand All @@ -119,7 +117,7 @@ func (c *customUnitClient) processMetrics() {
c.cancelCtx()
}
c.MetricReporting()
case <-c.stopChan:
default:
metricReport, err := c.recv()
if err == io.EOF {
c.logger.Debug("stream finished")
Expand Down
3 changes: 1 addition & 2 deletions pkg/customunit/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ func Test_MetricReporting(t *testing.T) {
ctx := context.Background()
fakeServer := &fakeCustomUnitMetricReportingServer{}
client, _ := createMRConnection(fakeServer, ctx)
err := client.MetricReporting()
fmt.Println(err)
client.MetricReporting()
}

func createMRConnection(fakeServer *fakeCustomUnitMetricReportingServer, ctx context.Context) (customUnitClient, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/customunit/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,6 @@ func (m *CustomUnitMetricServerManager) HandleMetricReporting(ctx context.Contex
factory := NewCustomUnitClientFactory(config.URL, m.cache, &customunits.QuotaInfo{})
client, _ := factory(ctx, cancelCtx)

client.MetricReporting()
go client.MetricReporting()
}
}
4 changes: 3 additions & 1 deletion pkg/transaction/metric/metricscollector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metric

import (
"context"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -148,7 +149,8 @@ func GetMetricCollector() Collector {
if globalMetricCollector == nil && util.IsNotTest() {
globalMetricCollector = createMetricCollector()
}

ctx, ctxCancel := context.WithCancel(context.Background())
agent.GetCustomUnitMetricServerManager().HandleMetricReporting(ctx, ctxCancel)
return globalMetricCollector
}

Expand Down

0 comments on commit 05c3d86

Please sign in to comment.