Skip to content

Commit

Permalink
add custom spans for tracing in clickhouse datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
Umang01-hash committed Sep 27, 2024
1 parent 5ebbcae commit 1a4739c
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 15 deletions.
49 changes: 42 additions & 7 deletions pkg/gofr/datasource/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package clickhouse
import (
"context"
"errors"
"fmt"
"strings"
"time"

"go.opentelemetry.io/otel/attribute"

"go.opentelemetry.io/otel/trace"

"github.com/ClickHouse/clickhouse-go/v2"
Expand Down Expand Up @@ -120,9 +123,13 @@ func pushDBMetrics(conn Conn, metrics Metrics) {
// Exec should be used for DDL and simple statements.
// It should not be used for larger inserts or query iterations.
func (c *client) Exec(ctx context.Context, query string, args ...any) error {
defer c.sendOperationStats(time.Now(), "Exec", query, args...)
tracedCtx, span := c.addTraces(ctx, "exec", query)

err := c.conn.Exec(tracedCtx, query, args...)

return c.conn.Exec(ctx, query, args...)
defer c.sendOperationStats(time.Now(), "Exec", query, "exec", span, args...)

return err
}

// Select method allows a set of response rows to be marshaled into a slice of structs with a single invocation..
Expand All @@ -139,20 +146,29 @@ func (c *client) Exec(ctx context.Context, query string, args ...any) error {
//
// err = ctx.Clickhouse.Select(ctx, &user, "SELECT * FROM users") .
func (c *client) Select(ctx context.Context, dest any, query string, args ...any) error {
defer c.sendOperationStats(time.Now(), "Select", query, args...)
tracedCtx, span := c.addTraces(ctx, "select", query)

err := c.conn.Select(tracedCtx, dest, query, args...)

return c.conn.Select(ctx, dest, query, args...)
defer c.sendOperationStats(time.Now(), "Select", query, "select", span, args...)

return err
}

// AsyncInsert allows the user to specify whether the client should wait for the server to complete the insert or
// respond once the data has been received.
func (c *client) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
defer c.sendOperationStats(time.Now(), "AsyncInsert", query, args...)
tracedCtx, span := c.addTraces(ctx, "async-insert", query)

err := c.conn.AsyncInsert(tracedCtx, query, wait, args...)

return c.conn.AsyncInsert(ctx, query, wait, args...)
defer c.sendOperationStats(time.Now(), "AsyncInsert", query, "async-insert", span, args...)

return err
}

func (c *client) sendOperationStats(start time.Time, methodType, query string, args ...interface{}) {
func (c *client) sendOperationStats(start time.Time, methodType, query string, method string,
span trace.Span, args ...interface{}) {
duration := time.Since(start).Milliseconds()

c.logger.Debug(&Log{
Expand All @@ -162,6 +178,11 @@ func (c *client) sendOperationStats(start time.Time, methodType, query string, a
Args: args,
})

if span != nil {
defer span.End()
span.SetAttributes(attribute.Int64(fmt.Sprintf("clickhouse.%v.duration", method), duration))
}

c.metrics.RecordHistogram(context.Background(), "app_clickhouse_stats", float64(duration), "hosts", c.config.Hosts,
"database", c.config.Database, "type", getOperationType(query))
}
Expand Down Expand Up @@ -198,3 +219,17 @@ func (c *client) HealthCheck(ctx context.Context) (any, error) {

return &h, nil
}

func (c *client) addTraces(ctx context.Context, method, query string) (context.Context, trace.Span) {
if c.tracer != nil {
contextWithTrace, span := c.tracer.Start(ctx, fmt.Sprintf("clickhouse-%v", method))

span.SetAttributes(
attribute.String("clickhouse.query", query),
)

return contextWithTrace, span
}

return ctx, nil
}
24 changes: 16 additions & 8 deletions pkg/gofr/datasource/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"go.uber.org/mock/gomock"
)

func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, client) {
func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, *MockLogger, client) {
t.Helper()

ctrl := gomock.NewController(t)
Expand All @@ -30,12 +30,12 @@ func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, client)
Database: "test",
}, logger: mockLogger, metrics: mockMetric}

return mockConn, mockMetric, c
return mockConn, mockMetric, mockLogger, c
}

