Skip to content

Commit

Permalink
Add In memory queue + tests (#6)
Browse files Browse the repository at this point in the history
* Add in memory queue

* Add more tests

* Add more tests + add re drain

* refactor some tests

* update readme

* add headers + add gzip compression

* add count dropped logs + check disk space on enqueue instead of interval

* check disk space on enqueue instead of interval

* fix

* gopsutil -> gopsutil/v3

* add auto test workflow

* fix

* comment out e2e

* auto teat

* set default values, update README.md

* Fix typo

* Update errors.go
  • Loading branch information
yotamloe authored Jul 18, 2021
1 parent ac8fc0e commit 9a230ca
Show file tree
Hide file tree
Showing 12 changed files with 1,143 additions and 101 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: Test Go

on:
pull_request:
branches: [ master ]

jobs:

test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.15
- name: Test
run: |
go test -v -covermode=atomic -coverprofile=coverage.out
cd inMemoryQueue
go test -v -covermode=atomic -coverprofile=coverage.out
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
*.orig
gtoggl/gtoggl

test
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm

## Directory-based project format
Expand Down
74 changes: 67 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Sends logs to [logz.io](https://logz.io) over HTTP. It is a low level lib that can to be integrated with other logging libs.

[![GoDoc][doc-img]][doc] [![Build Status][ci-img]][ci] [![Coverage Status][cov-img]][cov] [![Go Report][report-img]][report]
[comment]: <> ([![GoDoc][doc-img]][doc] [![Build Status][ci-img]][ci] [![Coverage Status][cov-img]][cov] [![Go Report][report-img]][report])

## Prerequisites
go 1.x
Expand All @@ -11,9 +11,19 @@ go 1.x
```shell
$ go get -u github.com/logzio/logzio-go
```
Logzio golang api client offers two queue implementations that you can use:
## Disk queue
Logzio go client uses [goleveldb](https://github.com/syndtr/goleveldb) and [goqueue](github.com/beeker1121/goque) as a persistent storage.
Every 5 seconds logs are sent to logz.io (if any are available)

## In memory queue
You can see the logzio go client queue implementation in `inMemoryQueue.go` file. The in memory queue is initialized with 500k log count limit and 20mb capacity by default.
You can use the `SetinMemoryCapacity()` and `SetlogCountLimit()` functions to override default settings.


## Quick Start

### Disk queue
```go
package main

Expand Down Expand Up @@ -47,6 +57,40 @@ func main() {
}
```

### In memory queue
```go
package main

import (
"fmt"
"github.com/logzio/logzio-go"
"os"
"time"
)

func main() {
l, err := logzio.New(
"fake-token",
SetDebug(os.Stderr),
SetUrl("http://localhost:12345"),
SetInMemoryQueue(true),
SetinMemoryCapacity(24000000),
SetlogCountLimit(6000000),
) // token is required
if err != nil {
panic(err)
}
msg := fmt.Sprintf("{ \"%s\": \"%s\"}", "message", time.Now().UnixNano())

err = l.Send([]byte(msg))
if err != nil {
panic(err)
}

l.Stop()
}
```

## Usage

- Set url mode:
Expand All @@ -67,10 +111,22 @@ func main() {
- Set disk queue threshold, once the threshold is crossed the sender will not enqueue the received logs:
`logzio.New(token, SetDrainDiskThreshold(99))`

## Disk queue

Logzio go client uses [goleveldb](https://github.com/syndtr/goleveldb) and [goqueue](github.com/beeker1121/goque) as a persistent storage.
Every 5 seconds logs are sent to logz.io (if any are available)
- Set the sender to Use in memory queue:
`logzio.New(token, SetInMemoryQueue(true))`

- Set the sender to Use in memory queue with log count limit and capacity:
`logzio.New(token,
SetInMemoryQueue(true),
SetinMemoryCapacity(500),
SetlogCountLimit(6000000),
)`

## Data compression
All bulks are compressed with gzip by default to disable compressing initialize the client with `SetCompress(false)`:
```go
logzio.New(token, SetCompress(false),
)
```

## Tests

Expand All @@ -80,8 +136,6 @@ $ go test -v
```


See [travis.yaml](.travis.yml) for running benchmark tests


## Contributing
All PRs are welcome
Expand All @@ -99,3 +153,9 @@ This project is licensed under the Apache License - see the [LICENSE](LICENSE) f
## Acknowledgments

* [logzio-java-sender](https://github.com/logzio/logzio-java-sender)


## Changelog
- v1.0.1
- Add gzip compression
- Add option for im Memory queue
12 changes: 12 additions & 0 deletions genericQueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package logzio

import queue "github.com/beeker1121/goque"

type Item = queue.Item

type genericQueue interface {
Enqueue([]byte) (*Item, error)
Dequeue() (*Item, error)
Close()
Length() uint64
}
16 changes: 16 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module github.com/logzio/logzio-go

go 1.15

require (
github.com/beeker1121/goque v2.0.1+incompatible
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/shirou/gopsutil v0.0.0-20190323131628-2cbc9195c892 // indirect
github.com/shirou/gopsutil/v3 v3.21.6
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/tidwall/gjson v1.8.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/sjson v1.1.7
go.uber.org/atomic v1.3.2
)
79 changes: 79 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIOA6tDi6QXUemppXK3P9BI7mr2hd6gx8=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/beeker1121/goque v2.0.1+incompatible h1:5nJHPMqQLxUvGFc8m/NW2QzxKyc0zICmqs/JUsmEjwE=
github.com/beeker1121/goque v2.0.1+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/logzio/logzio-go v0.0.0-20190418115910-64e9870f9db3/go.mod h1:OBprCVuGvtyYcaCmYjE32bF12d5AAHeXS5xI0QbIXMI=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/shirou/gopsutil v0.0.0-20190323131628-2cbc9195c892 h1:oiNB/s36DdNBEnulbhdj6zHS73U/wRQihnsoJwanqfM=
github.com/shirou/gopsutil v0.0.0-20190323131628-2cbc9195c892/go.mod h1:WWnYX4lzhCH5h/3YBfyVA3VbLYjlMZZAQcW9ojMexNc=
github.com/shirou/gopsutil v3.21.6+incompatible h1:mmZtAlWSd8U2HeRTjswbnDLPxqsEoK01NK+GZ1P+nEM=
github.com/shirou/gopsutil/v3 v3.21.6 h1:vU7jrp1Ic/2sHB7w6UNs7MIkn7ebVtTb5D9j45o9VYE=
github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/tidwall/gjson v1.8.0 h1:Qt+orfosKn0rbNTZqHYDqBrmm3UDA4KRkv70fDzG+PQ=
github.com/tidwall/gjson v1.8.0/go.mod h1:5/xDoumyyDNerp2U36lyolv46b3uF/9Bu6OfyQ9GImk=
github.com/tidwall/gjson v1.8.1 h1:8j5EE9Hrh3l9Od1OIEDAb7IpezNA20UdRngNAj5N0WU=
github.com/tidwall/gjson v1.8.1/go.mod h1:5/xDoumyyDNerp2U36lyolv46b3uF/9Bu6OfyQ9GImk=
github.com/tidwall/match v1.0.3 h1:FQUVvBImDutD8wJLN6c5eMzWtjgONK9MwIBCOrUJKeE=
github.com/tidwall/match v1.0.3/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.1.0 h1:K3hMW5epkdAVwibsQEfR/7Zj0Qgt4DxtNumTq/VloO8=
github.com/tidwall/pretty v1.1.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.1.7 h1:sgVPwu/yygHJ2m1pJDLgGM/h+1F5odx5Q9ljG3imRm8=
github.com/tidwall/sjson v1.1.7/go.mod h1:w/yG+ezBeTdUxiKs5NcPicO9diP38nk96QBAbIIGeFs=
github.com/tklauser/go-sysconf v0.3.6/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI=
github.com/tklauser/numcpus v0.2.2/go.mod h1:x3qojaO3uyYt0i56EW/VUYs7uBvdl2fkfZFu0T9wgjM=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190419153524-e8e3143a4f4a h1:XCr/YX7O0uxRkLq2k1ApNQMims9eCioF9UpzIPBDmuo=
golang.org/x/sys v0.0.0-20190419153524-e8e3143a4f4a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa h1:ZYxPR6aca/uhfRJyaOAtflSHjJYiktO7QnJC5ut7iY4=
golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
10 changes: 10 additions & 0 deletions inMemoryQueue/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package inMemoryQueue

import (
"errors"
)

var (
// ErrEmpty is returned when the stack or queue is empty.
ErrEmpty = errors.New("inMemoryQueue.go: queue is empty")
)
93 changes: 93 additions & 0 deletions inMemoryQueue/inMemoryQueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package inMemoryQueue

import (
"fmt"
queue "github.com/beeker1121/goque"
"sync"
)

//ConcurrentQueue concurrent queue
type ConcurrentQueue struct {
//mutex lock
lock *sync.RWMutex
queue chan []byte
size int
maxLength int
length int
}

func NewConcurrentQueue(maxLength int) *ConcurrentQueue {
queue := ConcurrentQueue{}
//init mutexes
queue.lock = &sync.RWMutex{}
queue.queue = make(chan []byte, maxLength)
queue.size = 0
queue.length = 0
queue.maxLength = maxLength
return &queue
}

func (c *ConcurrentQueue) isEmpty() bool {
c.lock.Lock()
bool := c.size == 0
c.lock.Unlock()
return bool
}

func (c *ConcurrentQueue) Enqueue(data []byte) (*Item, error) {
if !c.IsFull() {
item := &Item{
Value: data,
}
c.queue <- data
c.lock.Lock()
c.size += len(data)
c.length++
c.lock.Unlock()
return item, nil
}
fmt.Printf("Queue is full dropping logs\n")
return nil, nil
}

type Item = queue.Item

func (c *ConcurrentQueue) Dequeue() (*Item, error) {
if c.isEmpty() {
return nil, ErrEmpty
}
data := <-c.queue
c.lock.Lock()
c.size -= len(data)
c.length--
c.lock.Unlock()

item := &Item{
Value: data,
}
return item, nil
}

func (c *ConcurrentQueue) Length() uint64 {
c.lock.RLock()
defer c.lock.RUnlock()
size := c.size
return uint64(size)
}
func (c *ConcurrentQueue) IsFull() bool {
c.lock.RLock()
defer c.lock.RUnlock()
isFull := c.maxLength-c.length == 0
return isFull
}

func (c *ConcurrentQueue) Close() {
var empty []byte
for empty != nil {
empty, _ := c.Dequeue()
if empty == nil {
break
}
}
close(c.queue)
}
Binary file added inMemoryQueue/inMemoryQueue.test
Binary file not shown.
Loading

0 comments on commit 9a230ca

Please sign in to comment.