Skip to content

Commit

Permalink
Refactor Sink Writer to accept blocks inplace of bytes (#86)
Browse files Browse the repository at this point in the history
* change Write to use Stream API and impl Stream for bq Sink

* basic refactoring to support list of sinks and single encoder

* bump go version in dockerfile

* added Filter for slice of rows

* added pubsub compaction implementation with Manoj

* Fix test cases

* Fix all test cases

* Remove print line

* Add yaml inline tag to BaseSink (#3)

* Add yaml inline tag to BaseSink

* Add inline tag to BaseSink

* Revert sample yaml configuration

* Refactor/encoder (#4)

* Add yaml inline tag to BaseSink

* Add inline tag to BaseSink

* Revert sample yaml configuration

* fix conflict

* Refactor/encoder (#5)

* Add yaml inline tag to BaseSink

* Add inline tag to BaseSink

* Revert sample yaml configuration

* fix conflict

* add contributors in readme

Co-authored-by: sosoonyuan <[email protected]>
Co-authored-by: JeffreyLean <[email protected]>
Co-authored-by: Jeffrey lean <[email protected]>
  • Loading branch information
4 people authored Jun 7, 2022
1 parent c6fb73c commit d40b2a4
Show file tree
Hide file tree
Showing 41 changed files with 868 additions and 320 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ testdata-*
.envrc
logs/
*.venv
.vscode/
talaria
*.so
*.so
15 changes: 15 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Test current file",
"type": "go",
"request": "launch",
"mode": "test",
"program": "${file}"
}
]
}
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.16 AS builder
FROM golang:1.17 AS builder

ARG GO111MODULE="on"
ARG GOOS="linux"
Expand Down
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Talaria

![Test](https://github.com/kelindar/talaria/workflows/Test/badge.svg)
![Test](https://github.com/kelindar/talaria/workflows/Test/badge.svg)
![Release](https://github.com/kelindar/talaria/workflows/Release/badge.svg)
[![Go Report Card](https://goreportcard.com/badge/github.com/kelindar/talaria)](https://goreportcard.com/report/github.com/kelindar/talaria)
[![Docker Pulls](https://img.shields.io/docker/pulls/kelindar/talaria)](https://hub.docker.com/repository/docker/kelindar/talaria/general)


This repository contains a fork of TalariaDB, a distributed, highly available, and low latency time-series database for Big Data systems. It was originally [designed and implemented in Grab](https://engineering.grab.com/big-data-real-time-presto-talariadb), where millions and millions of transactions and connections take place every day , which requires a platform scalable data-driven decision making.
This repository contains a fork of TalariaDB, a distributed, highly available, and low latency time-series database for Big Data systems. It was originally [designed and implemented in Grab](https://engineering.grab.com/big-data-real-time-presto-talariadb), where millions and millions of transactions and connections take place every day , which requires a platform scalable data-driven decision making.

<p align="center">
<img width="746" height="348" src=".github/images/query-event.png">
Expand Down Expand Up @@ -96,7 +96,7 @@ An example of weighted choice is shown below:
- azure:
container: a_container
prefix: a_prefix
blobServiceURL: .storage.microsoft.net
blobServiceURL: .storage.microsoft.net
storageAccounts:
- a_storage_account
- b_storage_account
Expand All @@ -107,11 +107,11 @@ Random choice and weighed choice are particularly useful for some key scenarios:
- High throughput deployment where the I/O generate by Talaria exceedes the limitation of the stroage accounts.
- When deploying on internal endpoints with multiple VPNs links and you want to split the network traffic across multiple
network links.
network links.
## Hot Data Query with Talaria
If your organisation requires querying of either hot data (e.g. last n hours) or in-flight data (i.e as ingested), you can also configure Talaria to serve it to Presto using built-in [Presto Thrift](https://prestodb.io/docs/current/connector/thrift.html) connector.
If your organisation requires querying of either hot data (e.g. last n hours) or in-flight data (i.e as ingested), you can also configure Talaria to serve it to Presto using built-in [Presto Thrift](https://prestodb.io/docs/current/connector/thrift.html) connector.
![alt text](.github/images/query.png)
Expand Down Expand Up @@ -150,7 +150,7 @@ Once you have set up Talaria, you'll need to configure Presto to talk to it usin
Once this is done, you should be able to query your data via Presto.

```sql
select *
select *
from talaria.data.eventlog
where event = 'table1.update'
limit 1000
Expand All @@ -176,6 +176,8 @@ We are open to contributions, feel free to submit a pull request and we'll revie
* [Atri Sharma](https://www.linkedin.com/in/atrisharma/)
* [Qiao Wei](https://www.linkedin.com/in/qiao-wei-3aa22480)
* [Oscar Cassetti](https://www.linkedin.com/in/ocassetti/)
* [Manoj Babu Katragadda](https://www.linkedin.com/in/manojbabuiit/)
* [Jeffrey Lean](https://www.linkedin.com/in/jeffrey-lean-4089a1198/)

## License

Expand Down
35 changes: 22 additions & 13 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,23 @@ type Computed struct {
FuncName string `json:"funcname"`
}

type BaseSink struct {
Encoder string `json:"encoder" yaml:"encoder"` // The default encoder for the compaction
Filter string `json:"filter" yaml:"filter"` // The default encoder for the compaction
}

// Compaction represents a configuration for compaction sinks
type Compaction struct {
Sinks `yaml:",inline"`
Encoder string `json:"encoder" yaml:"encoder"` // The default encoder for the compaction
Sinks []Sink `yaml:"sinks"`
NameFunc string `json:"nameFunc" yaml:"nameFunc" env:"NAMEFUNC"` // The lua script to compute file name given a row
Interval int `json:"interval" yaml:"interval" env:"INTERVAL"` // The compaction interval, in seconds
}

// Streams are lists of sinks to be streamed to
type Streams []Sinks
type Streams []Sink

// Sinks represents a configuration for writer sinks
type Sinks struct {
type Sink struct {
S3 *S3Sink `json:"s3" yaml:"s3"` // The S3 writer configuration
Azure *AzureSink `json:"azure" yaml:"azure"` // The Azure writer configuration
BigQuery *BigQuerySink `json:"bigquery" yaml:"bigquery" ` // The Big Query writer configuration
Expand All @@ -144,6 +148,7 @@ type Sinks struct {

// S3Sink represents a sink for AWS S3 and compatible stores.
type S3Sink struct {
BaseSink `yaml:",inline"`
Region string `json:"region" yaml:"region" env:"REGION"` // The region of AWS bucket
Bucket string `json:"bucket" yaml:"bucket" env:"BUCKET"` // The name of AWS bucket
Prefix string `json:"prefix" yaml:"prefix" env:"PREFIX"` // The prefix to add
Expand All @@ -156,6 +161,7 @@ type S3Sink struct {

// AzureSink represents a sink to Azure
type AzureSink struct {
BaseSink `yaml:",inline"`
Container string `json:"container" yaml:"container" env:"CONTAINER"` // The container name
Prefix string `json:"prefix" yaml:"prefix" env:"PREFIX"` // The prefix to add
Parallelism uint16 `json:"parallelism" yaml:"parallelism" env:"PARALLELISM"` // The BlockBlob upload parallelism
Expand All @@ -167,32 +173,35 @@ type AzureSink struct {

// BigQuerySink represents a sink to Google Big Query
type BigQuerySink struct {
Project string `json:"project" yaml:"project" env:"PROJECT"` // The project ID
Dataset string `json:"dataset" yaml:"dataset" env:"DATASET"` // The dataset ID
Table string `json:"table" yaml:"table" env:"TABLE"` // The table ID
BaseSink `yaml:",inline"`
Project string `json:"project" yaml:"project" env:"PROJECT"` // The project ID
Dataset string `json:"dataset" yaml:"dataset" env:"DATASET"` // The dataset ID
Table string `json:"table" yaml:"table" env:"TABLE"` // The table ID
}

// GCSSink represents a sink to Google Cloud Storage
type GCSSink struct {
Bucket string `json:"bucket" yaml:"bucket" env:"BUCKET"` // The name of the bucket
Prefix string `json:"prefix" yaml:"prefix" env:"PREFIX"` // The prefix to add
BaseSink `yaml:",inline"`
Bucket string `json:"bucket" yaml:"bucket" env:"BUCKET"` // The name of the bucket
Prefix string `json:"prefix" yaml:"prefix" env:"PREFIX"` // The prefix to add
}

// FileSink represents a sink to the local file system
type FileSink struct {
BaseSink `yaml:",inline"`
Directory string `json:"dir" yaml:"dir" env:"DIR"`
}

// PubSubSink represents a stream to Google Pub/Sub
type PubSubSink struct {
Project string `json:"project" yaml:"project" env:"PROJECT"`
Topic string `json:"topic" yaml:"topic" env:"TOPIC"`
Filter string `json:"filter" yaml:"filter" env:"FILTER"`
Encoder string `json:"encoder" yaml:"encoder" env:"ENCODER"`
BaseSink `yaml:",inline"`
Project string `json:"project" yaml:"project" env:"PROJECT"`
Topic string `json:"topic" yaml:"topic" env:"TOPIC"`
}

// TalariaSink represents a sink to an instance of Talaria
type TalariaSink struct {
BaseSink `yaml:",inline"`
Endpoint string `json:"endpoint" yaml:"endpoint" env:"ENDPOINT"` // The second Talaria endpoint
CircuitTimeout *time.Duration `json:"timeout" yaml:"timeout" env:"TIMEOUT"` // The timeout (in seconds) for requests to the second Talaria
MaxConcurrent *int `json:"concurrency" yaml:"concurrency" env:"CONCURRENCY"` // The number of concurrent requests permissible
Expand Down
11 changes: 6 additions & 5 deletions internal/config/env/configurer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
)

func TestConfigure(t *testing.T) {

c := &config.Config{}
st := static.New()
st.Configure(c)
Expand Down Expand Up @@ -76,8 +75,10 @@ tables:
data: json
compact:
interval: 300
file:
dir: "output/"
sinks:
- file:
dir: "output/"
encoder: json
streams:
- pubsub:
project: my-gcp-project
Expand All @@ -91,7 +92,7 @@ tables:
- azure:
container: a_container
prefix: a_prefix
blobServiceURL: .storage.microsoft.net
blobServiceURL: .storage.microsoft.net
storageAccounts:
- a_storage_account
- b_storage_account
Expand Down Expand Up @@ -119,6 +120,6 @@ computed:
assert.Len(t, c.Tables, 1)

assert.Equal(t, "my-gcp-project", c.Tables["eventlog"].Streams[0].PubSub.Project)
assert.Equal(t, "output/", c.Tables["eventlog"].Compact.File.Directory)
assert.Equal(t, "output/", c.Tables["eventlog"].Compact.Sinks[0].File.Directory)
assert.Equal(t, []uint{1, 2}, c.Tables["eventlog"].Streams[2].Azure.StorageAccountWeights)
}
2 changes: 1 addition & 1 deletion internal/config/sample_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ tables:
users:
hashBy: "user_id"
sortBy: "ingested_at"
schema: "gcs://k8s-default-stg-configs/ingestor/schema2.yaml"
schema: "gcs://k8s-default-stg-configs/ingestor/schema2.yaml"
30 changes: 28 additions & 2 deletions internal/encoding/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"errors"
"fmt"

eorc "github.com/crphang/orc"
"github.com/golang/snappy"
"github.com/kelindar/binary"
"github.com/kelindar/binary/nocopy"
"github.com/kelindar/talaria/internal/column"
"github.com/kelindar/talaria/internal/encoding/orc"
"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/kelindar/talaria/internal/presto"
"github.com/kelindar/binary"
"github.com/kelindar/binary/nocopy"
)

var (
Expand All @@ -33,6 +35,30 @@ type Block struct {
schema typeof.Schema `binary:"-"` // The cached schema of the block
}

// Create a base block for testing purpose
func Base() ([]Block, error) {
schema := typeof.Schema{
"col0": typeof.String,
"col1": typeof.Int64,
"col2": typeof.Float64,
}
orcSchema, _ := orc.SchemaFor(schema)

orcBuffer1 := &bytes.Buffer{}
writer, _ := eorc.NewWriter(orcBuffer1,
eorc.SetSchema(orcSchema))
_ = writer.Write("eventName", 1, 1.0)
_ = writer.Close()

apply := Transform(nil)

block, err := FromOrcBy(orcBuffer1.Bytes(), "col0", nil, apply)
if err != nil {
return nil, err
}
return block, nil
}

// Read decodes the block and selects the columns
func Read(buffer []byte, desiredSchema typeof.Schema) (column.Columns, error) {
block, err := FromBuffer(buffer)
Expand Down
31 changes: 31 additions & 0 deletions internal/encoding/block/from_block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2019-2020 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved.
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file

package block

import (
"github.com/kelindar/talaria/internal/encoding/typeof"
)

// FromBlockBy creates and returns a list of new block.Row for a block.
func FromBlockBy(blk Block, schema typeof.Schema) ([]Row, error) {
cols, err := blk.Select(schema)
if err != nil {
return nil, err
}
rowCount := cols.Any().Count()
rows := make([]Row, rowCount)
for i := 0; i < rowCount; i++ {
row := NewRow(schema, len(schema))
for name := range schema {
col := cols[name]
val := col.At(i)
if val == nil {
continue
}
row.Set(name, val)
}
rows[i] = row
}
return rows, nil
}
48 changes: 48 additions & 0 deletions internal/encoding/block/from_block_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

package block

import (
"io/ioutil"
"testing"

"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/stretchr/testify/assert"
)

func setup() (Block, typeof.Schema) {

const testFile = "../../../test/test4.csv"
o, _ := ioutil.ReadFile(testFile)
schema := &typeof.Schema{
"raisedCurrency": typeof.String,
"raisedAmt": typeof.Float64,
}
apply := Transform(schema)
b, _ := FromCSVBy(o, "raisedCurrency", &typeof.Schema{
"raisedCurrency": typeof.String,
"raisedAmt": typeof.Float64,
}, apply)
if len(b) > 0 {
return b[0], *schema
}
return Block{}, *schema
}

func TestFromBlock(t *testing.T) {
blk, schema := setup()
rows, err := FromBlockBy(blk, schema)
assert.NoError(t, err)
cols, err := blk.Select(schema)
assert.NoError(t, err)
rowCount := cols.Any().Count()
// verify row count.
assert.Equal(t, rowCount, len(rows))
for _, row := range rows {
// verify values
assert.Contains(t, []string{"EUR", "CAD", "USD"}, row.Values["raisedCurrency"])
// verify type
assert.Equal(t, typeof.String, row.Schema["raisedCurrency"])
}
}
2 changes: 1 addition & 1 deletion internal/encoding/block/from_parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,4 @@ func parquetJsonHandler(s interface{}) (interface{}, error) {
}

return nil, fmt.Errorf("Failed to convert to JSON")
}
}
Loading

0 comments on commit d40b2a4

Please sign in to comment.