Skip to content

Commit

Permalink
server: Initial Commit
Browse files Browse the repository at this point in the history
Signed-off-by: Dave Tucker <[email protected]>
  • Loading branch information
dave-tucker committed May 20, 2021
1 parent c9769e3 commit 7ab7452
Show file tree
Hide file tree
Showing 7 changed files with 2,579 additions and 6 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ require (
github.com/cenk/hub v1.0.1 // indirect
github.com/cenkalti/hub v1.0.1 // indirect
github.com/cenkalti/rpc2 v0.0.0-20210220005819-4a29bc83afe1
github.com/stretchr/testify v1.4.0
github.com/google/uuid v1.2.0
github.com/stretchr/testify v1.6.1
)
11 changes: 6 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ github.com/cenkalti/rpc2 v0.0.0-20210220005819-4a29bc83afe1 h1:aT9Ez2drLmrviqTnV
github.com/cenkalti/rpc2 v0.0.0-20210220005819-4a29bc83afe1/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
201 changes: 201 additions & 0 deletions server/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package server

import (
"errors"
"sync"

"github.com/google/uuid"
"github.com/ovn-org/libovsdb/ovsdb"
)

var (
ErrNotImplemented = errors.New("not implemented")
)

// Database abstracts database operations from ovsdb
type Database interface {
CreateDatabase(name string, schema *ovsdb.DatabaseSchema) error
Exists(name string) bool
Transact(database string, operations []ovsdb.Operation) ([]ovsdb.OperationResult, ovsdb.TableUpdates)
Select(database string, table string, where []ovsdb.Condition, columns []string) ovsdb.OperationResult
Insert(dastabase string, table string, uuidName string, row map[string]interface{}) (ovsdb.OperationResult, ovsdb.TableUpdates)
Update(database, table string, where []ovsdb.Condition, row map[string]interface{}) (ovsdb.OperationResult, ovsdb.TableUpdates)
Mutate(database, table string, where []ovsdb.Condition, mutations []ovsdb.Mutation) (ovsdb.OperationResult, ovsdb.TableUpdates)
Delete(database, table string, where []ovsdb.Condition) (ovsdb.OperationResult, ovsdb.TableUpdates)
Wait(database, table string, timeout int, conditions []ovsdb.Condition, columns []string, until string, rows []map[string]interface{}) ovsdb.OperationResult
Commit(database, table string, durable bool) ovsdb.OperationResult
Abort(database, table string) ovsdb.OperationResult
Comment(database, table string, comment string) ovsdb.OperationResult
Assert(database, table, lock string) ovsdb.OperationResult
}

type inMemoryDatabase struct {
databases map[string]Tables
mutex sync.RWMutex
}

type Tables map[string]*Table

type Table struct {
rows map[string]interface{}
mutex sync.RWMutex
}

func NewInMemoryDatabase() Database {
return &inMemoryDatabase{
databases: make(map[string]Tables),
mutex: sync.RWMutex{},
}
}

func (db *inMemoryDatabase) CreateDatabase(name string, schema *ovsdb.DatabaseSchema) error {
db.mutex.Lock()
defer db.mutex.Unlock()
database := make(Tables)
for k := range schema.Tables {
database[k] = &Table{
rows: make(map[string]interface{}),
mutex: sync.RWMutex{},
}
}
db.databases[name] = database
return nil
}

func (db *inMemoryDatabase) Exists(name string) bool {
db.mutex.RLock()
defer db.mutex.RUnlock()
_, ok := db.databases[name]
return ok
}

