Skip to content

Commit

Permalink
implement over all sync-status (#351)
Browse files Browse the repository at this point in the history
Implement /sync/sync-status/sync-state.
Trigger overall sync-state event when either ptp-state-change
or os-clock-sync-state-change events are triggered.

Signed-off-by: Jack Ding <[email protected]>
Co-authored-by: OpenShift Cherrypick Robot <[email protected]>
  • Loading branch information
jzding and openshift-cherrypick-robot authored Sep 12, 2024
1 parent 42dfcf5 commit 526126a
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 30 deletions.
3 changes: 2 additions & 1 deletion examples/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func createSubscription(resourceAddress string) (sub pubsub.PubSub, err error) {
if subB, err = json.Marshal(&sub); err == nil {
rc := restclient.New()
if status, subB = rc.PostWithReturn(subURL, subB); status != http.StatusCreated {
err = fmt.Errorf("error subscription creation api at %s, returned status %d", subURL, status)
err = fmt.Errorf("api at %s returned status %d for %s", subURL, status, resourceAddress)
} else {
err = json.Unmarshal(subB, &sub)
}
Expand Down Expand Up @@ -248,6 +248,7 @@ func initSubscribers(cType ConsumerTypeEnum) map[string]string {
subscribeTo[string(ptpEvent.PtpClockClassChange)] = string(ptpEvent.PtpClockClass)
subscribeTo[string(ptpEvent.PtpStateChange)] = string(ptpEvent.PtpLockState)
subscribeTo[string(ptpEvent.GnssStateChange)] = string(ptpEvent.GnssSyncStatus)
subscribeTo[string(ptpEvent.SyncStateChange)] = string(ptpEvent.SyncStatusState)
case MOCK:
subscribeTo[mockResourceKey] = mockResource
case HW:
Expand Down
6 changes: 3 additions & 3 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func CreateSubscription(config *SCConfiguration, subscription pubsub.PubSub) (su
}

// CreateEvent create an event
func CreateEvent(pubSubID, eventType, resourceAddress string, data ceevent.Data) (ceevent.Event, error) {
func CreateEvent(pubSubID, eventType, source string, data ceevent.Data) (ceevent.Event, error) {
// create an event
if pubSubID == "" {
return ceevent.Event{}, fmt.Errorf("id is a required field")
Expand All @@ -285,7 +285,7 @@ func CreateEvent(pubSubID, eventType, resourceAddress string, data ceevent.Data)
event := v1event.CloudNativeEvent()
event.ID = pubSubID
event.Type = eventType
event.SetSource(resourceAddress)
event.SetSource(source)
event.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time)
event.SetDataContentType(ceevent.ApplicationJSON)
event.SetData(data)
Expand Down Expand Up @@ -330,7 +330,7 @@ func GetPublishingCloudEvent(scConfig *SCConfiguration, cneEvent ceevent.Event)
pub, err := scConfig.PubSubAPI.GetPublisher(cneEvent.ID)
if err != nil {
localmetrics.UpdateEventPublishedCount(cneEvent.ID, localmetrics.FAIL, 1)
return nil, fmt.Errorf("no publisher data for id %s found to publish event for", cneEvent.ID)
return nil, err
}
ceEvent, err := cneEvent.NewCloudEvent(&pub)
if err != nil {
Expand Down
54 changes: 36 additions & 18 deletions plugins/ptp_operator/metrics/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type PTPEventManager struct {
PtpConfigMapUpdates *ptpConfig.LinuxPTPConfigMapUpdate
// Ptp4lConfigInterfaces holds interfaces and its roles, after reading from ptp4l config files
Ptp4lConfigInterfaces map[types.ConfigName]*ptp4lconf.PTP4lConfig
lastOverallSyncState ptp.SyncState
}

// NewPTPEventManager to manage events and metrics
Expand Down Expand Up @@ -220,31 +221,34 @@ func (p *PTPEventManager) GetPTPEventsData(state ptp.SyncState, ptpOffset int64,
Value: state,
})
}

data.Values = append(data.Values, ceevent.DataValue{
Resource: eventSource,
DataType: ceevent.METRIC,
ValueType: ceevent.DECIMAL,
Value: ptpOffset,
})
if eventType != ptp.SyncStateChange {
data.Values = append(data.Values, ceevent.DataValue{
Resource: eventSource,
DataType: ceevent.METRIC,
ValueType: ceevent.DECIMAL,
Value: ptpOffset,
})
}
return &data
}

// GetPTPCloudEvents ...GetEvent events
func (p *PTPEventManager) GetPTPCloudEvents(data ceevent.Data, eventType ptp.EventType) *cloudevents.Event {
resourceAddress := fmt.Sprintf(p.resourcePrefix, p.nodeName, string(p.publisherTypes[eventType].Resource))
func (p *PTPEventManager) GetPTPCloudEvents(data ceevent.Data, eventType ptp.EventType) (*cloudevents.Event, error) {
if pubs, ok := p.publisherTypes[eventType]; ok {
cneEvent, cneErr := common.CreateEvent(pubs.PubID, string(eventType), resourceAddress, data)
cneEvent, cneErr := common.CreateEvent(
pubs.PubID, string(eventType),
fmt.Sprintf(p.resourcePrefix, p.nodeName, string(p.publisherTypes[eventType].Resource)),
data)
if cneErr != nil {
log.Errorf("failed to create ptp event, %s", cneErr)
return nil
return nil, fmt.Errorf("failed to create ptp event, %s", cneErr)
}
if ceEvent, err := common.GetPublishingCloudEvent(p.scConfig, cneEvent); err == nil {
// the saw because api is not processing this, returned directly by currentState call
return ceEvent
ceEvent, err := common.GetPublishingCloudEvent(p.scConfig, cneEvent)
if err != nil {
return nil, err
}
return ceEvent, nil
}
return nil
return nil, fmt.Errorf("EventPublisherType not found for event type %s", string(eventType))
}

// PublishEvent ...publish events
Expand All @@ -258,15 +262,29 @@ func (p *PTPEventManager) PublishEvent(state ptp.SyncState, ptpOffset int64, sou
log.Infof("PublishEvent state=%s, ptpOffset=%d, source=%s, eventType=%s", state, ptpOffset, source, eventType)
return
}

// /cluster/xyz/ptp/CLOCK_REALTIME this is not address the event is published to
data := p.GetPTPEventsData(state, ptpOffset, source, eventType)
resourceAddress := fmt.Sprintf(p.resourcePrefix, p.nodeName, string(p.publisherTypes[eventType].Resource))
p.publish(*data, resourceAddress, eventType)
// publish the event again as overall sync state
// SyncStateChange is the overall sync state including PtpStateChange and OsClockSyncStateChange
if eventType == ptp.PtpStateChange || eventType == ptp.OsClockSyncStateChange {
if state != p.lastOverallSyncState {
eventType = ptp.SyncStateChange
data = p.GetPTPEventsData(state, ptpOffset, source, eventType)
resourceAddress = fmt.Sprintf(p.resourcePrefix, p.nodeName, string(p.publisherTypes[eventType].Resource))
p.publish(*data, resourceAddress, eventType)
p.lastOverallSyncState = state
}
}
}

func (p *PTPEventManager) publish(data ceevent.Data, eventSource string, eventType ptp.EventType) {
func (p *PTPEventManager) publish(data ceevent.Data, resourceAddress string, eventType ptp.EventType) {
var e ceevent.Event
var err error
if pubs, ok := p.publisherTypes[eventType]; ok {
e, err := common.CreateEvent(pubs.PubID, string(eventType), eventSource, data)
e, err = common.CreateEvent(pubs.PubID, string(eventType), resourceAddress, data)
if err != nil {
log.Errorf("failed to create ptp event, %s", err)
return
Expand Down
65 changes: 61 additions & 4 deletions plugins/ptp_operator/ptp_operator_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,27 +179,40 @@ func Start(wg *sync.WaitGroup, configuration *common.SCConfiguration, fn func(e
// getCurrentStatOverrideFn is called when current state is received by rest api
func getCurrentStatOverrideFn() func(e v2.Event, d *channel.DataChan) error {
return func(e v2.Event, d *channel.DataChan) error {
var err error
if e.Source() != "" {
log.Infof("setting return address to %s", e.Source())
d.ReturnAddress = pointer.String(e.Source())
}
log.Infof("got status check call,send events for subscriber %s => %s", d.ClientID.String(), e.Source())
var eventType ptp.EventType
var eventSource ptp.EventResource
if strings.Contains(e.Source(), string(ptp.PtpLockState)) {
eventType = ptp.PtpStateChange
eventSource = ptp.PtpLockState
} else if strings.Contains(e.Source(), string(ptp.OsClockSyncState)) {
eventType = ptp.OsClockSyncStateChange
eventSource = ptp.OsClockSyncState
} else if strings.Contains(e.Source(), string(ptp.PtpClockClass)) {
eventType = ptp.PtpClockClassChange
eventSource = ptp.PtpClockClass
} else if strings.Contains(e.Source(), string(ptp.GnssSyncStatus)) {
eventType = ptp.GnssStateChange
eventSource = ptp.GnssSyncStatus
} else if strings.Contains(e.Source(), string(ptp.SyncStatusState)) {
eventType = ptp.SyncStateChange
eventSource = ptp.SyncStatusState
} else {
log.Warnf("could not find any events for requested resource type %s", e.Source())
return fmt.Errorf("could not find any events for requested resource type %s", e.Source())
}
if len(eventManager.Stats) == 0 {
data := eventManager.GetPTPEventsData(ptp.FREERUN, 0, "ptp-not-set", eventType)
d.Data = eventManager.GetPTPCloudEvents(*data, eventType)
d.Data, err = eventManager.GetPTPCloudEvents(*data, eventType)
if err != nil {
return err
}
d.Data.SetSource(string(eventSource))
return nil
}
// process events
Expand All @@ -213,6 +226,8 @@ func getCurrentStatOverrideFn() func(e v2.Event, d *channel.DataChan) error {
return data
}

var overallSyncState ptp.SyncState

for _, ptpStats := range eventManager.Stats { // configname->PTPStats
for ptpInterface, s := range ptpStats { // iface->stats
switch ptpInterface {
Expand All @@ -227,10 +242,16 @@ func getCurrentStatOverrideFn() func(e v2.Event, d *channel.DataChan) error {
case ptp.PtpClockClassChange:
clockClass := fmt.Sprintf("%s/%s", string(ptpInterface), ptpMetrics.ClockClass)
data = processDataFn(data, eventManager.GetPTPEventsData(s.SyncState(), s.ClockClass(), clockClass, eventType))
case ptp.SyncStateChange:
overallSyncState = getOverallState(overallSyncState, s.SyncState())
}
case ptpMetrics.ClockRealTime:
if eventType == ptp.OsClockSyncStateChange {
switch eventType {
case ptp.OsClockSyncStateChange:
data = processDataFn(data, eventManager.GetPTPEventsData(s.SyncState(), s.LastOffset(), string(ptpInterface), eventType))
// SyncStateChange includes OsClockSyncStateChange
case ptp.SyncStateChange:
overallSyncState = getOverallState(overallSyncState, s.SyncState())
}
default:
switch eventType {
Expand All @@ -253,18 +274,50 @@ func getCurrentStatOverrideFn() func(e v2.Event, d *channel.DataChan) error {
}
}
}
if eventType == ptp.SyncStateChange && overallSyncState != "" {
data = processDataFn(data, eventManager.GetPTPEventsData(overallSyncState, 0, string(eventSource), eventType))
}
if data != nil {
d.Data = eventManager.GetPTPCloudEvents(*data, eventType)
d.Data, err = eventManager.GetPTPCloudEvents(*data, eventType)
if err != nil {
return err
}
d.Data.SetSource(string(eventSource))
} else {
data = eventManager.GetPTPEventsData(ptp.FREERUN, 0, "event-not-found", eventType)
d.Data = eventManager.GetPTPCloudEvents(*data, eventType)
d.Data, err = eventManager.GetPTPCloudEvents(*data, eventType)
if err != nil {
return err
}
d.Data.SetSource(string(eventSource))
log.Errorf("could not find any events for requested resource type %s", e.Source())
return nil
}
return nil
}
}

// return worst of FREERUN, HOLDOVER or LOCKED
func getOverallState(current, new ptp.SyncState) ptp.SyncState {
if current == "" {
return new
}
switch new {
case ptp.FREERUN:
return ptp.FREERUN
case ptp.HOLDOVER:
if current == ptp.FREERUN {
return current
}
return new
case ptp.LOCKED:
return current
default:
log.Warnf("last sync state is unknown: %s", new)
}
return ""
}

// update interface details and threshold details when ptpConfig change found.
// These are updated when config file is created or deleted under /var/run folder
// by linuxptp-daemon.
Expand Down Expand Up @@ -516,6 +569,10 @@ func HasEqualInterface(a []*string, b []*ptp4lconf.PTPInterface) bool {
// InitPubSubTypes ... initialize types of publishers for ptp operator
func InitPubSubTypes() map[ptp.EventType]*ptpTypes.EventPublisherType {
InitPubs := make(map[ptp.EventType]*ptpTypes.EventPublisherType)
InitPubs[ptp.SyncStateChange] = &ptpTypes.EventPublisherType{
EventType: ptp.SyncStateChange,
Resource: ptp.SyncStatusState,
}
InitPubs[ptp.OsClockSyncStateChange] = &ptpTypes.EventPublisherType{
EventType: ptp.OsClockSyncStateChange,
Resource: ptp.OsClockSyncState,
Expand Down
8 changes: 4 additions & 4 deletions plugins/ptp_operator/ptp_operator_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ func Test_StartWithOutAMQP(t *testing.T) {
log.Printf("Closing the channels")
close(scConfig.CloseCh) // close the channel
pubs := scConfig.PubSubAPI.GetPublishers()
assert.Equal(t, 4, len(pubs))
assert.Equal(t, 5, len(pubs))
subs := scConfig.PubSubAPI.GetSubscriptions()
assert.Equal(t, 4, len(subs))
assert.Equal(t, 5, len(subs))

}

Expand Down Expand Up @@ -254,9 +254,9 @@ func Test_StartWithHTTP(t *testing.T) {

close(scConfig.CloseCh) // close the channel
pubs := scConfig.PubSubAPI.GetPublishers()
assert.Equal(t, 4, len(pubs))
assert.Equal(t, 5, len(pubs))
subs := scConfig.PubSubAPI.GetSubscriptions()
assert.Equal(t, 4, len(subs))
assert.Equal(t, 5, len(subs))
}

// ProcessInChannel will be called if Transport is disabled
Expand Down

0 comments on commit 526126a

Please sign in to comment.