diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 17108e4ab..5e47c60a6 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -25,7 +25,6 @@ import ( "github.com/Axway/agent-sdk/pkg/cache" "github.com/Axway/agent-sdk/pkg/config" "github.com/Axway/agent-sdk/pkg/customunit" - customunithandler "github.com/Axway/agent-sdk/pkg/customunit/handler" "github.com/Axway/agent-sdk/pkg/util" hc "github.com/Axway/agent-sdk/pkg/util/healthcheck" "github.com/Axway/agent-sdk/pkg/util/log" @@ -63,17 +62,17 @@ type agentData struct { agentFeaturesCfg config.AgentFeaturesConfig tokenRequester auth.PlatformTokenGetter - teamMap cache.Cache - cacheManager agentcache.Manager - apiValidator APIValidator - apiValidatorLock sync.Mutex - apiValidatorJobID string - configChangeHandler ConfigChangeHandler - agentResourceChangeHandler ConfigChangeHandler - customUnitQuotaHandler customunithandler.CustomUnitQuotaHandler - agentShutdownHandler ShutdownHandler - proxyResourceHandler *handler.StreamWatchProxyHandler - isInitialized bool + teamMap cache.Cache + cacheManager agentcache.Manager + apiValidator APIValidator + apiValidatorLock sync.Mutex + apiValidatorJobID string + configChangeHandler ConfigChangeHandler + agentResourceChangeHandler ConfigChangeHandler + customUnitMetricServerManager customunit.CustomUnitMetricServerManager + agentShutdownHandler ShutdownHandler + proxyResourceHandler *handler.StreamWatchProxyHandler + isInitialized bool provisioner provisioning.Provisioning streamer *stream.StreamerClient @@ -170,16 +169,10 @@ func InitializeWithAgentFeatures(centralCfg config.CentralConfig, agentFeaturesC // call the metric services. metricServicesConfigs := agentFeaturesCfg.GetMetricServicesConfigs() - agent.customUnitQuotaHandler = customunithandler.NewCustomUnitQuotaHandler(metricServicesConfigs) - // iterate over each metric service config - for _, config := range metricServicesConfigs { - ctx, ctxCancel := context.WithCancel(context.Background()) - // Initialize custom units client - factory := customunit.NewCustomMetricReportingClientFactory(config.URL, agent.cacheManager) - client, _ := factory(ctx, ctxCancel) + agent.customUnitMetricServerManager = customunit.NewCustomUnitMetricServerManager(metricServicesConfigs, agent.cacheManager) + ctx, ctxCancel := context.WithCancel(context.Background()) + agent.customUnitMetricServerManager.HandleMetricReporting(ctx, ctxCancel) - client.MetricReporting() - } if !agent.isInitialized { err = handleInitialization() if err != nil { diff --git a/pkg/agent/handler/accessrequest.go b/pkg/agent/handler/accessrequest.go index e56b6288d..6eb95577c 100644 --- a/pkg/agent/handler/accessrequest.go +++ b/pkg/agent/handler/accessrequest.go @@ -9,7 +9,6 @@ import ( management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" defs "github.com/Axway/agent-sdk/pkg/apic/definitions" prov "github.com/Axway/agent-sdk/pkg/apic/provisioning" - handler "github.com/Axway/agent-sdk/pkg/customunit/handler" "github.com/Axway/agent-sdk/pkg/util" "github.com/Axway/agent-sdk/pkg/watchmanager/proto" ) @@ -24,24 +23,27 @@ type arProvisioner interface { AccessRequestProvision(accessRequest prov.AccessRequest) (status prov.RequestStatus, data prov.AccessData) AccessRequestDeprovision(accessRequest prov.AccessRequest) (status prov.RequestStatus) } +type customUnitMetricServerManager interface { + HandleQuotaEnforcement(context.Context, context.CancelFunc, *management.AccessRequest, *management.ManagedApplication) error +} type accessRequestHandler struct { marketplaceHandler - prov arProvisioner - cache agentcache.Manager - client client - encryptSchema encryptSchemaFunc - customUnitQuotaHandler handler.CustomUnitQuotaHandler + prov arProvisioner + cache agentcache.Manager + client client + encryptSchema encryptSchemaFunc + customUnitMetricServerManager customUnitMetricServerManager } // NewAccessRequestHandler creates a Handler for Access Requests -func NewAccessRequestHandler(prov arProvisioner, cache agentcache.Manager, client client, customUnitQuotaHandler handler.CustomUnitQuotaHandler) Handler { +func NewAccessRequestHandler(prov arProvisioner, cache agentcache.Manager, client client, customUnitMetricServerManager customUnitMetricServerManager) Handler { return &accessRequestHandler{ - prov: prov, - cache: cache, - client: client, - encryptSchema: encryptSchema, - customUnitQuotaHandler: customUnitQuotaHandler, + prov: prov, + cache: cache, + client: client, + encryptSchema: encryptSchema, + customUnitMetricServerManager: customUnitMetricServerManager, } } @@ -137,15 +139,13 @@ func (h *accessRequestHandler) onPending(ctx context.Context, ar *management.Acc status, accessData := h.prov.AccessRequestProvision(req) if status.GetStatus() == prov.Success && len(ar.Spec.AdditionalQuotas) > 0 { - errMessage, err := h.customUnitQuotaHandler.Handle(ctx, ar, app) + ctx, cancelCtx := context.WithCancel(ctx) + err := h.customUnitMetricServerManager.HandleQuotaEnforcement(ctx, cancelCtx, ar, app) if err != nil { - h.onError(ctx, ar, err) - } - - if errMessage != "" { + // h.onError(ctx, ar, err) status = prov.NewRequestStatusBuilder(). - SetMessage(errMessage). + SetMessage(err.Error()). SetCurrentStatusReasons(ar.Status.Reasons). Failed() } diff --git a/pkg/agent/handler/accessrequest_test.go b/pkg/agent/handler/accessrequest_test.go index 10067799a..9e1dbb9cc 100644 --- a/pkg/agent/handler/accessrequest_test.go +++ b/pkg/agent/handler/accessrequest_test.go @@ -12,7 +12,7 @@ import ( prov "github.com/Axway/agent-sdk/pkg/apic/provisioning" "github.com/Axway/agent-sdk/pkg/apic/provisioning/mock" "github.com/Axway/agent-sdk/pkg/config" - handler "github.com/Axway/agent-sdk/pkg/customunit/handler" + "github.com/Axway/agent-sdk/pkg/customunit" "github.com/Axway/agent-sdk/pkg/util" "github.com/Axway/agent-sdk/pkg/watchmanager/proto" "github.com/stretchr/testify/assert" @@ -176,8 +176,8 @@ func TestAccessRequestHandler(t *testing.T) { c.isDeleting = true } af := config.NewAgentFeaturesConfiguration().GetMetricServicesConfigs() - customUnitQuotaHandler := handler.NewCustomUnitQuotaHandler(af) - handler := NewAccessRequestHandler(arp, cm, c, customUnitQuotaHandler) + customUnitMetricServerManager := customunit.NewCustomUnitMetricServerManager(af, cm) + handler := NewAccessRequestHandler(arp, cm, c, customUnitMetricServerManager) v := handler.(*accessRequestHandler) v.encryptSchema = func(_, _ map[string]interface{}, _, _, _ string) (map[string]interface{}, error) { return map[string]interface{}{}, nil @@ -251,8 +251,8 @@ func TestAccessRequestHandler_deleting(t *testing.T) { t: t, } af := config.NewAgentFeaturesConfiguration().GetMetricServicesConfigs() - customUnitQuotaHandler := handler.NewCustomUnitQuotaHandler(af) - handler := NewAccessRequestHandler(arp, cm, c, customUnitQuotaHandler) + customUnitMetricServerManager := customunit.NewCustomUnitMetricServerManager(af, cm) + handler := NewAccessRequestHandler(arp, cm, c, customUnitMetricServerManager) ri, _ := ar.AsInstance() @@ -276,8 +276,8 @@ func TestAccessRequestHandler_wrong_kind(t *testing.T) { } ar := &mockARProvision{} af := config.NewAgentFeaturesConfiguration().GetMetricServicesConfigs() - customUnitQuotaHandler := handler.NewCustomUnitQuotaHandler(af) - handler := NewAccessRequestHandler(ar, cm, c, customUnitQuotaHandler) + customUnitMetricServerManager := customunit.NewCustomUnitMetricServerManager(af, cm) + handler := NewAccessRequestHandler(ar, cm, c, customUnitMetricServerManager) ri := &v1.ResourceInstance{ ResourceMeta: v1.ResourceMeta{ GroupVersionKind: management.EnvironmentGVK(), diff --git a/pkg/agent/provisioning.go b/pkg/agent/provisioning.go index c40de8a08..1bb01755e 100644 --- a/pkg/agent/provisioning.go +++ b/pkg/agent/provisioning.go @@ -537,7 +537,7 @@ func RegisterProvisioner(provisioner provisioning.Provisioning) { if agent.cfg.GetAgentType() == config.DiscoveryAgent || agent.cfg.GetAgentType() == config.GovernanceAgent { agent.proxyResourceHandler.RegisterTargetHandler( "accessrequesthandler", - handler.NewAccessRequestHandler(agent.provisioner, agent.cacheManager, agent.apicClient, agent.customUnitQuotaHandler), + handler.NewAccessRequestHandler(agent.provisioner, agent.cacheManager, agent.apicClient, agent.customUnitMetricServerManager), ) agent.proxyResourceHandler.RegisterTargetHandler( "managedappHandler", diff --git a/pkg/customunit/client.go b/pkg/customunit/client.go index 20b30a559..33a0855a6 100644 --- a/pkg/customunit/client.go +++ b/pkg/customunit/client.go @@ -15,29 +15,40 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -type customUnitsQEClient struct { - ctx context.Context - quotaInfo *cu.QuotaInfo - dialOpts []grpc.DialOption - cOpts []grpc.CallOption - url string - conn *grpc.ClientConn - client cu.QuotaEnforcementClient +type customUnitClient struct { + ctx context.Context + quotaInfo *cu.QuotaInfo + dialOpts []grpc.DialOption + cOpts []grpc.CallOption + url string + conn *grpc.ClientConn + quotaEnforcementClient cu.QuotaEnforcementClient + cancelCtx context.CancelFunc + logger *logrus.Entry + metricReportingClient cu.MetricReportingServiceClient + metricReportingServiceClient cu.MetricReportingService_MetricReportingClient + isRunning bool + cache agentcache.Manager + metricCollector metricCollector + stopChan chan bool } -type QEOption func(*customUnitsQEClient) +type CustomUnitOption func(*customUnitClient) -type QuotaEnforcementClientFactory func(context.Context, ...QEOption) (customUnitsQEClient, error) +type CustomUnitClientFactory func(context.Context, context.CancelFunc, ...CustomUnitOption) (customUnitClient, error) -func NewQuotaEnforcementClientFactory(url string, quotaInfo *cu.QuotaInfo) QuotaEnforcementClientFactory { - return func(ctx context.Context, opts ...QEOption) (customUnitsQEClient, error) { - c := &customUnitsQEClient{ +func NewCustomUnitClientFactory(url string, agentCache cache.Manager, quotaInfo *cu.QuotaInfo) CustomUnitClientFactory { + return func(ctx context.Context, ctxCancel context.CancelFunc, opts ...CustomUnitOption) (customUnitClient, error) { + c := &customUnitClient{ ctx: ctx, quotaInfo: quotaInfo, url: url, dialOpts: []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), }, + cancelCtx: ctxCancel, + cache: agentCache, + stopChan: make(chan bool, 1), } for _, o := range opts { @@ -48,29 +59,29 @@ func NewQuotaEnforcementClientFactory(url string, quotaInfo *cu.QuotaInfo) Quota } } -func WithGRPCDialOption(opt grpc.DialOption) QEOption { - return func(c *customUnitsQEClient) { +func WithGRPCDialOption(opt grpc.DialOption) CustomUnitOption { + return func(c *customUnitClient) { c.dialOpts = append(c.dialOpts, opt) } } -func (c *customUnitsQEClient) createConnection() error { +func (c *customUnitClient) createConnection() error { conn, err := grpc.DialContext(c.ctx, c.url, c.dialOpts...) if err != nil { return err } c.conn = conn - c.client = cu.NewQuotaEnforcementClient(conn) return nil } -func (c *customUnitsQEClient) QuotaEnforcementInfo() (*cu.QuotaEnforcementResponse, error) { +func (c *customUnitClient) QuotaEnforcementInfo() (*cu.QuotaEnforcementResponse, error) { err := c.createConnection() if err != nil { // Handle the connection error return nil, err } - response, err := c.client.QuotaEnforcementInfo(c.ctx, c.quotaInfo, c.cOpts...) + c.quotaEnforcementClient = cu.NewQuotaEnforcementClient(c.conn) + response, err := c.quotaEnforcementClient.QuotaEnforcementInfo(c.ctx, c.quotaInfo, c.cOpts...) return response, err } @@ -79,73 +90,15 @@ type metricCollector interface { AddCustomMetricDetail(models.CustomMetricDetail) } -type customUnitMetricReportingClient struct { - ctx context.Context - cancelCtx context.CancelFunc - logger *logrus.Entry - mtricReportingClient cu.MetricReportingServiceClient - metricReportingServiceClient cu.MetricReportingService_MetricReportingClient - dialOpts []grpc.DialOption - cOpts []grpc.CallOption - url string - isRunning bool - conn *grpc.ClientConn - cache agentcache.Manager - metricCollector metricCollector - stopChan chan bool -} - -type MROption func(*customUnitMetricReportingClient) - -type MetricReportingClientFactory func(context.Context, context.CancelFunc, ...MROption) (customUnitMetricReportingClient, error) - -func NewCustomMetricReportingClientFactory(url string, agentCache cache.Manager) MetricReportingClientFactory { - return func(ctx context.Context, ctxCancel context.CancelFunc, opts ...MROption) (customUnitMetricReportingClient, error) { - c := &customUnitMetricReportingClient{ - ctx: ctx, - cancelCtx: ctxCancel, - url: url, - cache: agentCache, - dialOpts: []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - }, - stopChan: make(chan bool, 1), - } - - for _, o := range opts { - o(c) - } - - return *c, nil - } -} - -func WithGRPCDialOptionForMR(opt grpc.DialOption) MROption { - return func(c *customUnitMetricReportingClient) { - c.dialOpts = append(c.dialOpts, opt) - } -} - -func (c *customUnitMetricReportingClient) createConnection() error { - // create the metric reporting connection - conn, err := grpc.DialContext(c.ctx, c.url, c.dialOpts...) - if err != nil { - c.logger.WithError(err).Errorf("failed to connect to metric server") - return err - } - c.conn = conn - c.mtricReportingClient = cu.NewMetricReportingServiceClient(c.conn) - return nil -} - -func (c *customUnitMetricReportingClient) MetricReporting() error { +func (c *customUnitClient) MetricReporting() error { if err := c.createConnection(); err != nil { //TODO:: Retry until the connection is stable return err } + c.metricReportingClient = cu.NewMetricReportingServiceClient(c.conn) metricServiceInit := &cu.MetricServiceInit{} - client, err := c.mtricReportingClient.MetricReporting(c.ctx, metricServiceInit, c.cOpts...) + client, err := c.metricReportingClient.MetricReporting(c.ctx, metricServiceInit, c.cOpts...) if err != nil { return err } @@ -157,7 +110,7 @@ func (c *customUnitMetricReportingClient) MetricReporting() error { } // processMetrics will stream custom metrics -func (c *customUnitMetricReportingClient) processMetrics() { +func (c *customUnitClient) processMetrics() { for { select { case <-c.ctx.Done(): @@ -179,7 +132,7 @@ func (c *customUnitMetricReportingClient) processMetrics() { } } } -func (c *customUnitMetricReportingClient) recv() (*cu.MetricReport, error) { +func (c *customUnitClient) recv() (*cu.MetricReport, error) { for { metricReport, err := c.metricReportingServiceClient.Recv() if err != nil { @@ -190,7 +143,7 @@ func (c *customUnitMetricReportingClient) recv() (*cu.MetricReport, error) { } } -func (c *customUnitMetricReportingClient) reportMetrics(metricReport *cu.MetricReport) { +func (c *customUnitClient) reportMetrics(metricReport *cu.MetricReport) { // deprovision the metric report and send it to the metric collector customMetricDetail, err := c.buildCustomMetricDetail(metricReport) if err == nil { @@ -198,7 +151,7 @@ func (c *customUnitMetricReportingClient) reportMetrics(metricReport *cu.MetricR } } -func (c *customUnitMetricReportingClient) buildCustomMetricDetail(metricReport *cu.MetricReport) (*models.CustomMetricDetail, error) { +func (c *customUnitClient) buildCustomMetricDetail(metricReport *cu.MetricReport) (*models.CustomMetricDetail, error) { apiServiceLookup := metricReport.GetApiService() managedAppLookup := metricReport.GetManagedApp() planUnitLookup := metricReport.GetPlanUnit() @@ -223,7 +176,7 @@ func (c *customUnitMetricReportingClient) buildCustomMetricDetail(metricReport * }, nil } -func (c *customUnitMetricReportingClient) Close() error { +func (c *customUnitClient) Close() error { var err error defer c.conn.Close() if err != nil { @@ -233,7 +186,7 @@ func (c *customUnitMetricReportingClient) Close() error { return nil } -func (c *customUnitMetricReportingClient) APIServiceLookup(apiServiceLookup *cu.APIServiceLookup) (*models.APIDetails, error) { +func (c *customUnitClient) APIServiceLookup(apiServiceLookup *cu.APIServiceLookup) (*models.APIDetails, error) { apiSvcValue := apiServiceLookup.GetValue() apiLookupType := apiServiceLookup.GetType() apiCustomAttr := apiServiceLookup.GetCustomAttribute() @@ -275,7 +228,7 @@ func (c *customUnitMetricReportingClient) APIServiceLookup(apiServiceLookup *cu. }, nil } -func (c *customUnitMetricReportingClient) ManagedApplicationLookup(appLookup *cu.AppLookup) (*models.AppDetails, error) { +func (c *customUnitClient) ManagedApplicationLookup(appLookup *cu.AppLookup) (*models.AppDetails, error) { appValue := appLookup.GetValue() appLookupType := appLookup.GetType() appCustomAttr := appLookup.GetCustomAttribute() @@ -317,7 +270,7 @@ func (c *customUnitMetricReportingClient) ManagedApplicationLookup(appLookup *cu }, nil } -func (c *customUnitMetricReportingClient) PlanUnitLookup(planUnitLookup *cu.UnitLookup) *models.Unit { +func (c *customUnitClient) PlanUnitLookup(planUnitLookup *cu.UnitLookup) *models.Unit { return &models.Unit{ Name: planUnitLookup.GetUnitName(), diff --git a/pkg/customunit/client_test.go b/pkg/customunit/client_test.go index 4db370426..f929270c5 100644 --- a/pkg/customunit/client_test.go +++ b/pkg/customunit/client_test.go @@ -35,7 +35,7 @@ func Test_QuotaEnforcementInfo(t *testing.T) { assert.Nil(t, response) } -func createQEConnection(fakeServer *fakeQuotaEnforcementServer, ctx context.Context) (customUnitsQEClient, error) { +func createQEConnection(fakeServer *fakeQuotaEnforcementServer, ctx context.Context) (customUnitClient, error) { lis := bufconn.Listen(bufSize) opt := grpc.WithContextDialer( func(context.Context, string) (net.Conn, error) { @@ -61,9 +61,10 @@ func createQEConnection(fakeServer *fakeQuotaEnforcementServer, ctx context.Cont Unit: "mockUnit", }, } - factory := NewQuotaEnforcementClientFactory("bufnet", quotaInfo) - - return factory(context.Background(), WithGRPCDialOption(opt)) + cache := cache.NewAgentCacheManager(&config.CentralConfiguration{}, false) + factory := NewCustomUnitClientFactory("bufnet", cache, quotaInfo) + streamCtx, streamCancel := context.WithCancel(context.Background()) + return factory(streamCtx, streamCancel, WithGRPCDialOption(opt)) } @@ -76,7 +77,7 @@ func Test_MetricReporting(t *testing.T) { fmt.Println(err) } -func createMRConnection(fakeServer *fakeCustomUnitMetricReportingServer, ctx context.Context) (customUnitMetricReportingClient, error) { +func createMRConnection(fakeServer *fakeCustomUnitMetricReportingServer, ctx context.Context) (customUnitClient, error) { lis := bufconn.Listen(bufSize) opt := grpc.WithContextDialer( func(context.Context, string) (net.Conn, error) { @@ -93,8 +94,8 @@ func createMRConnection(fakeServer *fakeCustomUnitMetricReportingServer, ctx con }() cache := cache.NewAgentCacheManager(&config.CentralConfiguration{}, false) - factory := NewCustomMetricReportingClientFactory("bufnet", cache) + factory := NewCustomUnitClientFactory("bufnet", cache, &customunits.QuotaInfo{}) streamCtx, streamCancel := context.WithCancel(context.Background()) - return factory(streamCtx, streamCancel, WithGRPCDialOptionForMR(opt)) + return factory(streamCtx, streamCancel, WithGRPCDialOption(opt)) } diff --git a/pkg/customunit/handler/handler.go b/pkg/customunit/manager.go similarity index 60% rename from pkg/customunit/handler/handler.go rename to pkg/customunit/manager.go index a33a07fe9..3afdd05b7 100644 --- a/pkg/customunit/handler/handler.go +++ b/pkg/customunit/manager.go @@ -1,4 +1,4 @@ -package handler +package customunit import ( "context" @@ -12,51 +12,54 @@ import ( management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" defs "github.com/Axway/agent-sdk/pkg/apic/definitions" "github.com/Axway/agent-sdk/pkg/config" - "github.com/Axway/agent-sdk/pkg/customunit" "github.com/Axway/agent-sdk/pkg/util" ) -type customUnitQuotaHandler struct { +type customUnitMetricServerManager struct { configs []config.MetricServiceConfiguration cache cache.Manager } -type CustomUnitQuotaHandler interface { - Handle(context.Context, *management.AccessRequest, *management.ManagedApplication) (string, error) +type CustomUnitMetricServerManager interface { + HandleQuotaEnforcement(context.Context, context.CancelFunc, *management.AccessRequest, *management.ManagedApplication) error + HandleMetricReporting(context.Context, context.CancelFunc) } -// NewCustomUnitQuotaHandler creates a new Custom Unit Quota Handler for AccessRequest processing -func NewCustomUnitQuotaHandler(configs []config.MetricServiceConfiguration) CustomUnitQuotaHandler { - return &customUnitQuotaHandler{ +func NewCustomUnitMetricServerManager(configs []config.MetricServiceConfiguration, cache cache.Manager) CustomUnitMetricServerManager { + return &customUnitMetricServerManager{ configs: configs, + cache: cache, } } -func (h *customUnitQuotaHandler) Handle(ctx context.Context, ar *management.AccessRequest, app *management.ManagedApplication) (string, error) { +func (h *customUnitMetricServerManager) HandleQuotaEnforcement(ctx context.Context, cancelCtx context.CancelFunc, ar *management.AccessRequest, app *management.ManagedApplication) error { // Build quota info quotaInfo, err := h.buildQuotaInfo(ctx, ar, app) if err != nil { - return "", fmt.Errorf("could not build quota info from access request") + return fmt.Errorf("could not build quota info from access request") } errMessage := "" for _, config := range h.configs { if config.MetricServiceEnabled() { - factory := customunit.NewQuotaEnforcementClientFactory(config.URL, quotaInfo) - client, _ := factory(ctx) + factory := NewCustomUnitClientFactory(config.URL, h.cache, quotaInfo) + client, _ := factory(ctx, cancelCtx) response, err := client.QuotaEnforcementInfo() if err != nil { // if error from QE and reject on fail, we return the error back to the central - if response.Error != "" && config.RejectOnFailEnabled() { + if response != nil && response.Error != "" && config.RejectOnFailEnabled() { errMessage = errMessage + fmt.Sprintf("TODO: message: %s", err.Error()) } } } } - return errMessage, nil + if errMessage != "" { + return fmt.Errorf(errMessage) + } + return nil } -func (h *customUnitQuotaHandler) buildQuotaInfo(ctx context.Context, ar *management.AccessRequest, app *management.ManagedApplication) (*customunits.QuotaInfo, error) { +func (h *customUnitMetricServerManager) buildQuotaInfo(ctx context.Context, ar *management.AccessRequest, app *management.ManagedApplication) (*customunits.QuotaInfo, error) { unitRef, count := h.getQuotaInfo(ar) if unitRef == "" { return nil, nil @@ -105,7 +108,7 @@ type reference struct { Unit string `json:"unit"` } -func (h *customUnitQuotaHandler) getQuotaInfo(ar *management.AccessRequest) (string, int) { +func (h *customUnitMetricServerManager) getQuotaInfo(ar *management.AccessRequest) (string, int) { index := 0 if len(ar.Spec.AdditionalQuotas) < index+1 { return "", 0 @@ -123,7 +126,7 @@ func (h *customUnitQuotaHandler) getQuotaInfo(ar *management.AccessRequest) (str return "", 0 } -func (h *customUnitQuotaHandler) getServiceInstance(_ context.Context, ar *management.AccessRequest) (*apiv1.ResourceInstance, error) { +func (h *customUnitMetricServerManager) getServiceInstance(_ context.Context, ar *management.AccessRequest) (*apiv1.ResourceInstance, error) { instRef := ar.GetReferenceByGVK(management.APIServiceInstanceGVK()) instID := instRef.ID instance, err := h.cache.GetAPIServiceInstanceByID(instID) @@ -132,3 +135,14 @@ func (h *customUnitQuotaHandler) getServiceInstance(_ context.Context, ar *manag } return instance, nil } + +func (m *customUnitMetricServerManager) HandleMetricReporting(ctx context.Context, cancelCtx context.CancelFunc) { + // iterate over each metric service config + for _, config := range m.configs { + // Initialize custom units client + factory := NewCustomUnitClientFactory(config.URL, m.cache, &customunits.QuotaInfo{}) + client, _ := factory(ctx, cancelCtx) + + client.MetricReporting() + } +} diff --git a/pkg/customunit/manager_test.go b/pkg/customunit/manager_test.go new file mode 100644 index 000000000..1937f9544 --- /dev/null +++ b/pkg/customunit/manager_test.go @@ -0,0 +1,100 @@ +package customunit + +import ( + "context" + "testing" + + agentcache "github.com/Axway/agent-sdk/pkg/agent/cache" + v1 "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/api/v1" + management "github.com/Axway/agent-sdk/pkg/apic/apiserver/models/management/v1alpha1" + defs "github.com/Axway/agent-sdk/pkg/apic/definitions" + prov "github.com/Axway/agent-sdk/pkg/apic/provisioning" + "github.com/Axway/agent-sdk/pkg/config" + "github.com/stretchr/testify/assert" +) + +const instRefID = "inst-id-1" +const instRefName = "inst-name-1" +const managedAppRefName = "managed-app-name" + +func Test_NewCustomUnitMetricServerManager(t *testing.T) { + metricServicesConfigs := config.NewAgentFeaturesConfiguration().GetMetricServicesConfigs() + cm := agentcache.NewAgentCacheManager(&config.CentralConfiguration{}, false) + handler := NewCustomUnitMetricServerManager(metricServicesConfigs, cm) + + assert.NotNil(t, handler) +} + +func Test_HandleQuotaEnforcementInfo(t *testing.T) { + metricServicesConfigs := []config.MetricServiceConfiguration{ + { + Enable: true, + URL: "https://mockserver:8080", + RejectOnFail: true, + }, + } + cm := agentcache.NewAgentCacheManager(&config.CentralConfiguration{}, false) + accessReq := &management.AccessRequest{ + ResourceMeta: v1.ResourceMeta{ + Metadata: v1.Metadata{ + ID: "11", + References: []v1.Reference{ + { + Group: management.APIServiceInstanceGVK().Group, + Kind: management.APIServiceInstanceGVK().Kind, + ID: instRefID, + Name: instRefName, + }, + }, + Scope: v1.MetadataScope{ + Kind: management.EnvironmentGVK().Kind, + Name: "env-1", + }, + }, + SubResources: map[string]interface{}{ + defs.XAgentDetails: map[string]interface{}{ + "sub_access_request_key": "sub_access_request_val", + }, + }, + }, + Spec: management.AccessRequestSpec{ + ApiServiceInstance: instRefName, + ManagedApplication: managedAppRefName, + Data: map[string]interface{}{}, + }, + Status: &v1.ResourceStatus{ + Level: prov.Pending.String(), + }, + } + managedAppForTest := &management.ManagedApplication{ + ResourceMeta: v1.ResourceMeta{ + Name: "app-test", + Metadata: v1.Metadata{ + ID: "11", + Scope: v1.MetadataScope{ + Kind: management.EnvironmentGVK().Kind, + Name: "env-1", + }, + }, + SubResources: map[string]interface{}{ + defs.XAgentDetails: map[string]interface{}{ + "sub_manage_app_key": "sub_manage_app_val", + }, + }, + }, + Owner: &v1.Owner{ + Type: 0, + }, + Spec: management.ManagedApplicationSpec{}, + Status: &v1.ResourceStatus{ + Level: prov.Pending.String(), + }, + } + + manager := NewCustomUnitMetricServerManager(metricServicesConfigs, cm) + ctx, cancelCtx := context.WithCancel(context.Background()) + err := manager.HandleQuotaEnforcement(ctx, cancelCtx, accessReq, managedAppForTest) + + assert.Nil(t, err) + +}