From 1a4739c2db236029fa171067500714f74dedc5d2 Mon Sep 17 00:00:00 2001 From: umang01-hash Date: Fri, 27 Sep 2024 15:50:50 +0530 Subject: [PATCH] add custom spans for tracing in clickhouse datasource --- pkg/gofr/datasource/clickhouse/clickhouse.go | 49 ++++++++++++++++--- .../datasource/clickhouse/clickhouse_test.go | 24 ++++++--- pkg/gofr/external_db.go | 5 ++ 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/pkg/gofr/datasource/clickhouse/clickhouse.go b/pkg/gofr/datasource/clickhouse/clickhouse.go index 9ab49d2e5..c60b3c23a 100644 --- a/pkg/gofr/datasource/clickhouse/clickhouse.go +++ b/pkg/gofr/datasource/clickhouse/clickhouse.go @@ -3,9 +3,12 @@ package clickhouse import ( "context" "errors" + "fmt" "strings" "time" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/ClickHouse/clickhouse-go/v2" @@ -120,9 +123,13 @@ func pushDBMetrics(conn Conn, metrics Metrics) { // Exec should be used for DDL and simple statements. // It should not be used for larger inserts or query iterations. func (c *client) Exec(ctx context.Context, query string, args ...any) error { - defer c.sendOperationStats(time.Now(), "Exec", query, args...) + tracedCtx, span := c.addTraces(ctx, "exec", query) + + err := c.conn.Exec(tracedCtx, query, args...) - return c.conn.Exec(ctx, query, args...) + defer c.sendOperationStats(time.Now(), "Exec", query, "exec", span, args...) + + return err } // Select method allows a set of response rows to be marshaled into a slice of structs with a single invocation.. @@ -139,20 +146,29 @@ func (c *client) Exec(ctx context.Context, query string, args ...any) error { // // err = ctx.Clickhouse.Select(ctx, &user, "SELECT * FROM users") . func (c *client) Select(ctx context.Context, dest any, query string, args ...any) error { - defer c.sendOperationStats(time.Now(), "Select", query, args...) + tracedCtx, span := c.addTraces(ctx, "select", query) + + err := c.conn.Select(tracedCtx, dest, query, args...) - return c.conn.Select(ctx, dest, query, args...) + defer c.sendOperationStats(time.Now(), "Select", query, "select", span, args...) + + return err } // AsyncInsert allows the user to specify whether the client should wait for the server to complete the insert or // respond once the data has been received. func (c *client) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error { - defer c.sendOperationStats(time.Now(), "AsyncInsert", query, args...) + tracedCtx, span := c.addTraces(ctx, "async-insert", query) + + err := c.conn.AsyncInsert(tracedCtx, query, wait, args...) - return c.conn.AsyncInsert(ctx, query, wait, args...) + defer c.sendOperationStats(time.Now(), "AsyncInsert", query, "async-insert", span, args...) + + return err } -func (c *client) sendOperationStats(start time.Time, methodType, query string, args ...interface{}) { +func (c *client) sendOperationStats(start time.Time, methodType, query string, method string, + span trace.Span, args ...interface{}) { duration := time.Since(start).Milliseconds() c.logger.Debug(&Log{ @@ -162,6 +178,11 @@ func (c *client) sendOperationStats(start time.Time, methodType, query string, a Args: args, }) + if span != nil { + defer span.End() + span.SetAttributes(attribute.Int64(fmt.Sprintf("clickhouse.%v.duration", method), duration)) + } + c.metrics.RecordHistogram(context.Background(), "app_clickhouse_stats", float64(duration), "hosts", c.config.Hosts, "database", c.config.Database, "type", getOperationType(query)) } @@ -198,3 +219,17 @@ func (c *client) HealthCheck(ctx context.Context) (any, error) { return &h, nil } + +func (c *client) addTraces(ctx context.Context, method, query string) (context.Context, trace.Span) { + if c.tracer != nil { + contextWithTrace, span := c.tracer.Start(ctx, fmt.Sprintf("clickhouse-%v", method)) + + span.SetAttributes( + attribute.String("clickhouse.query", query), + ) + + return contextWithTrace, span + } + + return ctx, nil +} diff --git a/pkg/gofr/datasource/clickhouse/clickhouse_test.go b/pkg/gofr/datasource/clickhouse/clickhouse_test.go index 4bd4861d7..05ad2fedf 100644 --- a/pkg/gofr/datasource/clickhouse/clickhouse_test.go +++ b/pkg/gofr/datasource/clickhouse/clickhouse_test.go @@ -14,7 +14,7 @@ import ( "go.uber.org/mock/gomock" ) -func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, client) { +func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, *MockLogger, client) { t.Helper() ctrl := gomock.NewController(t) @@ -30,12 +30,12 @@ func getClickHouseTestConnection(t *testing.T) (*MockConn, *MockMetrics, client) Database: "test", }, logger: mockLogger, metrics: mockMetric} - return mockConn, mockMetric, c + return mockConn, mockMetric, mockLogger, c } func Test_ClickHouse_ConnectAndMetricRegistrationAndPingFailure(t *testing.T) { logs := stderrOutputForFunc(func() { - _, mockMetric, _ := getClickHouseTestConnection(t) + _, mockMetric, _, _ := getClickHouseTestConnection(t) mockLogger := NewMockLogger(gomock.NewController(t)) cl := New(Config{ @@ -53,6 +53,8 @@ func Test_ClickHouse_ConnectAndMetricRegistrationAndPingFailure(t *testing.T) { mockMetric.EXPECT().NewGauge("app_clickhouse_idle_connections", "Number of idle Clickhouse connections.") mockMetric.EXPECT().SetGauge("app_clickhouse_open_connections", gomock.Any()).AnyTimes() mockMetric.EXPECT().SetGauge("app_clickhouse_idle_connections", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Logf("connecting to clickhouse db at %v to database %v", "localhost:8000", "test") + mockLogger.EXPECT().Errorf("ping failed with error %v", gomock.Any()) cl.Connect() @@ -78,7 +80,7 @@ func stderrOutputForFunc(f func()) string { } func Test_ClickHouse_HealthUP(t *testing.T) { - mockConn, _, c := getClickHouseTestConnection(t) + mockConn, _, _, c := getClickHouseTestConnection(t) mockConn.EXPECT().Ping(gomock.Any()).Return(nil) @@ -88,7 +90,7 @@ func Test_ClickHouse_HealthUP(t *testing.T) { } func Test_ClickHouse_HealthDOWN(t *testing.T) { - mockConn, _, c := getClickHouseTestConnection(t) + mockConn, _, _, c := getClickHouseTestConnection(t) mockConn.EXPECT().Ping(gomock.Any()).Return(sql.ErrConnDone) @@ -100,13 +102,15 @@ func Test_ClickHouse_HealthDOWN(t *testing.T) { } func Test_ClickHouse_Exec(t *testing.T) { - mockConn, mockMetric, c := getClickHouseTestConnection(t) + mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t) ctx := context.Background() mockConn.EXPECT().Exec(ctx, "INSERT INTO users (id, name, age) VALUES (?, ?, ?)", "8f165e2d-feef-416c-95f6-913ce3172e15", "gofr", "10").Return(nil) + mockLogger.EXPECT().Debug(gomock.Any()) + mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts, "database", c.config.Database, "type", "INSERT") @@ -116,7 +120,7 @@ func Test_ClickHouse_Exec(t *testing.T) { } func Test_ClickHouse_Select(t *testing.T) { - mockConn, mockMetric, c := getClickHouseTestConnection(t) + mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t) type User struct { ID string `ch:"id"` @@ -130,6 +134,8 @@ func Test_ClickHouse_Select(t *testing.T) { mockConn.EXPECT().Select(ctx, &user, "SELECT * FROM users").Return(nil) + mockLogger.EXPECT().Debug(gomock.Any()) + mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts, "database", c.config.Database, "type", "SELECT") @@ -139,7 +145,7 @@ func Test_ClickHouse_Select(t *testing.T) { } func Test_ClickHouse_AsyncInsert(t *testing.T) { - mockConn, mockMetric, c := getClickHouseTestConnection(t) + mockConn, mockMetric, mockLogger, c := getClickHouseTestConnection(t) ctx := context.Background() @@ -149,6 +155,8 @@ func Test_ClickHouse_AsyncInsert(t *testing.T) { mockMetric.EXPECT().RecordHistogram(ctx, "app_clickhouse_stats", float64(0), "hosts", c.config.Hosts, "database", c.config.Database, "type", "INSERT") + mockLogger.EXPECT().Debug(gomock.Any()) + err := c.AsyncInsert(ctx, "INSERT INTO users (id, name, age) VALUES (?, ?, ?)", true, "8f165e2d-feef-416c-95f6-913ce3172e15", "user", "10") diff --git a/pkg/gofr/external_db.go b/pkg/gofr/external_db.go index f5c64b4b5..ba20f4bbf 100644 --- a/pkg/gofr/external_db.go +++ b/pkg/gofr/external_db.go @@ -1,6 +1,7 @@ package gofr import ( + "go.opentelemetry.io/otel" "gofr.dev/pkg/gofr/container" "gofr.dev/pkg/gofr/datasource/file" ) @@ -42,6 +43,10 @@ func (a *App) AddClickhouse(db container.ClickhouseProvider) { db.UseLogger(a.Logger()) db.UseMetrics(a.Metrics()) + tracer := otel.GetTracerProvider().Tracer("gofr-clickhouse") + + db.UseTracer(tracer) + db.Connect() a.container.Clickhouse = db