Skip to content

Commit

Permalink
feat: add health check to host route sync
Browse files Browse the repository at this point in the history
Host route syncing is an important part of kube-router that is run in
its own goroutine, so we should be checking it for health the way we do
the major controllers to make sure that it never stops functioning.

Additionally, as the health check controller has been continuously added
on to, we also make some modifications here to make it a bit more robust
and scalable.
  • Loading branch information
aauren committed Oct 27, 2024
1 parent 0905d78 commit c204b26
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 47 deletions.
3 changes: 2 additions & 1 deletion pkg/cmd/kube-router.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"syscall"
"time"

"github.com/cloudnativelabs/kube-router/v2/pkg"
"github.com/cloudnativelabs/kube-router/v2/pkg/controllers/lballoc"
"github.com/cloudnativelabs/kube-router/v2/pkg/controllers/netpol"
"github.com/cloudnativelabs/kube-router/v2/pkg/controllers/proxy"
Expand Down Expand Up @@ -85,7 +86,7 @@ func (kr *KubeRouter) Run() error {
os.Exit(0)
}

healthChan := make(chan *healthcheck.ControllerHeartbeat, healthControllerChannelLength)
healthChan := make(chan *pkg.ControllerHeartbeat, healthControllerChannelLength)
defer close(healthChan)
stopCh := make(chan struct{})

Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/lballoc/lballoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/cloudnativelabs/kube-router/v2/pkg"
"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/v2/pkg/options"
v1core "k8s.io/api/core/v1"
Expand Down Expand Up @@ -432,7 +433,7 @@ func (lbc *LoadBalancerController) allocator() {
}
}

