Skip to content

Commit

Permalink
v1.0.8
Browse files Browse the repository at this point in the history
support synchronous runtimes
  • Loading branch information
8naama authored Sep 25, 2024
2 parents 4fbf7ac + 229108e commit 3ae80ac
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 3 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ This project is licensed under the Apache License - see the [LICENSE](LICENSE) f


## Changelog

- v1.0.8
- Add `AwaitDrain` function to support ensuring all logs are drained in synchronous runtimes
- v1.0.7
- Added set http client option (@orelazar1)
- Update dependencies:
Expand Down
39 changes: 37 additions & 2 deletions logsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func TestLogzioSender_InMemoryWrite(t *testing.T) {
l.Stop()
}

//dequeueUpToMaxBatchSize
// dequeueUpToMaxBatchSize
func TestLogzioSender_DequeueUpToMaxBatchSize(t *testing.T) {
var sent = make([]byte, 1024)
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -724,7 +724,7 @@ func BenchmarkLogzioSenderInmemory(b *testing.B) {
}
}

//E2E test
// E2E test
func TestLogzioSender_E2E(t *testing.T) {
l, err := New("",
SetInMemoryQueue(true),
Expand All @@ -747,3 +747,38 @@ func TestLogzioSender_E2E(t *testing.T) {
time.Sleep(time.Second * 40)
l.Stop() //logs are buffered on disk. Stop will drain the buffer
}

func TestLogzioSender_AwaitDrain(t *testing.T) {
var sent = make([]byte, 1024)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
r.Body.Read(sent)
}))
defer ts.Close()

l, err := New("fake-token",
SetUrl(ts.URL),
SetCompress(false),
)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(l.dir)

l.Send([]byte("blah"))
drainSuccess := l.AwaitDrain(time.Second)
if !drainSuccess {
t.Fatal("AwaitDrain timed out")
}

sentMsg := string(sent[0:5])
if sentMsg != "blah\n" {
t.Fatalf("%s != %s ", sent, sentMsg)
}

l.Send([]byte("blah2"))
drainSuccess = l.AwaitDrain(time.Millisecond)
if drainSuccess {
t.Fatal("Expected AwaitDrain to timeout but it did not")
}
}
22 changes: 22 additions & 0 deletions logziosender.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,25 @@ func (l *LogzioSender) Write(p []byte) (n int, err error) {
func (l *LogzioSender) CloseIdleConnections() {
l.httpTransport.CloseIdleConnections()
}

// AwaitDrain waits for the sender to finish flushing all data up to a provided timeout
func (l *LogzioSender) AwaitDrain(timeout time.Duration) bool {
l.Drain()

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

timeoutChan := time.After(timeout)

for {
select {
case <-ticker.C:
if !l.draining.Load() {
return true // nothing to drain
}
case <-timeoutChan:
l.errorLog("Timed out while waiting for draining to complete\n")
return false
}
}
}

0 comments on commit 3ae80ac

Please sign in to comment.