Skip to content

Commit

Permalink
Merge pull request #24 from UlfBj/master
Browse files Browse the repository at this point in the history
gRPC subscription bug fix.
  • Loading branch information
UlfBj authored Apr 10, 2024
2 parents 27d6de0 + 4041647 commit b46ac45
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 12 deletions.
3 changes: 2 additions & 1 deletion client/client-1.0/grpc_client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func initCommandList() {

commandList[0] = `{"action":"get","path":"Vehicle/Speed","requestId":"232"}`
commandList[1] = `{"action":"set", "path":"Vehicle/Body/Lights/IsLeftIndicatorOn", "value":"true", "requestId":"245"}`
commandList[2] = `{"action":"subscribe","path":"Vehicle","filter":[{"type":"paths","parameter":["Speed", "Chassis.Accelerator.PedalPosition"]},{"type":"timebased","parameter": {"period":"5000"}}],"requestId":"246"}`
commandList[2] = `{"action":"subscribe","path":"Vehicle.Speed","filter":{"type":"curvelog","parameter":{"maxerr":"2","bufsize":"15"}},"requestId":"285"}`
/* commandList[2] = `{"action":"subscribe","path":"Vehicle","filter":[{"type":"paths","parameter":["Speed", "Chassis.Accelerator.PedalPosition"]},{"type":"timebased","parameter": {"period":"5000"}}],"requestId":"246"}`*/
commandList[3] = `{"action":"unsubscribe","subscriptionId":"X","requestId":"240"}` // X is replaced according to input

/* different variants
Expand Down
2 changes: 1 addition & 1 deletion feeder/feeder-template/feederv1/feederv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func statestorageSet(path string, val string, ts string) int {
}
return 0
case "redis":
dp := `{"val":"` + val + `", "ts":"` + ts + `"}`
dp := `{"value":"` + val + `", "ts":"` + ts + `"}`
err := redisClient.Set(path, dp, time.Duration(0)).Err()
if err != nil {
utils.Error.Printf("Job failed. Err=%s", err)
Expand Down
4 changes: 2 additions & 2 deletions feeder/feeder-template/feederv2/feederv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,15 @@ func statestorageSet(path string, val string, ts string) int {
}
return 0
case "redis":
dp := `{"val":"` + val + `", "ts":"` + ts + `"}`
dp := `{"value":"` + val + `", "ts":"` + ts + `"}`
err := redisClient.Set(path, dp, time.Duration(0)).Err()
if err != nil {
utils.Error.Printf("Job failed. Err=%s", err)
return -1
}
return 0
case "memcache":
dp := `{"val":"` + val + `", "ts":"` + ts + `"}`
dp := `{"value":"` + val + `", "ts":"` + ts + `"}`
err := memcacheClient.Set(&memcache.Item{Key: path, Value: []byte(dp)})
if err != nil {
utils.Error.Printf("Job failed. Err=%s", err)
Expand Down
15 changes: 8 additions & 7 deletions server/vissv2server/serviceMgr/curvelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func clCapture1dim(clChan chan CLPack, subscriptionId int, path string, bufSize
}
mcloseClSubId.Unlock()
dp := getVehicleData(path)
utils.Info.Printf("dp=%s", dp)
utils.MapRequest(dp, &dpMap)
_, ts := readRing(&aRingBuffer, 0) // read latest written
if ts != dpMap["ts"].(string) {
Expand All @@ -442,11 +443,11 @@ func clCapture1dim(clChan chan CLPack, subscriptionId int, path string, bufSize
var clPack CLPack
clPack.SubscriptionId = subscriptionId
if len(extraData) > 0 {
clPack.DataPack = `{"path":"` + path + `","data":` + extraData + "}"
clPack.DataPack = `{"path":"` + path + `","dp":` + extraData + "}"
clChan <- clPack
}
if lastSelected > 0 {
clPack.DataPack = `{"path":"` + path + `","data":` + data + "}"
clPack.DataPack = `{"path":"` + path + `","dp":` + data + "}"
clChan <- clPack
}
setRingTail(&aRingBuffer, lastSelected) // update tail pointer
Expand Down Expand Up @@ -628,7 +629,7 @@ func transformDataPoint(aRingBuffer *RingBuffer, index int, tsBase time.Time) (C
func returnSingleDp(clChan chan CLPack, subscriptionId int, path string) {
dp := getVehicleData(path)
var clPack CLPack
clPack.DataPack = `{"path":"` + path + `","data":` + dp + "}"
clPack.DataPack = `{"path":"` + path + `","dp":` + dp + "}"
clPack.SubscriptionId = subscriptionId
clChan <- clPack
}
Expand All @@ -637,7 +638,7 @@ func returnSingleDp2(clChan chan CLPack, subscriptionId int, paths Dim2Elem) {
dp1 := getVehicleData(paths.Path1)
dp2 := getVehicleData(paths.Path2)
var clPack CLPack
clPack.DataPack = `[{"path":"` + paths.Path1 + `","data":` + dp1 + "}," + `{"path":"` + paths.Path2 + `","data":` + dp2 + "}]"
clPack.DataPack = `[{"path":"` + paths.Path1 + `","dp":` + dp1 + "}," + `{"path":"` + paths.Path2 + `","dp":` + dp2 + "}]"
clPack.SubscriptionId = subscriptionId
clChan <- clPack
}
Expand All @@ -647,7 +648,7 @@ func returnSingleDp3(clChan chan CLPack, subscriptionId int, paths Dim3Elem) {
dp2 := getVehicleData(paths.Path2)
dp3 := getVehicleData(paths.Path3)
var clPack CLPack
clPack.DataPack = `[{"path":"` + paths.Path1 + `","data":` + dp1 + `},{"path":"` + paths.Path2 + `","data":` + dp2 + `},{"path":"` + paths.Path3 + `","data":` + dp3 + "}]"
clPack.DataPack = `[{"path":"` + paths.Path1 + `","dp":` + dp1 + `},{"path":"` + paths.Path2 + `","dp":` + dp2 + `},{"path":"` + paths.Path3 + `","dp":` + dp3 + "}]"
clPack.SubscriptionId = subscriptionId
clChan <- clPack
}
Expand Down Expand Up @@ -687,7 +688,7 @@ func clCapture2dim(clChan chan CLPack, subscriptionId int, paths Dim2Elem, bufSi
if (currentBufSize == bufSize) || (closeClSession == true) {
data1, data2, updatedTail := clAnalyze2dim(&aRingBuffer1, &aRingBuffer2, currentBufSize, maxError)
var clPack CLPack
clPack.DataPack = `[{"path":"` + paths.Path1 + `","data":` + data1 + "}," + `{"path":"` + paths.Path2 + `","data":` + data2 + "}]"
clPack.DataPack = `[{"path":"` + paths.Path1 + `","dp":` + data1 + "}," + `{"path":"` + paths.Path2 + `","dp":` + data2 + "}]"
clPack.SubscriptionId = subscriptionId
clChan <- clPack
setRingTail(&aRingBuffer1, updatedTail)
Expand Down Expand Up @@ -812,7 +813,7 @@ func clCapture3dim(clChan chan CLPack, subscriptionId int, paths Dim3Elem, bufSi
if (currentBufSize == bufSize) || (closeClSession == true) {
data1, data2, data3, updatedTail := clAnalyze3dim(&aRingBuffer1, &aRingBuffer2, &aRingBuffer3, currentBufSize, maxError)
var clPack CLPack
clPack.DataPack = `[{"path":"` + paths.Path1 + `","data":` + data1 + `},{"path":"` + paths.Path2 + `","data":` + data2 + `},{"path":"` + paths.Path3 + `","data":` + data3 + "}]"
clPack.DataPack = `[{"path":"` + paths.Path1 + `","dp":` + data1 + `},{"path":"` + paths.Path2 + `","dp":` + data2 + `},{"path":"` + paths.Path3 + `","dp":` + data3 + "}]"
clPack.SubscriptionId = subscriptionId
clChan <- clPack
setRingTail(&aRingBuffer1, updatedTail)
Expand Down
3 changes: 2 additions & 1 deletion utils/grcputils.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ func createJSON(value string, key string) string {

func createJsonData(dataPack []*pb.DataPackages_DataPackage) string {
data := ""

Info.Printf("createJsonData:len(dataPack)=%d", len(dataPack))
if len(dataPack) > 1 {
data += "["
}
Expand All @@ -835,6 +835,7 @@ func createJsonData(dataPack []*pb.DataPackages_DataPackage) string {
} else {
path = DecompressPath(dataPack[i].GetPathC())
}
Info.Printf("createJsonData:path=%s", path)
dp := getJsonDp(dataPack[i])
data += `{"path":"` + path + `","dp":` + dp + `},`
}
Expand Down

0 comments on commit b46ac45

Please sign in to comment.