Skip to content

Commit

Permalink
Implementation of Master Arbitration.
Browse files Browse the repository at this point in the history
  • Loading branch information
sallylsy committed Jul 19, 2023
1 parent 37ce38b commit 9f98862
Show file tree
Hide file tree
Showing 3 changed files with 309 additions and 4 deletions.
94 changes: 94 additions & 0 deletions gnmi_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type Server struct {
config *Config
cMu sync.Mutex
clients map[string]*Client
// ReqFromMaster point to a function that is called to verify if the request
// comes from a master controller.
ReqFromMaster func(req *gnmipb.SetRequest, masterEID *uint128) (bool, error)
masterEID uint128
}
type AuthTypes map[string]bool

Expand All @@ -56,6 +60,7 @@ type Config struct {
}

var AuthLock sync.Mutex
var maMu sync.Mutex

func (i AuthTypes) String() string {
if i["none"] {
Expand Down Expand Up @@ -136,6 +141,10 @@ func NewServer(config *Config, opts []grpc.ServerOption) (*Server, error) {
s: s,
config: config,
clients: map[string]*Client{},
// ReqFromMaster point to a function that is called to verify if
// the request comes from a master controller.
ReqFromMaster: ReqFromMasterDisabledMA,
masterEID: uint128{High: 0, Low: 0},
}
var err error
if srv.config.Port < 0 {
Expand Down Expand Up @@ -377,6 +386,15 @@ func (s *Server) Get(ctx context.Context, req *gnmipb.GetRequest) (*gnmipb.GetRe
}

func (s *Server) Set(ctx context.Context, req *gnmipb.SetRequest) (*gnmipb.SetResponse, error) {
isFromMaster, e := s.ReqFromMaster(req, &s.masterEID)
if e != nil {
return nil, e
}

if !isFromMaster {
return nil, grpc.Errorf(codes.PermissionDenied, "MA: Not a master")
}

common_utils.IncCounter(common_utils.GNMI_SET)
if s.config.EnableTranslibWrite == false && s.config.EnableNativeWrite == false {
common_utils.IncCounter(common_utils.GNMI_SET_FAIL)
Expand Down Expand Up @@ -515,3 +533,79 @@ func (s *Server) Capabilities(ctx context.Context, req *gnmipb.CapabilityRequest
GNMIVersion: "0.7.0",
Extension: exts}, nil
}

type uint128 struct {
High uint64
Low uint64
}

func (lh *uint128) Compare(rh *uint128) int {
if rh == nil {
// For MA disabled case, EID supposed to be 0.
rh = &uint128{High: 0, Low: 0}
}
if lh.High > rh.High {
return 1
}
if lh.High < rh.High {
return -1
}
if lh.Low > rh.Low {
return 1
}
if lh.Low < rh.Low {
return -1
}
return 0
}

// ReqFromMasterEnabledMA returns true if the request is sent by the master
// controller.
func ReqFromMasterEnabledMA(req *gnmipb.SetRequest, masterEID *uint128) (bool, error) {
// Read the election_id.
reqEID := uint128{High: 0, Low: 0}
hasMaExt := false
// It can be one of many extensions, so iterate through them to find it.
for _, e := range req.GetExtension() {
ma := e.GetMasterArbitration()
if ma == nil {
continue
}

hasMaExt = true
// The Master Arbitration descriptor has been found.
if ma.ElectionId == nil {
return false, status.Errorf(codes.InvalidArgument, "MA: ElectionId missing")
}

if ma.Role != nil {
// Role will be implemented later.
return false, status.Errorf(codes.Unimplemented, "MA: Role is not implemented")
}

reqEID = uint128{High: ma.ElectionId.High, Low: ma.ElectionId.Low}
// Use the election ID that is in the last extension, so, no 'break' here.
}

if !hasMaExt {
log.V(0).Infof("MA: No Master Arbitration in setRequest extension, masterEID %v is not updated", masterEID)
return false, nil
}

maMu.Lock()
defer maMu.Unlock()
switch masterEID.Compare(&reqEID) {
case 1: // This Election ID is smaller than the known Master Election ID.
return false, status.Errorf(codes.PermissionDenied, "Election ID is smaller than the current master. Rejected. Master EID: %v. Current EID: %v.", masterEID, reqEID)
case -1: // New Master Election ID received!
log.V(0).Infof("New master has been elected with %v\n", reqEID)
*masterEID = reqEID
}
return true, nil
}

// ReqFromMasterDisabledMA always returns true. It is used when Master Arbitration
// is disabled.
func ReqFromMasterDisabledMA(req *gnmipb.SetRequest, masterEID *uint128) (bool, error) {
return false, nil
}
214 changes: 210 additions & 4 deletions gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"path/filepath"
"flag"
"fmt"
"sync"
"sync"
"strings"
"unsafe"

Expand Down Expand Up @@ -306,28 +306,42 @@ const (

func runTestSet(t *testing.T, ctx context.Context, gClient pb.GNMIClient, pathTarget string,
textPbPath string, wantRetCode codes.Code, wantRespVal interface{}, attributeData string, op op_t) {
t.Helper()
// Send request
var pbPath pb.Path
if err := proto.UnmarshalText(textPbPath, &pbPath); err != nil {
t.Fatalf("error in unmarshaling path: %v %v", textPbPath, err)
}
req := &pb.SetRequest{}
switch op {
case Replace:
case Replace, Update:
prefix := pb.Path{Target: pathTarget}
var v *pb.TypedValue
v = &pb.TypedValue{
Value: &pb.TypedValue_JsonIetfVal{JsonIetfVal: extractJSON(attributeData)}}
data := []*pb.Update{{Path: &pbPath, Val: v}}

req = &pb.SetRequest{
Prefix: &prefix,
Replace: []*pb.Update{&pb.Update{Path: &pbPath, Val: v}},
}
if op == Replace {
req.Replace = data
} else {
req.Update = data
}
case Delete:
req = &pb.SetRequest{
Delete: []*pb.Path{&pbPath},
}
}

runTestSetRaw(t, ctx, gClient, req, wantRetCode)
}

func runTestSetRaw(t *testing.T, ctx context.Context, gClient pb.GNMIClient, req *pb.SetRequest,
wantRetCode codes.Code) {
t.Helper()

_, err := gClient.Set(ctx, req)
gotRetStatus, ok := status.FromError(err)
if !ok {
Expand Down Expand Up @@ -2744,6 +2758,7 @@ func TestBulkSet(t *testing.T) {

req := &pb.SetRequest{
Update: []*pb.Update{update1, update2},
Extension: []*ext_pb.Extension{},
}

_, err = gClient.Set(ctx, req)
Expand Down Expand Up @@ -3431,11 +3446,202 @@ func TestParseOrigin(t *testing.T) {
}
}

func TestMasterArbitration(t *testing.T) {
s := createServer(t, 8088)
// Turn on Master Arbitration
s.ReqFromMaster = ReqFromMasterEnabledMA
go runServer(t, s)
defer s.s.Stop()

tlsConfig := &tls.Config{InsecureSkipVerify: true}
opts := []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))}

//targetAddr := "30.57.185.38:8080"
targetAddr := "127.0.0.1:8088"
conn, err := grpc.Dial(targetAddr, opts...)
if err != nil {
t.Fatalf("Dialing to %q failed: %v", targetAddr, err)
}
defer conn.Close()

gClient := pb.NewGNMIClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

maExt0 := &ext_pb.Extension{
Ext: &ext_pb.Extension_MasterArbitration{
MasterArbitration: &ext_pb.MasterArbitration{
ElectionId: &ext_pb.Uint128{High: 0, Low: 0},
},
},
}
maExt1 := &ext_pb.Extension{
Ext: &ext_pb.Extension_MasterArbitration{
MasterArbitration: &ext_pb.MasterArbitration{
ElectionId: &ext_pb.Uint128{High: 0, Low: 1},
},
},
}
maExt1H0L := &ext_pb.Extension{
Ext: &ext_pb.Extension_MasterArbitration{
MasterArbitration: &ext_pb.MasterArbitration{
ElectionId: &ext_pb.Uint128{High: 1, Low: 0},
},
},
}
regExt := &ext_pb.Extension{
Ext: &ext_pb.Extension_RegisteredExt{
RegisteredExt: &ext_pb.RegisteredExtension{},
},
}

// By default ElectionID starts from 0 so this test does not change it.
t.Run("MasterArbitrationOnElectionIdZero", func(t *testing.T) {
req := &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt0},
}
_, err = gClient.Set(ctx, req)
if err != nil {
t.Fatal("Did not expected an error: " + err.Error())
}
if _, ok := status.FromError(err); !ok {
t.Fatal("Got a non-grpc error from grpc call")
}
reqEid0 := maExt0.GetMasterArbitration().GetElectionId()
expectedEID0 := uint128{High: reqEid0.GetHigh(), Low: reqEid0.GetLow()}
if s.masterEID.Compare(&expectedEID0) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID0, s.masterEID)
}
})
// After this test ElectionID is one.
t.Run("MasterArbitrationOnElectionIdZeroThenOne", func(t *testing.T) {
req := &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt0},
}
if _, err = gClient.Set(ctx, req); err != nil {
t.Fatal("Did not expected an error: " + err.Error())
}
reqEid0 := maExt0.GetMasterArbitration().GetElectionId()
expectedEID0 := uint128{High: reqEid0.GetHigh(), Low: reqEid0.GetLow()}
if s.masterEID.Compare(&expectedEID0) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID0, s.masterEID)
}
req = &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt1},
}
if _, err = gClient.Set(ctx, req); err != nil {
t.Fatal("Set gRPC failed")
}
reqEid1 := maExt1.GetMasterArbitration().GetElectionId()
expectedEID1 := uint128{High: reqEid1.GetHigh(), Low: reqEid1.GetLow()}
if s.masterEID.Compare(&expectedEID1) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID1, s.masterEID)
}
})
// Multiple ElectionIDs with the last being one.
t.Run("MasterArbitrationOnElectionIdMultipleIdsZeroThenOne", func(t *testing.T) {
req := &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt0, maExt1, regExt},
}
_, err = gClient.Set(ctx, req)
if err != nil {
t.Fatal("Did not expected an error: " + err.Error())
}
if _, ok := status.FromError(err); !ok {
t.Fatal("Got a non-grpc error from grpc call")
}
reqEid1 := maExt1.GetMasterArbitration().GetElectionId()
expectedEID1 := uint128{High: reqEid1.GetHigh(), Low: reqEid1.GetLow()}
if s.masterEID.Compare(&expectedEID1) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID1, s.masterEID)
}
})
// ElectionIDs with the high word set to 1 and low word to 0.
t.Run("MasterArbitrationOnElectionIdHighOne", func(t *testing.T) {
req := &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt1H0L},
}
_, err = gClient.Set(ctx, req)
if err != nil {
t.Fatal("Did not expected an error: " + err.Error())
}
if _, ok := status.FromError(err); !ok {
t.Fatal("Got a non-grpc error from grpc call")
}
reqEid10 := maExt1H0L.GetMasterArbitration().GetElectionId()
expectedEID10 := uint128{High: reqEid10.GetHigh(), Low: reqEid10.GetLow()}
if s.masterEID.Compare(&expectedEID10) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID10, s.masterEID)
}
})
// As the ElectionID is one, a request with ElectionID==0 will fail.
// Also a request without Election ID will fail.
t.Run("MasterArbitrationOnElectionIdZeroThenNone", func(t *testing.T) {
req := &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt0},
}
_, err = gClient.Set(ctx, req)
if err == nil {
t.Fatal("Expected a PermissionDenied error")
}
ret, ok := status.FromError(err)
if !ok {
t.Fatal("Got a non-grpc error from grpc call")
}
if ret.Code() != codes.PermissionDenied {
t.Fatalf("Expected PermissionDenied. Got %v", ret.Code())
}
reqEid10 := maExt1H0L.GetMasterArbitration().GetElectionId()
expectedEID10 := uint128{High: reqEid10.GetHigh(), Low: reqEid10.GetLow()}
if s.masterEID.Compare(&expectedEID10) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID10, s.masterEID)
}
req = &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{},
}
_, err = gClient.Set(ctx, req)
if err != nil {
t.Fatal("Expected a successful set call.")
}
if s.masterEID.Compare(&expectedEID10) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID10, s.masterEID)
}
})
}

func init() {
// Enable logs at UT setup
flag.Lookup("v").Value.Set("10")
flag.Lookup("log_dir").Value.Set("/tmp/telemetrytest")

// Inform gNMI server to use redis tcp localhost connection
sdc.UseRedisLocalTcpPort = true
}
}
Loading

0 comments on commit 9f98862

Please sign in to comment.