func (lbc *LoadBalancerController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat,
func (lbc *LoadBalancerController) Run(healthChan chan<- *pkg.ControllerHeartbeat,
stopCh <-chan struct{}, wg *sync.WaitGroup) {
isLeader := false
isLeaderChan := make(chan bool)
Expand Down Expand Up @@ -461,7 +462,7 @@ func (lbc *LoadBalancerController) Run(healthChan chan<- *healthcheck.Controller
}
case <-timer.C:
timer.Reset(time.Minute)
healthcheck.SendHeartBeat(healthChan, "LBC")
healthcheck.SendHeartBeat(healthChan, pkg.HeartBeatCompLoadBalancerController)
if isLeader {
go lbc.walkServices()
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/netpol/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type NetworkPolicyController struct {
mu sync.Mutex
syncPeriod time.Duration
MetricsEnabled bool
healthChan chan<- *healthcheck.ControllerHeartbeat
healthChan chan<- *pkg.ControllerHeartbeat
fullSyncRequestChan chan struct{}
ipsetMutex *sync.Mutex

Expand Down Expand Up @@ -154,7 +154,7 @@ type protocol2eps map[string]numericPort2eps
type namedPort2eps map[string]protocol2eps

// Run runs forever till we receive notification on stopCh
func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{},
func (npc *NetworkPolicyController) Run(healthChan chan<- *pkg.ControllerHeartbeat, stopCh <-chan struct{},
wg *sync.WaitGroup) {
t := time.NewTicker(npc.syncPeriod)
defer t.Stop()
Expand Down Expand Up @@ -242,7 +242,7 @@ func (npc *NetworkPolicyController) fullPolicySync() {
}
}

healthcheck.SendHeartBeat(npc.healthChan, "NPC")
healthcheck.SendHeartBeat(npc.healthChan, pkg.HeartBeatCompNetworkPolicyController)
start := time.Now()
syncVersion := strconv.FormatInt(start.UnixNano(), syncVersionBase)
defer func() {
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/proxy/hairpin_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/cloudnativelabs/kube-router/v2/pkg"
"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/v2/pkg/utils"
"github.com/vishvananda/netns"
Expand All @@ -27,7 +28,7 @@ type hairpinController struct {
}

func (hpc *hairpinController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup,
healthChan chan<- *healthcheck.ControllerHeartbeat) {
healthChan chan<- *pkg.ControllerHeartbeat) {
defer wg.Done()
klog.Infof("Starting hairping controller (handles setting hairpin_mode for veth interfaces)")

Expand All @@ -54,7 +55,7 @@ func (hpc *hairpinController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup,
endpointIP, err)
}
case <-t.C:
healthcheck.SendHeartBeat(healthChan, "HPC")
healthcheck.SendHeartBeat(healthChan, pkg.HeartBeatCompHairpinController)
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/controllers/proxy/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ type endpointSliceInfo struct {
type endpointSliceInfoMap map[string][]endpointSliceInfo

// Run periodically sync ipvs configuration to reflect desired state of services and endpoints
func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat,
func (nsc *NetworkServicesController) Run(healthChan chan<- *pkg.ControllerHeartbeat,
stopCh <-chan struct{}, wg *sync.WaitGroup) {
t := time.NewTicker(nsc.syncPeriod)
defer t.Stop()
Expand Down Expand Up @@ -341,7 +341,7 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control
}

case perform := <-nsc.syncChan:
healthcheck.SendHeartBeat(healthChan, "NSC")
healthcheck.SendHeartBeat(healthChan, pkg.HeartBeatCompNetworkServicesController)
switch perform {
case synctypeAll:
klog.V(1).Info("Performing requested full sync of services")
Expand All @@ -366,18 +366,18 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control
nsc.mu.Unlock()
}
if err == nil {
healthcheck.SendHeartBeat(healthChan, "NSC")
healthcheck.SendHeartBeat(healthChan, pkg.HeartBeatCompNetworkServicesController)
}

case <-t.C:
klog.V(1).Info("Performing periodic sync of ipvs services")
healthcheck.SendHeartBeat(healthChan, "NSC")
healthcheck.SendHeartBeat(healthChan, pkg.HeartBeatCompNetworkServicesController)
err := nsc.doSync()
if err != nil {
klog.Errorf("Error during periodic ipvs sync in network service controller. Error: " + err.Error())
klog.Errorf("Skipping sending heartbeat from network service controller as periodic sync failed.")
} else {
healthcheck.SendHeartBeat(healthChan, "NSC")
healthcheck.SendHeartBeat(healthChan, pkg.HeartBeatCompNetworkServicesController)
}
}
}
Expand Down
17 changes: 13 additions & 4 deletions pkg/controllers/routing/host_route_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/cloudnativelabs/kube-router/v2/pkg"
"github.com/cloudnativelabs/kube-router/v2/pkg/bgp"
"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/v2/pkg/metrics"
"github.com/cloudnativelabs/kube-router/v2/pkg/routes"

Expand Down Expand Up @@ -199,21 +200,22 @@ func (rs *RouteSync) checkCacheAgainstBGP() error {
}

// syncLocalRouteTable iterates over the local route state map and syncs all routes to the kernel's routing table
func (rs *RouteSync) SyncLocalRouteTable() {
func (rs *RouteSync) SyncLocalRouteTable() error {
rs.mutex.Lock()
defer rs.mutex.Unlock()
klog.V(2).Infof("Running local route table synchronization")
for _, route := range rs.routeTableStateMap {
klog.V(3).Infof("Syncing route: %s -> %s via %s", route.Src, route.Dst, route.Gw)
err := rs.routeReplacer(route)
if err != nil {
klog.Errorf("Route could not be replaced due to : " + err.Error())
return err
}
}
return nil
}

// run starts a goroutine that calls syncLocalRouteTable on interval injectedRoutesSyncPeriod
func (rs *RouteSync) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
func (rs *RouteSync) Run(healthChan chan<- *pkg.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) {
// Start route synchronization routine
wg.Add(1)
go func(stopCh <-chan struct{}, wg *sync.WaitGroup) {
Expand All @@ -225,7 +227,14 @@ func (rs *RouteSync) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
for {
select {
case <-t1.C:
rs.SyncLocalRouteTable()
err := rs.SyncLocalRouteTable()
if err != nil {
klog.Errorf("Route could not be replaced due to : " + err.Error())
}
// Some of our unit tests send a nil health channel
if nil != healthChan {
healthcheck.SendHeartBeat(healthChan, pkg.HeartBeatCompHostRouteSync)
}
case <-t2.C:
err := rs.checkCacheAgainstBGP()
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/controllers/routing/host_route_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ func (mnl *mockNetlink) waitForSyncLocalRouteToAcquireLock(syncer pkg.RouteSynce
// we try to use it in addInjectedRoute() below
mnl.wg = &sync.WaitGroup{}
mnl.wg.Add(1)
go syncer.SyncLocalRouteTable()
go func() {
_ = syncer.SyncLocalRouteTable()
}()

// Now we know that the syncLocalRouteTable() is paused on our artificial wait we added above
mnl.wg.Wait()
Expand Down Expand Up @@ -230,7 +232,7 @@ func Test_routeSyncer_run(t *testing.T) {
assert.Nil(t, myNetLink.currentRoute, "currentRoute should be nil when the syncer hasn't run")

myNetLink.wg.Add(1)
syncer.Run(stopCh, &wg)
syncer.Run(nil, stopCh, &wg)

timedOut := waitTimeout(myNetLink.wg, 110*time.Millisecond)
assert.False(t, timedOut, "Run should have not timed out and instead should have added a route")
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/routing/network_routes_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type NetworkRoutingController struct {
}

// Run runs forever until we are notified on stop channel
func (nrc *NetworkRoutingController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{},
func (nrc *NetworkRoutingController) Run(healthChan chan<- *pkg.ControllerHeartbeat, stopCh <-chan struct{},
wg *sync.WaitGroup) {
var err error
if nrc.enableCNI {
Expand Down Expand Up @@ -322,7 +322,7 @@ func (nrc *NetworkRoutingController) Run(healthChan chan<- *healthcheck.Controll

// Start route syncer
nrc.routeSyncer.AddBGPPathLister(nrc.bgpServer)
nrc.routeSyncer.Run(stopCh, wg)
nrc.routeSyncer.Run(healthChan, stopCh, wg)

// Watch for BGP Updates
go nrc.watchBgpUpdates(nrc.routerInjector)
Expand Down Expand Up @@ -388,7 +388,7 @@ func (nrc *NetworkRoutingController) Run(healthChan chan<- *healthcheck.Controll
}

if err == nil {
healthcheck.SendHeartBeat(healthChan, "NRC")
healthcheck.SendHeartBeat(healthChan, pkg.HeartBeatCompNetworkRoutesController)
} else {
klog.Errorf("Error during periodic sync in network routing controller. Error: " + err.Error())
klog.Errorf("Skipping sending heartbeat from network routing controller as periodic sync failed.")
Expand Down
48 changes: 29 additions & 19 deletions pkg/healthcheck/health_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/cloudnativelabs/kube-router/v2/pkg"
"github.com/cloudnativelabs/kube-router/v2/pkg/options"
"golang.org/x/net/context"
"k8s.io/klog/v2"
Expand All @@ -18,12 +19,6 @@ const (
healthControllerTickTime = 5000 * time.Millisecond
)

// ControllerHeartbeat is the structure to hold the heartbeats sent by controllers
type ControllerHeartbeat struct {
Component string
LastHeartBeat time.Time
}

// HealthController reports the health of the controller loops as a http endpoint
type HealthController struct {
HealthPort uint16
Expand All @@ -47,12 +42,14 @@ type HealthStats struct {
NetworkServicesControllerAliveTTL time.Duration
HairpinControllerAlive time.Time
HairpinControllerAliveTTL time.Duration
HostRouteSyncAlive time.Time
HostRouteSyncAliveTTL time.Duration
}

// SendHeartBeat sends a heartbeat on the passed channel
func SendHeartBeat(channel chan<- *ControllerHeartbeat, controller string) {
heartbeat := ControllerHeartbeat{
Component: controller,
func SendHeartBeat(channel chan<- *pkg.ControllerHeartbeat, component int) {
heartbeat := pkg.ControllerHeartbeat{
Component: component,
LastHeartBeat: time.Now(),
}
channel <- &heartbeat
Expand Down Expand Up @@ -87,46 +84,52 @@ func (hc *HealthController) Handler(w http.ResponseWriter, _ *http.Request) {
}

// HandleHeartbeat handles received heartbeats on the health channel
func (hc *HealthController) HandleHeartbeat(beat *ControllerHeartbeat) {
klog.V(3).Infof("Received heartbeat from %s", beat.Component)
func (hc *HealthController) HandleHeartbeat(beat *pkg.ControllerHeartbeat) {
klog.V(3).Infof("Received heartbeat from %s", pkg.HeartBeatCompNames[beat.Component])

hc.Status.Lock()
defer hc.Status.Unlock()

switch {
switch beat.Component {
// The first heartbeat will set the initial gracetime the controller has to report in, A static time is added as
// well when checking to allow for load variation in sync time
case beat.Component == "LBC":
case pkg.HeartBeatCompLoadBalancerController:
if hc.Status.LoadBalancerControllerAliveTTL == 0 {
hc.Status.LoadBalancerControllerAliveTTL = time.Since(hc.Status.LoadBalancerControllerAlive)
}
hc.Status.LoadBalancerControllerAlive = beat.LastHeartBeat

case beat.Component == "NSC":
case pkg.HeartBeatCompNetworkServicesController:
if hc.Status.NetworkServicesControllerAliveTTL == 0 {
hc.Status.NetworkServicesControllerAliveTTL = time.Since(hc.Status.NetworkServicesControllerAlive)
}
hc.Status.NetworkServicesControllerAlive = beat.LastHeartBeat

case beat.Component == "HPC":
case pkg.HeartBeatCompHairpinController:
if hc.Status.HairpinControllerAliveTTL == 0 {
hc.Status.HairpinControllerAliveTTL = time.Since(hc.Status.HairpinControllerAlive)
}
hc.Status.HairpinControllerAlive = beat.LastHeartBeat

case beat.Component == "NRC":
case pkg.HeartBeatCompNetworkRoutesController:
if hc.Status.NetworkRoutingControllerAliveTTL == 0 {
hc.Status.NetworkRoutingControllerAliveTTL = time.Since(hc.Status.NetworkRoutingControllerAlive)
}
hc.Status.NetworkRoutingControllerAlive = beat.LastHeartBeat

case beat.Component == "NPC":
case pkg.HeartBeatCompHostRouteSync:
if hc.Status.HostRouteSyncAliveTTL == 0 {
hc.Status.HostRouteSyncAliveTTL = time.Since(hc.Status.HostRouteSyncAlive)
}
hc.Status.HostRouteSyncAlive = beat.LastHeartBeat

case pkg.HeartBeatCompNetworkPolicyController:
if hc.Status.NetworkPolicyControllerAliveTTL == 0 {
hc.Status.NetworkPolicyControllerAliveTTL = time.Since(hc.Status.NetworkPolicyControllerAlive)
}
hc.Status.NetworkPolicyControllerAlive = beat.LastHeartBeat

case beat.Component == "MC":
case pkg.HeartBeatCompMetricsController:
hc.Status.MetricsControllerAlive = beat.LastHeartBeat
}
}
Expand Down Expand Up @@ -158,6 +161,11 @@ func (hc *HealthController) CheckHealth() bool {
klog.Error("Network Routing Controller heartbeat missed")
health = false
}
if time.Since(hc.Status.HostRouteSyncAlive) >
hc.Config.InjectedRoutesSyncPeriod+hc.Status.HostRouteSyncAliveTTL+graceTime {
klog.Error("Host Route Sync Controller heartbeat missed")
health = false
}
}

if hc.Config.RunServiceProxy {
Expand All @@ -166,6 +174,7 @@ func (hc *HealthController) CheckHealth() bool {
klog.Error("NetworkService Controller heartbeat missed")
health = false
}
// !!!! The HAIRPIN controller is not currently used as it is handled in the CNI plugin !!!!
// if time.Since(hc.Status.HairpinControllerAlive) >
// HPCSyncPeriod+hc.Status.HairpinControllerAliveTTL+graceTime {
// klog.Error("Hairpin Controller heartbeat missed")
Expand Down Expand Up @@ -214,7 +223,7 @@ func (hc *HealthController) RunServer(stopCh <-chan struct{}, wg *sync.WaitGroup
}

// RunCheck starts the HealthController's check
func (hc *HealthController) RunCheck(healthChan <-chan *ControllerHeartbeat, stopCh <-chan struct{},
func (hc *HealthController) RunCheck(healthChan <-chan *pkg.ControllerHeartbeat, stopCh <-chan struct{},
wg *sync.WaitGroup) {
t := time.NewTicker(healthControllerTickTime)
defer wg.Done()
Expand Down Expand Up @@ -242,6 +251,7 @@ func (hc *HealthController) SetAlive() {
hc.Status.NetworkRoutingControllerAlive = now
hc.Status.NetworkServicesControllerAlive = now
hc.Status.HairpinControllerAlive = now
hc.Status.HostRouteSyncAlive = now
}

// NewHealthController creates a new health controller and returns a reference to it
Expand Down
5 changes: 3 additions & 2 deletions pkg/metrics/metrics_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/cloudnativelabs/kube-router/v2/pkg"
"github.com/cloudnativelabs/kube-router/v2/pkg/healthcheck"
"github.com/cloudnativelabs/kube-router/v2/pkg/options"
"github.com/cloudnativelabs/kube-router/v2/pkg/version"
Expand Down Expand Up @@ -240,7 +241,7 @@ func Handler() http.Handler {
}

// Run prometheus metrics controller
func (mc *Controller) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{},
func (mc *Controller) Run(healthChan chan<- *pkg.ControllerHeartbeat, stopCh <-chan struct{},
wg *sync.WaitGroup) {
t := time.NewTicker(metricsControllerTickTime)
defer wg.Done()
Expand All @@ -266,7 +267,7 @@ func (mc *Controller) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, st
}
}()
for {
healthcheck.SendHeartBeat(healthChan, "MC")
healthcheck.SendHeartBeat(healthChan, pkg.HeartBeatCompMetricsController)
select {
case <-stopCh:
klog.Infof("Shutting down metrics controller")
Expand Down
Loading

0 comments on commit c204b26

Please sign in to comment.