From 526126ac99131dc4c286909e3008d3c63909fbbc Mon Sep 17 00:00:00 2001 From: Jack Ding <7378668+jzding@users.noreply.github.com> Date: Thu, 12 Sep 2024 10:45:38 -0400 Subject: [PATCH] implement over all sync-status (#351) Implement /sync/sync-status/sync-state. Trigger overall sync-state event when either ptp-state-change or os-clock-sync-state-change events are triggered. Signed-off-by: Jack Ding Co-authored-by: OpenShift Cherrypick Robot --- examples/consumer/main.go | 3 +- pkg/common/common.go | 6 +- plugins/ptp_operator/metrics/manager.go | 54 ++++++++++----- plugins/ptp_operator/ptp_operator_plugin.go | 65 +++++++++++++++++-- .../ptp_operator/ptp_operator_plugin_test.go | 8 +-- 5 files changed, 106 insertions(+), 30 deletions(-) diff --git a/examples/consumer/main.go b/examples/consumer/main.go index aed54984..fda3dfe8 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -168,7 +168,7 @@ func createSubscription(resourceAddress string) (sub pubsub.PubSub, err error) { if subB, err = json.Marshal(&sub); err == nil { rc := restclient.New() if status, subB = rc.PostWithReturn(subURL, subB); status != http.StatusCreated { - err = fmt.Errorf("error subscription creation api at %s, returned status %d", subURL, status) + err = fmt.Errorf("api at %s returned status %d for %s", subURL, status, resourceAddress) } else { err = json.Unmarshal(subB, &sub) } @@ -248,6 +248,7 @@ func initSubscribers(cType ConsumerTypeEnum) map[string]string { subscribeTo[string(ptpEvent.PtpClockClassChange)] = string(ptpEvent.PtpClockClass) subscribeTo[string(ptpEvent.PtpStateChange)] = string(ptpEvent.PtpLockState) subscribeTo[string(ptpEvent.GnssStateChange)] = string(ptpEvent.GnssSyncStatus) + subscribeTo[string(ptpEvent.SyncStateChange)] = string(ptpEvent.SyncStatusState) case MOCK: subscribeTo[mockResourceKey] = mockResource case HW: diff --git a/pkg/common/common.go b/pkg/common/common.go index 015ae419..9ae7ad96 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -274,7 +274,7 @@ func CreateSubscription(config *SCConfiguration, subscription pubsub.PubSub) (su } // CreateEvent create an event -func CreateEvent(pubSubID, eventType, resourceAddress string, data ceevent.Data) (ceevent.Event, error) { +func CreateEvent(pubSubID, eventType, source string, data ceevent.Data) (ceevent.Event, error) { // create an event if pubSubID == "" { return ceevent.Event{}, fmt.Errorf("id is a required field") @@ -285,7 +285,7 @@ func CreateEvent(pubSubID, eventType, resourceAddress string, data ceevent.Data) event := v1event.CloudNativeEvent() event.ID = pubSubID event.Type = eventType - event.SetSource(resourceAddress) + event.SetSource(source) event.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time) event.SetDataContentType(ceevent.ApplicationJSON) event.SetData(data) @@ -330,7 +330,7 @@ func GetPublishingCloudEvent(scConfig *SCConfiguration, cneEvent ceevent.Event) pub, err := scConfig.PubSubAPI.GetPublisher(cneEvent.ID) if err != nil { localmetrics.UpdateEventPublishedCount(cneEvent.ID, localmetrics.FAIL, 1) - return nil, fmt.Errorf("no publisher data for id %s found to publish event for", cneEvent.ID) + return nil, err } ceEvent, err := cneEvent.NewCloudEvent(&pub) if err != nil { diff --git a/plugins/ptp_operator/metrics/manager.go b/plugins/ptp_operator/metrics/manager.go index 8926ef1a..26ff4e8b 100644 --- a/plugins/ptp_operator/metrics/manager.go +++ b/plugins/ptp_operator/metrics/manager.go @@ -31,6 +31,7 @@ type PTPEventManager struct { PtpConfigMapUpdates *ptpConfig.LinuxPTPConfigMapUpdate // Ptp4lConfigInterfaces holds interfaces and its roles, after reading from ptp4l config files Ptp4lConfigInterfaces map[types.ConfigName]*ptp4lconf.PTP4lConfig + lastOverallSyncState ptp.SyncState } // NewPTPEventManager to manage events and metrics @@ -220,31 +221,34 @@ func (p *PTPEventManager) GetPTPEventsData(state ptp.SyncState, ptpOffset int64, Value: state, }) } - - data.Values = append(data.Values, ceevent.DataValue{ - Resource: eventSource, - DataType: ceevent.METRIC, - ValueType: ceevent.DECIMAL, - Value: ptpOffset, - }) + if eventType != ptp.SyncStateChange { + data.Values = append(data.Values, ceevent.DataValue{ + Resource: eventSource, + DataType: ceevent.METRIC, + ValueType: ceevent.DECIMAL, + Value: ptpOffset, + }) + } return &data } // GetPTPCloudEvents ...GetEvent events -func (p *PTPEventManager) GetPTPCloudEvents(data ceevent.Data, eventType ptp.EventType) *cloudevents.Event { - resourceAddress := fmt.Sprintf(p.resourcePrefix, p.nodeName, string(p.publisherTypes[eventType].Resource)) +func (p *PTPEventManager) GetPTPCloudEvents(data ceevent.Data, eventType ptp.EventType) (*cloudevents.Event, error) { if pubs, ok := p.publisherTypes[eventType]; ok { - cneEvent, cneErr := common.CreateEvent(pubs.PubID, string(eventType), resourceAddress, data) + cneEvent, cneErr := common.CreateEvent( + pubs.PubID, string(eventType), + fmt.Sprintf(p.resourcePrefix, p.nodeName, string(p.publisherTypes[eventType].Resource)), + data) if cneErr != nil { - log.Errorf("failed to create ptp event, %s", cneErr) - return nil + return nil, fmt.Errorf("failed to create ptp event, %s", cneErr) } - if ceEvent, err := common.GetPublishingCloudEvent(p.scConfig, cneEvent); err == nil { - // the saw because api is not processing this, returned directly by currentState call - return ceEvent + ceEvent, err := common.GetPublishingCloudEvent(p.scConfig, cneEvent) + if err != nil { + return nil, err } + return ceEvent, nil } - return nil + return nil, fmt.Errorf("EventPublisherType not found for event type %s", string(eventType)) } // PublishEvent ...publish events @@ -258,15 +262,29 @@ func (p *PTPEventManager) PublishEvent(state ptp.SyncState, ptpOffset int64, sou log.Infof("PublishEvent state=%s, ptpOffset=%d, source=%s, eventType=%s", state, ptpOffset, source, eventType) return } + // /cluster/xyz/ptp/CLOCK_REALTIME this is not address the event is published to data := p.GetPTPEventsData(state, ptpOffset, source, eventType) resourceAddress := fmt.Sprintf(p.resourcePrefix, p.nodeName, string(p.publisherTypes[eventType].Resource)) p.publish(*data, resourceAddress, eventType) + // publish the event again as overall sync state + // SyncStateChange is the overall sync state including PtpStateChange and OsClockSyncStateChange + if eventType == ptp.PtpStateChange || eventType == ptp.OsClockSyncStateChange { + if state != p.lastOverallSyncState { + eventType = ptp.SyncStateChange + data = p.GetPTPEventsData(state, ptpOffset, source, eventType) + resourceAddress = fmt.Sprintf(p.resourcePrefix, p.nodeName, string(p.publisherTypes[eventType].Resource)) + p.publish(*data, resourceAddress, eventType) + p.lastOverallSyncState = state + } + } } -func (p *PTPEventManager) publish(data ceevent.Data, eventSource string, eventType ptp.EventType) { +func (p *PTPEventManager) publish(data ceevent.Data, resourceAddress string, eventType ptp.EventType) { + var e ceevent.Event + var err error if pubs, ok := p.publisherTypes[eventType]; ok { - e, err := common.CreateEvent(pubs.PubID, string(eventType), eventSource, data) + e, err = common.CreateEvent(pubs.PubID, string(eventType), resourceAddress, data) if err != nil { log.Errorf("failed to create ptp event, %s", err) return diff --git a/plugins/ptp_operator/ptp_operator_plugin.go b/plugins/ptp_operator/ptp_operator_plugin.go index f7cb8afb..386825d0 100644 --- a/plugins/ptp_operator/ptp_operator_plugin.go +++ b/plugins/ptp_operator/ptp_operator_plugin.go @@ -179,27 +179,40 @@ func Start(wg *sync.WaitGroup, configuration *common.SCConfiguration, fn func(e // getCurrentStatOverrideFn is called when current state is received by rest api func getCurrentStatOverrideFn() func(e v2.Event, d *channel.DataChan) error { return func(e v2.Event, d *channel.DataChan) error { + var err error if e.Source() != "" { log.Infof("setting return address to %s", e.Source()) d.ReturnAddress = pointer.String(e.Source()) } log.Infof("got status check call,send events for subscriber %s => %s", d.ClientID.String(), e.Source()) var eventType ptp.EventType + var eventSource ptp.EventResource if strings.Contains(e.Source(), string(ptp.PtpLockState)) { eventType = ptp.PtpStateChange + eventSource = ptp.PtpLockState } else if strings.Contains(e.Source(), string(ptp.OsClockSyncState)) { eventType = ptp.OsClockSyncStateChange + eventSource = ptp.OsClockSyncState } else if strings.Contains(e.Source(), string(ptp.PtpClockClass)) { eventType = ptp.PtpClockClassChange + eventSource = ptp.PtpClockClass } else if strings.Contains(e.Source(), string(ptp.GnssSyncStatus)) { eventType = ptp.GnssStateChange + eventSource = ptp.GnssSyncStatus + } else if strings.Contains(e.Source(), string(ptp.SyncStatusState)) { + eventType = ptp.SyncStateChange + eventSource = ptp.SyncStatusState } else { log.Warnf("could not find any events for requested resource type %s", e.Source()) return fmt.Errorf("could not find any events for requested resource type %s", e.Source()) } if len(eventManager.Stats) == 0 { data := eventManager.GetPTPEventsData(ptp.FREERUN, 0, "ptp-not-set", eventType) - d.Data = eventManager.GetPTPCloudEvents(*data, eventType) + d.Data, err = eventManager.GetPTPCloudEvents(*data, eventType) + if err != nil { + return err + } + d.Data.SetSource(string(eventSource)) return nil } // process events @@ -213,6 +226,8 @@ func getCurrentStatOverrideFn() func(e v2.Event, d *channel.DataChan) error { return data } + var overallSyncState ptp.SyncState + for _, ptpStats := range eventManager.Stats { // configname->PTPStats for ptpInterface, s := range ptpStats { // iface->stats switch ptpInterface { @@ -227,10 +242,16 @@ func getCurrentStatOverrideFn() func(e v2.Event, d *channel.DataChan) error { case ptp.PtpClockClassChange: clockClass := fmt.Sprintf("%s/%s", string(ptpInterface), ptpMetrics.ClockClass) data = processDataFn(data, eventManager.GetPTPEventsData(s.SyncState(), s.ClockClass(), clockClass, eventType)) + case ptp.SyncStateChange: + overallSyncState = getOverallState(overallSyncState, s.SyncState()) } case ptpMetrics.ClockRealTime: - if eventType == ptp.OsClockSyncStateChange { + switch eventType { + case ptp.OsClockSyncStateChange: data = processDataFn(data, eventManager.GetPTPEventsData(s.SyncState(), s.LastOffset(), string(ptpInterface), eventType)) + // SyncStateChange includes OsClockSyncStateChange + case ptp.SyncStateChange: + overallSyncState = getOverallState(overallSyncState, s.SyncState()) } default: switch eventType { @@ -253,11 +274,22 @@ func getCurrentStatOverrideFn() func(e v2.Event, d *channel.DataChan) error { } } } + if eventType == ptp.SyncStateChange && overallSyncState != "" { + data = processDataFn(data, eventManager.GetPTPEventsData(overallSyncState, 0, string(eventSource), eventType)) + } if data != nil { - d.Data = eventManager.GetPTPCloudEvents(*data, eventType) + d.Data, err = eventManager.GetPTPCloudEvents(*data, eventType) + if err != nil { + return err + } + d.Data.SetSource(string(eventSource)) } else { data = eventManager.GetPTPEventsData(ptp.FREERUN, 0, "event-not-found", eventType) - d.Data = eventManager.GetPTPCloudEvents(*data, eventType) + d.Data, err = eventManager.GetPTPCloudEvents(*data, eventType) + if err != nil { + return err + } + d.Data.SetSource(string(eventSource)) log.Errorf("could not find any events for requested resource type %s", e.Source()) return nil } @@ -265,6 +297,27 @@ func getCurrentStatOverrideFn() func(e v2.Event, d *channel.DataChan) error { } } +// return worst of FREERUN, HOLDOVER or LOCKED +func getOverallState(current, new ptp.SyncState) ptp.SyncState { + if current == "" { + return new + } + switch new { + case ptp.FREERUN: + return ptp.FREERUN + case ptp.HOLDOVER: + if current == ptp.FREERUN { + return current + } + return new + case ptp.LOCKED: + return current + default: + log.Warnf("last sync state is unknown: %s", new) + } + return "" +} + // update interface details and threshold details when ptpConfig change found. // These are updated when config file is created or deleted under /var/run folder // by linuxptp-daemon. @@ -516,6 +569,10 @@ func HasEqualInterface(a []*string, b []*ptp4lconf.PTPInterface) bool { // InitPubSubTypes ... initialize types of publishers for ptp operator func InitPubSubTypes() map[ptp.EventType]*ptpTypes.EventPublisherType { InitPubs := make(map[ptp.EventType]*ptpTypes.EventPublisherType) + InitPubs[ptp.SyncStateChange] = &ptpTypes.EventPublisherType{ + EventType: ptp.SyncStateChange, + Resource: ptp.SyncStatusState, + } InitPubs[ptp.OsClockSyncStateChange] = &ptpTypes.EventPublisherType{ EventType: ptp.OsClockSyncStateChange, Resource: ptp.OsClockSyncState, diff --git a/plugins/ptp_operator/ptp_operator_plugin_test.go b/plugins/ptp_operator/ptp_operator_plugin_test.go index e16857b7..50adc728 100644 --- a/plugins/ptp_operator/ptp_operator_plugin_test.go +++ b/plugins/ptp_operator/ptp_operator_plugin_test.go @@ -194,9 +194,9 @@ func Test_StartWithOutAMQP(t *testing.T) { log.Printf("Closing the channels") close(scConfig.CloseCh) // close the channel pubs := scConfig.PubSubAPI.GetPublishers() - assert.Equal(t, 4, len(pubs)) + assert.Equal(t, 5, len(pubs)) subs := scConfig.PubSubAPI.GetSubscriptions() - assert.Equal(t, 4, len(subs)) + assert.Equal(t, 5, len(subs)) } @@ -254,9 +254,9 @@ func Test_StartWithHTTP(t *testing.T) { close(scConfig.CloseCh) // close the channel pubs := scConfig.PubSubAPI.GetPublishers() - assert.Equal(t, 4, len(pubs)) + assert.Equal(t, 5, len(pubs)) subs := scConfig.PubSubAPI.GetSubscriptions() - assert.Equal(t, 4, len(subs)) + assert.Equal(t, 5, len(subs)) } // ProcessInChannel will be called if Transport is disabled