Skip to content

Commit

Permalink
Merge pull request #43 from karimra/fix-state
Browse files Browse the repository at this point in the history
fix agent state update
  • Loading branch information
karimra authored Aug 22, 2023
2 parents dc03b75 + accd302 commit b31f81a
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 37 deletions.
70 changes: 34 additions & 36 deletions app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s *server) ConfigHandler(ctx context.Context) {
log.Errorf("NwInst notification Marshal failed: %+v", err)
continue
}
log.Debugf("%s\n", string(b))
log.Debugf("%s", string(b))
}
for _, ev := range nwInstEvent.GetNotification() {
if nwInst := ev.GetNwInst(); nwInst != nil {
Expand All @@ -159,7 +159,7 @@ func (s *server) ConfigHandler(ctx context.Context) {
log.Errorf("Config notification Marshal failed: %+v", err)
continue
}
log.Debugf("%s\n", string(b))
log.Debugf("%s", string(b))
}
for _, ev := range event.GetNotification() {
if cfg := ev.GetConfig(); cfg != nil {
Expand All @@ -175,6 +175,9 @@ func (s *server) ConfigHandler(ctx context.Context) {
}

func (s *server) handleConfigEvent(ctx context.Context, cfg *ndk.ConfigNotification) {
s.config.m.Lock()
defer s.config.m.Unlock()

log.Debugf("handling cfg: %+v", cfg)
log.Debugf("PATH: %s\n", cfg.GetKey().GetJsPath())
log.Debugf("KEYS: %v\n", cfg.GetKey().GetKeys())
Expand Down Expand Up @@ -228,9 +231,7 @@ func (s *server) handleConfigEvent(ctx context.Context, cfg *ndk.ConfigNotificat
log.Errorf("unexpected config path %q", txCfg.GetKey().GetJsPath())
}
}
s.config.m.Lock()
s.config.trx = make([]*ndk.ConfigNotification, 0)
s.config.m.Unlock()
}

func (s *server) handleCfgPrometheusCreate(ctx context.Context, cfg *ndk.ConfigNotification) {
Expand All @@ -245,15 +246,15 @@ func (s *server) handleCfgPrometheusCreate(ctx context.Context, cfg *ndk.ConfigN
log.Errorf("failed to marshal config data from path %q: %v", cfg.GetKey().GetJsPath(), err)
return
}
b, err := json.MarshalIndent(newCfg, "", " ")
if err != nil {
log.Errorf("failed to Marshal baseconfig: %v", err)
return
}
log.Debugf("read baseconfig data: %s", string(b))

s.config.m.Lock()
defer s.config.m.Unlock()
if s.config.debug {
b, err := json.MarshalIndent(newCfg, "", " ")
if err != nil {
log.Errorf("failed to Marshal baseconfig: %v", err)
return
}
log.Debugf("read baseconfig data: %s", string(b))
}

// set default oper state
newCfg.OperState = operDown
Expand All @@ -278,23 +279,25 @@ func (s *server) handleCfgPrometheusChange(ctx context.Context, cfg *ndk.ConfigN
log.Errorf("failed to marshal config data from path %q: %v", cfg.GetKey().GetJsPath(), err)
return
}
b, err := json.MarshalIndent(newCfg, "", " ")
if err != nil {
log.Errorf("failed to Marshal baseconfig: %v", err)
return
}
log.Debugf("read baseconfig data: %s", string(b))

s.config.m.Lock()
defer s.config.m.Unlock()
if s.config.debug {
b, err := json.MarshalIndent(newCfg, "", " ")
if err != nil {
log.Errorf("failed to Marshal baseconfig: %v", err)
return
}
log.Debugf("read baseconfig data: %s", string(b))
}

if newCfg.AdminState == adminDisable && s.config.baseConfig.OperState != operDown {
// shutdown the http server with a 1s timeout
s.shutdown(ctx, time.Second)
// shutdown the http server with a 500ms timeout
s.shutdown(ctx, time.Second/2)
return
}
if newCfg.AdminState == adminEnable && s.config.baseConfig.OperState == operDown {
// start http server
log.Debug("starting server...")
s.config.baseConfig.AdminState = adminEnable
go s.start(ctx)
return
}
Expand Down Expand Up @@ -330,8 +333,6 @@ func (s *server) handleCfgMetricCreate(ctx context.Context, cfg *ndk.ConfigNotif
}
log.Debugf("read metric config data: %+v", newMetricConfig)

s.config.m.Lock()
defer s.config.m.Unlock()
if _, ok := s.config.metrics[key]; !ok {
s.config.metrics[key] = new(metricConfig)
}
Expand All @@ -357,8 +358,6 @@ func (s *server) handleCfgMetricChange(ctx context.Context, cfg *ndk.ConfigNotif
log.Errorf("failed to marshal config data from path %s: %v", cfg.Key.JsPath, err)
return
}
s.config.m.Lock()
defer s.config.m.Unlock()

// store new config
s.config.metrics[key].Metric.State = newMetricConfig.Metric.State
Expand All @@ -368,8 +367,7 @@ func (s *server) handleCfgMetricChange(ctx context.Context, cfg *ndk.ConfigNotif

func (s *server) handleCfgMetricDelete(ctx context.Context, cfg *ndk.ConfigNotification) {
key := cfg.Key.Keys[0]
s.config.m.Lock()
defer s.config.m.Unlock()

if _, ok := s.config.metrics[key]; !ok {
log.Errorf("Op delete metric, cannot find metric %q", key)
return
Expand All @@ -389,8 +387,6 @@ func (s *server) handleCfgCustomMetricCreateChange(ctx context.Context, cfg *ndk
}
log.Debugf("read metric config data: %+v", newMetricConfig)

s.config.m.Lock()
defer s.config.m.Unlock()
if _, ok := s.config.customMetric[key]; !ok {
s.config.customMetric[key] = new(customMetricConfig)
}
Expand All @@ -403,8 +399,7 @@ func (s *server) handleCfgCustomMetricCreateChange(ctx context.Context, cfg *ndk

func (s *server) handleCfgCustomMetricDelete(ctx context.Context, cfg *ndk.ConfigNotification) {
key := cfg.Key.Keys[0]
s.config.m.Lock()
defer s.config.m.Unlock()

if _, ok := s.config.customMetric[key]; !ok {
log.Errorf("Op delete custom metric, cannot find custom metric %q", key)
return
Expand All @@ -414,19 +409,21 @@ func (s *server) handleCfgCustomMetricDelete(ctx context.Context, cfg *ndk.Confi
}

func (s *server) handleNwInstCfg(ctx context.Context, nwInst *ndk.NetworkInstanceNotification) {
s.config.m.Lock()
defer s.config.m.Unlock()

key := nwInst.GetKey()
if key == nil {
return
}
s.config.m.Lock()
defer s.config.m.Unlock()
switch nwInst.Op {
case ndk.SdkMgrOperation_Create:
s.config.nwInst[key.InstName] = nwInst.Data
if s.config.baseConfig.NetworkInstance.Value == nwInst.Key.InstName {
if nwInst.Data.OperIsUp &&
s.config.baseConfig.AdminState == adminEnable &&
s.config.baseConfig.OperState == operDown {
log.Debug("starting server...")
go s.start(ctx)
}
}
Expand All @@ -435,20 +432,21 @@ func (s *server) handleNwInstCfg(ctx context.Context, nwInst *ndk.NetworkInstanc
if s.config.baseConfig.NetworkInstance.Value == nwInst.Key.InstName {
if !nwInst.Data.OperIsUp {
if s.config.baseConfig.OperState == operUp {
s.shutdown(ctx, time.Second)
s.shutdown(ctx, time.Second/2)
}
return
}
if s.config.baseConfig.AdminState == adminEnable &&
s.config.baseConfig.OperState == operDown {
log.Debug("starting server...")
go s.start(ctx)
}
}
case ndk.SdkMgrOperation_Delete:
delete(s.config.nwInst, key.InstName)
if s.config.baseConfig.NetworkInstance.Value == nwInst.Key.InstName {
if s.config.baseConfig.OperState == operUp {
s.shutdown(ctx, time.Second)
s.shutdown(ctx, time.Second/2)
}
}
}
Expand Down
11 changes: 10 additions & 1 deletion app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ func NewServer(opts ...serverOption) *server {
}

func (s *server) start(ctx context.Context) {
if s.srvCancelFn != nil {
s.srvCancelFn()
}
sctx, cancel := context.WithCancel(ctx)
s.srvCancelFn = cancel
START:
Expand All @@ -269,7 +272,11 @@ START:
goto START
}
// create http server
promHandler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError})
promHandler := promhttp.HandlerFor(
registry,
promhttp.HandlerOpts{
ErrorHandling: promhttp.ContinueOnError,
})

mux := http.NewServeMux()
if s.config.baseConfig.HttpPath.Value == "" {
Expand Down Expand Up @@ -613,12 +620,14 @@ INITCONSUL:
}
s.config.baseConfig.Registration.OperState = operUp
go s.updatePrometheusBaseTelemetry(ctx, s.config.baseConfig)

err = s.consulClient.Agent().UpdateTTL(ttlCheckID, "", capi.HealthPassing)
if err != nil {
log.Errorf("failed to pass the first TTL check: %v", err)
time.Sleep(retryInterval)
goto INITCONSUL
}

ttl, _ := time.ParseDuration(s.config.baseConfig.Registration.TTL.Value)
ticker := time.NewTicker(ttl / 2)

Expand Down
1 change: 1 addition & 0 deletions app/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

func (s *server) updateTelemetry(ctx context.Context, jsPath string, jsData string) {
log.Debugf("updating telemetry: %q: %s\n", exporterPath, string(jsData))
key := &ndk.TelemetryKey{JsPath: jsPath}
data := &ndk.TelemetryData{JsonContent: jsData}
info := &ndk.TelemetryInfo{Key: key, Data: data}
Expand Down

0 comments on commit b31f81a

Please sign in to comment.