From c523380ba3290355ce32d0e4d87c18e69a75f9a8 Mon Sep 17 00:00:00 2001 From: Starnop Date: Mon, 5 Nov 2018 20:26:02 +0800 Subject: [PATCH] feature: use ringBuffer to make log not blocking and configurable Signed-off-by: Starnop --- daemon/containerio/io.go | 22 +++++ .../logger/logbuffer}/list.go | 10 +- daemon/logger/logbuffer/logbuff.go | 61 ++++++++++++ .../logger/logbuffer}/ringbuff.go | 50 ++++++---- .../logger/logbuffer}/ringbuff_test.go | 96 +++++++++---------- daemon/logger/types.go | 12 +++ daemon/mgr/container.go | 15 ++- daemon/mgr/container_validation.go | 40 +++++++- 8 files changed, 227 insertions(+), 79 deletions(-) rename {pkg/ringbuffer => daemon/logger/logbuffer}/list.go (80%) create mode 100644 daemon/logger/logbuffer/logbuff.go rename {pkg/ringbuffer => daemon/logger/logbuffer}/ringbuff.go (63%) rename {pkg/ringbuffer => daemon/logger/logbuffer}/ringbuff_test.go (56%) diff --git a/daemon/containerio/io.go b/daemon/containerio/io.go index 30e5a9b0c..e1915cfbb 100644 --- a/daemon/containerio/io.go +++ b/daemon/containerio/io.go @@ -6,6 +6,7 @@ import ( "github.com/alibaba/pouch/daemon/logger" "github.com/alibaba/pouch/daemon/logger/crilog" + "github.com/alibaba/pouch/daemon/logger/logbuffer" "github.com/alibaba/pouch/pkg/multierror" "github.com/alibaba/pouch/pkg/streams" @@ -47,6 +48,9 @@ type IO struct { logdriver logger.LogDriver logcopier *logger.LogCopier criLog *crilog.Log + + nonBlock bool + maxBufferSize int64 } // NewIO return IO instance. @@ -87,6 +91,16 @@ func (ctrio *IO) SetLogDriver(logdriver logger.LogDriver) { ctrio.logdriver = logdriver } +// SetMaxBufferSize set the max size of buffer. +func (ctrio *IO) SetMaxBufferSize(maxBufferSize int64) { + ctrio.maxBufferSize = maxBufferSize +} + +// SetNonBlock whether to cache the container's logs with buffer. +func (ctrio *IO) SetNonBlock(nonBlock bool) { + ctrio.nonBlock = nonBlock +} + // Stream is used to export the stream field. func (ctrio *IO) Stream() *streams.Stream { return ctrio.stream @@ -188,6 +202,14 @@ func (ctrio *IO) startLogging() error { return nil } + if ctrio.nonBlock { + logDriver, err := logbuffer.NewLogBuffer(ctrio.logdriver, ctrio.maxBufferSize) + if err != nil { + return err + } + ctrio.logdriver = logDriver + } + ctrio.logcopier = logger.NewLogCopier(ctrio.logdriver, map[string]io.Reader{ "stdout": ctrio.stream.NewStdoutPipe(), "stderr": ctrio.stream.NewStderrPipe(), diff --git a/pkg/ringbuffer/list.go b/daemon/logger/logbuffer/list.go similarity index 80% rename from pkg/ringbuffer/list.go rename to daemon/logger/logbuffer/list.go index 64157cdb7..e7873f26b 100644 --- a/pkg/ringbuffer/list.go +++ b/daemon/logger/logbuffer/list.go @@ -1,14 +1,16 @@ -package ringbuffer +package logbuffer import ( "sync" + + "github.com/alibaba/pouch/daemon/logger" ) var elemPool = &sync.Pool{New: func() interface{} { return new(element) }} type element struct { next, prev *element - val interface{} + val *logger.LogMessage } func (e *element) reset() { @@ -34,7 +36,7 @@ func (q *queue) size() int { return q.count } -func (q *queue) enqueue(val interface{}) { +func (q *queue) enqueue(val *logger.LogMessage) { elem := elemPool.Get().(*element) elem.val = val @@ -47,7 +49,7 @@ func (q *queue) enqueue(val interface{}) { q.count++ } -func (q *queue) dequeue() interface{} { +func (q *queue) dequeue() *logger.LogMessage { if q.size() == 0 { return nil } diff --git a/daemon/logger/logbuffer/logbuff.go b/daemon/logger/logbuffer/logbuff.go new file mode 100644 index 000000000..d4ee27203 --- /dev/null +++ b/daemon/logger/logbuffer/logbuff.go @@ -0,0 +1,61 @@ +package logbuffer + +import ( + "github.com/alibaba/pouch/daemon/logger" + + "github.com/sirupsen/logrus" +) + +// LogBuffer is uses to cache the container's logs with ringBuffer. +type LogBuffer struct { + ringBuffer *RingBuffer + logger logger.LogDriver +} + +// NewLogBuffer return a new BufferLog. +func NewLogBuffer(logDriver logger.LogDriver, maxBytes int64) (logger.LogDriver, error) { + bl := &LogBuffer{ + logger: logDriver, + ringBuffer: NewRingBuffer(maxBytes), + } + + // use a goroutine to write logs continuously with specified log driver + go bl.run() + return bl, nil +} + +// Name return the log driver's name. +func (bl *LogBuffer) Name() string { + return bl.logger.Name() +} + +// WriteLogMessage will write the LogMessage to the ringBuffer. +func (bl *LogBuffer) WriteLogMessage(msg *logger.LogMessage) error { + return bl.ringBuffer.Push(msg) +} + +// Close close the ringBuffer and drain the messages. +func (bl *LogBuffer) Close() error { + bl.ringBuffer.Close() + for _, msg := range bl.ringBuffer.Drain() { + if err := bl.logger.WriteLogMessage(msg); err != nil { + logrus.Debugf("failed to write log %v when closing with log driver %s", msg, bl.logger.Name()) + } + } + + return bl.logger.Close() +} + +// write logs continuously with specified log driver from ringBuffer. +func (bl *LogBuffer) run() { + for { + msg, err := bl.ringBuffer.Pop() + if err != nil { + return + } + + if err := bl.logger.WriteLogMessage(msg); err != nil { + logrus.Debugf("failed to write log %v with log driver %s", msg, bl.logger.Name()) + } + } +} diff --git a/pkg/ringbuffer/ringbuff.go b/daemon/logger/logbuffer/ringbuff.go similarity index 63% rename from pkg/ringbuffer/ringbuff.go rename to daemon/logger/logbuffer/ringbuff.go index f0c3a91e4..c976d9688 100644 --- a/pkg/ringbuffer/ringbuff.go +++ b/daemon/logger/logbuffer/ringbuff.go @@ -1,35 +1,41 @@ -package ringbuffer +package logbuffer import ( "fmt" "sync" + + "github.com/alibaba/pouch/daemon/logger" ) // ErrClosed is used to indicate the ringbuffer has been closed. var ErrClosed = fmt.Errorf("closed") -const defaultSize = 1024 +const ( + defaultMaxBytes = 1e6 //1MB +) // RingBuffer implements a fixed-size buffer which will drop oldest data if full. type RingBuffer struct { mu sync.Mutex wait *sync.Cond - cap int closed bool q *queue + + maxBytes int64 + currentBytes int64 } -// New creates new RingBuffer. -func New(cap int) *RingBuffer { - if cap <= 0 { - cap = defaultSize +// NewRingBuffer creates new RingBuffer. +func NewRingBuffer(maxBytes int64) *RingBuffer { + if maxBytes < 0 { + maxBytes = defaultMaxBytes } rb := &RingBuffer{ - cap: cap, - closed: false, - q: newQueue(), + closed: false, + q: newQueue(), + maxBytes: maxBytes, } rb.wait = sync.NewCond(&rb.mu) return rb @@ -37,33 +43,33 @@ func New(cap int) *RingBuffer { // Push pushes value into buffer and return whether it covers the oldest data // or not. -func (rb *RingBuffer) Push(val interface{}) (bool, error) { +func (rb *RingBuffer) Push(val *logger.LogMessage) error { rb.mu.Lock() defer rb.mu.Unlock() if rb.closed { - return false, ErrClosed + return ErrClosed } if val == nil { - return false, nil + return nil } - // drop the oldest element if covered - covered := (rb.q.size() == rb.cap) - if covered { - rb.q.dequeue() + msgLength := int64(len(val.Line)) + if (rb.currentBytes + msgLength) > rb.maxBytes { + rb.wait.Broadcast() + return nil } rb.q.enqueue(val) rb.wait.Broadcast() - return covered, nil + return nil } // Pop pops the value in the buffer. // // NOTE: it returns ErrClosed if the buffer has been closed. -func (rb *RingBuffer) Pop() (interface{}, error) { +func (rb *RingBuffer) Pop() (*logger.LogMessage, error) { rb.mu.Lock() for rb.q.size() == 0 && !rb.closed { rb.wait.Wait() @@ -75,6 +81,7 @@ func (rb *RingBuffer) Pop() (interface{}, error) { } val := rb.q.dequeue() + rb.currentBytes -= int64(len(val.Line)) rb.mu.Unlock() return val, nil } @@ -82,16 +89,17 @@ func (rb *RingBuffer) Pop() (interface{}, error) { // Drain returns all the data in the buffer. // // NOTE: it can be used after closed to make sure the data have been consumed. -func (rb *RingBuffer) Drain() []interface{} { +func (rb *RingBuffer) Drain() []*logger.LogMessage { rb.mu.Lock() defer rb.mu.Unlock() size := rb.q.size() - vals := make([]interface{}, 0, size) + vals := make([]*logger.LogMessage, 0, size) for i := 0; i < size; i++ { vals = append(vals, rb.q.dequeue()) } + rb.currentBytes = 0 return vals } diff --git a/pkg/ringbuffer/ringbuff_test.go b/daemon/logger/logbuffer/ringbuff_test.go similarity index 56% rename from pkg/ringbuffer/ringbuff_test.go rename to daemon/logger/logbuffer/ringbuff_test.go index ebf403596..21d1ef777 100644 --- a/pkg/ringbuffer/ringbuff_test.go +++ b/daemon/logger/logbuffer/ringbuff_test.go @@ -1,94 +1,77 @@ -package ringbuffer +package logbuffer import ( "reflect" + "strconv" "sync" "testing" "time" + + "github.com/alibaba/pouch/daemon/logger" ) func TestPushNormal(t *testing.T) { - count := 5 - rb := New(count) - - // make the buffer full - for i := 0; i < count; i++ { - covered, err := rb.Push(i) - assertHelper(t, false, covered, "unexpected to drop data") - assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err) - } - - // continue to push new data - for i := 0; i < count; i++ { - covered, err := rb.Push(i + count) - assertHelper(t, true, covered, "expected to drop data, but not") - assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err) - } + rb := NewRingBuffer(1024) - // check the buffer data - expectedDump := make([]interface{}, 0, count) - for i := 0; i < count; i++ { - expectedDump = append(expectedDump, count+i) - } + b := make([]byte, 1024) + extraB := []byte{1} - got := rb.Drain() - assertHelper(t, expectedDump, got, "expected return %v, but got %v", expectedDump, got) -} + // push bytes of max size + err := rb.Push(wrapLogWithByte(b)) + assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err) -func TestPopNormal(t *testing.T) { - count := 5 - rb := New(count) + // continue to push new data + err = rb.Push(wrapLogWithByte(extraB)) + assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err) - // make the buffer full - for i := 0; i < count; i++ { - covered, err := rb.Push(i) - assertHelper(t, false, covered, "unexpected to drop data") - assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err) - } + // get data + logMsg, err := rb.Pop() + expectedDump := wrapLogWithByte(b) + assertHelper(t, nil, err, "unexpected error during pop: %v", err) + assertHelper(t, expectedDump, logMsg, "expected return %v, but got %v", expectedDump, logMsg) - for i := 0; i < count; i++ { - val, err := rb.Pop() - assertHelper(t, nil, err, "unexpected error during pop: %v", err) - assertHelper(t, i, val, "expected to have %v, but got %v", i, val) - } + // get drain data + got := rb.Drain() + expectedLogs := []*logger.LogMessage{wrapLogWithByte(extraB)} + assertHelper(t, expectedLogs, got, "expected return %v, but got %v", expectedLogs, got) assertHelper(t, 0, rb.q.size(), "expected to have empty queue, but got %d size of queue", rb.q.size()) assertHelper(t, &rb.q.root, rb.q.root.next, "when empty, expected queue.root.next equal to &queue.root") assertHelper(t, &rb.q.root, rb.q.root.prev, "when empty, expected queue.root.prev equal to &queue.root") + } func TestPushAndPop(t *testing.T) { - count := 5 - rb := New(count) + rb := NewRingBuffer(defaultMaxBytes) for _, v := range []int{1, 3, 5} { - rb.Push(v) + rb.Push(wrapLogWithInt(v)) } { // get 1 without error val, err := rb.Pop() - assertHelper(t, val, 1, "expected to get 1, but got %v", val) + assertHelper(t, val, wrapLogWithInt(1), "expected to get 1, but got %v", val) assertHelper(t, nil, err, "unexpected error during pop: %v", err) } // push 4, [3, 5, 4] - rb.Push(4) + rb.Push(wrapLogWithInt(4)) { // get 3 without error val, err := rb.Pop() - assertHelper(t, val, 3, "expected to get 3, but got %v", val) + assertHelper(t, val, wrapLogWithInt(3), "expected to get 3, but got %v", val) assertHelper(t, nil, err, "unexpected error during pop: %v", err) } // push 2, [5, 4, 2] - rb.Push(2) + rb.Push(wrapLogWithInt(2)) { // get 5 without error val, err := rb.Pop() - assertHelper(t, val, 5, "expected to get 5, but got %v", val) + assertHelper(t, val, wrapLogWithInt(5), "expected to get 5, but got %v", val) assertHelper(t, nil, err, "unexpected error during pop: %v", err) } @@ -96,19 +79,18 @@ func TestPushAndPop(t *testing.T) { { // get error if push data into closed buffer - _, err := rb.Push(0) + err := rb.Push(wrapLogWithInt(0)) assertHelper(t, ErrClosed, err, "expected to get error(%v) when push data into closed buffer, but got error(%v)", ErrClosed, err) } // check the buffer data - expectedDump, got := []interface{}{4, 2}, rb.Drain() + expectedDump, got := []*logger.LogMessage{wrapLogWithInt(4), wrapLogWithInt(2)}, rb.Drain() assertHelper(t, expectedDump, got, "expected return %v, but got %v", expectedDump, got) } func TestPopWaitWhenNotData(t *testing.T) { - count := 5 - rb := New(count) + rb := NewRingBuffer(defaultMaxBytes) var ( wg sync.WaitGroup @@ -144,3 +126,15 @@ func assertHelper(t *testing.T, expected, got interface{}, format string, args . t.FailNow() } } + +func wrapLogWithInt(num int) *logger.LogMessage { + return &logger.LogMessage{ + Line: []byte(strconv.Itoa(num)), + } +} + +func wrapLogWithByte(bytes []byte) *logger.LogMessage { + return &logger.LogMessage{ + Line: bytes, + } +} diff --git a/daemon/logger/types.go b/daemon/logger/types.go index 8e1be2dc3..d8b4c610d 100644 --- a/daemon/logger/types.go +++ b/daemon/logger/types.go @@ -1,5 +1,17 @@ package logger +// LogMode indicates available logging modes. +type LogMode string + +const ( + // LogModeDefault default to unuse buffer to make logs blocking. + LogModeDefault = "" + // LogModeBlocking means to unuse buffer to make logs blocking. + LogModeBlocking LogMode = "blocking" + // LogModeNonBlock means to use buffer to make logs non blocking. + LogModeNonBlock LogMode = "non-blocking" +) + // LogDriver represents any kind of log drivers, such as jsonfile, syslog. type LogDriver interface { Name() string diff --git a/daemon/mgr/container.go b/daemon/mgr/container.go index b4eda4a6c..a952be283 100644 --- a/daemon/mgr/container.go +++ b/daemon/mgr/container.go @@ -35,6 +35,7 @@ import ( "github.com/containerd/cgroups" containerdtypes "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/mount" + "github.com/docker/go-units" "github.com/docker/libnetwork" "github.com/go-openapi/strfmt" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -1582,10 +1583,22 @@ func (mgr *ContainerManager) initLogDriverBeforeStart(c *Container) error { } } - logDriver, err := logOptionsForContainerio(c, mgr.convContainerToLoggerInfo(c)) + logInfo := mgr.convContainerToLoggerInfo(c) + logDriver, err := logOptionsForContainerio(c, logInfo) if err != nil { return err } + + if logger.LogMode(logInfo.LogConfig["mode"]) == logger.LogModeNonBlock { + if maxBufferSize, ok := logInfo.LogConfig["max-buffer-size"]; ok { + maxBytes, err := units.RAMInBytes(maxBufferSize) + if err != nil { + return errors.Wrapf(err, "failed to parse option max-buffer-size: %s", maxBufferSize) + } + cntrio.SetMaxBufferSize(maxBytes) + cntrio.SetNonBlock(true) + } + } cntrio.SetLogDriver(logDriver) return nil } diff --git a/daemon/mgr/container_validation.go b/daemon/mgr/container_validation.go index d1f88030f..247beac6b 100644 --- a/daemon/mgr/container_validation.go +++ b/daemon/mgr/container_validation.go @@ -1,17 +1,19 @@ package mgr import ( - "errors" "fmt" "os" "strconv" "strings" "github.com/alibaba/pouch/apis/types" + "github.com/alibaba/pouch/daemon/logger" "github.com/alibaba/pouch/daemon/logger/jsonfile" "github.com/alibaba/pouch/daemon/logger/syslog" "github.com/alibaba/pouch/pkg/system" + "github.com/docker/go-units" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -27,6 +29,12 @@ var ( errInvalidDevice = errors.New("invalid nvidia device") errInvalidDriver = errors.New("invalid nvidia driver capability") + + // commonLogOpts the option which should be validated in common such as mode, max-buffer-size. + commonLogOpts = map[string]bool{ + "mode": true, + "max-buffer-size": true, + } ) // validateConfig validates container config @@ -217,9 +225,37 @@ func (mgr *ContainerManager) validateLogConfig(c *Container) error { return nil } + // validate log mode + switch logger.LogMode(logCfg.LogOpts["mode"]) { + case logger.LogModeDefault, logger.LogModeBlocking, logger.LogModeNonBlock: + default: + return fmt.Errorf("unsupported logger mode: %s", logCfg.LogOpts["mode"]) + } + + // validate max buffer size of logger + if maxBufferSize, ok := logCfg.LogOpts["max-buffer-size"]; ok { + if logger.LogMode(logCfg.LogOpts["mode"]) != logger.LogModeNonBlock { + return fmt.Errorf("max-buffer-size option is only supported with 'mode=%s'", logger.LogModeNonBlock) + } + + // try to parse the max-buffer-size option + _, err := units.RAMInBytes(maxBufferSize) + if err != nil { + return errors.Wrapf(err, "failed to parse option max-buffer-size: %s", maxBufferSize) + } + } + + // filter the option which have been validated in common. + restOpts := make(map[string]string) + for k, v := range logCfg.LogOpts { + if !commonLogOpts[k] { + restOpts[k] = v + } + } + switch logCfg.LogDriver { case types.LogConfigLogDriverNone, types.LogConfigLogDriverJSONFile: - return jsonfile.ValidateLogOpt(logCfg.LogOpts) + return jsonfile.ValidateLogOpt(restOpts) case types.LogConfigLogDriverSyslog: info := mgr.convContainerToLoggerInfo(c) return syslog.ValidateSyslogOption(info)