forked from kata-containers/kata-containers
-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitor.go
148 lines (120 loc) · 2.61 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
"context"
"sync"
"time"
"github.com/pkg/errors"
)
const (
defaultCheckInterval = 5 * time.Second
watcherChannelSize = 128
)
// nolint: govet
type monitor struct {
watchers []chan error
sandbox *Sandbox
wg sync.WaitGroup
sync.Mutex
stopCh chan bool
checkInterval time.Duration
running bool
}
func newMonitor(s *Sandbox) *monitor {
return &monitor{
sandbox: s,
checkInterval: defaultCheckInterval,
stopCh: make(chan bool, 1),
}
}
func (m *monitor) newWatcher(ctx context.Context) (chan error, error) {
m.Lock()
defer m.Unlock()
watcher := make(chan error, watcherChannelSize)
m.watchers = append(m.watchers, watcher)
if !m.running {
m.running = true
m.wg.Add(1)
// create and start agent watcher
go func() {
tick := time.NewTicker(m.checkInterval)
for {
select {
case <-m.stopCh:
tick.Stop()
m.wg.Done()
return
case <-tick.C:
m.watchHypervisor(ctx)
m.watchAgent(ctx)
}
}
}()
}
return watcher, nil
}
func (m *monitor) notify(ctx context.Context, err error) {
m.sandbox.agent.markDead(ctx)
m.Lock()
defer m.Unlock()
if !m.running {
return
}
// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()
for _, c := range m.watchers {
// throw away message can not write to channel
// make it not stuck, the first error is useful.
select {
case c <- err:
default:
virtLog.WithField("channel-size", watcherChannelSize).Warnf("watcher channel is full, throw notify message")
}
}
}
func (m *monitor) stop() {
// wait outside of monitor lock for the watcher channel to exit.
defer m.wg.Wait()
m.Lock()
defer m.Unlock()
if !m.running {
return
}
m.stopCh <- true
defer func() {
m.watchers = nil
m.running = false
}()
// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()
for _, c := range m.watchers {
close(c)
}
}
func (m *monitor) watchAgent(ctx context.Context) {
err := m.sandbox.agent.check(ctx)
if err != nil {
// TODO: define and export error types
m.notify(ctx, errors.Wrapf(err, "failed to ping agent"))
}
}
func (m *monitor) watchHypervisor(ctx context.Context) error {
if err := m.sandbox.hypervisor.Check(); err != nil {
m.notify(ctx, errors.Wrapf(err, "failed to ping hypervisor process"))
return err
}
return nil
}