Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of Master Arbitration. #96

Merged
merged 6 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions gnmi_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type Server struct {
config *Config
cMu sync.Mutex
clients map[string]*Client
// ReqFromMaster point to a function that is called to verify if the request
// comes from a master controller.
ReqFromMaster func(req *gnmipb.SetRequest, masterEID *uint128) error
masterEID uint128
}
type AuthTypes map[string]bool

Expand All @@ -56,6 +60,7 @@ type Config struct {
}

var AuthLock sync.Mutex
var maMu sync.Mutex

func (i AuthTypes) String() string {
if i["none"] {
Expand Down Expand Up @@ -136,6 +141,10 @@ func NewServer(config *Config, opts []grpc.ServerOption) (*Server, error) {
s: s,
config: config,
clients: map[string]*Client{},
// ReqFromMaster point to a function that is called to verify if
// the request comes from a master controller.
ReqFromMaster: ReqFromMasterDisabledMA,
masterEID: uint128{High: 0, Low: 0},
}
var err error
if srv.config.Port < 0 {
Expand Down Expand Up @@ -377,6 +386,11 @@ func (s *Server) Get(ctx context.Context, req *gnmipb.GetRequest) (*gnmipb.GetRe
}

func (s *Server) Set(ctx context.Context, req *gnmipb.SetRequest) (*gnmipb.SetResponse, error) {
e := s.ReqFromMaster(req, &s.masterEID)
if e != nil {
return nil, e
}

common_utils.IncCounter(common_utils.GNMI_SET)
if s.config.EnableTranslibWrite == false && s.config.EnableNativeWrite == false {
common_utils.IncCounter(common_utils.GNMI_SET_FAIL)
Expand Down Expand Up @@ -515,3 +529,79 @@ func (s *Server) Capabilities(ctx context.Context, req *gnmipb.CapabilityRequest
GNMIVersion: "0.7.0",
Extension: exts}, nil
}

type uint128 struct {
High uint64
Low uint64
}

func (lh *uint128) Compare(rh *uint128) int {
sallylsy marked this conversation as resolved.
Show resolved Hide resolved
if rh == nil {
// For MA disabled case, EID supposed to be 0.
rh = &uint128{High: 0, Low: 0}
}
if lh.High > rh.High {
return 1
}
if lh.High < rh.High {
return -1
}
if lh.Low > rh.Low {
return 1
}
if lh.Low < rh.Low {
return -1
}
return 0
}

// ReqFromMasterEnabledMA returns true if the request is sent by the master
// controller.
func ReqFromMasterEnabledMA(req *gnmipb.SetRequest, masterEID *uint128) error {
// Read the election_id.
reqEID := uint128{High: 0, Low: 0}
hasMaExt := false
// It can be one of many extensions, so iterate through them to find it.
for _, e := range req.GetExtension() {
ma := e.GetMasterArbitration()
sallylsy marked this conversation as resolved.
Show resolved Hide resolved
if ma == nil {
continue
}

hasMaExt = true
// The Master Arbitration descriptor has been found.
if ma.ElectionId == nil {
return status.Errorf(codes.InvalidArgument, "MA: ElectionId missing")
}
sallylsy marked this conversation as resolved.
Show resolved Hide resolved

if ma.Role != nil {
// Role will be implemented later.
return status.Errorf(codes.Unimplemented, "MA: Role is not implemented")
}

reqEID = uint128{High: ma.ElectionId.High, Low: ma.ElectionId.Low}
// Use the election ID that is in the last extension, so, no 'break' here.
sallylsy marked this conversation as resolved.
Show resolved Hide resolved
}

if !hasMaExt {
log.V(0).Infof("MA: No Master Arbitration in setRequest extension, masterEID %v is not updated", masterEID)
return nil
}

maMu.Lock()
defer maMu.Unlock()
switch masterEID.Compare(&reqEID) {
sallylsy marked this conversation as resolved.
Show resolved Hide resolved
case 1: // This Election ID is smaller than the known Master Election ID.
return status.Errorf(codes.PermissionDenied, "Election ID is smaller than the current master. Rejected. Master EID: %v. Current EID: %v.", masterEID, reqEID)
case -1: // New Master Election ID received!
log.V(0).Infof("New master has been elected with %v\n", reqEID)
*masterEID = reqEID
sallylsy marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

// ReqFromMasterDisabledMA always returns true. It is used when Master Arbitration
// is disabled.
func ReqFromMasterDisabledMA(req *gnmipb.SetRequest, masterEID *uint128) error {
return nil
}
195 changes: 193 additions & 2 deletions gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"path/filepath"
"flag"
"fmt"
"sync"
"sync"
"strings"
"unsafe"

Expand Down Expand Up @@ -3610,11 +3610,202 @@ func TestParseOrigin(t *testing.T) {
}
}

func TestMasterArbitration(t *testing.T) {
s := createServer(t, 8088)
// Turn on Master Arbitration
s.ReqFromMaster = ReqFromMasterEnabledMA
go runServer(t, s)
defer s.s.Stop()

tlsConfig := &tls.Config{InsecureSkipVerify: true}
opts := []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))}

//targetAddr := "30.57.185.38:8080"
targetAddr := "127.0.0.1:8088"
conn, err := grpc.Dial(targetAddr, opts...)
if err != nil {
t.Fatalf("Dialing to %q failed: %v", targetAddr, err)
}
defer conn.Close()

gClient := pb.NewGNMIClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

maExt0 := &ext_pb.Extension{
Ext: &ext_pb.Extension_MasterArbitration{
MasterArbitration: &ext_pb.MasterArbitration{
ElectionId: &ext_pb.Uint128{High: 0, Low: 0},
},
},
}
maExt1 := &ext_pb.Extension{
Ext: &ext_pb.Extension_MasterArbitration{
MasterArbitration: &ext_pb.MasterArbitration{
ElectionId: &ext_pb.Uint128{High: 0, Low: 1},
},
},
}
maExt1H0L := &ext_pb.Extension{
Ext: &ext_pb.Extension_MasterArbitration{
MasterArbitration: &ext_pb.MasterArbitration{
ElectionId: &ext_pb.Uint128{High: 1, Low: 0},
},
},
}
regExt := &ext_pb.Extension{
Ext: &ext_pb.Extension_RegisteredExt{
RegisteredExt: &ext_pb.RegisteredExtension{},
},
}

// By default ElectionID starts from 0 so this test does not change it.
t.Run("MasterArbitrationOnElectionIdZero", func(t *testing.T) {
req := &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt0},
}
_, err = gClient.Set(ctx, req)
if err != nil {
t.Fatal("Did not expected an error: " + err.Error())
}
if _, ok := status.FromError(err); !ok {
sallylsy marked this conversation as resolved.
Show resolved Hide resolved
sallylsy marked this conversation as resolved.
Show resolved Hide resolved
t.Fatal("Got a non-grpc error from grpc call")
}
reqEid0 := maExt0.GetMasterArbitration().GetElectionId()
expectedEID0 := uint128{High: reqEid0.GetHigh(), Low: reqEid0.GetLow()}
if s.masterEID.Compare(&expectedEID0) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID0, s.masterEID)
}
})
// After this test ElectionID is one.
t.Run("MasterArbitrationOnElectionIdZeroThenOne", func(t *testing.T) {
req := &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt0},
}
if _, err = gClient.Set(ctx, req); err != nil {
t.Fatal("Did not expected an error: " + err.Error())
}
reqEid0 := maExt0.GetMasterArbitration().GetElectionId()
expectedEID0 := uint128{High: reqEid0.GetHigh(), Low: reqEid0.GetLow()}
if s.masterEID.Compare(&expectedEID0) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID0, s.masterEID)
}
req = &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt1},
}
if _, err = gClient.Set(ctx, req); err != nil {
t.Fatal("Set gRPC failed")
}
reqEid1 := maExt1.GetMasterArbitration().GetElectionId()
expectedEID1 := uint128{High: reqEid1.GetHigh(), Low: reqEid1.GetLow()}
if s.masterEID.Compare(&expectedEID1) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID1, s.masterEID)
}
sallylsy marked this conversation as resolved.
Show resolved Hide resolved
})
// Multiple ElectionIDs with the last being one.
t.Run("MasterArbitrationOnElectionIdMultipleIdsZeroThenOne", func(t *testing.T) {
req := &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt0, maExt1, regExt},
}
_, err = gClient.Set(ctx, req)
if err != nil {
t.Fatal("Did not expected an error: " + err.Error())
}
if _, ok := status.FromError(err); !ok {
t.Fatal("Got a non-grpc error from grpc call")
}
reqEid1 := maExt1.GetMasterArbitration().GetElectionId()
expectedEID1 := uint128{High: reqEid1.GetHigh(), Low: reqEid1.GetLow()}
if s.masterEID.Compare(&expectedEID1) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID1, s.masterEID)
}
})
// ElectionIDs with the high word set to 1 and low word to 0.
t.Run("MasterArbitrationOnElectionIdHighOne", func(t *testing.T) {
req := &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt1H0L},
}
_, err = gClient.Set(ctx, req)
if err != nil {
t.Fatal("Did not expected an error: " + err.Error())
}
if _, ok := status.FromError(err); !ok {
t.Fatal("Got a non-grpc error from grpc call")
}
reqEid10 := maExt1H0L.GetMasterArbitration().GetElectionId()
expectedEID10 := uint128{High: reqEid10.GetHigh(), Low: reqEid10.GetLow()}
if s.masterEID.Compare(&expectedEID10) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID10, s.masterEID)
}
})
// As the ElectionID is one, a request with ElectionID==0 will fail.
// Also a request without Election ID will fail.
sallylsy marked this conversation as resolved.
Show resolved Hide resolved
t.Run("MasterArbitrationOnElectionIdZeroThenNone", func(t *testing.T) {
req := &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{maExt0},
}
_, err = gClient.Set(ctx, req)
if err == nil {
t.Fatal("Expected a PermissionDenied error")
}
ret, ok := status.FromError(err)
if !ok {
t.Fatal("Got a non-grpc error from grpc call")
}
if ret.Code() != codes.PermissionDenied {
t.Fatalf("Expected PermissionDenied. Got %v", ret.Code())
}
reqEid10 := maExt1H0L.GetMasterArbitration().GetElectionId()
expectedEID10 := uint128{High: reqEid10.GetHigh(), Low: reqEid10.GetLow()}
if s.masterEID.Compare(&expectedEID10) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID10, s.masterEID)
}
req = &pb.SetRequest{
Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}},
Update: []*pb.Update{
newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`),
},
Extension: []*ext_pb.Extension{},
}
_, err = gClient.Set(ctx, req)
if err != nil {
t.Fatal("Expected a successful set call.")
}
if s.masterEID.Compare(&expectedEID10) != 0 {
t.Fatalf("Master EID update failed. Want %v, got %v", expectedEID10, s.masterEID)
}
})
}

func init() {
// Enable logs at UT setup
flag.Lookup("v").Value.Set("10")
flag.Lookup("log_dir").Value.Set("/tmp/telemetrytest")

// Inform gNMI server to use redis tcp localhost connection
sdc.UseRedisLocalTcpPort = true
}
}
5 changes: 5 additions & 0 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
gnmi_translib_write = flag.Bool("gnmi_translib_write", gnmi.ENABLE_TRANSLIB_WRITE, "Enable gNMI translib write for management framework")
gnmi_native_write = flag.Bool("gnmi_native_write", gnmi.ENABLE_NATIVE_WRITE, "Enable gNMI native write")
threshold = flag.Int("threshold", 100, "max number of client connections")
withMasterArbitration = flag.Bool("with-master-arbitration", false, "Enables master arbitration policy.")
sallylsy marked this conversation as resolved.
Show resolved Hide resolved
idle_conn_duration = flag.Int("idle_conn_duration", 5, "Seconds before server closes idle connections")
)

Expand Down Expand Up @@ -177,6 +178,10 @@ func main() {
return
}

if *withMasterArbitration {
s.ReqFromMaster = gnmi.ReqFromMasterEnabledMA
}

log.V(1).Infof("Auth Modes: ", userAuth)
log.V(1).Infof("Starting RPC server on address: %s", s.Address())
s.Serve() // blocks until close
Expand Down