func (db *inMemoryDatabase) Transact(name string, operations []ovsdb.Operation) ([]ovsdb.OperationResult, ovsdb.TableUpdates) {
results := []ovsdb.OperationResult{}
updates := make(ovsdb.TableUpdates)
for i := range operations {
switch operations[i].Op {
case ovsdb.OperationInsert:
r, tu := db.Insert(name, operations[i].Table, operations[i].UUIDName, operations[i].Row)
results = append(results, r)
if tu != nil {
updates.Merge(tu)
}
case ovsdb.OperationSelect:
r := db.Select(name, operations[i].Table, operations[i].Where, operations[i].Columns)
results = append(results, r)
case ovsdb.OperationUpdate:
r, tu := db.Update(name, operations[i].Table, operations[i].Where, operations[i].Row)
results = append(results, r)
if tu != nil {
updates.Merge(tu)
}
case ovsdb.OperationMutate:
r, tu := db.Mutate(name, operations[i].Table, operations[i].Where, operations[i].Mutations)
results = append(results, r)
if tu != nil {
updates.Merge(tu)
}
case ovsdb.OperationDelete:
r, tu := db.Delete(name, operations[i].Table, operations[i].Where)
results = append(results, r)
if tu != nil {
updates.Merge(tu)
}
case ovsdb.OperationWait:
r := db.Wait(name, operations[i].Table, operations[i].Timeout, operations[i].Where, operations[i].Columns, operations[i].Until, operations[i].Rows)
results = append(results, r)
case ovsdb.OperationCommit:
durable := operations[i].Durable
r := db.Commit(name, operations[i].Table, *durable)
results = append(results, r)
case ovsdb.OperationAbort:
r := db.Abort(name, operations[i].Table)
results = append(results, r)
case ovsdb.OperationComment:
r := db.Comment(name, operations[i].Table, *operations[i].Comment)
results = append(results, r)
case ovsdb.OperationAssert:
r := db.Assert(name, operations[i].Table, *operations[i].Lock)
results = append(results, r)
default:
return nil, updates
}
}
return results, updates
}

func (db *inMemoryDatabase) Insert(database string, table string, uuidName string, row map[string]interface{}) (ovsdb.OperationResult, ovsdb.TableUpdates) {
db.mutex.Lock()
defer db.mutex.Unlock()

var targetDb Tables
var targetTable *Table
var ok bool

if targetDb, ok = db.databases[database]; !ok {
return ovsdb.OperationResult{
Count: 1,
Error: "database does not exist",
}, nil
}
if targetTable, ok = targetDb[table]; !ok {
return ovsdb.OperationResult{
Count: 1,
Error: "table does not exist",
}, nil
}

targetTable.mutex.Lock()
defer targetTable.mutex.Unlock()

// TODO: Handle Named UUIDs and existence checks
rowUUID := uuid.NewString()
row["_uuid"] = rowUUID
targetDb[table].rows[rowUUID] = row
result := ovsdb.OperationResult{
Count: 1,
UUID: ovsdb.UUID{GoUUID: rowUUID},
Rows: []ovsdb.ResultRow{row},
}
return result, ovsdb.TableUpdates{
table: {
rowUUID: {New: &ovsdb.Row{Fields: row}, Old: nil},
},
}
}

func (db *inMemoryDatabase) Select(database string, table string, where []ovsdb.Condition, columns []string) ovsdb.OperationResult {
return ovsdb.OperationResult{Error: ErrNotImplemented.Error()}
}

func (db *inMemoryDatabase) Update(database, table string, where []ovsdb.Condition, row map[string]interface{}) (ovsdb.OperationResult, ovsdb.TableUpdates) {
return ovsdb.OperationResult{Error: ErrNotImplemented.Error()}, nil
}

func (db *inMemoryDatabase) Mutate(database, table string, where []ovsdb.Condition, mutations []ovsdb.Mutation) (ovsdb.OperationResult, ovsdb.TableUpdates) {
return ovsdb.OperationResult{Error: ErrNotImplemented.Error()}, nil
}

func (db *inMemoryDatabase) Delete(database, table string, where []ovsdb.Condition) (ovsdb.OperationResult, ovsdb.TableUpdates) {
return ovsdb.OperationResult{Error: ErrNotImplemented.Error()}, nil
}

