diff --git a/client/client-1.0/Javascript/appclient_commands.txt b/client/client-1.0/Javascript/appclient_commands.txt index 363a1510..780ced57 100644 --- a/client/client-1.0/Javascript/appclient_commands.txt +++ b/client/client-1.0/Javascript/appclient_commands.txt @@ -29,13 +29,13 @@ Metadata request: {"action":"get","path":"Vehicle/ADAS","filter":[{"type":"paths","parameter":["ABS/*","CruiseControl/Error"]},{"type":"dynamic-metadata","parameter":"availability"}],"requestId":"237"} Set request: -{"action":"set", "path":"Vehicle/Cabin/Door/Row1/Right/IsOpen", "value":"999", "requestId":"245"} +{"action":"set", "path":"Vehicle/Cabin/Door/Row1/PassengerSide/IsOpen", "value":"999", "requestId":"245"} Subscribe request: {"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/PassengerSide/IsOpen","filter":{"type":"timebased","parameter":{"period":"3000"}},"requestId":"246"} -{"action":"subscribe","path":"Vehicle.Cabin.Door.Row1.Right.IsOpen","filter":{"type":"change","parameter":{"logic-op":"ne", "diff":"0"}},"requestId":"247"} -{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/Right/IsOpen","filter":{"type":"range","parameter":{"logic-op":"gt","boundary":"500"}},"requestId":"255"} -{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/Right/IsOpen","filter":{"type":"range","parameter":[{"logic-op":"gt","boundary":"500"},{"logic-op":"lt","boundary":"510"}]},"requestId":"265"} +{"action":"subscribe","path":"Vehicle.Cabin.Door.Row1.PassengerSide.IsOpen","filter":{"type":"change","parameter":{"logic-op":"ne", "diff":"0"}},"requestId":"247"} +{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/PassengerSide/IsOpen","filter":{"type":"range","parameter":{"logic-op":"gt","boundary":"500"}},"requestId":"255"} +{"action":"subscribe","path":"Vehicle/Cabin/Door/Row1/PassengerSide/IsOpen","filter":{"type":"range","parameter":[{"logic-op":"gt","boundary":"500"},{"logic-op":"lt","boundary":"510"}]},"requestId":"265"} {"action":"subscribe","path":"Vehicle.Powertrain.Transmission.Speed","filter":{"type":"curvelog","parameter":{"maxerr":"2","bufsize":"100"}},"requestId":"275"} {"action":"subscribe","path":"Vehicle","filter":[{"type":"paths","parameter":["CurrentLocation.Latitude", "CurrentLocation.Longitude"]}, {"type":"curvelog","parameter":{"maxerr":"0.00001","bufsize":"100"}}],"requestId":"285"} @@ -48,7 +48,7 @@ Unsubscribe request: Get request: HTTP GET -URL: Vehicle/Cabin/Door/Row1/Right/IsOpen +URL: Vehicle/Cabin/Door/Row1/PassengerSide/IsOpen URL: Vehicle.Acceleration.Longitudinal Get request with search: @@ -90,6 +90,6 @@ wsclient_uncompressed.html: // where "at" is the Access Token from ATS request http_client.html: -URL: Vehicle/Cabin/Door/Row1/Right/IsOpen +URL: Vehicle/Cabin/Door/Row1/PassengerSide/IsOpen Access token: from at-request response diff --git a/feeder/feeder-template/feederv2/feederv2.go b/feeder/feeder-template/feederv2/feederv2.go index 9f70c322..e281c216 100644 --- a/feeder/feeder-template/feederv2/feederv2.go +++ b/feeder/feeder-template/feederv2/feederv2.go @@ -161,7 +161,10 @@ func initVSSInterfaceMgr(inputChan chan DomainData, outputChan chan DomainData) for { select { case outData := <-outputChan: - utils.Info.Printf("Data written to statestorage: Name=%s, Value=%s", outData.Name, outData.Value) +// utils.Info.Printf("Data written to statestorage: Name=%s, Value=%s", outData.Name, outData.Value) + if len(outData.Name) == 0 { + continue + } status := statestorageSet(outData.Name, outData.Value, utils.GetRfcTime()) if status != 0 { utils.Error.Printf("initVSSInterfaceMgr():Redis write failed") @@ -209,8 +212,8 @@ func statestorageSet(path string, val string, ts string) int { } func initUdsEndpoint(udsChan chan DomainData) { - os.Remove("/var/tmp/vissv2/server-feeder-channel.sock") - listener, err := net.Listen("unix", "/var/tmp/vissv2/server-feeder-channel.sock") //the file must be the same as declared in the feeder-registration.json that the service mgr reads + os.Remove("/var/tmp/vissv2/serverFeeder.sock") + listener, err := net.Listen("unix", "/var/tmp/vissv2/serverFeeder.sock") //the file must be the same as declared in the feeder-registration.json that the service mgr reads if err != nil { utils.Error.Printf("initUdsEndpoint:UDS listen failed, err = %s", err) os.Exit(-1) @@ -229,19 +232,21 @@ func initUdsEndpoint(udsChan chan DomainData) { continue } utils.Info.Printf("Feeder:Server message: %s", string(buf[:n])) - domainData, _ := splitToDomainDataAndTs(string(buf[:n])) - udsChan <- domainData + var serverMessageMap map[string]interface{} + err = json.Unmarshal(buf[:n], &serverMessageMap) + if err != nil { + utils.Error.Printf("splitToDomainDataAndTs:Unmarshal error=%s", err) + continue + } + if serverMessageMap["action"] != nil && serverMessageMap["action"].(string) == "set" { + domainData, _ := splitToDomainDataAndTs(serverMessageMap["data"].(map[string]interface{})) + udsChan <- domainData + } } } -func splitToDomainDataAndTs(serverMessage string) (DomainData, string) { // server={"dp": {"ts": "Z","value": "Y"},"path": "X"}, redis={"value":"xxx", "ts":"zzz"} +func splitToDomainDataAndTs(serverMessageMap map[string]interface{}) (DomainData, string) { // server={"dp": {"ts": "Z","value": "Y"},"path": "X"}, redis={"value":"xxx", "ts":"zzz"} var domainData DomainData - var serverMessageMap map[string]interface{} - err := json.Unmarshal([]byte(serverMessage), &serverMessageMap) - if err != nil { - utils.Error.Printf("splitToDomainDataAndTs:Unmarshal error=%s", err) - return domainData, "" - } domainData.Name = serverMessageMap["path"].(string) dpMap := serverMessageMap["dp"].(map[string]interface{}) domainData.Value = dpMap["value"].(string) @@ -319,7 +324,7 @@ func selectRandomInput(fMap []FeederMap) DomainData { } else { domainData.Value = strconv.Itoa(rand.Intn(1000)) } - utils.Info.Printf("Simulated data from Vehicle interface: Name=%s, Value=%s", domainData.Name, domainData.Value) +// utils.Info.Printf("Simulated data from Vehicle interface: Name=%s, Value=%s", domainData.Name, domainData.Value) return domainData } @@ -370,7 +375,8 @@ func convertDomainData(north2SouthConv bool, inData DomainData, feederMap []Feed var outData DomainData matchIndex := sort.Search(len(feederMap), func(i int) bool { return feederMap[i].Name >= inData.Name }) if matchIndex == len(feederMap) || feederMap[matchIndex].Name != inData.Name { - matchIndex = -1 + utils.Error.Printf("convertDomainData:Failed to map= %s", inData.Name) + return outData } outData.Name = feederMap[feederMap[matchIndex].MapIndex].Name outData.Value = convertValue(inData.Value, feederMap[matchIndex].ConvertIndex, diff --git a/feeder/feeder-template/feederv3/VssVehicle.cvt b/feeder/feeder-template/feederv3/VssVehicle.cvt new file mode 100755 index 00000000..04d7627e Binary files /dev/null and b/feeder/feeder-template/feederv3/VssVehicle.cvt differ diff --git a/feeder/feeder-template/feederv3/VssVehicleScaling.json b/feeder/feeder-template/feederv3/VssVehicleScaling.json new file mode 100755 index 00000000..d7578439 --- /dev/null +++ b/feeder/feeder-template/feederv3/VssVehicleScaling.json @@ -0,0 +1,2 @@ +[ +"{\"false\":\"0\", \"true\":\"1\"}","{\"UNDEFINED\":\"0\", \"LOCK\":\"1\", \"OFF\":\"2\", \"ACC\":\"3\", \"ON\":\"4\", \"START\":\"5\"}","{\"NONE\":\"0\", \"TWO_D\":\"1\", \"TWO_D_SATELLITE_BASED_AUGMENTATION\":\"2\", \"TWO_D_GROUND_BASED_AUGMENTATION\":\"3\", \"TWO_D_SATELLITE_AND_GROUND_BASED_AUGMENTATION\":\"4\", \"THREE_D\":\"5\", \"THREE_D_SATELLITE_BASED_AUGMENTATION\":\"6\", \"THREE_D_GROUND_BASED_AUGMENTATION\":\"7\", \"THREE_D_SATELLITE_AND_GROUND_BASED_AUGMENTATION\":\"8\"}","[0.6213712, 0]"] diff --git a/feeder/feeder-template/feederv3/feederv3.go b/feeder/feeder-template/feederv3/feederv3.go new file mode 100644 index 00000000..058942bb --- /dev/null +++ b/feeder/feeder-template/feederv3/feederv3.go @@ -0,0 +1,610 @@ +/** +* (C) 2024 Ford Motor Company +* +* All files and artifacts are licensed under the provisions of the license provided by the LICENSE file in this repository. +* +**/ + +package main + +import ( + "database/sql" + "encoding/json" + "github.com/akamensky/argparse" + "github.com/bradfitz/gomemcache/memcache" + "github.com/covesa/vissr/utils" + "github.com/go-redis/redis" + _ "github.com/mattn/go-sqlite3" + "math/rand" + "net" + "os" + "os/exec" + "sort" + "strconv" + "strings" + "slices" + "time" +) + +type DomainData struct { + Name string + Value string +} + +type DataItem struct { + Path string `json:"path"` + Dp []DpItem `json:"dp"` +} + +type DpItem struct { + Ts string `json:"ts"` + Value string `json:"value"` +} + +var tripData []DataItem +var simulatedSource string + +type FeederMap struct { + MapIndex uint16 + Name string + Type int8 + Datatype int8 + ConvertIndex uint16 +} + +var scalingDataList []string + +var redisClient *redis.Client +var memcacheClient *memcache.Client +var dbHandle *sql.DB +var stateDbType string + +var notificationList []string + +func fileExists(filename string) bool { + info, err := os.Stat(filename) + if os.IsNotExist(err) { + return false + } + return !info.IsDir() +} + +func readscalingDataList(listFilename string) []string { + if !fileExists(listFilename) { + utils.Error.Printf("readscalingDataList: The file %s does not exist.", listFilename) + return nil + } + data, err := os.ReadFile(listFilename) + if err != nil { + utils.Error.Printf("readscalingDataList:Error reading %s: %s", listFilename, err) + return nil + } + var convertData []string + err = json.Unmarshal([]byte(data), &convertData) + if err != nil { + utils.Error.Printf("readscalingDataList:Error unmarshal json=%s", err) + return nil + } + return convertData +} + +func readFeederMap(mapFilename string) []FeederMap { + var feederMap []FeederMap + treeFp, err := os.OpenFile(mapFilename, os.O_RDONLY, 0644) + if err != nil { + utils.Error.Printf("Could not open %s for reading map data", mapFilename) + return nil + } + for { + mapElement := readElement(treeFp) + if mapElement.Name == "" { + break + } + feederMap = append(feederMap, mapElement) + } + treeFp.Close() + return feederMap +} + +// The reading order must be aligned with the reading order by the Domain Conversion Tool +func readElement(treeFp *os.File) FeederMap { + var feederMap FeederMap + feederMap.MapIndex = deSerializeUInt(readBytes(2, treeFp)).(uint16) + //utils.Info.Printf("feederMap.MapIndex=%d\n", feederMap.MapIndex) + + NameLen := deSerializeUInt(readBytes(1, treeFp)).(uint8) + feederMap.Name = string(readBytes((uint32)(NameLen), treeFp)) + //utils.Info.Printf("NameLen=%d\n", NameLen) + //utils.Info.Printf("feederMap.Name=%s\n", feederMap.Name) + + feederMap.Type = (int8)(deSerializeUInt(readBytes(1, treeFp)).(uint8)) + //utils.Info.Printf("feederMap.Type=%d\n", feederMap.Type) + + feederMap.Datatype = (int8)(deSerializeUInt(readBytes(1, treeFp)).(uint8)) + //utils.Info.Printf("feederMap.Datatype=%d\n", feederMap.Datatype) + + feederMap.ConvertIndex = deSerializeUInt(readBytes(2, treeFp)).(uint16) + //utils.Info.Printf("feederMap.ConvertIndex=%d\n", feederMap.ConvertIndex) + + return feederMap +} + +func readBytes(numOfBytes uint32, treeFp *os.File) []byte { + if numOfBytes > 0 { + buf := make([]byte, numOfBytes) + treeFp.Read(buf) + return buf + } + return nil +} + +func deSerializeUInt(buf []byte) interface{} { + switch len(buf) { + case 1: + var intVal uint8 + intVal = (uint8)(buf[0]) + return intVal + case 2: + var intVal uint16 + intVal = (uint16)((uint16)((uint16)(buf[1])*256) + (uint16)(buf[0])) + return intVal + case 4: + var intVal uint32 + intVal = (uint32)((uint32)((uint32)(buf[3])*16777216) + (uint32)((uint32)(buf[2])*65536) + (uint32)((uint32)(buf[1])*256) + (uint32)(buf[0])) + return intVal + default: + utils.Error.Printf("Buffer length=%d is of an unknown size", len(buf)) + return nil + } +} + +func initVSSInterfaceMgr(inputChan chan DomainData, outputChan chan DomainData) { + os.Remove("/var/tmp/vissv2/serverFeeder.sock") + listener, err := net.Listen("unix", "/var/tmp/vissv2/serverFeeder.sock") //the file must be the same as declared in the feeder-registration.json that the service mgr reads + if err != nil { + utils.Error.Printf("udsReader:UDS listen failed, err = %s", err) + os.Exit(-1) + } + conn, err := listener.Accept() + if err != nil { + utils.Error.Printf("udsReader:UDS accept failed, err = %s", err) + os.Exit(-1) + } + udsChan := make(chan string) + go udsReader(conn, inputChan, udsChan) + go udsWriter(conn, udsChan) + for { + select { + case outData := <-outputChan: +if outData.Name == "Vehicle.TripMeterReading" { + utils.Info.Printf("Data written to statestorage: Name=%s, Value=%s", outData.Name, outData.Value) +} + if len(outData.Name) == 0 { + continue + } + status := statestorageSet(outData.Name, outData.Value, utils.GetRfcTime()) + if status != 0 { + utils.Error.Printf("initVSSInterfaceMgr():State storage write failed") + } else { + if onNotificationList(outData.Name) != -1 { + message := `{"action": "subscription", "path":"` + outData.Name + `"}` + udsChan <- message + } + } + } + } +} + +func statestorageSet(path string, val string, ts string) int { + switch stateDbType { + case "sqlite": + stmt, err := dbHandle.Prepare("UPDATE VSS_MAP SET c_value=?, c_ts=? WHERE `path`=?") + if err != nil { + utils.Error.Printf("Could not prepare for statestorage updating, err = %s", err) + return -1 + } + defer stmt.Close() + + _, err = stmt.Exec(val, ts, path) + if err != nil { + utils.Error.Printf("Could not update statestorage, err = %s", err) + return -1 + } + return 0 + case "redis": + dp := `{"value":"` + val + `", "ts":"` + ts + `"}` + err := redisClient.Set(path, dp, time.Duration(0)).Err() + if err != nil { + utils.Error.Printf("Job failed. Err=%s", err) + return -1 + } + return 0 + case "memcache": + dp := `{"value":"` + val + `", "ts":"` + ts + `"}` + err := memcacheClient.Set(&memcache.Item{Key: path, Value: []byte(dp)}) + if err != nil { + utils.Error.Printf("Job failed. Err=%s", err) + return -1 + } + return 0 + } + return -1 +} + +func udsReader(conn net.Conn, inputChan chan DomainData, udsChan chan string) { + defer conn.Close() + buf := make([]byte, 512) + for { + n, err := conn.Read(buf) + if err != nil { + utils.Error.Printf("udsReader:Read failed, err = %s", err) + continue + } + utils.Info.Printf("udsReader:Server message: %s", string(buf[:n])) + var serverMessageMap map[string]interface{} + err = json.Unmarshal(buf[:n], &serverMessageMap) + if err != nil { + utils.Error.Printf("udsReader:Unmarshal error=%s", err) + continue + } + if serverMessageMap["action"] != nil { + switch serverMessageMap["action"].(string) { + case "set": + domainData, _ := splitToDomainDataAndTs(serverMessageMap["data"].(map[string]interface{})) + inputChan <- domainData + case "subscribe": + pathList := serverMessageMap["path"].([]interface{}) + for i := 0; i < len(pathList); i++ { + if onNotificationList(pathList[i].(string)) == -1 { + notificationList = append(notificationList, pathList[i].(string)) + } + } + response := `{"action": "subscribe", "status": "ok"}` + udsChan <- response + case "unsubscribe": + pathList := serverMessageMap["path"].([]interface{}) + for i := 0; i < len(pathList); i++ { + if onNotificationList(pathList[i].(string)) != -1 { + notificationList = slices.Delete(notificationList, i, i+1) + } + } + default: + utils.Error.Printf("udsReader:Message action unknown = %s", serverMessageMap["action"].(string)) + } + } + } +} + +func udsWriter(conn net.Conn, udsChan chan string) { + defer conn.Close() + for { + select { + case message := <-udsChan: + utils.Info.Printf("udsWriter:Server message: %s", message) + _, err := conn.Write([]byte(message)) + if err != nil { + utils.Error.Printf("udsWriter:Write failed, err = %s", err) + } + } + } +} + +func onNotificationList(path string) int { + for i := 0; i < len(notificationList); i++ { + if notificationList[i] == path { + return i + } + } + return -1 +} + +func splitToDomainDataAndTs(serverMessageMap map[string]interface{}) (DomainData, string) { // server={"dp": {"ts": "Z","value": "Y"},"path": "X"}, redis={"value":"xxx", "ts":"zzz"} + var domainData DomainData + domainData.Name = serverMessageMap["path"].(string) + dpMap := serverMessageMap["dp"].(map[string]interface{}) + domainData.Value = dpMap["value"].(string) + return domainData, dpMap["ts"].(string) +} + +type simulateDataCtx struct { + RandomSim bool // true=random, false=stepwise change of signal written to + Fmap []FeederMap // used for random simulation + Path string // signal written to + SetVal string // value written + Iteration int +} + +func initVehicleInterfaceMgr(fMap []FeederMap, inputChan chan DomainData, outputChan chan DomainData) { + var simCtx simulateDataCtx + simCtx.RandomSim = true + simCtx.Fmap = fMap + dpIndex := 0 + for { + select { + case outData := <-outputChan: + utils.Info.Printf("Data for calling the vehicle interface: Name=%s, Value=%s", outData.Name, outData.Value) + simCtx.RandomSim = false + simCtx.Path = outData.Name + simCtx.SetVal = outData.Value + simCtx.Iteration = 0 + + default: + if simulatedSource == "internal" { + time.Sleep(3 * time.Second) // not to overload input channel + inputChan <- simulateInput(&simCtx) // simulating signals read from the vehicle interface + } else { + time.Sleep(1 * time.Second) // set to the tripdata "time base" + dataPoint := getSimulatedDataPoints(dpIndex) + for i := 0; i < len(dataPoint); i++ { + inputChan <- dataPoint[i] + } + dpIndex = incDpIndex(dpIndex) + } + } + } +} + +func simulateInput(simCtx *simulateDataCtx) DomainData { + var input DomainData + if simCtx.RandomSim == true { + return selectRandomInput(simCtx.Fmap) + } + if simCtx.Iteration == 10 { + simCtx.RandomSim = true + } + input.Name = simCtx.Path + input.Value = calcInputValue(simCtx.Iteration, simCtx.SetVal) + simCtx.Iteration++ + return input +} + +func calcInputValue(iteration int, setValue string) string { + setVal, _ := strconv.Atoi(setValue) + newVal := setVal - 10 + iteration + return strconv.Itoa(newVal) +} + +func selectRandomInput(fMap []FeederMap) DomainData { + var domainData DomainData + signalIndex := getRandomVssfMapIndex(fMap) + domainData.Name = fMap[signalIndex].Name + if fMap[signalIndex].Datatype == 0 { // uint8, maybe allowed... + domainData.Value = strconv.Itoa(rand.Intn(10)) + } else if fMap[signalIndex].Datatype == 9 { // double, maybe lat/long + domainData.Value = strconv.Itoa(rand.Intn(90)) + } else if fMap[signalIndex].Datatype == 10 { // bool + domainData.Value = strconv.Itoa(rand.Intn(2)) + } else { + domainData.Value = strconv.Itoa(rand.Intn(1000)) + } +// utils.Info.Printf("Simulated data from Vehicle interface: Name=%s, Value=%s", domainData.Name, domainData.Value) + return domainData +} + +func getRandomVssfMapIndex(fMap []FeederMap) int { + signalIndex := rand.Intn(len(fMap)) + for strings.Contains(fMap[signalIndex].Name, ".") { // assuming vehicle if names do not contain dot... + signalIndex = (signalIndex + 1) % (len(fMap) - 1) + } + return signalIndex +} + +func readSimulatedData(fname string) []DataItem { + if !fileExists(fname) { + utils.Error.Printf("readSimulatedData: The file %s does not exist.", fname) + return nil + } + data, err := os.ReadFile(fname) + if err != nil { + utils.Error.Printf("readSimulatedData:Error reading %s: %s", fname, err) + return nil + } + err = json.Unmarshal([]byte(data), &tripData) + if err != nil { + utils.Error.Printf("readSimulatedData:Error unmarshal json=%s", err) + return nil + } + return tripData +} + +func getSimulatedDataPoints(dpIndex int) []DomainData { + dataPoint := make([]DomainData, len(tripData)) + for i := 0; i < len(tripData); i++ { + dataPoint[i].Name = tripData[i].Path + dataPoint[i].Value = tripData[i].Dp[dpIndex].Value + } + return dataPoint +} + +func incDpIndex(index int) int { + index++ + if index == len(tripData[0].Dp) { + return 0 + } + return index +} + +func convertDomainData(north2SouthConv bool, inData DomainData, feederMap []FeederMap) DomainData { + var outData DomainData + matchIndex := sort.Search(len(feederMap), func(i int) bool { return feederMap[i].Name >= inData.Name }) + if matchIndex == len(feederMap) || feederMap[matchIndex].Name != inData.Name { + utils.Error.Printf("convertDomainData:Failed to map= %s", inData.Name) + return outData + } + outData.Name = feederMap[feederMap[matchIndex].MapIndex].Name + outData.Value = convertValue(inData.Value, feederMap[matchIndex].ConvertIndex, + feederMap[matchIndex].Datatype, feederMap[feederMap[matchIndex].MapIndex].Datatype, north2SouthConv) + return outData +} + +func convertValue(value string, convertIndex uint16, inDatatype int8, outDatatype int8, north2SouthConv bool) string { + switch convertIndex { + case 0: // no conversion + return value + default: // call to conversion method + var convertDataMap interface{} + err := json.Unmarshal([]byte(scalingDataList[convertIndex-1]), &convertDataMap) + if err != nil { + utils.Error.Printf("convertValue:Error unmarshal scalingDataList item=%s", scalingDataList[convertIndex-1]) + return "" + } + switch vv := convertDataMap.(type) { + case map[string]interface{}: + return enumConversion(vv, north2SouthConv, value) + case interface{}: + return linearConversion(vv.([]interface{}), north2SouthConv, value) + default: + utils.Error.Printf("convertValue: convert data=%s has unknown format.", scalingDataList[convertIndex-1]) + } + } + return "" +} + +func enumConversion(enumObj map[string]interface{}, north2SouthConv bool, inValue string) string { // enumObj = {"Key1":"value1", .., "KeyN":"valueN"}, k is VSS value + for k, v := range enumObj { + if north2SouthConv { + if k == inValue { + return v.(string) + } + } else { + if v.(string) == inValue { + return k + } + } + } + utils.Error.Printf("enumConversion: value=%s is out of range.", inValue) + return "" +} + +func linearConversion(coeffArray []interface{}, north2SouthConv bool, inValue string) string { // coeffArray = [A, B], y = Ax +B, y is VSS value + var A float64 + var B float64 + var x float64 + var err error + if x, err = strconv.ParseFloat(inValue, 64); err != nil { + utils.Error.Printf("linearConversion: input value=%s cannot be converted to float.", inValue) + return "" + } + A = coeffArray[0].(float64) + B = coeffArray[1].(float64) + var y float64 + if north2SouthConv { + y = A*x + B + } else { + y = (x - B) / A + } + return strconv.FormatFloat(y, 'f', -1, 32) +} + +func main() { + // Create new parser object + parser := argparse.NewParser("print", "Data feeder template version 2") + mapFile := parser.String("m", "mapfile", &argparse.Options{ + Required: false, + Help: "VSS-Vehicle mapping data filename", + Default: "VssVehicle.cvt"}) + sclDataFile := parser.String("s", "scldatafile", &argparse.Options{ + Required: false, + Help: "VSS-Vehicle scaling data filename", + Default: "VssVehicleScaling.json"}) + logFile := parser.Flag("", "logfile", &argparse.Options{Required: false, Help: "outputs to logfile in ./logs folder"}) + logLevel := parser.Selector("", "loglevel", []string{"trace", "debug", "info", "warn", "error", "fatal", "panic"}, &argparse.Options{ + Required: false, + Help: "changes log output level", + Default: "info"}) + simSource := parser.Selector("i", "simsource", []string{"vssjson", "internal"}, &argparse.Options{Required: false, + Help: "Simulator source must be either vssjson, or internal", Default: "internal"}) // "vehiclejson" could be added for non-converted simulator data + stateDB := parser.Selector("d", "statestorage", []string{"sqlite", "redis", "memcache", "none"}, &argparse.Options{Required: false, + Help: "Statestorage must be either sqlite, redis, memcache, or none", Default: "redis"}) + dbFile := parser.String("f", "dbfile", &argparse.Options{ + Required: false, + Help: "statestorage database filename", + Default: "../../server/vissv2server/serviceMgr/statestorage.db"}) + // Parse input + err := parser.Parse(os.Args) + if err != nil { + utils.Error.Print(parser.Usage(err)) + } + stateDbType = *stateDB + simulatedSource = *simSource + + utils.InitLog("feeder-log.txt", "./logs", *logFile, *logLevel) + + switch stateDbType { + case "sqlite": + var dbErr error + if utils.FileExists(*dbFile) { + dbHandle, dbErr = sql.Open("sqlite3", *dbFile) + if dbErr != nil { + utils.Error.Printf("Could not open state storage file = %s, err = %s", *dbFile, dbErr) + os.Exit(1) + } else { + utils.Info.Printf("SQLite state storage initialised.") + } + } else { + utils.Error.Printf("Could not find state storage file = %s", *dbFile) + } + case "redis": + redisClient = redis.NewClient(&redis.Options{ + Network: "unix", + Addr: "/var/tmp/vissv2/redisDB.sock", + Password: "", + DB: 1, + }) + err := redisClient.Ping().Err() + if err != nil { + utils.Error.Printf("Could not initialise redis DB, err = %s", err) + os.Exit(1) + } else { + utils.Info.Printf("Redis state storage initialised.") + } + case "memcache": + memcacheClient = memcache.New("/var/tmp/vissv2/memcacheDB.sock") + err := memcacheClient.Ping() + if err != nil { + utils.Info.Printf("Memcache daemon not alive. Trying to start it") + cmd := exec.Command("/usr/bin/bash", "memcacheNativeInit.sh") + err := cmd.Run() + if err != nil { + utils.Error.Printf("Memcache daemon startup failed, err=%s", err) + os.Exit(1) + } + } + utils.Info.Printf("Memcache daemon alive.") + default: + utils.Error.Printf("Unknown state storage type = %s", stateDbType) + os.Exit(1) + } + + vssInputChan := make(chan DomainData, 1) + vssOutputChan := make(chan DomainData, 1) + vehicleInputChan := make(chan DomainData, 1) + vehicleOutputChan := make(chan DomainData, 1) + + utils.Info.Printf("Initializing the feeder for mapping file %s.", *mapFile) + feederMap := readFeederMap(*mapFile) + if simulatedSource != "internal" { + tripData = readSimulatedData("tripdata.json") + if len(tripData) == 0 { + utils.Error.Printf("Tripdata file not found.") + os.Exit(1) + } + } + scalingDataList = readscalingDataList(*sclDataFile) + go initVSSInterfaceMgr(vssInputChan, vssOutputChan) + go initVehicleInterfaceMgr(feederMap, vehicleInputChan, vehicleOutputChan) + + for { + select { + case vssInData := <-vssInputChan: + vehicleOutputChan <- convertDomainData(true, vssInData, feederMap) + case vehicleInData := <-vehicleInputChan: + if simulatedSource != "vssjson" { + vssOutputChan <- convertDomainData(false, vehicleInData, feederMap) + } else { + //utils.Info.Printf("simulatedDataPoints:Path=%s, Value=%s", vehicleInData.Name, vehicleInData.Value) + vssOutputChan <- vehicleInData // conversion not needed + } + } + } +} diff --git a/feeder/feeder-template/feederv3/tripdata-test.json b/feeder/feeder-template/feederv3/tripdata-test.json new file mode 100644 index 00000000..39267ee8 --- /dev/null +++ b/feeder/feeder-template/feederv3/tripdata-test.json @@ -0,0 +1,5 @@ +[ + +{ "path": "Vehicle.Speed", "dp": [ { "ts": "0", "value": "0.0" }, { "ts": "0", "value": "1.0" }, { "ts": "0", "value": "2.0" }, { "ts": "0", "value": "3.0" }, { "ts": "0", "value": "0.0" }, { "ts": "0", "value": "1.0" }, { "ts": "0", "value": "2.0" }, { "ts": "0", "value": "3.0" }, { "ts": "0", "value": "0.0" }, { "ts": "0", "value": "1.0" }, { "ts": "0", "value": "2.0" }, { "ts": "0", "value": "3.0" }, { "ts": "0", "value": "0.0" }, { "ts": "0", "value": "1.0" }, { "ts": "0", "value": "2.0" }, { "ts": "0", "value": "3.0" } ]} + +] diff --git a/feeder/feeder-template/feederv3/tripdata.json b/feeder/feeder-template/feederv3/tripdata.json new file mode 100644 index 00000000..7ea6d42f --- /dev/null +++ b/feeder/feeder-template/feederv3/tripdata.json @@ -0,0 +1,9 @@ +[ + +{ "path": "Vehicle.TraveledDistance", "dp": [ { "ts": "2020-04-15T13:37:00Z", "value": "1000.0" }, { "ts": "2020-04-15T13:37:05Z", "value": "1000.0" }, { "ts": "2020-04-15T13:37:10Z", "value": "1001.0" }, { "ts": "2020-04-15T13:37:15Z", "value": "1001.0" } ]}, + +{ "path": "Vehicle.CurrentLocation.Longitude", "dp": [ { "ts": "2020-04-15T13:37:00Z", "value": "56.02024719364729" }, { "ts": "2020-04-15T13:37:00Z", "value": "56.02233748493814" }, { "ts": "2020-04-15T13:37:00Z", "value": "56.02565421151008" }, { "ts": "2020-04-15T13:37:05Z", "value": "56.02823595803967" } ]}, + +{ "path": "Vehicle.CurrentLocation.Latitude", "dp": [ { "ts": "2020-04-15T13:37:00Z", "value": "12.599927977070497" }, { "ts": "2020-04-15T13:37:00Z", "value": "12.601058355542794" }, { "ts": "2020-04-15T13:37:00Z", "value": "12.602554268256588" }, { "ts": "2020-04-15T13:37:05Z", "value": "12.603616368784676" } ]} + +] diff --git a/server/vissv2server/serviceMgr/curvelog.go b/server/vissv2server/serviceMgr/curvelog.go index 11679021..0de0f821 100644 --- a/server/vissv2server/serviceMgr/curvelog.go +++ b/server/vissv2server/serviceMgr/curvelog.go @@ -13,6 +13,7 @@ import ( "os" "sort" "strconv" + "slices" "sync" "time" @@ -30,7 +31,6 @@ type SubThreads struct { } var CLChannel chan CLPack -var threadsChan chan SubThreads var closeClSubId int = -1 var mcloseClSubId = &sync.Mutex{} @@ -78,9 +78,39 @@ type PostProcessBufElement3dim struct { } const MAXCLBUFSIZE = 240 // something large... -const MAXCLSESSIONS = 100 // This value depends on the HW memory and performance +const MAXCLSESSIONS = 25 // This value depends on the HW memory and performance +var clServerChan [MAXCLSESSIONS]chan string var numOfClSessions int = 0 +type TriggChannelElem struct { + Busy bool +} +var triggChannelList []TriggChannelElem + +type TriggRoutingElem struct { + Index int + Path []string +} + +type TriggRoutingData struct { + SubscriptionId string + TriggRoutingList []TriggRoutingElem +} + +var clRouterChan chan TriggRoutingData + +func initClResources() { + CLChannel = make(chan CLPack, 5) // allow some buffering... + for i := range clServerChan { + clServerChan[i] = make(chan string) + } + triggChannelList = make([]TriggChannelElem, MAXCLSESSIONS) + for i := 0; i < MAXCLSESSIONS; i++ { + triggChannelList[i].Busy = false + } + clRouterChan = make(chan TriggRoutingData) +} + func createRingBuffer(bufSize int) RingBuffer { var aRingBuffer RingBuffer aRingBuffer.bufSize = bufSize @@ -362,22 +392,32 @@ func is3dim(path string, index int, dim3List []Dim3Elem) bool { func getSleepDuration(newTime time.Time, oldTime time.Time, wantedDuration int) time.Duration { workDuration := newTime.Sub(oldTime) sleepDuration := time.Duration(wantedDuration) * time.Millisecond - return sleepDuration - workDuration + if sleepDuration - workDuration > 0 { + return sleepDuration - workDuration + } + return 1 // used by ticker that panics on <= 0 } -func curveLoggingServer(clChan chan CLPack, threadsChan chan SubThreads, subscriptionId int, opValue string, paths []string) { +func curveLoggingDispatcher(clChan chan CLPack, subscriptionId int, opValue string, paths []string) (TriggRoutingData, SubThreads) { maxError, bufSize := getCurveLoggingParams(opValue) if bufSize > MAXCLBUFSIZE { bufSize = MAXCLBUFSIZE } dim1List, dim2List, dim3List := populateDimLists(paths) + var triggRoutingData TriggRoutingData + triggRoutingData.SubscriptionId = strconv.Itoa(subscriptionId) + var triggRoutingElem TriggRoutingElem for i := 0; i < len(dim1List); i++ { if numOfClSessions > MAXCLSESSIONS { utils.Error.Printf("Curve logging: All resources are utilized.") break } returnSingleDp(clChan, subscriptionId, dim1List[i]) - go clCapture1dim(clChan, subscriptionId, dim1List[i], bufSize, maxError) + triggChannelIndex := allocateTriggChannelIndex() + go clCapture1dim(clChan, triggChannelIndex, subscriptionId, dim1List[i], bufSize, maxError) + triggRoutingElem.Index = triggChannelIndex + triggRoutingElem.Path = dim1List + triggRoutingData.TriggRoutingList = append(triggRoutingData.TriggRoutingList, triggRoutingElem) numOfClSessions++ } for i := 0; i < len(dim2List); i++ { @@ -386,7 +426,11 @@ func curveLoggingServer(clChan chan CLPack, threadsChan chan SubThreads, subscri break } returnSingleDp2(clChan, subscriptionId, dim2List[i]) - go clCapture2dim(clChan, subscriptionId, dim2List[i], bufSize, maxError) + triggChannelIndex := allocateTriggChannelIndex() + go clCapture2dim(clChan, triggChannelIndex, subscriptionId, dim2List[i], bufSize, maxError) + triggRoutingElem.Index = triggChannelIndex + triggRoutingElem.Path = dim1TransformDim2(dim2List[i]) + triggRoutingData.TriggRoutingList = append(triggRoutingData.TriggRoutingList, triggRoutingElem) numOfClSessions++ } for i := 0; i < len(dim3List); i++ { @@ -395,32 +439,163 @@ func curveLoggingServer(clChan chan CLPack, threadsChan chan SubThreads, subscri break } returnSingleDp3(clChan, subscriptionId, dim3List[i]) - go clCapture3dim(clChan, subscriptionId, dim3List[i], bufSize, maxError) + triggChannelIndex := allocateTriggChannelIndex() + go clCapture3dim(clChan, triggChannelIndex, subscriptionId, dim3List[i], bufSize, maxError) + triggRoutingElem.Index = triggChannelIndex + triggRoutingElem.Path = dim1TransformDim3(dim3List[i]) + triggRoutingData.TriggRoutingList = append(triggRoutingData.TriggRoutingList, triggRoutingElem) numOfClSessions++ } var subThreads SubThreads subThreads.NumofThreads = len(dim1List) + len(dim2List) + len(dim3List) subThreads.SubscriptionId = subscriptionId - threadsChan <- subThreads + return triggRoutingData, subThreads + +} + +func curveLogServer() { + var routingDataList []TriggRoutingData + for { + select { + case message := <- fromFeederCl: + if len(message) == 0 { + continue + } + var messageMap map[string]interface{} + err := json.Unmarshal([]byte(message), &messageMap) + if err != nil || messageMap["action"] == nil { + utils.Error.Printf("Error in trigg message=%s", message) + continue + } + switch messageMap["action"].(string) { + case "subscribe": + if messageMap["status"] != nil && messageMap["status"].(string) == "ok" { + // notify all existing compute threads + for i := 0; i < len(routingDataList); i++ { + for j := 0; j < len(routingDataList[i].TriggRoutingList); j++ { + clServerChan[routingDataList[i].TriggRoutingList[j].Index] <- message + } + } + } + case "unsubscribe": + if messageMap["subscriptionId"] != nil { + //remove from routingDataList + subscriptionId := messageMap["subscriptionId"].(string) + for i := 0; i < len(routingDataList); i++ { + if routingDataList[i].SubscriptionId == subscriptionId { + deallocateTriggChannels(i, routingDataList) + slices.Delete(routingDataList, i, i+1) + } + } + } + case "subscription": + if messageMap["path"] != nil { + // notify all threads that use the path + path := messageMap["path"].(string) + for j := 0; j < len(routingDataList); j++ { + for k := 0; k < len(routingDataList[j].TriggRoutingList); k++ { + for l := 0; l < len(routingDataList[j].TriggRoutingList[k].Path); l++ { + if routingDataList[j].TriggRoutingList[k].Path[l] == path { + clServerChan[routingDataList[j].TriggRoutingList[k].Index] <- message + break + } + } + } + } + } + default: + utils.Error.Printf("Unknown action=%s", messageMap["action"].(string)) + } + case routingData := <- clRouterChan: + routingDataList = append(routingDataList, routingData) + } + } +} + +func dim1TransformDim2(dim2List Dim2Elem) []string { + dimList := make([]string, 2) + dimList[0] = dim2List.Path1 + dimList[1] = dim2List.Path2 + return dimList +} +func dim1TransformDim3(dim3List Dim3Elem) []string { + dimList := make([]string, 3) + dimList[0] = dim3List.Path1 + dimList[1] = dim3List.Path2 + dimList[2] = dim3List.Path3 + return dimList } -func clCapture1dim(clChan chan CLPack, subscriptionId int, path string, bufSize int, maxError float64) { +func allocateTriggChannelIndex() int { + for i := 0; i < MAXCLSESSIONS; i++ { + if !triggChannelList[i].Busy { + triggChannelList[i].Busy = true + return i + } + } + return -1 +} + +func deallocateTriggChannels(i int, routingDataList []TriggRoutingData) { + for j := 0; j < len(routingDataList[i].TriggRoutingList); j++ { + triggChannelList[routingDataList[i].TriggRoutingList[j].Index].Busy = false + } +} + +func decodeFeederMessageCl(feederMessage string, feederNotification bool) (bool, bool) { + if len(feederMessage) == 0 { + return false, feederNotification + } + var messageMap map[string]interface{} + err := json.Unmarshal([]byte(feederMessage), &messageMap) + if err != nil || messageMap["action"] == nil { + utils.Error.Printf("Error in feeder message=%s", feederMessage) + return false, feederNotification + } + doCapture := false + switch messageMap["action"].(string) { + case "subscribe": + if messageMap["status"] != nil && messageMap["status"].(string) == "ok" { + feederNotification = true + } + case "subscription": + if messageMap["path"] != nil { + doCapture = true + } + default: + utils.Error.Printf("Unknown action=%s", messageMap["action"].(string)) + } + return doCapture, feederNotification +} + +func clCapture1dim(clChan chan CLPack, triggChannelIndex int, subscriptionId int, path string, bufSize int, maxError float64) { aRingBuffer := createRingBuffer(bufSize + 1) // logic requires buffer to have a size of one larger than needed var dpMap = make(map[string]interface{}) closeClSession := false - oldTime := getCurrentUtcTime() + oldTime := getCurrentUtcTime() // captureTicker start/reset time + captureTicker := time.NewTicker(10 * time.Millisecond) + feederNotification := false + var doCapture bool lastSelected := 0 // index into ringBuffer; zero points to last dp stored in buffer, increasing values goes backwards in time postProc := make([]PostProcessBufElement1dim, 3) // { CLBufElement{0, 0}, "", -1, CLBufElement{0, 0}, "", -1, CLBufElement{0, 0}, "", -1 } for { - newTime := getCurrentUtcTime() - sleepPeriod := getSleepDuration(newTime, oldTime, 90) // TODO: Iteration period should be configurable, set to less than sample freq of signal. - if sleepPeriod < 0 { - utils.Warning.Printf("Curve logging may have missed to capture.") + select { + case <- captureTicker.C: + if !feederNotification { + newTime := getCurrentUtcTime() + captureTicker.Reset(getSleepDuration(newTime, oldTime, 900)) // 90 msec too slow? + oldTime = newTime + } else { + captureTicker.Stop() + } + case feederMessage := <- clServerChan[triggChannelIndex]: + doCapture, feederNotification = decodeFeederMessageCl(feederMessage, feederNotification) + if !doCapture { + continue + } } - time.Sleep(sleepPeriod) - oldTime = getCurrentUtcTime() mcloseClSubId.Lock() if closeClSubId == subscriptionId { closeClSession = true @@ -665,22 +840,33 @@ func returnSingleDp3(clChan chan CLPack, subscriptionId int, paths Dim3Elem) { clChan <- clPack } -func clCapture2dim(clChan chan CLPack, subscriptionId int, paths Dim2Elem, bufSize int, maxError float64) { +func clCapture2dim(clChan chan CLPack,triggChannelIndex int, subscriptionId int, paths Dim2Elem, bufSize int, maxError float64) { aRingBuffer1 := createRingBuffer(bufSize + 1) aRingBuffer2 := createRingBuffer(bufSize + 1) var dpMap1 = make(map[string]interface{}) var dpMap2 = make(map[string]interface{}) closeClSession := false - oldTime := getCurrentUtcTime() + oldTime := getCurrentUtcTime() // captureTicker start/reset time + captureTicker := time.NewTicker(10 * time.Millisecond) + feederNotification := false + var doCapture bool updatedTail := 0 for { - newTime := getCurrentUtcTime() - sleepPeriod := getSleepDuration(newTime, oldTime, 800) // TODO: Iteration period should be configurable, set to less than sample freq of signal. - if sleepPeriod < 0 { - utils.Warning.Printf("Curve logging may have missed to capture.") + select { + case <- captureTicker.C: + if !feederNotification { + newTime := getCurrentUtcTime() + captureTicker.Reset(getSleepDuration(newTime, oldTime, 90)) // 90 msec sufficient? + oldTime = newTime + } else { + captureTicker.Stop() + } + case feederMessage := <- clServerChan[triggChannelIndex]: + doCapture, feederNotification = decodeFeederMessageCl(feederMessage, feederNotification) + if !doCapture { + continue + } } - time.Sleep(sleepPeriod) - oldTime = getCurrentUtcTime() mcloseClSubId.Lock() if closeClSubId == subscriptionId { closeClSession = true @@ -786,7 +972,7 @@ func clReduction2Dim(clBuffer1 []CLBufElement, clBuffer2 []CLBufElement, firstIn return nil } -func clCapture3dim(clChan chan CLPack, subscriptionId int, paths Dim3Elem, bufSize int, maxError float64) { +func clCapture3dim(clChan chan CLPack, triggChannelIndex int, subscriptionId int, paths Dim3Elem, bufSize int, maxError float64) { aRingBuffer1 := createRingBuffer(bufSize + 1) aRingBuffer2 := createRingBuffer(bufSize + 1) aRingBuffer3 := createRingBuffer(bufSize + 1) @@ -794,16 +980,27 @@ func clCapture3dim(clChan chan CLPack, subscriptionId int, paths Dim3Elem, bufSi var dpMap2 = make(map[string]interface{}) var dpMap3 = make(map[string]interface{}) closeClSession := false - oldTime := getCurrentUtcTime() + oldTime := getCurrentUtcTime() // captureTicker start/reset time + captureTicker := time.NewTicker(10 * time.Millisecond) + feederNotification := false + var doCapture bool updatedTail := 0 for { - newTime := getCurrentUtcTime() - sleepPeriod := getSleepDuration(newTime, oldTime, 800) // TODO: Iteration period should be configurable, set to less than sample freq of signal. - if sleepPeriod < 0 { - utils.Warning.Printf("Curve logging may have missed to capture.") + select { + case <- captureTicker.C: + if !feederNotification { + newTime := getCurrentUtcTime() + captureTicker.Reset(getSleepDuration(newTime, oldTime, 90)) // 90 msec sufficient? + oldTime = newTime + } else { + captureTicker.Stop() + } + case feederMessage := <- clServerChan[triggChannelIndex]: + doCapture, feederNotification = decodeFeederMessageCl(feederMessage, feederNotification) + if !doCapture { + continue + } } - time.Sleep(sleepPeriod) - oldTime = getCurrentUtcTime() mcloseClSubId.Lock() if closeClSubId == subscriptionId { closeClSession = true diff --git a/server/vissv2server/serviceMgr/serviceMgr.go b/server/vissv2server/serviceMgr/serviceMgr.go index 0de0fa01..2c9ce85f 100644 --- a/server/vissv2server/serviceMgr/serviceMgr.go +++ b/server/vissv2server/serviceMgr/serviceMgr.go @@ -28,6 +28,7 @@ import ( "os/exec" "strconv" "strings" + "slices" "time" ) @@ -59,6 +60,24 @@ type HistoryList struct { var historyList []HistoryList var historyAccessChannel chan string +// for feeder notifications +var toFeeder chan string // requests +var fromFeederRorC chan string //range and change messages +var fromFeederCl chan string // curvelog messages + +type FeederSubElem struct { + SubscriptionId string + Path []string + Variant string +} +var feederSubList []FeederSubElem + +type FeederPathElem struct { + Path string + Reference int +} +var feederPathList []FeederPathElem + //var feederConn net.Conn //var hostIp string @@ -204,20 +223,20 @@ func setSubscriptionListThreads(subscriptionList []SubscriptionState, subThreads return subscriptionList } -func checkRangeChangeFilter(filterList []utils.FilterObject, latestDataPoint string, path string) (bool, bool, string) { +func checkRangeChangeFilter(filterList []utils.FilterObject, latestDataPoint string, path string) (bool, string) { for i := 0; i < len(filterList); i++ { if filterList[i].Type == "paths" || filterList[i].Type == "timebased" || filterList[i].Type == "curvelog" { continue } currentDataPoint := getVehicleData(path) if filterList[i].Type == "range" { - return evaluateRangeFilter(filterList[i].Parameter, getDPValue(currentDataPoint)), false, currentDataPoint // do not update latestValue + return evaluateRangeFilter(filterList[i].Parameter, getDPValue(currentDataPoint)), currentDataPoint // do not update latestValue } if filterList[i].Type == "change" { return evaluateChangeFilter(filterList[i].Parameter, getDPValue(latestDataPoint), getDPValue(currentDataPoint), currentDataPoint) } } - return false, false, "" + return false, "" } func getDPValue(dp string) string { @@ -264,13 +283,13 @@ func evaluateRangeFilter(opValue string, currentValue string) bool { } evaluation := true for i := 0; i < len(rangeFilter); i++ { - eval, _ := compareValues(rangeFilter[i].LogicOp, rangeFilter[i].Boundary, currentValue, "0") // currVal - 0 logic-op boundary + eval := compareValues(rangeFilter[i].LogicOp, rangeFilter[i].Boundary, currentValue, "0") // currVal - 0 logic-op boundary evaluation = evaluation && eval } return evaluation } -func evaluateChangeFilter(opValue string, latestValue string, currentValue string, currentDataPoint string) (bool, bool, string) { +func evaluateChangeFilter(opValue string, latestValue string, currentValue string, currentDataPoint string) (bool, string) { //utils.Info.Printf("evaluateChangeFilter: opValue=%s", opValue) type ChangeFilter struct { LogicOp string `json:"logic-op"` @@ -280,17 +299,17 @@ func evaluateChangeFilter(opValue string, latestValue string, currentValue strin err := json.Unmarshal([]byte(opValue), &changeFilter) if err != nil { utils.Error.Printf("evaluateChangeFilter: Unmarshal error=%s", err) - return false, false, "" + return false, "" } - val1, val2 := compareValues(changeFilter.LogicOp, latestValue, currentValue, changeFilter.Diff) - return val1, val2, currentDataPoint + val1 := compareValues(changeFilter.LogicOp, latestValue, currentValue, changeFilter.Diff) + return val1, currentDataPoint } -func compareValues(logicOp string, latestValue string, currentValue string, diff string) (bool, bool) { +func compareValues(logicOp string, latestValue string, currentValue string, diff string) bool { latestValueType := utils.AnalyzeValueType(latestValue) if latestValueType != utils.AnalyzeValueType(currentValue) { utils.Error.Printf("compareValues: Incompatible types, latVal=%s, curVal=%s", latestValue, currentValue) - return false, false + return false } switch latestValueType { case 0: @@ -298,82 +317,82 @@ func compareValues(logicOp string, latestValue string, currentValue string, diff case 2: // bool if diff != "0" { utils.Error.Printf("compareValues: invalid parameter for boolean type") - return false, false + return false } switch logicOp { case "eq": - return currentValue == latestValue, true + return currentValue == latestValue case "ne": - return currentValue != latestValue, true // true->false OR false->true + return currentValue != latestValue // true->false OR false->true case "gt": - return latestValue == "false" && currentValue != latestValue, true // false->true + return latestValue == "false" && currentValue != latestValue // false->true case "lt": - return latestValue == "true" && currentValue != latestValue, true // true->false + return latestValue == "true" && currentValue != latestValue // true->false } - return false, false + return false case 1: // int curVal, err := strconv.Atoi(currentValue) if err != nil { - return false, false + return false } latVal, err := strconv.Atoi(latestValue) if err != nil { - return false, false + return false } diffVal, err := strconv.Atoi(diff) if err != nil { - return false, false + return false } - //utils.Info.Printf("compareValues: value type=integer, cv=%d, lv=%d, diff=%d, logicOp=%s", curVal, latVal, diffVal, logicOp) +// utils.Info.Printf("compareValues: value type=integer, cv=%d, lv=%d, diff=%d, logicOp=%s", curVal, latVal, diffVal, logicOp) switch logicOp { case "eq": - return curVal-diffVal == latVal, false + return curVal-diffVal == latVal case "ne": - return curVal-diffVal != latVal, false + return curVal-diffVal != latVal case "gt": - return curVal-diffVal > latVal, false + return curVal-diffVal > latVal case "gte": - return curVal-diffVal >= latVal, false + return curVal-diffVal >= latVal case "lt": - return curVal-diffVal < latVal, false + return curVal-diffVal < latVal case "lte": - return curVal-diffVal <= latVal, false + return curVal-diffVal <= latVal } - return false, false + return false case 3: // float f64Val, err := strconv.ParseFloat(currentValue, 32) if err != nil { - return false, false + return false } curVal := float32(f64Val) f64Val, err = strconv.ParseFloat(latestValue, 32) if err != nil { - return false, false + return false } latVal := float32(f64Val) f64Val, err = strconv.ParseFloat(diff, 32) if err != nil { - return false, false + return false } diffVal := float32(f64Val) //utils.Info.Printf("compareValues: value type=float, cv=%d, lv=%d, diff=%d, logicOp=%s", curVal, latVal, diffVal, logicOp) switch logicOp { case "eq": - return curVal-diffVal == latVal, false + return curVal-diffVal == latVal case "ne": - return curVal-diffVal != latVal, false + return curVal-diffVal != latVal case "gt": - return curVal-diffVal > latVal, false + return curVal-diffVal > latVal case "gte": - return curVal-diffVal >= latVal, false + return curVal-diffVal >= latVal case "lt": - return curVal-diffVal < latVal, false + return curVal-diffVal < latVal case "lte": - return curVal-diffVal <= latVal, false + return curVal-diffVal <= latVal } - return false, false + return false } - return false, false + return false } func addPackage(incompleteMessage string, packName string, packValue string) string { @@ -456,7 +475,25 @@ func getCurveLoggingParams(opValue string) (float64, int) { // {"maxerr": "X", " return maxErr, bufSize } -func activateIfIntervalOrCL(filterList []utils.FilterObject, subscriptionChan chan int, CLChan chan CLPack, subscriptionId int, paths []string) { +func createFeederNotifyMessage(variant string, pathList []string, subscriptionId int) string { + paths := `["` + for i := 0; i < len(pathList); i++ { + paths += pathList[i] + `", "` + } + paths = paths[:len(paths)-3] + "]" + return `{"action": "subscribe", "variant": "` + variant + `", "path": ` + paths + `, "subscriptionId": "` + strconv.Itoa(subscriptionId) + `"}` +} + +func getFeederNotifyType(filterList []utils.FilterObject) string { + for i := 0; i < len(filterList); i++ { + if filterList[i].Type == "curvelog" || filterList[i].Type == "change" || filterList[i].Type == "range" { + return filterList[i].Type + } + } + return "" +} + +func activateIfIntervalOrCL(filterList []utils.FilterObject, subscriptionChan chan int, CLChan chan CLPack, subscriptionId int, paths []string, subscriptionList []SubscriptionState) []SubscriptionState { for i := 0; i < len(filterList); i++ { if filterList[i].Type == "timebased" { interval := getIntervalPeriod(filterList[i].Parameter) @@ -467,10 +504,13 @@ func activateIfIntervalOrCL(filterList []utils.FilterObject, subscriptionChan ch break } if filterList[i].Type == "curvelog" { - go curveLoggingServer(CLChan, threadsChan, subscriptionId, filterList[i].Parameter, paths) + clRoutingList, subThreads := curveLoggingDispatcher(CLChan, subscriptionId, filterList[i].Parameter, paths) + subscriptionList = setSubscriptionListThreads(subscriptionList, subThreads) + clRouterChan <- clRoutingList break } } + return subscriptionList } func getVehicleData(path string) string { // returns {"value":"Y", "ts":"Z"} @@ -570,7 +610,7 @@ func setVehicleData(path string, value string) string { case "memcache": fallthrough case "redis": - udsConn := utils.GetUdsConn(path, "serverFeeder") +/* udsConn := utils.GetUdsConn(path, "serverFeeder") if udsConn == nil { utils.Error.Printf("setVehicleData:Failed to UDS connect to feeder for path = %s", path) return "" @@ -580,7 +620,9 @@ func setVehicleData(path string, value string) string { if err != nil { utils.Error.Printf("setVehicleData:Write failed, err = %s", err) return "" - } + }*/ + message := `{"action": "set", "data": {"path":"` + path + `", "dp":{"value":"` + value + `", "ts":"` + ts + `"}}}` + toFeeder <- message return ts case "apache-iotdb": vssKey := []string{"`" + path + "`"} // Back-quote the VSS node for the DB insert, e.g. `Vehicle.CurrentLocation.Longitude` @@ -947,6 +989,218 @@ func getValidation(path string) string { return "read-write" //dummy return } +func feederFrontend(toFeeder chan string, fromFeederRorC chan string, fromFeederCl chan string) { + udsConn := utils.GetUdsConn("*", "serverFeeder") + if udsConn == nil { + utils.Error.Printf("feederFrontend:Failed to UDS connect to feeder.") + return // ??? + } + fromFeeder := make(chan string) + go feederReader(udsConn, fromFeeder) + feederNotification := "not-verified" // possible alues ["not-verified", "not-supported", "supported"] + subMessageCount := 0 + for { + select { + case message := <- toFeeder: + var messageMap map[string]interface{} + var feederUpdatePath string + var unsubscribePath []string + err := json.Unmarshal([]byte(message), &messageMap) + if err != nil || messageMap["action"] == nil { + utils.Error.Printf("feederFrontend:Feeder message corrupt, err = %s\nmessage=%s", err, message) + } else { + switch messageMap["action"].(string) { + case "subscribe": + if messageMap["subscriptionId"] != nil && messageMap["variant"] != nil && messageMap["path"] != nil { + feederSubList = addOnFeederSubList(messageMap["subscriptionId"].(string), messageMap["variant"].(string), messageMap["path"].([]interface{})) + feederPathList, feederUpdatePath = addOnFeederPathList(messageMap["path"].([]interface{})) + if feederNotification != "not-supported" { + if subMessageCount >= 5 { + feederNotification = "not-supported" + continue + } + message = `{"action": "subscribe", "path":` + feederUpdatePath + `}` + subMessageCount++ + } + } else { + utils.Error.Printf("feederFrontend:Feeder message corrupt, message=%s", message) + continue + } + case "unsubscribe": + if messageMap["subscriptionId"] != nil { + fromFeederCl <- message // needed for list purging + feederSubList, unsubscribePath = deleteOnFeederSubList(messageMap["subscriptionId"].(string)) + feederPathList, feederUpdatePath = deleteOnFeederPathList(unsubscribePath) + if len(feederUpdatePath) > 4 { + message = `{"action": "unsubscribe", "path":` + feederUpdatePath + `}` + } else { + continue + } + } else { + utils.Error.Printf("feederFrontend:Feeder message corrupt, message=%s", message) + continue + } + case "set": // send message as is to feeder + default: + utils.Error.Printf("feederFrontend:Feeder message action unknown, message=%s", message) + continue + } + _, err := udsConn.Write([]byte(message)) + if err != nil { + utils.Error.Printf("feederFrontend:Feeder write failed, err = %s", err) + } + } + case message := <- fromFeeder: + var messageMap map[string]interface{} + err := json.Unmarshal([]byte(message), &messageMap) + if err != nil || messageMap["action"] == nil { + utils.Error.Printf("feederFrontend:Feeder message corrupt, err = %s\nmessage=%s", err, message) + } else { + switch messageMap["action"].(string) { + case "subscribe": //response + if messageMap["status"] != nil { + if messageMap["status"].(string) == "ok" { + fromFeederRorC <- message + fromFeederCl <- message + feederNotification = "supported" + } else { + feederNotification = "not-supported" + } + } else { + utils.Error.Printf("feederFrontend:Feeder message corrupt, message=%s", message) + continue + } + case "subscription": //notification + if messageMap["path"] != nil { + variant := getSubscribeVariant(messageMap["path"].(string)) + if strings.Contains(variant, "change") || strings.Contains(variant, "range") { + fromFeederRorC <- message + } + if strings.Contains(variant, "curvelog") { + fromFeederCl <- message + } + } else { + utils.Error.Printf("feederFrontend:Feeder message corrupt, message=%s", message) + } + default: + utils.Error.Printf("feederFrontend:Feeder message action unknown, message=%s", message) + } + } + } + } +} + +func getSubscribeVariant(path string) string { + variants := "" + for i := 0; i < len(feederSubList); i++ { + for j := 0; j < len(feederSubList[i].Path); j++ { + if feederSubList[i].Path[j] == path { + if !strings.Contains(variants, feederSubList[i].Variant) { + variants += feederSubList[i].Variant + "+" + } + } + } + } + if len(variants) > 0 { + variants = variants[:len(variants)-1] + } + return variants +} + +func addOnFeederSubList(subscriptionId string, variant string, path []interface{}) []FeederSubElem { + var feederSubElem FeederSubElem + feederSubElem.SubscriptionId = subscriptionId + feederSubElem.Variant = variant + feederSubElem.Path = make([]string, len(path)) + for i := 0; i < len(path); i++ { + feederSubElem.Path[i] = path[i].(string) + } + feederSubList = append(feederSubList, feederSubElem) + return feederSubList +} + +func deleteOnFeederSubList(subscriptionId string) ([]FeederSubElem, []string) { + for i := 0; i < len(feederSubList); i++ { + if feederSubList[i].SubscriptionId == subscriptionId { + unsubscribePath := make([]string, len(feederSubList[i].Path)) + for j := 0; j < len(feederSubList[i].Path); j++ { + unsubscribePath[j] = feederSubList[i].Path[j] + } + return slices.Delete(feederSubList, i, i+1), unsubscribePath + } + } + return nil, nil +} + +func addOnFeederPathList(path []interface{}) ([]FeederPathElem, string) { + var feederPathElem FeederPathElem + var pathFound bool + feederUpdatePath := `["` + for i := 0; i < len(path); i++ { + pathFound = false + for j := 0; j < len(feederPathList); j++ { + if path [i] == feederPathList[j].Path { + feederPathList[j].Reference++ + pathFound = true + } + } + if !pathFound { + feederPathElem.Path = path[i].(string) + feederPathElem.Reference = 1 + feederPathList = append(feederPathList, feederPathElem) + feederUpdatePath += path[i].(string) + `", "` + } + } + if len(feederUpdatePath) > 2 { + feederUpdatePath = feederUpdatePath[:len(feederUpdatePath)-4] + } + return feederPathList, feederUpdatePath + `"]` +} + +func deleteOnFeederPathList(path []string) ([]FeederPathElem, string) { + feederUpdatePath := `["` + removeIndex := make([]int, len(path)) + k := 0 + for i := 0; i < len(path); i++ { + removeIndex[k] = -1 + for j := 0; j < len(feederPathList); j++ { + if path [i] == feederPathList[j].Path { + if feederPathList[j].Reference > 1 { + feederPathList[j].Reference-- + } else { + removeIndex[k] = j + k++ + } + } + } + k = 0 + for i := 0; i < len(path); i++ { + if removeIndex[k] == i { + feederUpdatePath += path[i] + `", "` + k++ + feederPathList = slices.Delete(feederPathList, i, i+1) + } + } + } + if len(feederUpdatePath) > 2 { + feederUpdatePath = feederUpdatePath[:len(feederUpdatePath)-4] + } + return feederPathList, feederUpdatePath + `"]` +} + +func feederReader(udsConn net.Conn, fromFeeder chan string) { + buf := make([]byte, 512) + for { + nr, err := udsConn.Read(buf) + if err != nil { + utils.Error.Printf("feederReader:Read failed, err = %s", err) + break + } else { + fromFeeder <- string(buf[:nr]) + } + } +} + func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType string, histSupport bool, dbFile string) { stateDbType = stateStorageType historySupport = histSupport @@ -967,7 +1221,7 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri utils.Error.Printf("Could not find state storage file = %s", dbFile) } case "redis": - addr := utils.GetUdsPath("Vehicle", "redis") + addr := utils.GetUdsPath("*", "redis") if len(addr) == 0 { utils.Error.Printf("redis-server socket address not found.") // os.Exit(1) should terminate the process @@ -1057,7 +1311,7 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri backendChan := make(chan string) subscriptionChan := make(chan int) historyAccessChannel = make(chan string) - CLChannel = make(chan CLPack, 5) // allow some buffering... + initClResources() subscriptionList := []SubscriptionState{} subscriptionId = 1 // do not start with zero! @@ -1068,11 +1322,18 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri if historySupport { go historyServer(historyAccessChannel, vss_data) } + toFeeder = make(chan string) + fromFeederRorC = make(chan string) + fromFeederCl = make(chan string) + go feederFrontend(toFeeder, fromFeederRorC, fromFeederCl) + go curveLogServer() var dummyTicker *time.Ticker if stateDbType != "none" { dummyTicker = time.NewTicker(47 * time.Millisecond) } - subscriptTicker := time.NewTicker(23 * time.Millisecond) //range/change subscriptions + subscriptTicker := time.NewTicker(23 * time.Millisecond) //range/change subscription ticker when no feeder notifications + feederNotification := false + triggeredPath := "" for { select { @@ -1156,7 +1417,11 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri subscriptionState.LatestDataPoint = getVehicleData(subscriptionState.Path[0]) subscriptionList = append(subscriptionList, subscriptionState) responseMap["subscriptionId"] = strconv.Itoa(subscriptionId) - activateIfIntervalOrCL(subscriptionState.FilterList, subscriptionChan, CLChannel, subscriptionId, subscriptionState.Path) + subscriptionList = activateIfIntervalOrCL(subscriptionState.FilterList, subscriptionChan, CLChannel, subscriptionId, subscriptionState.Path, subscriptionList) + variant := getFeederNotifyType(subscriptionState.FilterList) + if variant == "curvelog" || variant == "range" || variant == "change" { + toFeeder <- createFeederNotifyMessage(variant, subscriptionState.Path, subscriptionId) + } subscriptionId++ // not to be incremented elsewhere dataChan <- utils.FinalizeMessage(responseMap) case "unsubscribe": @@ -1168,6 +1433,7 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri if status != -1 { responseMap["subscriptionId"] = subscriptId dataChan <- utils.FinalizeMessage(responseMap) + toFeeder <- request break } requestMap["subscriptionId"] = subscriptId @@ -1201,8 +1467,6 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri if dummyValue > 999 { dummyValue = 0 } - case subThreads := <-threadsChan: - subscriptionList = setSubscriptionListThreads(subscriptionList, subThreads) case subscriptionId := <-subscriptionChan: // interval notification triggered subscriptionState := subscriptionList[getSubcriptionStateIndex(subscriptionId, subscriptionList)] var subscriptionMap = make(map[string]interface{}) @@ -1230,28 +1494,66 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri subscriptionMap["RouterId"] = subscriptionList[index].RouterId backendChan <- addPackage(utils.FinalizeMessage(subscriptionMap), "data", clPack.DataPack) case <-subscriptTicker.C: - // check if range or change notification triggered - for i := range subscriptionList { - // triggerDataPoint := getVehicleData(subscriptionList[i].Path[0]) - doTrigger, updateLatest, triggerDataPoint := checkRangeChangeFilter(subscriptionList[i].FilterList, subscriptionList[i].LatestDataPoint, subscriptionList[i].Path[0]) - if updateLatest == true { - subscriptionList[i].LatestDataPoint = triggerDataPoint - } - if doTrigger == true { - subscriptionState := subscriptionList[i] - var subscriptionMap = make(map[string]interface{}) - subscriptionMap["action"] = "subscription" - subscriptionMap["ts"] = utils.GetRfcTime() - subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionState.SubscriptionId) - subscriptionMap["RouterId"] = subscriptionState.RouterId - subscriptionList[i].LatestDataPoint = triggerDataPoint - backendChan <- addPackage(utils.FinalizeMessage(subscriptionMap), "data", getDataPack(subscriptionList[i].Path, nil)) - } + if feederNotification == false { // feeder does not issue notifications + subscriptionList = checkRCFilterAndIssueMessages("", subscriptionList, backendChan) + } else { + subscriptTicker.Stop() } + case feederMessage := <-fromFeederRorC: +utils.Info.Printf("Feeder message=%s", feederMessage) + triggeredPath, feederNotification = decodeFeederMessage(feederMessage, feederNotification) + subscriptionList = checkRCFilterAndIssueMessages(triggeredPath, subscriptionList, backendChan) } // select } // for } +func checkRCFilterAndIssueMessages(triggeredPath string, subscriptionList []SubscriptionState, backendChan chan string) []SubscriptionState { + // check if range or change notification triggered + for i := range subscriptionList { + if len(triggeredPath) == 0 || triggeredPath == subscriptionList[i].Path[0] { + doTrigger, triggerDataPoint := checkRangeChangeFilter(subscriptionList[i].FilterList, subscriptionList[i].LatestDataPoint, subscriptionList[i].Path[0]) + subscriptionList[i].LatestDataPoint = triggerDataPoint + if doTrigger == true { + subscriptionState := subscriptionList[i] + var subscriptionMap = make(map[string]interface{}) + subscriptionMap["action"] = "subscription" + subscriptionMap["ts"] = utils.GetRfcTime() + subscriptionMap["subscriptionId"] = strconv.Itoa(subscriptionState.SubscriptionId) + subscriptionMap["RouterId"] = subscriptionState.RouterId + backendChan <- addPackage(utils.FinalizeMessage(subscriptionMap), "data", getDataPack(subscriptionList[i].Path, nil)) + } + } + } + return subscriptionList +} + +func decodeFeederMessage(feederMessage string, feederNotification bool) (string, bool) { + if len(feederMessage) == 0 { + return "", feederNotification + } + var messageMap map[string]interface{} + err := json.Unmarshal([]byte(feederMessage), &messageMap) + if err != nil || messageMap["action"] == nil { + utils.Error.Printf("Error in feeder message=%s", feederMessage) + return "", feederNotification + } + var triggeredPath string + switch messageMap["action"].(string) { + case "subscribe": + if messageMap["status"] != nil && messageMap["status"].(string) == "ok" { + feederNotification = true + } + case "subscription": + if messageMap["path"] != nil { + triggeredPath = messageMap["path"].(string) + } + default: + utils.Error.Printf("Unknown action=%s", messageMap["action"].(string)) + return "", feederNotification + } + return triggeredPath, feederNotification +} + func getSubscriptionData(subscriptionList []SubscriptionState, gatingId string) (string, string) { for i := 0; i < len(subscriptionList); i++ { if subscriptionList[i].GatingId == gatingId { diff --git a/server/vissv2server/uds-registration.json b/server/vissv2server/uds-registration.json index b09a0c45..a8fa96e9 100644 --- a/server/vissv2server/uds-registration.json +++ b/server/vissv2server/uds-registration.json @@ -1,6 +1,6 @@ [ { - "root":"Vehicle", + "root":"*", "serverFeeder":"/var/tmp/vissv2/serverFeeder.sock", "redis": "/var/tmp/vissv2/redisDB.sock", "memcache": "/var/tmp/vissv2/memcacheDB.sock", diff --git a/tutorial/content/feeder/_index.md b/tutorial/content/feeder/_index.md index f7ea493b..0285068a 100644 --- a/tutorial/content/feeder/_index.md +++ b/tutorial/content/feeder/_index.md @@ -12,6 +12,7 @@ which spawns two threads that implement the respective interface task. The architecture shown handle all its communication with the server via the state storage. This leads to a polling paradigm and thus a potential latency and performance weakness. This architecture is therefore not found on the master branch, but available on the datastore-poll branch. +There is still a feeder design for this in the feeder-template/feederv1 directory. ![Feeder Sw architecture, unoptimized polling](/vissr/images/feeder-sw-design-v1.jpg?width=50pc) * Figure 1. Feeder software architecture unoptimized polling @@ -21,11 +22,15 @@ the state storage to find new write requests. ![Feeder Sw architecture, optimized polling](/vissr/images/feeder-sw-design-v2.jpg?width=50pc) * Figure 2. Feeder software architecture optimized polling -A feeder implementing the optimized polling version of the SwA is found at the master branch. +A feeder implementing this solution can be found in the feeder-template/feederv2 directory. This feeder can be configured to either use an SQLite, or a Redis state storage interface, please see the Datastore chapter for details. -A design for how the polling on the server side can be mitigated is in the planning stage. -It is likely to require an update of the feeder interface. +However, the solution implementd in the feederv2 template does not support that the server also can replace the polling with a more effective event based solution. +For this the feeder implementation in the feeder-template/feederv3 directory needs to be used. + +The server is able via the interface to detect whether a feeder implements version 2 or 3 of the interface. +In case of version 2 it keeps poling of the data store, while for version 3 it relies on event signalling from the feeder instead. +For more details of this interface, see the [VISSR Server:Feeder interface](/vissr/server/#feeder-interface) chapter. The feeder translation task is divided into a mapping of the signal name, and a possible scaling of the value. The instructions for how to do this is encoded into one or more configuration files that the feeder reads at startup. diff --git a/tutorial/content/server/_index.md b/tutorial/content/server/_index.md index 44aa5636..f7b2956a 100644 --- a/tutorial/content/server/_index.md +++ b/tutorial/content/server/_index.md @@ -107,6 +107,36 @@ SwCs that use this file: * The protobuf encoding scheme. * The live simulator. +### Feeder interface +As described in the [VISSR Feeders](/vissr/feeder/) chapter there are two template versions of feeders. +Version 2 only supports reception of Set requests from the server, while version 3 can also act on subscribe/unsubscribe requests from the server, +and then issue an event to the server when it has updated a subscribed signal in the data store. +The figure below shows the communication network that implements this. +![Network for feeder event communication](/vissr/images/feeder-event-comm.jpg?width=50pc) +* Figure 1. Network for feeder event communication + +The feeder, running in a separate process, is communicating with the server over a Unix domain socket interface. +This interface is on the server side managed by the "feeder front end" thread. +The "service manager" thread of the server receives set/subscribe/unsubscribe requests from clients (get requests do not affect this network) +that it passes on to the feeder front end, which then analyzes the requests and decides to which other entities this should be forwarded. +The subscribe request types that benefit from switching from polling to an event based paradigm are change, range, curvelog, and historic data capture. +This solution supports events for the change, range, and curvelog type. +The historic data capture may later also be updated to support this. +The message formats for the messages passed over the UDS interface are shown below. +For the message formats over the other Golang channel based interfaces, please read the code. + +Feeder front end to Feeder: +* {”action”: ”set”, "data": {"path":"x", "dp":{"value":"y", "ts":"z"}}} +* {”action”: ”subscribe”, ”path”: [”p1”, ..., ”pN”]} +* {”action”: ”unsubscribe”, ”path”: [”p1”, ..., ”pN”]} + +Feeder to Feeder front end: +* {”action”: ”subscribe”, ”status”: “ok/nok”} +* {”action”: ”subscription”, ”path”: ”p”} + +A feeder implementing version 2 may discard messages from the Feeder front end that have the "action" set to either "subcribe" or "unsubscribe", +while a feeder implementing version 3 must respond to a subscribe request with "status" set to "ok". + ### History control The VISSv2 specification provides a capability for clients to issue a request for [historic data](https://raw.githack.com/covesa/vehicle-information-service-specification/main/spec/VISSv2_Core.html#history-filter-operation). This server supports temporary recording of data that can then be requested by a client using a history filter. diff --git a/tutorial/static/images/feeder-event-comm.jpg b/tutorial/static/images/feeder-event-comm.jpg new file mode 100644 index 00000000..82ebf08b Binary files /dev/null and b/tutorial/static/images/feeder-event-comm.jpg differ diff --git a/utils/common.go b/utils/common.go index 7f9081b6..cba424fb 100644 --- a/utils/common.go +++ b/utils/common.go @@ -75,7 +75,7 @@ func ReadUdsRegistrations(sockFile string) []UdsReg { func GetUdsConn(path string, connectionName string) net.Conn { root := ExtractRootName(path) for i := 0; i < len(udsRegList); i++ { - if root == udsRegList[i].RootName { + if root == udsRegList[i].RootName || udsRegList[i].RootName == "*" { return connectViaUds(getSocketPath(i, connectionName)) } } @@ -334,7 +334,6 @@ func FileExists(filename string) bool { func ExtractRootName(path string) string { dotDelimiter := strings.Index(path, ".") if dotDelimiter == -1 { - Info.Print("ExtractRootName():Dot limited segments not found in path=%s", path) return path } return path[:dotDelimiter]