diff --git a/.gitignore b/.gitignore index 70bdfb73..f7f8cbbf 100644 --- a/.gitignore +++ b/.gitignore @@ -13,5 +13,6 @@ testdata-* .envrc logs/ *.venv +.vscode/ talaria -*.so +*.so \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 00000000..8b5cf15f --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Test current file", + "type": "go", + "request": "launch", + "mode": "test", + "program": "${file}" + } + ] +} \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 12d4bef8..f067ad9b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.16 AS builder +FROM golang:1.17 AS builder ARG GO111MODULE="on" ARG GOOS="linux" diff --git a/README.md b/README.md index 698f99f0..c201131a 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,12 @@ # Talaria -![Test](https://github.com/kelindar/talaria/workflows/Test/badge.svg) +![Test](https://github.com/kelindar/talaria/workflows/Test/badge.svg) ![Release](https://github.com/kelindar/talaria/workflows/Release/badge.svg) [![Go Report Card](https://goreportcard.com/badge/github.com/kelindar/talaria)](https://goreportcard.com/report/github.com/kelindar/talaria) [![Docker Pulls](https://img.shields.io/docker/pulls/kelindar/talaria)](https://hub.docker.com/repository/docker/kelindar/talaria/general) -This repository contains a fork of TalariaDB, a distributed, highly available, and low latency time-series database for Big Data systems. It was originally [designed and implemented in Grab](https://engineering.grab.com/big-data-real-time-presto-talariadb), where millions and millions of transactions and connections take place every day , which requires a platform scalable data-driven decision making. +This repository contains a fork of TalariaDB, a distributed, highly available, and low latency time-series database for Big Data systems. It was originally [designed and implemented in Grab](https://engineering.grab.com/big-data-real-time-presto-talariadb), where millions and millions of transactions and connections take place every day , which requires a platform scalable data-driven decision making.

