Skip to content

Commit

Permalink
Merge pull request #1 from UlfBj/memcached
Browse files Browse the repository at this point in the history
Memcache data store support added in server and feederv2 template.
  • Loading branch information
UlfBj authored Mar 7, 2024
2 parents 30c17cf + f83267a commit 523f94c
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 24 deletions.
28 changes: 26 additions & 2 deletions feeder/feeder-template/feederv2/feederv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
"github.com/akamensky/argparse"
"github.com/go-redis/redis"
_ "github.com/mattn/go-sqlite3"
"github.com/bradfitz/gomemcache/memcache"
"github.com/w3c/automotive-viss2/utils"
"math/rand"
"net"
"os"
"os/exec"
"sort"
"strconv"
"strings"
Expand All @@ -39,6 +41,7 @@ type FeederMap struct {
var scalingDataList []string

var redisClient *redis.Client
var memcacheClient *memcache.Client
var dbHandle *sql.DB
var stateDbType string

Expand Down Expand Up @@ -180,6 +183,14 @@ func statestorageSet(path string, val string, ts string) int {
return -1
}
return 0
case "memcache":
dp := `{"val":"` + val + `", "ts":"` + ts + `"}`
err := memcacheClient.Set(&memcache.Item{Key: path, Value: []byte(dp)})
if err != nil {
utils.Error.Printf("Job failed. Err=%s", err)
return -1
}
return 0
}
return -1
}
Expand Down Expand Up @@ -384,8 +395,8 @@ func main() {
Required: false,
Help: "changes log output level",
Default: "info"})
stateDB := parser.Selector("d", "statestorage", []string{"sqlite", "redis", "none"}, &argparse.Options{Required: false,
Help: "Statestorage must be either sqlite, redis, or none", Default: "redis"})
stateDB := parser.Selector("d", "statestorage", []string{"sqlite", "redis", "memcache", "none"}, &argparse.Options{Required: false,
Help: "Statestorage must be either sqlite, redis, memcache, or none", Default: "redis"})
dbFile := parser.String("f", "dbfile", &argparse.Options{
Required: false,
Help: "statestorage database filename",
Expand Down Expand Up @@ -427,6 +438,19 @@ func main() {
} else {
utils.Info.Printf("Redis state storage initialised.")
}
case "memcache":
memcacheClient = memcache.New("/var/tmp/vissv2/memcacheDB.sock")
err := memcacheClient.Ping()
if err != nil {
utils.Info.Printf("Memcache daemon not alive. Trying to start it")
cmd := exec.Command("/usr/bin/bash", "memcacheNativeInit.sh")
err := cmd.Run()
if err != nil {
utils.Error.Printf("Memcache daemon startup failed, err=%s", err)
os.Exit(1)
}
}
utils.Info.Printf("Memcache daemon alive.")
default:
utils.Error.Printf("Unknown state storage type = %s", stateDbType)
os.Exit(1)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (

require (
github.com/apache/thrift v0.15.0 // indirect
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
golang.org/x/net v0.19.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ github.com/apache/iotdb-client-go v1.1.7 h1:MH87LJNquMxHmFKWuS06CFwxgEjxK5oug6H5
github.com/apache/iotdb-client-go v1.1.7/go.mod h1:3D6QYkqRmASS/4HsjU+U/3fscyc5M9xKRfywZsKuoZY=
github.com/apache/thrift v0.15.0 h1:aGvdaR0v1t9XLgjtBYwxcBvBOTMqClzwE26CHOgjW1Y=
github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 h1:N7oVaKyGp8bttX0bfZGmcGkjz7DLQXhAn3DNd3T0ous=
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874/go.mod h1:r5xuitiExdLAJ09PR7vBVENGvp4ZuTBeWTGtxuX3K+c=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
3 changes: 3 additions & 0 deletions server/vissv2server/memcacheNativeInit.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#! /usr/bin/bash
memcached -d --unix-mask=755 --unix-socket=/var/tmp/vissv2/memcacheDB.sock

5 changes: 3 additions & 2 deletions server/vissv2server/serverCoreHandlerLib.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ func (pathList *PathList) VssPathListHandler(w http.ResponseWriter, r *http.Requ

w.Header().Set("Content-Type", "application/json")
w.Write(bytes)
truncatedIndex := min(len(bytes), 101)
utils.Info.Printf("initVssPathListServer():Response=%s...(truncated to %d bytes)", truncatedIndex-1, bytes[0:truncatedIndex])
// truncatedIndex := min(len(bytes), 101)
// utils.Info.Printf("initVssPathListServer():Response=%s...(truncated to %d bytes)", truncatedIndex-1, bytes[0:truncatedIndex])
utils.Info.Printf("initVssPathListServer():Response length=%d", len(bytes))
}

func min(a, b int) int {
Expand Down
61 changes: 47 additions & 14 deletions server/vissv2server/serviceMgr/serviceMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/apache/iotdb-client-go/client"
"github.com/go-redis/redis"
_ "github.com/mattn/go-sqlite3"
"github.com/bradfitz/gomemcache/memcache"
"github.com/w3c/automotive-viss2/utils"
"io"
"net"
Expand Down Expand Up @@ -72,6 +73,7 @@ var errorResponseMap = map[string]interface{}{
var dbHandle *sql.DB
var dbErr error
var redisClient *redis.Client
var memcacheClient *memcache.Client
var stateDbType string
var historySupport bool

Expand Down Expand Up @@ -472,19 +474,19 @@ func getVehicleData(path string) string { // returns {"value":"Y", "ts":"Z"}
rows.Next()
err = rows.Scan(&value, &timestamp)
if err != nil {
utils.Warning.Printf("Data not found: %s for path=%s\n", err, path)
utils.Warning.Printf("Data not found: %s for path=%s", err, path)
return `{"value":"Data-not-available", "ts":"` + utils.GetRfcTime() + `"}`
}
return `{"value":"` + value + `", "ts":"` + timestamp + `"}`
case "redis":
utils.Info.Printf(path)
// utils.Info.Printf(path)
dp, err := redisClient.Get(path).Result()
if err != nil {
if err.Error() != "redis: nil" {
utils.Error.Printf("Job failed. Error()=%s\n", err.Error())
utils.Error.Printf("Job failed. Error()=%s", err.Error())
return `{"value":"Database-error", "ts":"` + utils.GetRfcTime() + `"}`
} else {
utils.Warning.Printf("Data not found.\n")
utils.Warning.Printf("Data not found.")
return `{"value":"Data-not-found", "ts":"` + utils.GetRfcTime() + `"}`
}
} else {
Expand Down Expand Up @@ -515,6 +517,19 @@ func getVehicleData(path string) string { // returns {"value":"Y", "ts":"Z"}
return `{"value":"Data-not-found", "ts":"` + utils.GetRfcTime() + `"}`
}
return `{"value":"` + value + `", "ts":"` + ts + `"}`
case "memcache":
mcItem, err := memcacheClient.Get(path)
if err != nil {
if err.Error() != "memcache: cache miss" {
utils.Error.Printf("Job failed. Error()=%s", err.Error())
return `{"value":"Database-error", "ts":"` + utils.GetRfcTime() + `"}`
} else {
utils.Warning.Printf("Data not found.")
return `{"value":"Data-not-found", "ts":"` + utils.GetRfcTime() + `"}`
}
} else {
return string(mcItem.Value)
}
case "none":
return `{"value":"` + strconv.Itoa(dummyValue) + `", "ts":"` + utils.GetRfcTime() + `"}`
}
Expand All @@ -538,15 +553,8 @@ func setVehicleData(path string, value string) string {
return ""
}
return ts
case "memcache": fallthrough
case "redis":
/* dp := `{"val":"` + value + `", "ts":"` + ts + `"}`
dPath := path + ".D" // path to "desired" dp. Must be created identically by feeder reading it.
err := redisClient.Set(dPath, dp, time.Duration(0)).Err()
if err != nil {
utils.Error.Printf("Could not update statestorage. Err=%s\n", err)
return ""
}
return ts*/
udsConn := utils.GetUdsConn(path, "serverFeeder")
if udsConn == nil {
utils.Error.Printf("setVehicleData:Failed to UDS connect to feeder for path = %s", path)
Expand Down Expand Up @@ -945,16 +953,21 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
}
case "redis":
addr := utils.GetUdsPath("Vehicle", "redis")
if len(addr) == 0 {
utils.Error.Printf("redis-server socket address not found.")
// os.Exit(1) should terminate the process
return
}
utils.Info.Printf(addr)
redisClient = redis.NewClient(&redis.Options{
Network: "unix",
Addr: addr, //TODO replace with check and exit if not defined.
Addr: addr,
Password: "",
DB: 1,
})
err := redisClient.Ping().Err()
if err != nil {
utils.Error.Printf("redis-server ,ping err = %s", err)
utils.Info.Printf("Redis-server not started. Trying to start it.")
if utils.FileExists("redis.log") {
os.Remove("redis.log")
}
Expand All @@ -977,6 +990,26 @@ func ServiceMgrInit(mgrId int, serviceMgrChan chan string, stateStorageType stri
os.Exit(1)
}
defer IoTDBsession.Close()
case "memcache":
addr := utils.GetUdsPath("Vehicle", "memcache")
if len(addr) == 0 {
utils.Error.Printf("memcache socket address not found.")
// os.Exit(1) should terminate the process
return
}
memcacheClient = memcache.New(addr)
err := memcacheClient.Ping()
if err != nil {
utils.Info.Printf("Memcache daemon not alive. Trying to start it")
cmd := exec.Command("/usr/bin/bash", "memcacheNativeInit.sh")
err := cmd.Run()
if err != nil {
utils.Error.Printf("Memcache daemon startup failed, err=%s", err)
// os.Exit(1) should terminate the process
return
}
}
utils.Info.Printf("Memcache daemon alive.")
default:
utils.Error.Printf("Unknown state storage type = %s", stateDbType)
}
Expand Down
3 changes: 2 additions & 1 deletion server/vissv2server/uds-registration.docker.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
"root":"Vehicle",
"serverFeeder":"/tmp/docker/server-feeder-channel.sock",
"redis": "/tmp/docker/redisDB.sock",
"history": "/var/tmp/vissv2/histctrlserver.sock"
"memcache": "/tmp/docker/memcacheDB.sock",
"history": "/tmp/docker/histctrlserver.sock"
}

]
1 change: 1 addition & 0 deletions server/vissv2server/uds-registration.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"root":"Vehicle",
"serverFeeder":"/var/tmp/vissv2/serverFeeder.sock",
"redis": "/var/tmp/vissv2/redisDB.sock",
"memcache": "/var/tmp/vissv2/memcacheDB.sock",
"history": "/var/tmp/vissv2/histctrlserver.sock"
}

Expand Down
4 changes: 2 additions & 2 deletions server/vissv2server/vissv2server.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,8 +793,8 @@ func main() {
Default: "info"})
dryRun := parser.Flag("", "dryrun", &argparse.Options{Required: false, Help: "dry run to generate vsspathlist file", Default: false})
vssJson := parser.String("", "vssJson", &argparse.Options{Required: false, Help: "path and name vssPathlist json file", Default: "../vsspathlist.json"})
stateDB := parser.Selector("s", "statestorage", []string{"sqlite", "redis", "apache-iotdb", "none"}, &argparse.Options{Required: false,
Help: "Statestorage must be either sqlite, redis, apache-iotdb, or none", Default: "redis"})
stateDB := parser.Selector("s", "statestorage", []string{"sqlite", "redis", "memcache", "apache-iotdb", "none"}, &argparse.Options{Required: false,
Help: "Statestorage must be either sqlite, redis, memcache, apache-iotdb, or none", Default: "redis"})
historySupport := parser.Flag("j", "history", &argparse.Options{Required: false, Help: "Support for historic data requests", Default: false})
dbFile := parser.String("", "dbfile", &argparse.Options{
Required: false,
Expand Down
14 changes: 13 additions & 1 deletion tutorial/content/build-system/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,24 @@ There are multiple Software components on this repo, such as feeders, simulators
If it is forgotten to be mentiond in the README, one way of determining whether a separate build is needed or not is to check the package statement in the source code.
If it says "package main" it is a separate executable and shall then be built and run as described above.

### Loggging
### Logging
Logging can be command line configured at startup.
* logging level can be set to either of [trace, debug, info, warn, error, fatal, panic].
* logging output destination. It can either be written to file, or directed to standard output.
The levels currently used are mainly info, warn, error. Info is appropriate during testing and debugging, while error is appropriate when performance is important.

### Transport protocols
Besides the transport protocols
* HTTP
* Websocke
* MQTT
that the specification lists, this server also supports
* gRPC
They are all except MQTT activated by the server per default.
To enable MQTT, or disable any other, the string array serverComponents in the vissv2server.go file contains a list of the components that are spawned on
separate threads by the main server process, and these can be commented in or out before building the server.
For the server to be operationable, the service manager, and at least one transport protocol must not be commented out.

### Go modules
Go modules are used in multiple places in this project, below follows some commands that may be helpful in managing this.

Expand Down
7 changes: 5 additions & 2 deletions utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type UdsReg struct {
RootName string `json:"root"`
ServerFeeder string `json:"serverFeeder"`
Redis string `json:"redis"`
Memcache string `json:"memcache"`
History string `json:"history"`
}

Expand Down Expand Up @@ -83,7 +84,7 @@ func GetUdsConn(path string, connectionName string) net.Conn {

func GetUdsPath(path string, connectionName string) string {
root := ExtractRootName(path)
Info.Printf("GetUdsPath:root=%s, connectionName=%s", root, connectionName)
// Info.Printf("GetUdsPath:root=%s, connectionName=%s", root, connectionName)
for i := 0; i < len(udsRegList); i++ {
if root == udsRegList[i].RootName {
return getSocketPath(i, connectionName)
Expand All @@ -99,6 +100,8 @@ func getSocketPath(listIndex int, connectionName string) string {
return udsRegList[listIndex].ServerFeeder
case "redis":
return udsRegList[listIndex].Redis
case "memcache":
return udsRegList[listIndex].Memcache
case "history":
return udsRegList[listIndex].History
default:
Expand Down Expand Up @@ -325,7 +328,7 @@ func FileExists(filename string) bool {
func ExtractRootName(path string) string {
dotDelimiter := strings.Index(path, ".")
if dotDelimiter == -1 {
Info.Print("ExtractRootName():Could not find root node name in path=%s", path)
Info.Print("ExtractRootName():Dot limited segments not found in path=%s", path)
return path
}
return path[:dotDelimiter]
Expand Down

0 comments on commit 523f94c

Please sign in to comment.