func Test_ClickHouse_ConnectAndMetricRegistrationAndPingFailure(t *testing.T) {
logs := stderrOutputForFunc(func() {
_, mockMetric, _ := getClickHouseTestConnection(t)
_, mockMetric, _, _ := getClickHouseTestConnection(t)
mockLogger := NewMockLogger(gomock.NewController(t))

cl := New(Config{
Expand All @@ -53,6 +53,8 @@ func Test_ClickHouse_ConnectAndMetricRegistrationAndPingFailure(t *testing.T) {
mockMetric.EXPECT().NewGauge("app_clickhouse_idle_connections", "Number of idle Clickhouse connections.")
mockMetric.EXPECT().SetGauge("app_clickhouse_open_connections", gomock.Any()).AnyTimes()
mockMetric.EXPECT().SetGauge("app_clickhouse_idle_connections", gomock.Any()).AnyTimes()
mockLogger.EXPECT().Logf("connecting to clickhouse db at %v to database %v", "localhost:8000", "test")
mockLogger.EXPECT().Errorf("ping failed with error %v", gomock.Any())

cl.Connect()

Expand All @@ -78,7 +80,7 @@ func stderrOutputForFunc(f func()) string {
}

func Test_ClickHouse_HealthUP(t *testing.T) {
mockConn, _, c := getClickHouseTestConnection(t)
mockConn, _, _, c := getClickHouseTestConnection(t)

mockConn.EXPECT().Ping(gomock.Any()).Return(nil)

Expand All @@ -88,7 +90,7 @@ func Test_ClickHouse_HealthUP(t *testing.T) {
}

func Test_ClickHouse_HealthDOWN(t *testing.T) {
mockConn, _, c := getClickHouseTestConnection(t)
mockConn, _, _, c := getClickHouseTestConnection(t)

mockConn.EXPECT().Ping(gomock.Any()).Return(sql.ErrConnDone)

Expand All @@ -100,13 +102,15 @@ func Test_ClickHouse_HealthDOWN(t *testing.T) {
}

func Test_ClickHouse_Exec(t *testing.T) {
mockConn, mockMetric, c := getClickHouseTestConnection(t)
mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t)

ctx := context.Background()

mockConn.EXPECT().Exec(ctx, "INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
"8f165e2d-feef-416c-95f6-913ce3172e15", "gofr", "10").Return(nil)

mockLogger.EXPECT().Debug(gomock.Any())

mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts,
"database", c.config.Database, "type", "INSERT")

Expand All @@ -116,7 +120,7 @@ func Test_ClickHouse_Exec(t *testing.T) {
}

func Test_ClickHouse_Select(t *testing.T) {
mockConn, mockMetric, c := getClickHouseTestConnection(t)
mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t)

type User struct {
ID string `ch:"id"`
Expand All @@ -130,6 +134,8 @@ func Test_ClickHouse_Select(t *testing.T) {

mockConn.EXPECT().Select(ctx, &user, "SELECT * FROM users").Return(nil)

mockLogger.EXPECT().Debug(gomock.Any())

mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts,
"database", c.config.Database, "type", "SELECT")

Expand All @@ -139,7 +145,7 @@ func Test_ClickHouse_Select(t *testing.T) {
}

func Test_ClickHouse_AsyncInsert(t *testing.T) {
mockConn, mockMetric, c := getClickHouseTestConnection(t)
mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t)

ctx := context.Background()

Expand All @@ -149,6 +155,8 @@ func Test_ClickHouse_AsyncInsert(t *testing.T) {
mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts,
"database", c.config.Database, "type", "INSERT")

mockLogger.EXPECT().Debug(gomock.Any())

err := c.AsyncInsert(ctx, "INSERT INTO users (id, name, age) VALUES (?, ?, ?)", true,
"8f165e2d-feef-416c-95f6-913ce3172e15", "user", "10")

Expand Down
5 changes: 5 additions & 0 deletions pkg/gofr/external_db.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gofr

import (
"go.opentelemetry.io/otel"
"gofr.dev/pkg/gofr/container"
"gofr.dev/pkg/gofr/datasource/file"
)
Expand Down Expand Up @@ -42,6 +43,10 @@ func (a *App) AddClickhouse(db container.ClickhouseProvider) {
db.UseLogger(a.Logger())
db.UseMetrics(a.Metrics())

tracer := otel.GetTracerProvider().Tracer("gofr-clickhouse")

db.UseTracer(tracer)

db.Connect()

a.container.Clickhouse = db
Expand Down

0 comments on commit 1a4739c

Please sign in to comment.