diff --git a/README.md b/README.md index 43afe32..468aadc 100644 --- a/README.md +++ b/README.md @@ -159,7 +159,8 @@ This project is licensed under the Apache License - see the [LICENSE](LICENSE) f ## Changelog - +- v1.0.8 + - Add `AwaitDrain` function to support ensuring all logs are drained in synchronous runtimes - v1.0.7 - Added set http client option (@orelazar1) - Update dependencies: diff --git a/logsender_test.go b/logsender_test.go index c66d16e..bfb894f 100644 --- a/logsender_test.go +++ b/logsender_test.go @@ -330,7 +330,7 @@ func TestLogzioSender_InMemoryWrite(t *testing.T) { l.Stop() } -//dequeueUpToMaxBatchSize +// dequeueUpToMaxBatchSize func TestLogzioSender_DequeueUpToMaxBatchSize(t *testing.T) { var sent = make([]byte, 1024) ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -724,7 +724,7 @@ func BenchmarkLogzioSenderInmemory(b *testing.B) { } } -//E2E test +// E2E test func TestLogzioSender_E2E(t *testing.T) { l, err := New("", SetInMemoryQueue(true), @@ -747,3 +747,38 @@ func TestLogzioSender_E2E(t *testing.T) { time.Sleep(time.Second * 40) l.Stop() //logs are buffered on disk. Stop will drain the buffer } + +func TestLogzioSender_AwaitDrain(t *testing.T) { + var sent = make([]byte, 1024) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + r.Body.Read(sent) + })) + defer ts.Close() + + l, err := New("fake-token", + SetUrl(ts.URL), + SetCompress(false), + ) + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(l.dir) + + l.Send([]byte("blah")) + drainSuccess := l.AwaitDrain(time.Second) + if !drainSuccess { + t.Fatal("AwaitDrain timed out") + } + + sentMsg := string(sent[0:5]) + if sentMsg != "blah\n" { + t.Fatalf("%s != %s ", sent, sentMsg) + } + + l.Send([]byte("blah2")) + drainSuccess = l.AwaitDrain(time.Millisecond) + if drainSuccess { + t.Fatal("Expected AwaitDrain to timeout but it did not") + } +} diff --git a/logziosender.go b/logziosender.go index d97b2f6..64a302f 100644 --- a/logziosender.go +++ b/logziosender.go @@ -460,3 +460,25 @@ func (l *LogzioSender) Write(p []byte) (n int, err error) { func (l *LogzioSender) CloseIdleConnections() { l.httpTransport.CloseIdleConnections() } + +// AwaitDrain waits for the sender to finish flushing all data up to a provided timeout +func (l *LogzioSender) AwaitDrain(timeout time.Duration) bool { + l.Drain() + + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + timeoutChan := time.After(timeout) + + for { + select { + case <-ticker.C: + if !l.draining.Load() { + return true // nothing to drain + } + case <-timeoutChan: + l.errorLog("Timed out while waiting for draining to complete\n") + return false + } + } +}