func (db *inMemoryDatabase) Wait(database, table string, timeout int, conditions []ovsdb.Condition, columns []string, until string, rows []map[string]interface{}) ovsdb.OperationResult {
return ovsdb.OperationResult{Error: ErrNotImplemented.Error()}
}

func (db *inMemoryDatabase) Commit(database, table string, durable bool) ovsdb.OperationResult {
return ovsdb.OperationResult{Error: ErrNotImplemented.Error()}
}

func (db *inMemoryDatabase) Abort(database, table string) ovsdb.OperationResult {
return ovsdb.OperationResult{Error: ErrNotImplemented.Error()}
}

func (db *inMemoryDatabase) Comment(database, table string, comment string) ovsdb.OperationResult {
return ovsdb.OperationResult{Error: ErrNotImplemented.Error()}
}

func (db *inMemoryDatabase) Assert(database, table, lock string) ovsdb.OperationResult {
return ovsdb.OperationResult{Error: ErrNotImplemented.Error()}
}
116 changes: 116 additions & 0 deletions server/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package server

import (
"log"
"sync"
"time"

"github.com/cenkalti/rpc2"
"github.com/ovn-org/libovsdb/ovsdb"
)

// monitor represents a connection to a client where db changes
// will be reflected
type monitor struct {
id string
request map[string]*ovsdb.MonitorRequest
client *rpc2.Client
updates chan ovsdb.TableUpdates
updatesMutex sync.Mutex
stopCh chan struct{}
}

func newMonitor(id string, request map[string]*ovsdb.MonitorRequest, client *rpc2.Client) *monitor {
m := &monitor{
id: id,
request: request,
client: client,
updates: make(chan ovsdb.TableUpdates, 10240),
updatesMutex: sync.Mutex{},
stopCh: make(chan struct{}, 1),
}
go m.process()
return m
}

func (m *monitor) process() {
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
go m.sendUpdate()
case <-m.stopCh:
return
}
}
}

func (m *monitor) sendUpdate() {
// lock the update mutex so readers don't overlap
m.updatesMutex.Lock()
numEvents := len(m.updates)
if numEvents == 0 {
return
}
var updates []ovsdb.TableUpdates
for i := 0; i < numEvents; i++ {
updates = append(updates, <-m.updates)
}
m.updatesMutex.Unlock()
tableUpdates := make(ovsdb.TableUpdates)
for _, u := range updates {
tableUpdates.Merge(u)
}
args := []interface{}{m.id, tableUpdates}
var reply interface{}
err := m.client.Call("update", args, &reply)
if err != nil {
log.Printf("client error handling update rpc: %v", err)
}
}

// Enqueue will enqueue an update if it matches the tables and monitor select arguments
// we take the update by value (not reference) so we can mutate it in place before
// queuing it for dispatch
func (m *monitor) Enqueue(update ovsdb.TableUpdates) {
// remove updates for tables that we aren't watching
if len(m.request) != 0 {
m.filter(update)
}
if len(update) == 0 {
return
}
m.updates <- update
}

func (m *monitor) filter(update ovsdb.TableUpdates) {
// remove updates for tables that we aren't watching
if len(m.request) != 0 {
for table, u := range update {
if _, ok := m.request[table]; !ok {
delete(update, table)
}
for uuid, row := range u {
switch {
case row.Insert() && m.request[table].Select.Insert():
case row.Modify() && m.request[table].Select.Modify():
case row.Delete() && m.request[table].Select.Delete():
if len(m.request[table].Columns) > 0 {
discard := true
for _, c := range m.request[table].Columns {
if _, ok := row.New.Fields[c]; ok {
discard = false
break
}
}
if discard {
delete(u, uuid)
}
}
default:
delete(u, uuid)
}
}
}
}
}
Loading

0 comments on commit 7ab7452

Please sign in to comment.