Skip to content

Commit

Permalink
Added TalariaSink (#40)
Browse files Browse the repository at this point in the history
* Add Talaria sink, include build in Dockerfile

* Change email

* Revert Dockerfile

* Update readme for talaria writer

* Function for getting client

* Update config in writer

* Test on blocking client

* Update config for TalariaSink

* Set defaults for all except endpoint

* Remove log

* Update config

* check for unavailable, redial and retry

* Try and close existing connection before creating new client

* Refactor

* Update README

* Change values to default
  • Loading branch information
TiewKH authored Oct 15, 2020
1 parent 754bb38 commit 1d356dc
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 2 deletions.
9 changes: 9 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ type Compaction struct {
BigQuery *BigQuerySink `json:"bigquery" yaml:"bigquery" env:"BIGQUERY"` // The Big Query writer configuration
GCS *GCSSink `json:"gcs" yaml:"gcs" env:"GCS"` // The Google Cloud Storage writer configuration
File *FileSink `json:"file" yaml:"file" env:"FILE"` // The local file system writer configuration
Talaria *TalariaSink `json:"talaria" yaml:"talaria" env:"TALARIA"` // The Talaria writer configuration
}

// S3Sink represents a sink for AWS S3 and compatible stores.
Expand Down Expand Up @@ -175,6 +176,14 @@ type FileSink struct {
Directory string `json:"dir" yaml:"dir" env:"DIR"`
}

// TalariaSink represents a sink to an instance of Talaria
type TalariaSink struct {
Endpoint string `json:"endpoint" yaml:"endpoint" env:"ENDPOINT"` // The second Talaria endpoint
CircuitTimeout *time.Duration `json:"timeout" yaml:"timeout" env:"TIMEOUT"` // The timeout (in seconds) for requests to the second Talaria
MaxConcurrent *int `json:"concurrency" yaml:"concurrency" env:"CONCURRENCY"` // The number of concurrent requests permissible
ErrorPercentThreshold *int `json:"errorThreshold" yaml:"errorThreshold" env:"ERROR_THRESHOLD"` // The percentage of failed requests tolerated
}

// Func represents a config function
type Func func() *Config

Expand Down
18 changes: 18 additions & 0 deletions internal/storage/writer/talaria/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Talaria

This sink implements sending data to a second Talaria. It can be used when there is one Talaria for **event ingestion**, and needs to write data to a second Talaria, which is used **purely for querying data.**

This sink can be enabled by adding the following configuration in the `storage` section:

```yaml
storage:
compact: # enable compaction
interval: 60 # compact every 60 seconds
nameFunc: "s3://bucket/namefunc.lua" # file name function
talaria: # sink to Talaria
endpoint: "127.0.0.1:8043" # Talaria endpoint to write data to
timeout: 5 # Timeout for requests to Talaria
maxConcurrent: 10 # Number of concurrent requests to Talaria
errorThreshold: 50 # Percentage of errors before no more requests are sent
...
```
110 changes: 110 additions & 0 deletions internal/storage/writer/talaria/talaria.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package talaria

import (
"context"
"sync"
"time"

talaria "github.com/kelindar/talaria/client/golang"
"github.com/kelindar/talaria/internal/encoding/key"
"github.com/kelindar/talaria/internal/monitor/errors"
"github.com/myteksi/hystrix-go/hystrix"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// getClient will create a Talaria client
func getClient(endpoint string, options ...talaria.Option) (*talaria.Client, error) {
client, err := talaria.Dial(endpoint, options...)

if err != nil {
return nil, err
}
return client, nil
}

// Writer to write to TalariaDB
type Writer struct {
lock sync.Mutex
endpoint string
client *talaria.Client
options []talaria.Option
}

// New initializes a new Talaria writer.
func New(endpoint string, circuitTimeout *time.Duration, maxConcurrent *int, errorPercentThreshold *int) (*Writer, error) {

var newTimeout = 5 * time.Second
var newMaxConcurrent = hystrix.DefaultMaxConcurrent
var newErrorPercentThreshold = hystrix.DefaultErrorPercentThreshold

// Set defaults for variables if there aren't any
if circuitTimeout != nil {
newTimeout = *circuitTimeout * time.Second
}

if maxConcurrent != nil {
newMaxConcurrent = *maxConcurrent
}

if errorPercentThreshold != nil {
newErrorPercentThreshold = *errorPercentThreshold
}

dialOptions := []talaria.Option{}
dialOptions = append(dialOptions, talaria.WithCircuit(newTimeout, newMaxConcurrent, newErrorPercentThreshold))

client, err := getClient(endpoint, dialOptions...)

// Return writer with nil client
if err != nil {
return nil, errors.Internal("talaria: unable to create a client", err)
}

return &Writer{
client: client,
endpoint: endpoint,
options: dialOptions,
}, nil
}

// Write will write the ORC data to Talaria
func (w *Writer) Write(key key.Key, val []byte) error {

// Check if client is nil
if w.client == nil {
if err := w.tryConnect(); err != nil {
return errors.Internal("talaria: client is nil, unable to connect", err)
}
}

// Check error status if it needs to redial
if err := w.client.IngestORC(context.Background(), val); err != nil {
errStatus, _ := status.FromError(err)
if codes.Unavailable == errStatus.Code() {
// Server unavailable, redial
if err := w.tryConnect(); err != nil {
return errors.Internal("talaria: unable to redial", err)
}
// Send again after redial
if err := w.client.IngestORC(context.Background(), val); err != nil {
return errors.Internal("talaria: unable to write after redial", err)
}
}
return errors.Internal("talaria: unable to write", err)
}
return nil
}

// tryConnect will reconnect to Talaria if needed
func (w *Writer) tryConnect() error {
w.lock.Lock()
defer w.lock.Unlock()

client, err := getClient(w.endpoint, w.options...)
if err != nil {
return err
}
w.client = client
return nil
}
24 changes: 24 additions & 0 deletions internal/storage/writer/talaria/talaria_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package talaria

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTalariaWriter(t *testing.T) {
var timeout time.Duration = 5
var concurrency int = 10
var errorPercentage int = 50
c, err := New("www.talaria.net:8043", &timeout, &concurrency, &errorPercentage)

// TODO: Impove test
assert.Nil(t, c)
assert.Error(t, err)

assert.Panics(t, func() {
c.Write([]byte("abc"), []byte("hello"))
})

}
12 changes: 11 additions & 1 deletion internal/storage/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/kelindar/talaria/internal/monitor"
"github.com/kelindar/talaria/internal/monitor/errors"
"github.com/kelindar/talaria/internal/scripting"
script "github.com/kelindar/talaria/internal/scripting"
"github.com/kelindar/talaria/internal/storage"
"github.com/kelindar/talaria/internal/storage/compact"
"github.com/kelindar/talaria/internal/storage/flush"
Expand All @@ -22,6 +22,7 @@ import (
"github.com/kelindar/talaria/internal/storage/writer/multi"
"github.com/kelindar/talaria/internal/storage/writer/noop"
"github.com/kelindar/talaria/internal/storage/writer/s3"
"github.com/kelindar/talaria/internal/storage/writer/talaria"
)

var seed = maphash.MakeSeed()
Expand Down Expand Up @@ -109,6 +110,15 @@ func newWriter(config *config.Compaction) (flush.Writer, error) {
writers = append(writers, w)
}

// Configure Talaria writer if present
if config.Talaria != nil {
w, err := talaria.New(config.Talaria.Endpoint, config.Talaria.CircuitTimeout, config.Talaria.MaxConcurrent, config.Talaria.ErrorPercentThreshold)
if err != nil {
return nil, err
}
writers = append(writers, w)
}

// If no writers were configured, error out
if len(writers) == 0 {
return noop.New(), errors.New("compact: writer was not configured")
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/writer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/kelindar/talaria/internal/monitor"
"github.com/kelindar/talaria/internal/monitor/logging"
"github.com/kelindar/talaria/internal/monitor/statsd"
"github.com/kelindar/talaria/internal/scripting"
script "github.com/kelindar/talaria/internal/scripting"
"github.com/kelindar/talaria/internal/storage/disk"
"github.com/stretchr/testify/assert"
)
Expand Down

0 comments on commit 1d356dc

Please sign in to comment.