diff --git a/gnmi_server/server_test.go b/gnmi_server/server_test.go index 76b0ddbe..21f99a16 100644 --- a/gnmi_server/server_test.go +++ b/gnmi_server/server_test.go @@ -2955,6 +2955,93 @@ func TestAuthCapabilities(t *testing.T) { } } +func TestOnChangeNoMissingKey(t *testing.T) { + s := createServer(t, 8081) + go runServer(t, s) + defer s.s.Stop() + q := createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68", "SAI_PORT_STAT_PFC_7_RX_PKTS") + q.Addrs = []string{"127.0.0.1:8081"} + + tests := []struct { + desc string + updates []tablePathValue + wantNoti []client.Notification + }{ + { + desc: "Check reponse for update notification for COUNTERS/Ethernet68/SAI_PORT_STAT_PFC_7_RX_PKTS", + updates: []tablePathValue{ + { + dbName: "COUNTERS_DB", + tableName: "COUNTERS", + tableKey: "oid:0x1000000000039", // "Ethernet68": "oid:0x1000000000039", + delimitor: ":", + field: "SAI_PORT_STAT_PFC_7_RX_PKTS", + value: "3", // be changed to 3 from 2 + }, + { //Same value set should not trigger multiple updates + dbName: "COUNTERS_DB", + tableName: "COUNTERS", + tableKey: "oid:0x1000000000039", // "Ethernet68": "oid:0x1000000000039", + delimitor: ":", + field: "SAI_PORT_STAT_PFC_7_RX_PKTS", + value: "3", // be changed to 3 from 2 + }, + }, + wantNoti: []client.Notification{ + client.Connected{}, + client.Update{Path: []string{"COUNTERS", "Ethernet68", "SAI_PORT_STAT_PFC_7_RX_PKTS"}, TS: time.Unix(0, 200), Val: "2"}, + client.Sync{}, + client.Update{Path: []string{"COUNTERS", "Ethernet68", "SAI_PORT_STAT_PFC_7_RX_PKTS"}, TS: time.Unix(0, 200), Val: "3"}, + }, + }, + } + namespace := sdcfg.GetDbDefaultNamespace() + rclient := getRedisClient(t, namespace) + defer rclient.Close() + prepareDb(t, namespace) + + for _, tt := range tests { + t.Run(tt.desc, func(t *testing.T) { + c := client.New() + defer c.Close() + + var gotNoti []string + var mutexGotNoti sync.Mutex + q.NotificationHandler = func(n client.Notification) error { + mutexGotNoti.Lock() + if nn, ok := n.(client.Update); ok { + nn.TS = time.Unix(0, 200) + str := fmt.Sprintf("%v", nn.Val) + currentNoti := gotNoti + gotNoti = append(currentNoti, str) + } + mutexGotNoti.Unlock() + return nil + } + + go func() { + if err := c.Subscribe(context.Background(), q); err != nil { + t.Errorf("c.Subscribe(): got error %v, expected nil", err) + } + }() + + time.Sleep(time.Millisecond * 500) + + for _, update := range tt.updates { + rclient.HSet(update.tableName+update.delimitor+update.tableKey, update.field, update.value) + } + + time.Sleep(time.Millisecond * 500) + + mutexGotNoti.Lock() + defer mutexGotNoti.Unlock() + for _, noti := range gotNoti { + t.Errorf(noti) + } + }) + } +} + func TestCPUUtilization(t *testing.T) { mock := gomonkey.ApplyFunc(sdc.PollStats, func() { var i uint64 diff --git a/sonic_data_client/db_client.go b/sonic_data_client/db_client.go index 09a52d45..4f361742 100644 --- a/sonic_data_client/db_client.go +++ b/sonic_data_client/db_client.go @@ -1089,7 +1089,7 @@ func dbSingleTableKeySubscribe(c *DbClient, rsd redisSubData, updateChannel chan continue } tblPath.tableKey = subscr.Channel[prefixLen:] - err = tableData2Msi(&tblPath, false, nil, &newMsi) + err = tableData2Msi(&tblPath, true, nil, &newMsi) if err != nil { enqueueFatalMsg(c, err.Error()) return