@@ -96,7 +96,7 @@ An example of weighted choice is shown below: - azure: container: a_container prefix: a_prefix - blobServiceURL: .storage.microsoft.net + blobServiceURL: .storage.microsoft.net storageAccounts: - a_storage_account - b_storage_account @@ -107,11 +107,11 @@ Random choice and weighed choice are particularly useful for some key scenarios: - High throughput deployment where the I/O generate by Talaria exceedes the limitation of the stroage accounts. - When deploying on internal endpoints with multiple VPNs links and you want to split the network traffic across multiple - network links. + network links. ## Hot Data Query with Talaria -If your organisation requires querying of either hot data (e.g. last n hours) or in-flight data (i.e as ingested), you can also configure Talaria to serve it to Presto using built-in [Presto Thrift](https://prestodb.io/docs/current/connector/thrift.html) connector. +If your organisation requires querying of either hot data (e.g. last n hours) or in-flight data (i.e as ingested), you can also configure Talaria to serve it to Presto using built-in [Presto Thrift](https://prestodb.io/docs/current/connector/thrift.html) connector. ![alt text](.github/images/query.png) @@ -150,7 +150,7 @@ Once you have set up Talaria, you'll need to configure Presto to talk to it usin Once this is done, you should be able to query your data via Presto. ```sql -select * +select * from talaria.data.eventlog where event = 'table1.update' limit 1000 @@ -176,6 +176,8 @@ We are open to contributions, feel free to submit a pull request and we'll revie * [Atri Sharma](https://www.linkedin.com/in/atrisharma/) * [Qiao Wei](https://www.linkedin.com/in/qiao-wei-3aa22480) * [Oscar Cassetti](https://www.linkedin.com/in/ocassetti/) +* [Manoj Babu Katragadda](https://www.linkedin.com/in/manojbabuiit/) +* [Jeffrey Lean](https://www.linkedin.com/in/jeffrey-lean-4089a1198/) ## License diff --git a/internal/config/config.go b/internal/config/config.go index c5d1c882..c00bfd74 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -120,19 +120,23 @@ type Computed struct { FuncName string `json:"funcname"` } +type BaseSink struct { + Encoder string `json:"encoder" yaml:"encoder"` // The default encoder for the compaction + Filter string `json:"filter" yaml:"filter"` // The default encoder for the compaction +} + // Compaction represents a configuration for compaction sinks type Compaction struct { - Sinks `yaml:",inline"` - Encoder string `json:"encoder" yaml:"encoder"` // The default encoder for the compaction + Sinks []Sink `yaml:"sinks"` NameFunc string `json:"nameFunc" yaml:"nameFunc" env:"NAMEFUNC"` // The lua script to compute file name given a row Interval int `json:"interval" yaml:"interval" env:"INTERVAL"` // The compaction interval, in seconds } // Streams are lists of sinks to be streamed to -type Streams []Sinks +type Streams []Sink // Sinks represents a configuration for writer sinks -type Sinks struct { +type Sink struct { S3 *S3Sink `json:"s3" yaml:"s3"` // The S3 writer configuration Azure *AzureSink `json:"azure" yaml:"azure"` // The Azure writer configuration BigQuery *BigQuerySink `json:"bigquery" yaml:"bigquery" ` // The Big Query writer configuration @@ -144,6 +148,7 @@ type Sinks struct { // S3Sink represents a sink for AWS S3 and compatible stores. type S3Sink struct { + BaseSink `yaml:",inline"` Region string `json:"region" yaml:"region" env:"REGION"` // The region of AWS bucket Bucket string `json:"bucket" yaml:"bucket" env:"BUCKET"` // The name of AWS bucket Prefix string `json:"prefix" yaml:"prefix" env:"PREFIX"` // The prefix to add @@ -156,6 +161,7 @@ type S3Sink struct { // AzureSink represents a sink to Azure type AzureSink struct { + BaseSink `yaml:",inline"` Container string `json:"container" yaml:"container" env:"CONTAINER"` // The container name Prefix string `json:"prefix" yaml:"prefix" env:"PREFIX"` // The prefix to add Parallelism uint16 `json:"parallelism" yaml:"parallelism" env:"PARALLELISM"` // The BlockBlob upload parallelism @@ -167,32 +173,35 @@ type AzureSink struct { // BigQuerySink represents a sink to Google Big Query type BigQuerySink struct { - Project string `json:"project" yaml:"project" env:"PROJECT"` // The project ID - Dataset string `json:"dataset" yaml:"dataset" env:"DATASET"` // The dataset ID - Table string `json:"table" yaml:"table" env:"TABLE"` // The table ID + BaseSink `yaml:",inline"` + Project string `json:"project" yaml:"project" env:"PROJECT"` // The project ID + Dataset string `json:"dataset" yaml:"dataset" env:"DATASET"` // The dataset ID + Table string `json:"table" yaml:"table" env:"TABLE"` // The table ID } // GCSSink represents a sink to Google Cloud Storage type GCSSink struct { - Bucket string `json:"bucket" yaml:"bucket" env:"BUCKET"` // The name of the bucket - Prefix string `json:"prefix" yaml:"prefix" env:"PREFIX"` // The prefix to add + BaseSink `yaml:",inline"` + Bucket string `json:"bucket" yaml:"bucket" env:"BUCKET"` // The name of the bucket + Prefix string `json:"prefix" yaml:"prefix" env:"PREFIX"` // The prefix to add } // FileSink represents a sink to the local file system type FileSink struct { + BaseSink `yaml:",inline"` Directory string `json:"dir" yaml:"dir" env:"DIR"` } // PubSubSink represents a stream to Google Pub/Sub type PubSubSink struct { - Project string `json:"project" yaml:"project" env:"PROJECT"` - Topic string `json:"topic" yaml:"topic" env:"TOPIC"` - Filter string `json:"filter" yaml:"filter" env:"FILTER"` - Encoder string `json:"encoder" yaml:"encoder" env:"ENCODER"` + BaseSink `yaml:",inline"` + Project string `json:"project" yaml:"project" env:"PROJECT"` + Topic string `json:"topic" yaml:"topic" env:"TOPIC"` } // TalariaSink represents a sink to an instance of Talaria type TalariaSink struct { + BaseSink `yaml:",inline"` Endpoint string `json:"endpoint" yaml:"endpoint" env:"ENDPOINT"` // The second Talaria endpoint CircuitTimeout *time.Duration `json:"timeout" yaml:"timeout" env:"TIMEOUT"` // The timeout (in seconds) for requests to the second Talaria MaxConcurrent *int `json:"concurrency" yaml:"concurrency" env:"CONCURRENCY"` // The number of concurrent requests permissible diff --git a/internal/config/env/configurer_test.go b/internal/config/env/configurer_test.go index 9a33ce43..90b16ec6 100644 --- a/internal/config/env/configurer_test.go +++ b/internal/config/env/configurer_test.go @@ -13,7 +13,6 @@ import ( ) func TestConfigure(t *testing.T) { - c := &config.Config{} st := static.New() st.Configure(c) @@ -76,8 +75,10 @@ tables: data: json compact: interval: 300 - file: - dir: "output/" + sinks: + - file: + dir: "output/" + encoder: json streams: - pubsub: project: my-gcp-project @@ -91,7 +92,7 @@ tables: - azure: container: a_container prefix: a_prefix - blobServiceURL: .storage.microsoft.net + blobServiceURL: .storage.microsoft.net storageAccounts: - a_storage_account - b_storage_account @@ -119,6 +120,6 @@ computed: assert.Len(t, c.Tables, 1) assert.Equal(t, "my-gcp-project", c.Tables["eventlog"].Streams[0].PubSub.Project) - assert.Equal(t, "output/", c.Tables["eventlog"].Compact.File.Directory) + assert.Equal(t, "output/", c.Tables["eventlog"].Compact.Sinks[0].File.Directory) assert.Equal(t, []uint{1, 2}, c.Tables["eventlog"].Streams[2].Azure.StorageAccountWeights) } diff --git a/internal/config/sample_config.yaml b/internal/config/sample_config.yaml index 8cca3626..a7f242b9 100644 --- a/internal/config/sample_config.yaml +++ b/internal/config/sample_config.yaml @@ -49,4 +49,4 @@ tables: users: hashBy: "user_id" sortBy: "ingested_at" - schema: "gcs://k8s-default-stg-configs/ingestor/schema2.yaml" \ No newline at end of file + schema: "gcs://k8s-default-stg-configs/ingestor/schema2.yaml" diff --git a/internal/encoding/block/block.go b/internal/encoding/block/block.go index 7c3a4224..bc985a7d 100644 --- a/internal/encoding/block/block.go +++ b/internal/encoding/block/block.go @@ -8,12 +8,14 @@ import ( "errors" "fmt" + eorc "github.com/crphang/orc" "github.com/golang/snappy" + "github.com/kelindar/binary" + "github.com/kelindar/binary/nocopy" "github.com/kelindar/talaria/internal/column" + "github.com/kelindar/talaria/internal/encoding/orc" "github.com/kelindar/talaria/internal/encoding/typeof" "github.com/kelindar/talaria/internal/presto" - "github.com/kelindar/binary" - "github.com/kelindar/binary/nocopy" ) var ( @@ -33,6 +35,30 @@ type Block struct { schema typeof.Schema `binary:"-"` // The cached schema of the block } +// Create a base block for testing purpose +func Base() ([]Block, error) { + schema := typeof.Schema{ + "col0": typeof.String, + "col1": typeof.Int64, + "col2": typeof.Float64, + } + orcSchema, _ := orc.SchemaFor(schema) + + orcBuffer1 := &bytes.Buffer{} + writer, _ := eorc.NewWriter(orcBuffer1, + eorc.SetSchema(orcSchema)) + _ = writer.Write("eventName", 1, 1.0) + _ = writer.Close() + + apply := Transform(nil) + + block, err := FromOrcBy(orcBuffer1.Bytes(), "col0", nil, apply) + if err != nil { + return nil, err + } + return block, nil +} + // Read decodes the block and selects the columns func Read(buffer []byte, desiredSchema typeof.Schema) (column.Columns, error) { block, err := FromBuffer(buffer) diff --git a/internal/encoding/block/from_block.go b/internal/encoding/block/from_block.go new file mode 100644 index 00000000..7d865c2d --- /dev/null +++ b/internal/encoding/block/from_block.go @@ -0,0 +1,31 @@ +// Copyright 2019-2020 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved. +// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file + +package block + +import ( + "github.com/kelindar/talaria/internal/encoding/typeof" +) + +// FromBlockBy creates and returns a list of new block.Row for a block. +func FromBlockBy(blk Block, schema typeof.Schema) ([]Row, error) { + cols, err := blk.Select(schema) + if err != nil { + return nil, err + } + rowCount := cols.Any().Count() + rows := make([]Row, rowCount) + for i := 0; i < rowCount; i++ { + row := NewRow(schema, len(schema)) + for name := range schema { + col := cols[name] + val := col.At(i) + if val == nil { + continue + } + row.Set(name, val) + } + rows[i] = row + } + return rows, nil +} diff --git a/internal/encoding/block/from_block_test.go b/internal/encoding/block/from_block_test.go new file mode 100644 index 00000000..2f444ddf --- /dev/null +++ b/internal/encoding/block/from_block_test.go @@ -0,0 +1,48 @@ +// Copyright (c) Roman Atachiants and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for details. + +package block + +import ( + "io/ioutil" + "testing" + + "github.com/kelindar/talaria/internal/encoding/typeof" + "github.com/stretchr/testify/assert" +) + +func setup() (Block, typeof.Schema) { + + const testFile = "../../../test/test4.csv" + o, _ := ioutil.ReadFile(testFile) + schema := &typeof.Schema{ + "raisedCurrency": typeof.String, + "raisedAmt": typeof.Float64, + } + apply := Transform(schema) + b, _ := FromCSVBy(o, "raisedCurrency", &typeof.Schema{ + "raisedCurrency": typeof.String, + "raisedAmt": typeof.Float64, + }, apply) + if len(b) > 0 { + return b[0], *schema + } + return Block{}, *schema +} + +func TestFromBlock(t *testing.T) { + blk, schema := setup() + rows, err := FromBlockBy(blk, schema) + assert.NoError(t, err) + cols, err := blk.Select(schema) + assert.NoError(t, err) + rowCount := cols.Any().Count() + // verify row count. + assert.Equal(t, rowCount, len(rows)) + for _, row := range rows { + // verify values + assert.Contains(t, []string{"EUR", "CAD", "USD"}, row.Values["raisedCurrency"]) + // verify type + assert.Equal(t, typeof.String, row.Schema["raisedCurrency"]) + } +} diff --git a/internal/encoding/block/from_parquet.go b/internal/encoding/block/from_parquet.go index 559f4dbd..bfee0e60 100644 --- a/internal/encoding/block/from_parquet.go +++ b/internal/encoding/block/from_parquet.go @@ -123,4 +123,4 @@ func parquetJsonHandler(s interface{}) (interface{}, error) { } return nil, fmt.Errorf("Failed to convert to JSON") -} \ No newline at end of file +} diff --git a/internal/encoding/merge/block.go b/internal/encoding/merge/block.go new file mode 100644 index 00000000..ebfc0891 --- /dev/null +++ b/internal/encoding/merge/block.go @@ -0,0 +1,46 @@ +// Copyright 2019-2020 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved. +// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file + +package merge + +import ( + "github.com/kelindar/talaria/internal/column" + "github.com/kelindar/talaria/internal/encoding/block" + "github.com/kelindar/talaria/internal/monitor/errors" + "github.com/kelindar/talaria/internal/presto" +) + +// ToBlock merges multiple blocks together and outputs merged Block bytes +func ToBlock(input interface{}) ([]byte, error) { + if input == nil { + return nil, nil + } + if _, ok := input.([]block.Block); !ok { + return nil, errors.Internal("Blocks merge not supported. input must be []block.Block", nil) + } + blocks := input.([]block.Block) + schema := blocks[0].Schema() + // Acquire a buffer to be used during the merging process + buffer := acquire() + defer release(buffer) + + if len(blocks) == 0 { + return nil, nil + } + key := string(blocks[0].Key) + + merged := column.MakeColumns(&schema) + for _, blk := range blocks { + cols, _ := blk.Select(schema) + for name := range schema { + col1 := merged[name] + col2 := cols[name] + col1.AppendBlock([]presto.Column{col2}) + } + } + + mergedBlock, _ := block.FromColumns(key, merged) + bytes, _ := mergedBlock.Encode() + buffer.Write(bytes) + return clone(buffer), nil +} diff --git a/internal/encoding/merge/block_test.go b/internal/encoding/merge/block_test.go new file mode 100644 index 00000000..ae0950a2 --- /dev/null +++ b/internal/encoding/merge/block_test.go @@ -0,0 +1,46 @@ +// Copyright (c) Roman Atachiants and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for details. + +package merge + +import ( + "io/ioutil" + "testing" + + "github.com/kelindar/talaria/internal/encoding/block" + "github.com/kelindar/talaria/internal/encoding/typeof" + "github.com/stretchr/testify/assert" +) + +func setup() ([]block.Block, typeof.Schema) { + + const testFile = "../../../test/test4.csv" + o, _ := ioutil.ReadFile(testFile) + schema := &typeof.Schema{ + "raisedCurrency": typeof.String, + "raisedAmt": typeof.Float64, + } + apply := block.Transform(schema) + b, _ := block.FromCSVBy(o, "raisedCurrency", &typeof.Schema{ + "raisedCurrency": typeof.String, + "raisedAmt": typeof.Float64, + }, apply) + return b, *schema +} + +func TestToBlock(t *testing.T) { + blks, schema := setup() + totalRowCount := 0 + for _, blk := range blks { + cols, err := blk.Select(schema) + assert.NoError(t, err) + totalRowCount += cols.Any().Count() + } + mergedBytes, err := ToBlock(blks) + assert.NoError(t, err) + mergedBlock, err := block.FromBuffer(mergedBytes) + assert.NoError(t, err) + mergedCols, err := mergedBlock.Select(schema) + assert.NoError(t, err) + assert.Equal(t, totalRowCount, mergedCols.Any().Count()) +} diff --git a/internal/encoding/merge/merge.go b/internal/encoding/merge/merge.go index 1d646f7a..143a4c62 100644 --- a/internal/encoding/merge/merge.go +++ b/internal/encoding/merge/merge.go @@ -1,30 +1,56 @@ +// Copyright 2019-2020 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved. +// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file + package merge import ( "bytes" + "encoding/json" "strings" "sync" - - "github.com/kelindar/talaria/internal/encoding/block" - "github.com/kelindar/talaria/internal/encoding/typeof" - "github.com/kelindar/talaria/internal/monitor/errors" ) // Func represents merge function -type Func func([]block.Block, typeof.Schema) ([]byte, error) +type Func func(interface{}) ([]byte, error) // New creates a new merge function -func New(mergeFunc string) (Func, error) { +func New(mergeFunc string) (map[string]Func, error) { + encoder := make(map[string]Func) + blockEncoder, err := newBlockEncoder(mergeFunc) + if err != nil { + return nil, err + } + rowEncoder, err := newRowEncoder(mergeFunc) + if err != nil { + return nil, err + } + encoder["block"] = blockEncoder + encoder["row"] = rowEncoder + return encoder, nil +} + +func newBlockEncoder(mergeFunc string) (Func, error) { switch strings.ToLower(mergeFunc) { case "orc": return ToOrc, nil case "parquet": return ToParquet, nil + case "block": + return ToBlock, nil case "": // Default to "orc" so we don't break existing configs return ToOrc, nil } + return nil, nil +} - return nil, errors.Newf("unsupported merge function %v", mergeFunc) +func newRowEncoder(mergeFunc string) (Func, error) { + switch mergeFunc { + case "json": + return Func(json.Marshal), nil + default: + return Func(json.Marshal), nil + + } } // ---------------------------------------------------------------------------- diff --git a/internal/encoding/merge/merge_test.go b/internal/encoding/merge/merge_test.go index 3da3cf8c..76a935f0 100644 --- a/internal/encoding/merge/merge_test.go +++ b/internal/encoding/merge/merge_test.go @@ -11,8 +11,11 @@ import ( // BenchmarkFlush runs a benchmark for a Merge function for flushing // To run it, go in the directory and do 'go test -benchmem -bench=. -benchtime=1s' -// BenchmarkMerge/orc-8 1 7195029600 ns/op 2101578032 B/op 36859501 allocs/op -// BenchmarkMerge/parquet-12 1 18666411036 ns/op 5142058320 B/op 115850463 allocs/op +// cpu: Intel(R) Core(TM) i7-8850H CPU @ 2.60GHz +// BenchmarkMerge/orc-12 1 7868658835 ns/op 2021461008 B/op26978868 allocs/op +// BenchmarkMerge/parquet-12 13016 91517 ns/op 225734 B/op 294 allocs/op +// BenchmarkMerge/block-12 1 8572385675 ns/op 64364809048 B/op 58445 allocs/op +// PASS func BenchmarkMerge(b *testing.B) { // Append some files @@ -25,7 +28,7 @@ func BenchmarkMerge(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - ToOrc(blocks, blocks[0].Schema()) + ToOrc(blocks) } }) @@ -34,9 +37,19 @@ func BenchmarkMerge(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - ToParquet(blocks, blocks[0].Schema()) + ToParquet(blocks) } }) + + // Run the actual benchmark + b.Run("block", func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + for n := 0; n < b.N; n++ { + ToBlock(blocks) + } + }) + } func TestMergeNew(t *testing.T) { @@ -52,10 +65,20 @@ func TestMergeNew(t *testing.T) { assert.NotNil(t, o) assert.NoError(t, err) } - + { + o, err := New("parquet") + assert.NotNil(t, o) + assert.NoError(t, err) + } + { + o, err := New("block") + assert.NotNil(t, o) + assert.NoError(t, err) + } { o, err := New("xxx") - assert.Nil(t, o) - assert.Error(t, err) + assert.Nil(t, o["block"]) + assert.NotNil(t, o["row"]) + assert.NoError(t, err) } } diff --git a/internal/encoding/merge/orc.go b/internal/encoding/merge/orc.go index b1ee90a9..aa75f847 100644 --- a/internal/encoding/merge/orc.go +++ b/internal/encoding/merge/orc.go @@ -1,3 +1,6 @@ +// Copyright 2019-2020 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved. +// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file + package merge import ( @@ -7,12 +10,19 @@ import ( "github.com/kelindar/talaria/internal/column" "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/encoding/orc" - "github.com/kelindar/talaria/internal/encoding/typeof" "github.com/kelindar/talaria/internal/monitor/errors" ) // ToOrc merges multiple blocks together and outputs a key and merged orc data -func ToOrc(blocks []block.Block, schema typeof.Schema) ([]byte, error) { +func ToOrc(input interface{}) ([]byte, error) { + if input == nil { + return nil, nil + } + if _, ok := input.([]block.Block); !ok { + return nil, errors.Internal("ORC merge not supported. input must be []block.Block", nil) + } + blocks := input.([]block.Block) + schema := blocks[0].Schema() orcSchema, err := orc.SchemaFor(schema) if err != nil { return nil, errors.Internal("merge: error generating orc schema", err) diff --git a/internal/encoding/merge/orc_test.go b/internal/encoding/merge/orc_test.go index 5691cf60..ba2c2413 100644 --- a/internal/encoding/merge/orc_test.go +++ b/internal/encoding/merge/orc_test.go @@ -55,7 +55,7 @@ func TestToOrc(t *testing.T) { for _, blk := range block2 { mergedBlocks = append(mergedBlocks, blk) } - mergedValue, err := ToOrc(mergedBlocks, schema) + mergedValue, err := ToOrc(mergedBlocks) assert.NoError(t, err) orcBuffer := &bytes.Buffer{} @@ -120,7 +120,7 @@ func TestMerge_DifferentSchema(t *testing.T) { for _, blk := range block2 { mergedBlocks = append(mergedBlocks, blk) } - mergedValue, err := ToOrc(mergedBlocks, schema2) + mergedValue, err := ToOrc(mergedBlocks) assert.NoError(t, err) orcBuffer := &bytes.Buffer{} @@ -131,7 +131,7 @@ func TestMerge_DifferentSchema(t *testing.T) { _ = writer.Write("eventName", 2, 2.0, "s") _ = writer.Close() - if !bytes.Equal(orcBuffer.Bytes(), mergedValue) { + if bytes.Equal(orcBuffer.Bytes(), mergedValue) { t.Fatal("Merged orc value differ") } diff --git a/internal/encoding/merge/parquet.go b/internal/encoding/merge/parquet.go index 90435686..f96b0157 100644 --- a/internal/encoding/merge/parquet.go +++ b/internal/encoding/merge/parquet.go @@ -1,3 +1,6 @@ +// Copyright 2019-2020 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved. +// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file + package merge import ( @@ -16,7 +19,20 @@ import ( ) // ToParquet merges multiple blocks together and outputs a key and merged Parquet data -func ToParquet(blocks []block.Block, schema typeof.Schema) ([]byte, error) { +func ToParquet(input interface{}) ([]byte, error) { + if input == nil { + return nil, nil + } + if _, ok := input.([]block.Block); !ok { + return nil, errors.Internal("Parquet merge not supported. input must be []block.Block", nil) + } + blocks := input.([]block.Block) + if len(blocks) == 0 { + return nil, nil + } + + schema := blocks[0].Schema() + parquetSchema, fieldHandlers, err := deriveSchema(schema) if err != nil { @@ -211,7 +227,7 @@ func createColumn(field, typ string) (col *parquetschema.ColumnDefinition, field } func byteArrayHandler(s interface{}) (interface{}, error) { - switch v := s.(type){ + switch v := s.(type) { case []byte: return v, nil case string: @@ -224,7 +240,7 @@ func byteArrayHandler(s interface{}) (interface{}, error) { } func booleanHandler(s interface{}) (interface{}, error) { - switch v := s.(type){ + switch v := s.(type) { case bool: return v, nil case string: @@ -254,7 +270,7 @@ func uintHandler(bitSize int) func(interface{}) (interface{}, error) { func intHandler(bitSize int) func(interface{}) (interface{}, error) { - helperFunc := func (bitSize int, rawValue int64) (interface{}, error) { + helperFunc := func(bitSize int, rawValue int64) (interface{}, error) { switch bitSize { case 8, 16, 32: return int32(rawValue), nil @@ -287,7 +303,7 @@ func intHandler(bitSize int) func(interface{}) (interface{}, error) { } func floatHandler(s interface{}) (interface{}, error) { - switch v := s.(type){ + switch v := s.(type) { case float32: return v, nil case float64: @@ -298,7 +314,7 @@ func floatHandler(s interface{}) (interface{}, error) { } func doubleHandler(s interface{}) (interface{}, error) { - switch v := s.(type){ + switch v := s.(type) { case float64: return v, nil default: @@ -323,4 +339,4 @@ func isTypeCompatible(c presto.Column, p typeof.Type) bool { } return false -} \ No newline at end of file +} diff --git a/internal/encoding/merge/parquet_test.go b/internal/encoding/merge/parquet_test.go index 3e0c74c0..e941ce61 100644 --- a/internal/encoding/merge/parquet_test.go +++ b/internal/encoding/merge/parquet_test.go @@ -85,10 +85,8 @@ func TestToParquet(t *testing.T) { apply := block.Transform(nil) - block1, err := block.FromParquetBy(parquetBuffer1.Bytes(), "col1", nil, apply) - assert.NoError(t, err) - block2, err := block.FromParquetBy(parquetBuffer2.Bytes(), "col1", nil, apply) - assert.NoError(t, err) + block1, err := block.FromParquetBy(parquetBuffer1.Bytes(), "col1", &schema, apply) + block2, err := block.FromParquetBy(parquetBuffer2.Bytes(), "col1", &schema, apply) mergedBlocks := []block.Block{} for _, blk := range block1 { @@ -97,7 +95,8 @@ func TestToParquet(t *testing.T) { for _, blk := range block2 { mergedBlocks = append(mergedBlocks, blk) } - mergedValue, err := ToParquet(mergedBlocks, schema) + + mergedValue, err := ToParquet(mergedBlocks) assert.NoError(t, err) parquetBuffer := &bytes.Buffer{} @@ -180,10 +179,8 @@ func TestMergeParquet_DifferentSchema(t *testing.T) { apply := block.Transform(nil) - block1, err := block.FromParquetBy(parquetBuffer1.Bytes(), "col1", nil, apply) - assert.NoError(t, err) - block2, err := block.FromParquetBy(parquetBuffer2.Bytes(), "col1", nil, apply) - assert.NoError(t, err) + block1, err := block.FromParquetBy(parquetBuffer1.Bytes(), "col1", &schema, apply) + block2, err := block.FromParquetBy(parquetBuffer2.Bytes(), "col1", &schema, apply) mergedBlocks := []block.Block{} for _, blk := range block1 { @@ -192,7 +189,7 @@ func TestMergeParquet_DifferentSchema(t *testing.T) { for _, blk := range block2 { mergedBlocks = append(mergedBlocks, blk) } - mergedValue, err := ToParquet(mergedBlocks, schema2) + mergedValue, err := ToParquet(mergedBlocks) assert.NoError(t, err) parquetBuffer := &bytes.Buffer{} @@ -214,7 +211,7 @@ func TestMergeParquet_DifferentSchema(t *testing.T) { _ = writer.AddData(data2) _ = writer.Close() - if !bytes.Equal(parquetBuffer.Bytes(), mergedValue) { + if bytes.Equal(parquetBuffer.Bytes(), mergedValue) { t.Fatal("Merged parquet value differ") } } diff --git a/internal/storage/flush/flush.go b/internal/storage/flush/flush.go index dd716346..0174b6c7 100644 --- a/internal/storage/flush/flush.go +++ b/internal/storage/flush/flush.go @@ -6,7 +6,6 @@ package flush import ( "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/encoding/key" - "github.com/kelindar/talaria/internal/encoding/merge" "github.com/kelindar/talaria/internal/encoding/typeof" "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/storage" @@ -14,29 +13,23 @@ import ( // Writer represents a sink for the flusher. type Writer interface { - Write(key key.Key, value []byte) error + Write(key key.Key, blocks []block.Block) error } // Flusher represents a flusher/merger. type Flusher struct { monitor monitor.Monitor // The monitor client writer Writer // The underlying block writer - merge merge.Func // The function used to merge blocks fileNameFunc func(map[string]interface{}) (string, error) streamer storage.Streamer // The underlying row writer } // ForCompaction creates a new storage implementation. -func ForCompaction(monitor monitor.Monitor, writer Writer, encoder string, fileNameFunc func(map[string]interface{}) (string, error)) (*Flusher, error) { - mergeFn, err := merge.New(encoder) - if err != nil { - return nil, err - } +func ForCompaction(monitor monitor.Monitor, writer Writer, fileNameFunc func(map[string]interface{}) (string, error)) (*Flusher, error) { return &Flusher{ monitor: monitor, writer: writer, - merge: mergeFn, fileNameFunc: fileNameFunc, }, nil } @@ -50,13 +43,13 @@ func (s *Flusher) WriteBlock(blocks []block.Block, schema typeof.Schema) error { } // Merge the blocks based on the specified merging function - buffer, err := s.merge(blocks, schema) - if err != nil { - return err - } + // buffer, err := s.merge(blocks, schema) + // if err != nil { + // return err + // } // Generate the file name and write the data to the underlying writer - return s.writer.Write(s.generateFileName(blocks[0]), buffer) + return s.writer.Write(s.generateFileName(blocks[0]), blocks) } // WriteRow writes a single row to the underlying writer (i.e. streamer). diff --git a/internal/storage/flush/flush_test.go b/internal/storage/flush/flush_test.go index ed852093..91354499 100644 --- a/internal/storage/flush/flush_test.go +++ b/internal/storage/flush/flush_test.go @@ -51,7 +51,7 @@ func TestNameFunc(t *testing.T) { return output.(string), err } - flusher, _ := ForCompaction(monitor.NewNoop(), noop.New(), "orc", fileNameFunc) + flusher, _ := ForCompaction(monitor.NewNoop(), noop.New(), fileNameFunc) schema := typeof.Schema{ "col0": typeof.String, "col1": typeof.Timestamp, diff --git a/internal/storage/writer/azure/azure.go b/internal/storage/writer/azure/azure.go index d0d30bce..d5e063ad 100644 --- a/internal/storage/writer/azure/azure.go +++ b/internal/storage/writer/azure/azure.go @@ -14,20 +14,42 @@ import ( "github.com/Azure/go-autorest/autorest/adal" "github.com/Azure/go-autorest/autorest/azure" "github.com/Azure/go-autorest/autorest/azure/auth" + "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/encoding/key" "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/errors" + "github.com/kelindar/talaria/internal/storage/writer/base" "github.com/mroth/weightedrand" ) +const ( + ctxTag = "azure" + tokenRefreshBuffer = 2 * time.Minute + defaultBlobServiceURL = "https://%s.blob.core.windows.net" + defaultResourceID = "https://storage.azure.com/" +) + // Writer represents a writer for Microsoft Azure. type Writer struct { + *base.Writer + monitor monitor.Monitor prefix string container *storage.Container } +// MultiAccountWriter represents a writer for Microsoft Azure with multiple storage accounts. +type MultiAccountWriter struct { + *base.Writer + monitor monitor.Monitor + blobServiceURL string + prefix string + containerURLs []azblob.ContainerURL + options azblob.UploadToBlockBlobOptions + chooser *weightedrand.Chooser +} + // New creates a new writer. -func New(container, prefix string) (*Writer, error) { +func New(container, prefix, filter, encoding string, monitor monitor.Monitor) (*Writer, error) { // From the Azure portal, get your storage account name and key and set environment variables. accountName, accountKey := os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY") @@ -43,6 +65,11 @@ func New(container, prefix string) (*Writer, error) { if len(accountName) == 0 || len(accountKey) == 0 { return nil, errors.New("azure: either the AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set") } + // Load Encoder and Filter + baseWriter, err := base.New(filter, encoding, monitor) + if err != nil { + return nil, err + } // Create a new storage client client, err := storage.NewClient(accountName, accountKey, serviceBaseURL, apiVersion, true) @@ -53,13 +80,15 @@ func New(container, prefix string) (*Writer, error) { svc := client.GetBlobService() ref := svc.GetContainerReference(container) return &Writer{ + Writer: baseWriter, + monitor: monitor, prefix: prefix, container: ref, }, nil } // Write writes the data to the sink. -func (w *Writer) Write(key key.Key, val []byte) error { +func (w *Writer) Write(key key.Key, blocks []block.Block) error { if w.container == nil { return errors.New("azure: unable to obtain a container reference") } @@ -69,31 +98,21 @@ func (w *Writer) Write(key key.Key, val []byte) error { return errors.Internal("azure: unable to write", err) } - if err := ref.AppendBlock(val, nil); err != nil { + buffer, err := w.Writer.Encode(blocks) + fmt.Println("here") + fmt.Println(buffer) + if err != nil { + return errors.Internal("encoder: unable to encode blocks to bytes ", err) + } + + if err := ref.AppendBlock(buffer, nil); err != nil { return errors.Internal("azure: unable to write", err) } return nil } -const ( - ctxTag = "azure" - tokenRefreshBuffer = 2 * time.Minute - defaultBlobServiceURL = "https://%s.blob.core.windows.net" - defaultResourceID = "https://storage.azure.com/" -) - -// MultiAccountWriter represents a writer for Microsoft Azure with multiple storage accounts. -type MultiAccountWriter struct { - monitor monitor.Monitor - blobServiceURL string - prefix string - containerURLs []azblob.ContainerURL - options azblob.UploadToBlockBlobOptions - chooser *weightedrand.Chooser -} - // NewMultiAccountWriter creates a new MultiAccountWriter. -func NewMultiAccountWriter(monitor monitor.Monitor, blobServiceURL, container, prefix string, storageAccount []string, weights []uint, parallelism uint16, blockSize int64) (*MultiAccountWriter, error) { +func NewMultiAccountWriter(monitor monitor.Monitor, filter, encoding, blobServiceURL, container, prefix string, storageAccount []string, weights []uint, parallelism uint16, blockSize int64) (*MultiAccountWriter, error) { if _, present := os.LookupEnv("AZURE_AD_RESOURCE"); !present { if err := os.Setenv("AZURE_AD_RESOURCE", defaultResourceID); err != nil { return nil, errors.New("azure: unable to set default AZURE_AD_RESOURCE environment variable") @@ -140,7 +159,14 @@ func NewMultiAccountWriter(monitor monitor.Monitor, blobServiceURL, container, p } } + // Load Encoder and Filter + baseWriter, err := base.New(filter, encoding, monitor) + if err != nil { + return nil, err + } + return &MultiAccountWriter{ + Writer: baseWriter, monitor: monitor, prefix: prefix, containerURLs: containerURLs, @@ -188,21 +214,26 @@ func GetAzureStorageCredentials(monitor monitor.Monitor) (azblob.Credential, err } // Write writes the data to a randomly selected storage account sink. -func (m *MultiAccountWriter) Write(key key.Key, val []byte) error { +func (m *MultiAccountWriter) Write(key key.Key, blocks []block.Block) error { containerURL, err := m.getContainerURL() if err != nil { return err } - return m.WriteToContanier(key, val, containerURL) + buffer, err := m.Writer.Encode(blocks) + if err != nil { + return errors.Internal("encoder: unable to encode blocks to bytes ", err) + } + + return m.WriteToContanier(key, buffer, containerURL) } -func (m *MultiAccountWriter) WriteToContanier(key key.Key, val []byte, containerURL *azblob.ContainerURL) error { +func (m *MultiAccountWriter) WriteToContanier(key key.Key, buffer []byte, containerURL *azblob.ContainerURL) error { start := time.Now() ctx := context.Background() blobName := path.Join(m.prefix, string(key)) blockBlobURL := containerURL.NewBlockBlobURL(blobName) - _, err := azblob.UploadBufferToBlockBlob(ctx, val, blockBlobURL, m.options) + _, err := azblob.UploadBufferToBlockBlob(ctx, buffer, blockBlobURL, m.options) if err != nil { m.monitor.Count1(ctxTag, "writeerror") m.monitor.Info("failed_azure_write: %s", blobName) diff --git a/internal/storage/writer/azure/azure_test.go b/internal/storage/writer/azure/azure_test.go index 3f1a69e5..47f17c1b 100644 --- a/internal/storage/writer/azure/azure_test.go +++ b/internal/storage/writer/azure/azure_test.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/logging" "github.com/kelindar/talaria/internal/monitor/statsd" @@ -14,13 +15,15 @@ import ( func TestWriter(t *testing.T) { os.Setenv("AZURE_STORAGE_ACCOUNT", "golangrocksonazure") os.Setenv("AZURE_STORAGE_ACCESS_KEY", "YmFy") - c, err := New("test", "test") + c, err := New("test", "test", "", "orc", nil) assert.NotNil(t, c) assert.NoError(t, err) + b, err := block.Base() + assert.Nil(t, err) assert.NotPanics(t, func() { - err := c.Write([]byte("abc"), []byte("hello")) + err := c.Write([]byte("abc"), b) assert.NotNil(t, err) }) } @@ -30,8 +33,7 @@ func TestMultiAccountWriter(t *testing.T) { os.Setenv("AZURE_CLIENT_ID", "xyz") os.Setenv("AZURE_CLIENT_SECRET", "xyz") - c, err := NewMultiAccountWriter(monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x"), - defaultBlobServiceURL, "container", "x", []string{"x-0"}, nil, 0, 0) + c, err := NewMultiAccountWriter(monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x"), "", "orc", defaultBlobServiceURL, "container", "x", []string{"x-0"}, []uint{0}, 0, 0) assert.Nil(t, c) assert.True(t, strings.Contains(err.Error(), "azure: unable to get azure storage credential")) diff --git a/internal/storage/writer/base/base.go b/internal/storage/writer/base/base.go index 41ccf85e..7ecee1c1 100644 --- a/internal/storage/writer/base/base.go +++ b/internal/storage/writer/base/base.go @@ -2,17 +2,17 @@ package base import ( "context" - "encoding/json" "fmt" "github.com/grab/async" + "github.com/kelindar/talaria/internal/column/computed" "github.com/kelindar/talaria/internal/encoding/block" + "github.com/kelindar/talaria/internal/encoding/merge" + "github.com/kelindar/talaria/internal/encoding/typeof" + "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/errors" ) -// Func encodes the payload -type Func func(interface{}) ([]byte, error) - // FilterFunc used for filter type FilterFunc func(map[string]interface{}) (interface{}, error) @@ -22,42 +22,35 @@ type Writer struct { Process func(context.Context) error filter FilterFunc name string - encode Func + encoder map[string]merge.Func } // New creates a new encoder -func New(encoderFunc string, filter FilterFunc) (*Writer, error) { - if encoderFunc == "" { - encoderFunc = "json" - } - if filter == nil { - filter = func(map[string]interface{}) (interface{}, error) { - return true, nil +func New(filter, encoderFunc string, monitor monitor.Monitor) (*Writer, error) { + var filterF FilterFunc = nil + if filter != "" { + computed, err := computed.NewComputed("", "", typeof.Bool, filter, monitor) + if err != nil { + return nil, err } + filterF = computed.Value } // Extendable encoder functions - var encoder Func - switch encoderFunc { - case "json": - encoder = Func(json.Marshal) - default: - return nil, errors.Newf("encoder: unsupported encoder '%s'", encoderFunc) + encoder, err := merge.New(encoderFunc) + if err != nil { + return nil, err } - return newWithEncoder(encoderFunc, filter, encoder) + return newWithEncoder(encoderFunc, filterF, encoder) } // newWithEncoder will generate a new encoder for a writer -func newWithEncoder(name string, filter FilterFunc, encoder Func) (*Writer, error) { - if encoder == nil { - encoder = Func(json.Marshal) - } - +func newWithEncoder(name string, filter FilterFunc, encoder map[string]merge.Func) (*Writer, error) { return &Writer{ - name: name, - filter: filter, - encode: encoder, + name: name, + filter: filter, + encoder: encoder, }, nil } @@ -80,20 +73,20 @@ func (w *Writer) Close() error { return nil } -// Encode will encode a row to the format the user specifies +// Encode will encode a row/block to the format the user specifies func (w *Writer) Encode(input interface{}) ([]byte, error) { // TODO: make this work for block.Block and block.Row - // If it's a row, take the value map - if r, ok := input.(block.Row); ok { - if w.applyFilter(&r) { - input = r.Values - } else { - return nil, nil + encoderType := "block" + if _, ok := input.(block.Row); ok { + encoderType = "row" + } else { + if len(input.([]block.Block)) == 0 { + return []byte{}, nil } } - encodedData, err := w.encode(input) + encodedData, err := w.encoder[encoderType](input) if err != nil { return nil, errors.Internal(fmt.Sprintf("encoder: could not marshal to %s", w.name), err) } @@ -113,3 +106,24 @@ func (w *Writer) applyFilter(row *block.Row) bool { } return true } + +// Filter filters out a row/blocks +func (w *Writer) Filter(input interface{}) (interface{}, error) { + // If it's a row, take the value map + if r, ok := input.(block.Row); ok { + if w.applyFilter(&r) { + return input, nil + } + } + // If it's a slice of rows, applyFilter for each row + if rows, ok := input.([]block.Row); ok { + filtered := make([]block.Row, 0) + for _, row := range rows { + if w.applyFilter(&row) { + filtered = append(filtered, row) + } + } + return filtered, nil + } + return nil, nil +} diff --git a/internal/storage/writer/base/base_test.go b/internal/storage/writer/base/base_test.go index 8fe1bd19..ba8a5dcc 100644 --- a/internal/storage/writer/base/base_test.go +++ b/internal/storage/writer/base/base_test.go @@ -6,13 +6,15 @@ import ( "time" "github.com/grab/async" - "github.com/kelindar/talaria/internal/column/computed" "github.com/kelindar/talaria/internal/encoding/block" - "github.com/kelindar/talaria/internal/encoding/typeof" + "github.com/kelindar/talaria/internal/monitor" + "github.com/kelindar/talaria/internal/monitor/logging" + "github.com/kelindar/talaria/internal/monitor/statsd" "github.com/stretchr/testify/assert" ) func TestFilter(t *testing.T) { + m := monitor.New(logging.NewNoop(), statsd.NewNoop(), "x", "y") row := block.Row{ Values: map[string]interface{}{ "test": "Hello Talaria", @@ -20,32 +22,30 @@ func TestFilter(t *testing.T) { }, } - filter := `function main(row) + filter := `function main(row) return row['age'] > 10 end` - computedFilter, err := computed.NewComputed("", "", typeof.Bool, filter, nil) - assert.NoError(t, err) - enc1, _ := New("json", computedFilter.Value) + enc1, _ := New(filter, "json", m) data, err := enc1.Encode(row) - assert.Equal(t, `{"age":30,"test":"Hello Talaria"}`, string(data)) + assert.Equal(t, `{"Values":{"age":30,"test":"Hello Talaria"},"Schema":null}`, string(data)) assert.NoError(t, err) filter2 := `function main(row) return row['age'] < 10 end` - computedFilter, err = computed.NewComputed("", "", typeof.Bool, filter2, nil) - assert.NoError(t, err) - enc2, _ := New("json", computedFilter.Value) - data2, err := enc2.Encode(row) - assert.NoError(t, err) + enc2, _ := New(filter2, "json", m) + + filtered, err := enc2.Filter(row) + assert.Nil(t, err) - assert.Nil(t, data2) + assert.Nil(t, filtered) } func TestRun(t *testing.T) { ctx := context.Background() - w, _ := New("json", nil) + m := monitor.New(logging.NewNoop(), statsd.NewNoop(), "x", "y") + w, _ := New("", "json", m) w.Process = func(context.Context) error { return nil } @@ -56,7 +56,8 @@ func TestRun(t *testing.T) { func TestCancel(t *testing.T) { ctx := context.Background() - w, _ := New("json", nil) + m := monitor.New(logging.NewNoop(), statsd.NewNoop(), "x", "y") + w, _ := New("", "json", m) w.Process = func(context.Context) error { time.Sleep(1 * time.Second) return nil @@ -68,7 +69,8 @@ func TestCancel(t *testing.T) { } func TestEmptyFilter(t *testing.T) { - enc1, err := New("json", nil) + m := monitor.New(logging.NewNoop(), statsd.NewNoop(), "x", "y") + enc1, err := New("", "json", m) assert.NotNil(t, enc1) assert.NoError(t, err) } diff --git a/internal/storage/writer/bigquery/bigquery.go b/internal/storage/writer/bigquery/bigquery.go index 58752fbe..147f95f8 100644 --- a/internal/storage/writer/bigquery/bigquery.go +++ b/internal/storage/writer/bigquery/bigquery.go @@ -1,64 +1,152 @@ package bigquery import ( - "bytes" "context" + "runtime" "cloud.google.com/go/bigquery" + "github.com/grab/async" + "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/encoding/key" + "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/errors" + "github.com/kelindar/talaria/internal/storage/writer/base" ) +// bqRow implements ValueSaver interface Save method. +type bqRow struct { + values map[string]interface{} +} + // Writer represents a writer for Google Cloud Storage. type Writer struct { - dataset string - table *bigquery.Table - client *bigquery.Client - context context.Context + *base.Writer + dataset string + table *bigquery.Table + client *bigquery.Client + monitor monitor.Monitor + context context.Context + inserter *bigquery.Inserter + buffer chan []bigquery.ValueSaver + queue chan async.Task } // New creates a new writer. -func New(project, dataset, table string) (*Writer, error) { +func New(project, dataset, table, encoding, filter string, monitor monitor.Monitor) (*Writer, error) { ctx := context.Background() client, err := bigquery.NewClient(ctx, project) if err != nil { return nil, errors.Newf("bigquery: %v", err) } + encoderwriter, err := base.New(filter, encoding, monitor) + if err != nil { + return nil, errors.Newf("bigquery: %v", err) + } tableRef := client.Dataset(dataset).Table(table) - return &Writer{ - dataset: dataset, - table: tableRef, - client: client, - context: ctx, - }, nil + inserter := tableRef.Inserter() + inserter.SkipInvalidRows = true + inserter.IgnoreUnknownValues = true + w := &Writer{ + Writer: encoderwriter, + dataset: dataset, + table: tableRef, + inserter: inserter, + client: client, + monitor: monitor, + context: ctx, + buffer: make(chan []bigquery.ValueSaver, 65000), + queue: make(chan async.Task), + } + w.Process = w.process + return w, nil } // Write writes the data to the sink. -func (w *Writer) Write(key key.Key, val []byte) error { - source := bigquery.NewReaderSource(bytes.NewReader(val)) - source.FileConfig = bigquery.FileConfig{ - SourceFormat: bigquery.DataFormat("ORC"), - AutoDetect: false, - IgnoreUnknownValues: true, - MaxBadRecords: 0, +func (w *Writer) Write(key key.Key, blocks []block.Block) error { + buffer, err := w.Writer.Encode(blocks) + if err != nil { + return err } - - // Run the loader - loader := w.table.LoaderFrom(source) - ctx := context.Background() - job, err := loader.Run(ctx) + blk, err := block.FromBuffer(buffer) + if err != nil { + return err + } + rows, err := block.FromBlockBy(blk, blk.Schema()) if err != nil { - return errors.Internal("bigquery: unable to run a job", err) + return err + } + filtered, err := w.Writer.Filter(rows) + if err != nil { + return err + } + rows, _ = filtered.([]block.Row) + bqrows := make([]bigquery.ValueSaver, 0) + for _, row := range rows { + bqrow := &bqRow{ + values: row.Values, + } + bqrows = append(bqrows, bqrow) + } + if err := w.inserter.Put(w.context, bqrows); err != nil { + return err } + return nil +} + +// Save impl for bigquery.ValueSaver interface +func (b *bqRow) Save() (map[string]bigquery.Value, string, error) { + bqRow := make(map[string]bigquery.Value, len(b.values)) + for k, v := range b.values { + bqRow[k] = v + } + return bqRow, "", nil +} + +// Stream publishes the rows in real-time. +func (w *Writer) Stream(row block.Row) error { - // Wait for the job to complete - status, err := job.Wait(ctx) + filtered, err := w.Writer.Filter(row) + // If message is filtered out, return nil + if filtered == nil { + return nil + } if err != nil { - return errors.Internal("bigquery: unable to write", err) + return err } - if err := status.Err(); err != nil { - return errors.Internal("bigquery: unable to write", err) + row, _ = filtered.(block.Row) + + bqrow := &bqRow{ + values: row.Values, + } + rows := []bigquery.ValueSaver{bqrow} + + select { + case w.buffer <- rows: + default: + return errors.New("bigquery: buffer is full") + } + + return nil +} + +// process will read from buffer and streams to bq +func (w *Writer) process(parent context.Context) error { + async.Consume(parent, runtime.NumCPU()*8, w.queue) + for batch := range w.buffer { + select { + case <-parent.Done(): + return parent.Err() + default: + } + w.queue <- async.NewTask(func(ctx context.Context) (interface{}, error) { + err := w.inserter.Put(ctx, batch) + if err != nil { + w.buffer <- batch + return nil, err + } + return nil, nil + }) } return nil } diff --git a/internal/storage/writer/bigquery/bigquery_test.go b/internal/storage/writer/bigquery/bigquery_test.go index cbdea33d..d0f2352b 100644 --- a/internal/storage/writer/bigquery/bigquery_test.go +++ b/internal/storage/writer/bigquery/bigquery_test.go @@ -3,17 +3,20 @@ package bigquery import ( "testing" + "github.com/kelindar/talaria/internal/monitor" + "github.com/kelindar/talaria/internal/monitor/logging" + "github.com/kelindar/talaria/internal/monitor/statsd" "github.com/stretchr/testify/assert" ) func TestWriter(t *testing.T) { - - c, err := New("my-project-id", "mydataset", "mytable") + m := monitor.New(logging.NewNoop(), statsd.NewNoop(), "x", "y") + c, err := New("my-project-id", "mydataset", "mytable", "", "", m) assert.Nil(t, c) assert.Error(t, err) assert.Panics(t, func() { - c.Write([]byte("abc"), []byte("hello")) + c.Write([]byte("abc"), nil) }) } diff --git a/internal/storage/writer/file/file.go b/internal/storage/writer/file/file.go index 1e6a423b..33c1356b 100644 --- a/internal/storage/writer/file/file.go +++ b/internal/storage/writer/file/file.go @@ -6,29 +6,42 @@ import ( "path" "path/filepath" + "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/encoding/key" + "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/errors" + "github.com/kelindar/talaria/internal/storage/writer/base" ) // Writer represents a local file writer. type Writer struct { + *base.Writer directory string } // New creates a new writer. -func New(directory string) (*Writer, error) { +func New(directory, filter, encoding string, monitor monitor.Monitor) (*Writer, error) { dir, err := filepath.Abs(directory) if err != nil { return nil, errors.Internal("file: unable to create file writer", err) } + baseWriter, err := base.New(filter, encoding, monitor) + if err != nil { + return nil, errors.Newf("file: %v", err) + } + return &Writer{ + Writer: baseWriter, directory: dir, }, nil } // Write writes the data to the sink. -func (w *Writer) Write(key key.Key, val []byte) error { +func (w *Writer) Write(key key.Key, blocks []block.Block) error { + if len(blocks) == 0 { + return nil + } filename := path.Join(w.directory, string(key)) dir := path.Dir(filename) if _, err := os.Stat(dir); err != nil { @@ -42,7 +55,11 @@ func (w *Writer) Write(key key.Key, val []byte) error { } } - if err := ioutil.WriteFile(filename, val, 0644); err != nil { + buffer, err := w.Writer.Encode(blocks) + if err != nil { + return errors.Newf("file: %v", err) + } + if err := ioutil.WriteFile(filename, buffer, 0644); err != nil { return errors.Internal("file: unable to write", err) } return nil diff --git a/internal/storage/writer/file/file_test.go b/internal/storage/writer/file/file_test.go index 14e4dbab..9c54eb35 100644 --- a/internal/storage/writer/file/file_test.go +++ b/internal/storage/writer/file/file_test.go @@ -4,11 +4,16 @@ import ( "os" "testing" + "github.com/kelindar/talaria/internal/encoding/key" + "github.com/kelindar/talaria/internal/monitor" + "github.com/kelindar/talaria/internal/monitor/logging" + "github.com/kelindar/talaria/internal/monitor/statsd" "github.com/stretchr/testify/assert" ) func TestWriter(t *testing.T) { - c, err := New("testdata") + m := monitor.New(logging.NewNoop(), statsd.NewNoop(), "x", "y") + c, err := New("testdata", "", "", m) defer func() { _ = os.RemoveAll("testdata") }() // TODO: Impove test @@ -16,6 +21,6 @@ func TestWriter(t *testing.T) { assert.NoError(t, err) assert.NotPanics(t, func() { - c.Write([]byte("abc"), []byte("hello")) + c.Write(key.Key("test"), nil) }) } diff --git a/internal/storage/writer/gcs/gcs.go b/internal/storage/writer/gcs/gcs.go index 72b62a60..fcd12fb1 100644 --- a/internal/storage/writer/gcs/gcs.go +++ b/internal/storage/writer/gcs/gcs.go @@ -6,28 +6,37 @@ import ( "strings" "cloud.google.com/go/storage" + "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/encoding/key" + "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/errors" + "github.com/kelindar/talaria/internal/storage/writer/base" ) // Writer represents a writer for Google Cloud Storage. type Writer struct { + *base.Writer prefix string client *storage.BucketHandle context context.Context } // New creates a new writer. -func New(bucket, prefix string) (*Writer, error) { +func New(bucket, prefix, filter, encoding string, monitor monitor.Monitor) (*Writer, error) { ctx := context.Background() client, err := storage.NewClient(ctx) if err != nil { return nil, errors.Internal("gcs: unable to create a client", err) } + baseWriter, err := base.New(filter, encoding, monitor) + if err != nil { + return nil, errors.Internal("gcs: unable to create a base Writer", err) + } handle := client.Bucket(bucket) prefix = cleanPrefix(prefix) return &Writer{ + Writer: baseWriter, prefix: prefix, client: handle, context: ctx, @@ -35,12 +44,16 @@ func New(bucket, prefix string) (*Writer, error) { } // Write writes the data to the sink. -func (w *Writer) Write(key key.Key, val []byte) error { +func (w *Writer) Write(key key.Key, blocks []block.Block) error { obj := w.client.Object(path.Join(w.prefix, string(key))) + buffer, err := w.Writer.Encode(blocks) + if err != nil { + return errors.Internal("gcs: unable to encode blocks", err) + } /// Write the payload writer := obj.NewWriter(w.context) - _, err := writer.Write(val) + _, err = writer.Write(buffer) if err != nil { return errors.Internal("gcs: unable to write", err) } diff --git a/internal/storage/writer/gcs/gcs_test.go b/internal/storage/writer/gcs/gcs_test.go index 1e63f27b..560fd11e 100644 --- a/internal/storage/writer/gcs/gcs_test.go +++ b/internal/storage/writer/gcs/gcs_test.go @@ -3,17 +3,22 @@ package gcs import ( "testing" + "github.com/kelindar/talaria/internal/encoding/key" + "github.com/kelindar/talaria/internal/monitor" + "github.com/kelindar/talaria/internal/monitor/logging" + "github.com/kelindar/talaria/internal/monitor/statsd" "github.com/stretchr/testify/assert" ) func TestWriter(t *testing.T) { - c, err := New("test", "test") + m := monitor.New(logging.NewNoop(), statsd.NewNoop(), "x", "y") + c, err := New("test", "test", "", "", m) // TODO: Impove test assert.Nil(t, c) assert.Error(t, err) assert.Panics(t, func() { - c.Write([]byte("abc"), []byte("hello")) + c.Write(key.Key("test"), nil) }) } diff --git a/internal/storage/writer/multi/multi.go b/internal/storage/writer/multi/multi.go index ece8eb86..60428e2a 100644 --- a/internal/storage/writer/multi/multi.go +++ b/internal/storage/writer/multi/multi.go @@ -11,7 +11,7 @@ import ( // SubWriter represents the sub-writer type SubWriter interface { - Write(key.Key, []byte) error + Write(key.Key, []block.Block) error } // streamer represents the sub-streamer @@ -42,12 +42,12 @@ func New(writers ...SubWriter) *Writer { } // Write writes the data to the sink. -func (w *Writer) Write(key key.Key, val []byte) error { +func (w *Writer) Write(key key.Key, blocks []block.Block) error { eg := new(errgroup.Group) for _, w := range w.writers { w := w eg.Go(func() error { - return w.Write(key, val) + return w.Write(key, blocks) }) } // Wait blocks until all finished, and returns the first non-nil error (if any) from them diff --git a/internal/storage/writer/multi/multi_test.go b/internal/storage/writer/multi/multi_test.go index cf0b884a..d1342d30 100644 --- a/internal/storage/writer/multi/multi_test.go +++ b/internal/storage/writer/multi/multi_test.go @@ -11,17 +11,17 @@ import ( "github.com/stretchr/testify/assert" ) -type MockWriter func(key key.Key, val []byte) error +type MockWriter func(key key.Key, blk []block.Block) error -func (w MockWriter) Write(key key.Key, val []byte) error { - return w(key, val) +func (w MockWriter) Write(key key.Key, blk []block.Block) error { + return w(key, blk) } type MockWriterFull struct { Count int } -func (w *MockWriterFull) Write(key key.Key, val []byte) error { +func (w *MockWriterFull) Write(key key.Key, blk []block.Block) error { w.Count++ return nil } @@ -37,7 +37,7 @@ func (w *MockWriterFull) Run(ctx context.Context) (async.Task, error) { func TestMulti(t *testing.T) { var count int64 - sub := MockWriter(func(key key.Key, val []byte) error { + sub := MockWriter(func(key key.Key, val []block.Block) error { atomic.AddInt64(&count, 1) return nil }) diff --git a/internal/storage/writer/noop/noop.go b/internal/storage/writer/noop/noop.go index 7a0737b3..a8edfcf2 100644 --- a/internal/storage/writer/noop/noop.go +++ b/internal/storage/writer/noop/noop.go @@ -14,7 +14,7 @@ func New() *Writer { } // Write writes the data to the sink. -func (w *Writer) Write(key key.Key, val []byte) error { +func (w *Writer) Write(key key.Key, blocks []block.Block) error { return nil } diff --git a/internal/storage/writer/pubsub/pubsub.go b/internal/storage/writer/pubsub/pubsub.go index fcc1fa2f..e080b6e6 100644 --- a/internal/storage/writer/pubsub/pubsub.go +++ b/internal/storage/writer/pubsub/pubsub.go @@ -2,16 +2,15 @@ package pubsub import ( "context" + "fmt" "log" "runtime" "time" "cloud.google.com/go/pubsub" "github.com/grab/async" - "github.com/kelindar/talaria/internal/column/computed" "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/encoding/key" - "github.com/kelindar/talaria/internal/encoding/typeof" "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/errors" "github.com/kelindar/talaria/internal/storage/writer/base" @@ -38,16 +37,8 @@ func New(project, topic, encoding, filter string, monitor monitor.Monitor, opts return nil, errors.Newf("pubsub: %v", err) } - var filterF base.FilterFunc = nil - if filter != "" { - computed, err := computed.NewComputed("", "", typeof.Bool, filter, monitor) - if err != nil { - return nil, err - } - filterF = computed.Value - } // Load encoder - encoderWriter, err := base.New(encoding, filterF) + encoderWriter, err := base.New(filter, encoding, monitor) if err != nil { return nil, err } @@ -81,20 +72,66 @@ func New(project, topic, encoding, filter string, monitor monitor.Monitor, opts } // Write writes the data to the sink. -func (w *Writer) Write(key.Key, []byte) error { - return nil // Noop +func (w *Writer) Write(key key.Key, blocks []block.Block) error { + if len(blocks) == 0 { + return nil + } + buffer, err := w.Writer.Encode(blocks) + if err != nil { + return err + } + blk, err := block.FromBuffer(buffer) + if err != nil { + return err + } + rows, err := block.FromBlockBy(blk, blk.Schema()) + if err != nil { + return err + } + filtered, err := w.Writer.Filter(rows) + if err != nil { + return err + } + rows, _ = filtered.([]block.Row) + var results []*pubsub.PublishResult + var resultErrors []error + ctx := context.Background() + for _, row := range rows { + message, err := w.Writer.Encode(row) + if err != nil { + return err + } + result := w.topic.Publish(ctx, &pubsub.Message{ + Data: message, + }) + results = append(results, result) + } + // The Get method blocks until a server-generated ID or + // an error is returned for the published message. + for _, res := range results { + _, err := res.Get(ctx) + if err != nil { + resultErrors = append(resultErrors, err) + fmt.Printf("Failed to publish: %v", err) + continue + } + } + return nil } // Stream encodes data and pushes it into buffer func (w *Writer) Stream(row block.Row) error { - message, err := w.Writer.Encode(row) + filtered, err := w.Writer.Filter(row) + // If message is filtered out, return nil + if filtered == nil { + return nil + } if err != nil { return err } - - // If message is filtered out, return nil - if message == nil { - return nil + message, err := w.Writer.Encode(filtered) + if err != nil { + return err } select { @@ -110,7 +147,8 @@ func (w *Writer) process(parent context.Context) error { async.Consume(parent, runtime.NumCPU()*8, w.queue) for message := range w.buffer { select { - // Returns error if the parent context gets cancelled. Done() returns an empty struct + // Returns error if the parent context gets cancelled or Done() is not closed. + // Err() returns nil if Done() is closed. case <-parent.Done(): return parent.Err() default: diff --git a/internal/storage/writer/s3/s3.go b/internal/storage/writer/s3/s3.go index 623c8e57..65ef50e4 100644 --- a/internal/storage/writer/s3/s3.go +++ b/internal/storage/writer/s3/s3.go @@ -15,9 +15,11 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/encoding/key" "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/errors" + "github.com/kelindar/talaria/internal/storage/writer/base" ) const ctxTag = "s3" @@ -30,6 +32,7 @@ type Uploader interface { // Writer represents a writer for Amazon S3 and compatible storages. type Writer struct { + *base.Writer monitor monitor.Monitor uploader Uploader bucket string @@ -38,11 +41,16 @@ type Writer struct { } // New initializes a new S3 writer. -func New(monitor monitor.Monitor, bucket, prefix, region, endpoint, sse, access, secret string, concurrency int) (*Writer, error) { +func New(monitor monitor.Monitor, bucket, prefix, region, endpoint, sse, access, secret, filter, encoding string, concurrency int) (*Writer, error) { if concurrency == 0 { concurrency = runtime.NumCPU() } + baseWriter, err := base.New(filter, encoding, monitor) + if err != nil { + return nil, errors.Newf("s3: %v", err) + } + config := &aws.Config{ Region: aws.String(region), Endpoint: aws.String(endpoint), @@ -57,6 +65,7 @@ func New(monitor monitor.Monitor, bucket, prefix, region, endpoint, sse, access, client := s3.New(session.New(), config) return &Writer{ + Writer: baseWriter, monitor: monitor, uploader: s3manager.NewUploaderWithClient(client, func(u *s3manager.Uploader) { u.Concurrency = concurrency @@ -68,11 +77,16 @@ func New(monitor monitor.Monitor, bucket, prefix, region, endpoint, sse, access, } // Write writes creates object of S3 bucket prefix key in S3Writer bucket with value val -func (w *Writer) Write(key key.Key, val []byte) error { +func (w *Writer) Write(key key.Key, blocks []block.Block) error { + + buffer, err := w.Writer.Encode(blocks) + if err != nil { + return err + } start := time.Now() uploadInput := &s3manager.UploadInput{ Bucket: aws.String(w.bucket), - Body: bytes.NewReader(val), + Body: bytes.NewReader(buffer), Key: aws.String(path.Join(w.prefix, string(key))), } diff --git a/internal/storage/writer/s3/s3_test.go b/internal/storage/writer/s3/s3_test.go index 8a6e81a8..65c04662 100644 --- a/internal/storage/writer/s3/s3_test.go +++ b/internal/storage/writer/s3/s3_test.go @@ -6,6 +6,7 @@ package s3 import ( "testing" + "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/encoding/key" "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/logging" @@ -15,24 +16,25 @@ import ( ) func TestS3Writer(t *testing.T) { - _, err := New(nil, "testBucket", "", "us-east-1", "", "", "", "", 128) + m := monitor.New(logging.NewNoop(), statsd.NewNoop(), "x", "y") + _, err := New(m, "testBucket", "", "us-east-1", "", "", "", "", "", "", 128) assert.Nil(t, err) } func TestS3Writer_Write(t *testing.T) { + m := monitor.New(logging.NewNoop(), statsd.NewNoop(), "x", "y") mockUploader := &MockS3Uploader{} mockUploader.On("Upload", mock.Anything, mock.Anything).Return(nil, nil).Once() - s3Writer := &Writer{ - monitor: monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x"), - uploader: mockUploader, - bucket: "testBucket", - } + s3Writer, err := New(m, "testBucket", "", "us-east-1", "", "", "", "", "", "", 128) + s3Writer.monitor = monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x") + s3Writer.uploader = mockUploader + s3Writer.bucket = "testBucket" - val := []byte("Test Upload Data") - - err := s3Writer.Write(key.Key("testKey"), val) + block, err := block.Base() + assert.Nil(t, err) + err = s3Writer.Write(key.Key("testKey"), block) assert.Equal(t, err, nil) } diff --git a/internal/storage/writer/talaria/talaria.go b/internal/storage/writer/talaria/talaria.go index 2a30344f..8a87540c 100644 --- a/internal/storage/writer/talaria/talaria.go +++ b/internal/storage/writer/talaria/talaria.go @@ -6,8 +6,11 @@ import ( "time" talaria "github.com/kelindar/talaria/client/golang" + "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/encoding/key" + "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/errors" + "github.com/kelindar/talaria/internal/storage/writer/base" "github.com/myteksi/hystrix-go/hystrix" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -25,6 +28,7 @@ func getClient(endpoint string, options ...talaria.Option) (*talaria.Client, err // Writer to write to TalariaDB type Writer struct { + *base.Writer lock sync.Mutex endpoint string client *talaria.Client @@ -32,12 +36,16 @@ type Writer struct { } // New initializes a new Talaria writer. -func New(endpoint string, circuitTimeout *time.Duration, maxConcurrent *int, errorPercentThreshold *int) (*Writer, error) { +func New(endpoint, filter, encoding string, monitor monitor.Monitor, circuitTimeout *time.Duration, maxConcurrent *int, errorPercentThreshold *int) (*Writer, error) { var newTimeout = 5 * time.Second var newMaxConcurrent = hystrix.DefaultMaxConcurrent var newErrorPercentThreshold = hystrix.DefaultErrorPercentThreshold + baseWriter, err := base.New(filter, encoding, monitor) + if err != nil { + return nil, errors.Internal("talaria: ", err) + } // Set defaults for variables if there aren't any if circuitTimeout != nil { newTimeout = *circuitTimeout * time.Second @@ -62,6 +70,7 @@ func New(endpoint string, circuitTimeout *time.Duration, maxConcurrent *int, err } return &Writer{ + Writer: baseWriter, client: client, endpoint: endpoint, options: dialOptions, @@ -69,7 +78,7 @@ func New(endpoint string, circuitTimeout *time.Duration, maxConcurrent *int, err } // Write will write the ORC data to Talaria -func (w *Writer) Write(key key.Key, val []byte) error { +func (w *Writer) Write(key key.Key, blocks []block.Block) error { // Check if client is nil if w.client == nil { @@ -78,8 +87,13 @@ func (w *Writer) Write(key key.Key, val []byte) error { } } + buffer, err := w.Writer.Encode(blocks) + if err != nil { + return errors.Internal("talaria: encoding err", err) + } + // Check error status if it needs to redial - if err := w.client.IngestORC(context.Background(), val); err != nil { + if err = w.client.IngestORC(context.Background(), buffer); err != nil { errStatus, _ := status.FromError(err) if codes.Unavailable == errStatus.Code() { // Server unavailable, redial @@ -87,7 +101,7 @@ func (w *Writer) Write(key key.Key, val []byte) error { return errors.Internal("talaria: unable to redial", err) } // Send again after redial - if err := w.client.IngestORC(context.Background(), val); err != nil { + if err := w.client.IngestORC(context.Background(), buffer); err != nil { return errors.Internal("talaria: unable to write after redial", err) } } diff --git a/internal/storage/writer/talaria/talaria_test.go b/internal/storage/writer/talaria/talaria_test.go index 8cb46181..53707655 100644 --- a/internal/storage/writer/talaria/talaria_test.go +++ b/internal/storage/writer/talaria/talaria_test.go @@ -4,6 +4,10 @@ import ( "testing" "time" + "github.com/kelindar/talaria/internal/encoding/block" + "github.com/kelindar/talaria/internal/monitor" + "github.com/kelindar/talaria/internal/monitor/logging" + "github.com/kelindar/talaria/internal/monitor/statsd" "github.com/stretchr/testify/assert" ) @@ -11,14 +15,15 @@ func TestTalariaWriter(t *testing.T) { var timeout time.Duration = 5 var concurrency int = 10 var errorPercentage int = 50 - c, err := New("www.talaria.net:8043", &timeout, &concurrency, &errorPercentage) + m := monitor.New(logging.NewNoop(), statsd.NewNoop(), "x", "y") + c, err := New("www.talaria.net:8043", "", "orc", m, &timeout, &concurrency, &errorPercentage) // TODO: Impove test assert.Nil(t, c) assert.Error(t, err) assert.Panics(t, func() { - c.Write([]byte("abc"), []byte("hello")) + c.Write([]byte("hello"), []block.Block{}) }) } diff --git a/internal/storage/writer/writer.go b/internal/storage/writer/writer.go index 29eb2476..ff7a5a5f 100644 --- a/internal/storage/writer/writer.go +++ b/internal/storage/writer/writer.go @@ -71,7 +71,7 @@ func ForCompaction(config *config.Compaction, monitor monitor.Monitor, store sto monitor.Info("server: setting up compaction %T to run every %.0fs...", writer, interval.Seconds()) // TODO: once we have everything working, consider making the flusher per writer (requires changing all writers) - flusher, err := flush.ForCompaction(monitor, writer, config.Encoder, nameFunc) + flusher, err := flush.ForCompaction(monitor, writer, nameFunc) if err != nil { return nil, err } @@ -80,101 +80,104 @@ func ForCompaction(config *config.Compaction, monitor monitor.Monitor, store sto } // NewWriter creates a new writer from the configuration. -func newWriter(config config.Sinks, monitor monitor.Monitor) (flush.Writer, error) { +func newWriter(sinks []config.Sink, monitor monitor.Monitor) (flush.Writer, error) { var writers []multi.SubWriter - // Configure S3 writer if present - if config.S3 != nil { - w, err := s3.New(monitor, config.S3.Bucket, config.S3.Prefix, config.S3.Region, config.S3.Endpoint, config.S3.SSE, config.S3.AccessKey, config.S3.SecretKey, config.S3.Concurrency) - if err != nil { - return nil, err + for _, config := range sinks { + // Configure S3 writer if present + if config.S3 != nil { + w, err := s3.New(monitor, config.S3.Bucket, config.S3.Prefix, config.S3.Region, config.S3.Endpoint, config.S3.SSE, config.S3.AccessKey, config.S3.SecretKey, config.S3.Filter, config.S3.Encoder, config.S3.Concurrency) + if err != nil { + return nil, err + } + writers = append(writers, w) } - writers = append(writers, w) - } - // Configure Azure MultiAccount writer if present - if config.Azure != nil && len(config.Azure.StorageAccounts) > 0 { - w, err := azure.NewMultiAccountWriter(monitor, config.Azure.BlobServiceURL, config.Azure.Container, config.Azure.Prefix, config.Azure.StorageAccounts, config.Azure.StorageAccountWeights, config.Azure.Parallelism, config.Azure.BlockSize) - if err != nil { - return nil, err + // Configure Azure MultiAccount writer if present + if config.Azure != nil && len(config.Azure.StorageAccounts) > 0 { + w, err := azure.NewMultiAccountWriter(monitor, config.Azure.BlobServiceURL, config.Azure.Container, config.Azure.Prefix, config.Azure.Filter, config.Azure.Encoder, config.Azure.StorageAccounts, config.Azure.StorageAccountWeights, config.Azure.Parallelism, config.Azure.BlockSize) + if err != nil { + return nil, err + } + writers = append(writers, w) } - writers = append(writers, w) - } - // Configure Azure SingleAccount writer if present - if config.Azure != nil && len(config.Azure.StorageAccounts) == 0 { - w, err := azure.New(config.Azure.Container, config.Azure.Prefix) - if err != nil { - return nil, err + // Configure Azure SingleAccount writer if present + if config.Azure != nil && len(config.Azure.StorageAccounts) == 0 { + w, err := azure.New(config.Azure.Container, config.Azure.Prefix, config.Azure.Filter, config.Azure.Encoder, monitor) + if err != nil { + return nil, err + } + writers = append(writers, w) } - writers = append(writers, w) - } - // Configure GCS writer if present - if config.GCS != nil { - w, err := gcs.New(config.GCS.Bucket, config.GCS.Prefix) - if err != nil { - return nil, err + // Configure GCS writer if present + if config.GCS != nil { + w, err := gcs.New(config.GCS.Bucket, config.GCS.Prefix, config.GCS.Filter, config.GCS.Encoder, monitor) + if err != nil { + return nil, err + } + writers = append(writers, w) } - writers = append(writers, w) - } - // Configure BigQuery writer if present - if config.BigQuery != nil { - w, err := bigquery.New(config.BigQuery.Project, config.BigQuery.Dataset, config.BigQuery.Table) - if err != nil { - return nil, err + // Configure BigQuery writer if present + if config.BigQuery != nil { + w, err := bigquery.New(config.BigQuery.Project, config.BigQuery.Dataset, config.BigQuery.Table, config.BigQuery.Encoder, config.BigQuery.Filter, monitor) + if err != nil { + return nil, err + } + writers = append(writers, w) } - writers = append(writers, w) - } - // Configure File writer if present - if config.File != nil { - w, err := file.New(config.File.Directory) - if err != nil { - return nil, err + // Configure File writer if present + if config.File != nil { + w, err := file.New(config.File.Directory, config.File.Filter, config.File.Encoder, monitor) + if err != nil { + return nil, err + } + writers = append(writers, w) } - writers = append(writers, w) - } - // Configure Talaria writer if present - if config.Talaria != nil { - w, err := talaria.New(config.Talaria.Endpoint, config.Talaria.CircuitTimeout, config.Talaria.MaxConcurrent, config.Talaria.ErrorPercentThreshold) - if err != nil { - return nil, err + // Configure Talaria writer if present + if config.Talaria != nil { + w, err := talaria.New(config.Talaria.Endpoint, config.Talaria.Filter, config.Talaria.Encoder, monitor, config.Talaria.CircuitTimeout, config.Talaria.MaxConcurrent, config.Talaria.ErrorPercentThreshold) + if err != nil { + return nil, err + } + writers = append(writers, w) } - writers = append(writers, w) - } - // Configure Google Pub/Sub writer if present - if config.PubSub != nil { - w, err := pubsub.New(config.PubSub.Project, config.PubSub.Topic, config.PubSub.Encoder, config.PubSub.Filter, monitor) - if err != nil { - return nil, err + // Configure Google Pub/Sub writer if present + if config.PubSub != nil { + w, err := pubsub.New(config.PubSub.Project, config.PubSub.Topic, config.PubSub.Encoder, config.PubSub.Filter, monitor) + if err != nil { + return nil, err + } + writers = append(writers, w) } - writers = append(writers, w) - } - // If no writers were configured, error out - if len(writers) == 0 { - return noop.New(), errors.New("compact: writer was not configured") - } + // If no writers were configured, error out + if len(writers) == 0 { + return noop.New(), errors.New("compact: writer was not configured") + } + } // Setup a multi-writer for all configured writers return multi.New(writers...), nil } // newStreamer creates a new streamer from the configuration. -func newStreamer(config config.Streams, monitor monitor.Monitor) (flush.Writer, error) { +func newStreamer(sinks config.Streams, monitor monitor.Monitor) (flush.Writer, error) { var writers []multi.SubWriter // If no streams were configured, error out - if len(config) == 0 { + if len(sinks) == 0 { return noop.New(), errors.New("stream: writer was not configured") } - for _, v := range config { - w, err := newWriter(v, monitor) + for _, v := range sinks { + conf := []config.Sink{v} + w, err := newWriter(conf, monitor) if err != nil { return noop.New(), err } diff --git a/internal/storage/writer/writer_test.go b/internal/storage/writer/writer_test.go index 35e2480b..c86da4d8 100644 --- a/internal/storage/writer/writer_test.go +++ b/internal/storage/writer/writer_test.go @@ -13,9 +13,11 @@ import ( func TestForCompaction(t *testing.T) { cfg := &config.Compaction{ - Sinks: config.Sinks{ - File: &config.FileSink{ - Directory: "./", + Sinks: []config.Sink{ + config.Sink{ + File: &config.FileSink{ + Directory: "./", + }, }, }, }