From 6e0853b6544831f1c5a2d90dec3f10124000076b Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Sun, 25 Aug 2024 16:13:25 -0700 Subject: [PATCH] Fix span leak for inbound & outbound --- inbound.go | 26 +++++++++----- json/call.go | 3 ++ outbound.go | 36 ++++++++++++------- raw/call.go | 3 ++ thrift/client.go | 1 + thrift/server_test.go | 3 +- thrift/tracing_test.go | 80 +++++++++++++++++++++++++++++++++++++++++- tracing.go | 33 ++++++++++++++--- tracing_test.go | 53 ++++++++++++++++++++++++++++ 9 files changed, 209 insertions(+), 29 deletions(-) diff --git a/inbound.go b/inbound.go index dd525237a..feef2d1e9 100644 --- a/inbound.go +++ b/inbound.go @@ -27,7 +27,6 @@ import ( "time" "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" "golang.org/x/net/context" ) @@ -173,6 +172,8 @@ func (call *InboundCall) createStatsTags(connectionTags map[string]string) { // dispatchInbound ispatches an inbound call to the appropriate handler func (c *Connection) dispatchInbound(_ uint32, _ uint32, call *InboundCall, frame *Frame) { + defer call.Response().finishSpanWithOptions(opentracing.FinishOptions{}) + if call.log.Enabled(LogLevelDebug) { call.log.Debugf("Received incoming call for %s from %s", call.ServiceName(), c.remotePeerInfo) } @@ -359,8 +360,10 @@ type InboundCallResponse struct { systemError bool headers transportHeaders span opentracing.Span - statsReporter StatsReporter - commonStatsTags map[string]string + // spanFinished is a bool flag for avoiding finishing a span multiple times + spanFinished bool + statsReporter StatsReporter + commonStatsTags map[string]string } // SendSystemError returns a system error response to the peer. The call is considered @@ -432,18 +435,23 @@ func (response *InboundCallResponse) setSpanErrorDetails(err error) { } } +// finishSpanWithOptions finishes the span in InboundCallResponse if it is not nil and hasn't been finished yet +func (response *InboundCallResponse) finishSpanWithOptions(opts opentracing.FinishOptions) { + if response == nil || response.span == nil || response.spanFinished { + return + } + response.spanFinished = true + response.span.FinishWithOptions(opts) +} + // doneSending shuts down the message exchange for this call. // For incoming calls, the last message is sending the call response. func (response *InboundCallResponse) doneSending() { // TODO(prashant): Move this to when the message is actually being sent. now := response.timeNow() - if span := response.span; span != nil { - if response.applicationError || response.systemError { - ext.Error.Set(span, true) - } - span.FinishWithOptions(opentracing.FinishOptions{FinishTime: now}) - } + UpdateSpanWithError(response.span, response.applicationError || response.systemError, nil) + response.finishSpanWithOptions(opentracing.FinishOptions{FinishTime: now}) latency := now.Sub(response.calledAt) response.statsReporter.RecordTimer("inbound.calls.latency", response.commonStatsTags, latency) diff --git a/json/call.go b/json/call.go index 65eb8baf8..a248d5c26 100644 --- a/json/call.go +++ b/json/call.go @@ -120,6 +120,7 @@ func (c *Client) Call(ctx Context, method string, arg, resp interface{}) error { if err != nil { return err } + defer call.Response().Done() isOK, errAt, err = makeCall(call, headers, arg, &respHeaders, resp, &respErr) return err @@ -157,6 +158,7 @@ func CallPeer(ctx Context, peer *tchannel.Peer, serviceName, method string, arg, if err != nil { return err } + defer call.Response().Done() return wrapCall(ctx, call, method, arg, resp) } @@ -167,6 +169,7 @@ func CallSC(ctx Context, sc *tchannel.SubChannel, method string, arg, resp inter if err != nil { return err } + defer call.Response().Done() return wrapCall(ctx, call, method, arg, resp) } diff --git a/outbound.go b/outbound.go index 6e3bf1820..a437c6516 100644 --- a/outbound.go +++ b/outbound.go @@ -27,7 +27,6 @@ import ( "github.com/uber/tchannel-go/typed" "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" "golang.org/x/net/context" ) @@ -132,6 +131,8 @@ func (c *Connection) beginCall(ctx context.Context, serviceName, methodName stri call.response = response if err := call.writeMethod([]byte(methodName)); err != nil { + UpdateSpanWithError(response.span, true, err) + response.Done() return nil, err } return call, nil @@ -231,9 +232,11 @@ type OutboundCallResponse struct { requestState *RequestState // startedAt is the time at which the outbound call was started. - startedAt time.Time - timeNow func() time.Time - span opentracing.Span + startedAt time.Time + timeNow func() time.Time + span opentracing.Span + // spanFinished is a bool flag for avoiding finishing a span multiple times + spanFinished bool statsReporter StatsReporter commonStatsTags map[string]string } @@ -269,6 +272,20 @@ func (response *OutboundCallResponse) Arg3Reader() (ArgReader, error) { return response.arg3Reader() } +// Done does the cleanup job for OutboundCallResponse. +func (response *OutboundCallResponse) Done() { + response.finishSpanWithOptions(opentracing.FinishOptions{}) +} + +// finishSpanWithOptions finishes the span in OutboundCallResponse if it is not nil and hasn't been finished yet +func (response *OutboundCallResponse) finishSpanWithOptions(opts opentracing.FinishOptions) { + if response == nil || response.span == nil || response.spanFinished { + return + } + response.spanFinished = true + response.span.FinishWithOptions(opts) +} + // handleError handles an error coming back from the peer. If the error is a // protocol level error, the entire connection will be closed. If the error is // a request specific error, it will be written to the request's response @@ -330,15 +347,8 @@ func (response *OutboundCallResponse) doneReading(unexpected error) { lastAttempt := isSuccess || !response.requestState.HasRetries(unexpected) // TODO how should this work with retries? - if span := response.span; span != nil { - if unexpected != nil { - span.LogEventWithPayload("error", unexpected) - } - if !isSuccess && lastAttempt { - ext.Error.Set(span, true) - } - span.FinishWithOptions(opentracing.FinishOptions{FinishTime: now}) - } + UpdateSpanWithError(response.span, !isSuccess && lastAttempt, unexpected) + response.finishSpanWithOptions(opentracing.FinishOptions{FinishTime: now}) latency := now.Sub(response.startedAt) response.statsReporter.RecordTimer("outbound.calls.per-attempt.latency", response.commonStatsTags, latency) diff --git a/raw/call.go b/raw/call.go index 4debbc768..516d6ff67 100644 --- a/raw/call.go +++ b/raw/call.go @@ -78,6 +78,7 @@ func Call(ctx context.Context, ch *tchannel.Channel, hostPort string, serviceNam if err != nil { return nil, nil, nil, err } + defer call.Response().Done() return WriteArgs(call, arg2, arg3) } @@ -90,6 +91,7 @@ func CallSC(ctx context.Context, sc *tchannel.SubChannel, method string, arg2, a if err != nil { return nil, nil, nil, err } + defer call.Response().Done() return WriteArgs(call, arg2, arg3) } @@ -115,6 +117,7 @@ func CallV2(ctx context.Context, sc *tchannel.SubChannel, cArgs CArgs) (*CRes, e if err != nil { return nil, err } + defer call.Response().Done() arg2, arg3, res, err := WriteArgs(call, cArgs.Arg2, cArgs.Arg3) if err != nil { diff --git a/thrift/client.go b/thrift/client.go index fe52262f0..eabadfa79 100644 --- a/thrift/client.go +++ b/thrift/client.go @@ -143,6 +143,7 @@ func (c *client) Call(ctx Context, thriftService, methodName string, req, resp t if err != nil { return err } + defer call.Response().Done() if err := writeArgs(call, headers, req); err != nil { return err diff --git a/thrift/server_test.go b/thrift/server_test.go index 6e1d50216..2addd8b57 100644 --- a/thrift/server_test.go +++ b/thrift/server_test.go @@ -1,4 +1,4 @@ -package thrift +package thrift_test import ( "errors" @@ -9,6 +9,7 @@ import ( "github.com/uber/tchannel-go" "github.com/uber/tchannel-go/testutils" athrift "github.com/uber/tchannel-go/thirdparty/github.com/apache/thrift/lib/go/thrift" + . "github.com/uber/tchannel-go/thrift" ) var errIO = errors.New("IO Error") diff --git a/thrift/tracing_test.go b/thrift/tracing_test.go index 3363717b6..c0a29cca8 100644 --- a/thrift/tracing_test.go +++ b/thrift/tracing_test.go @@ -3,12 +3,18 @@ package thrift_test import ( json_encoding "encoding/json" "testing" + "time" + "github.com/opentracing/opentracing-go/ext" + "github.com/opentracing/opentracing-go/mocktracer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/uber/tchannel-go" + "github.com/uber/tchannel-go/testutils" . "github.com/uber/tchannel-go/testutils/testtracing" "github.com/uber/tchannel-go/thrift" + . "github.com/uber/tchannel-go/thrift" gen "github.com/uber/tchannel-go/thrift/gen-go/test" - "golang.org/x/net/context" ) @@ -108,3 +114,75 @@ func TestThriftTracingPropagation(t *testing.T) { } suite.Run(t) } + +func TestClientSpanFinish(t *testing.T) { + tracer := mocktracer.New() + + server := testutils.NewTestServer(t, testutils.NewOpts()) + defer server.CloseAndVerify() + + sCh := server.Server() + NewServer(sCh).Register(&thriftStruction{}) + + ch := server.NewClient(&testutils.ChannelOpts{ + ChannelOptions: tchannel.ChannelOptions{ + Tracer: tracer, + }, + }) + defer ch.Close() + client := NewClient( + ch, + sCh.ServiceName(), + &ClientOptions{HostPort: server.HostPort()}, + ) + + ctx, cancel := NewContext(time.Second) + defer cancel() + + _, err := client.Call(ctx, "destruct", "partialDestruct", &badTStruct{Err: errIO}, &nullTStruct{}) + assert.Error(t, err) + assert.IsType(t, errIO, err) + + time.Sleep(testutils.Timeout(time.Millisecond)) + spans := tracer.FinishedSpans() + require.Equal(t, 1, len(spans)) + assert.Equal(t, ext.SpanKindEnum("client"), spans[0].Tag("span.kind")) +} + +func TestServerSpanSpanFinish(t *testing.T) { + tracer := mocktracer.New() + + opts := &testutils.ChannelOpts{ + ChannelOptions: tchannel.ChannelOptions{ + Tracer: tracer, + }, + } + opts.AddLogFilter( + "Thrift server error.", 1, + "error", "IO Error", + "method", "destruct::partialDestruct") + + server := testutils.NewTestServer(t, opts) + defer server.CloseAndVerify() + + sCh := server.Server() + NewServer(sCh).Register(&thriftStruction{}) + + client := NewClient( + server.NewClient(nil), // not provide the tracer to check server span only + sCh.ServiceName(), + &ClientOptions{HostPort: server.HostPort()}, + ) + + ctx, cancel := NewContext(time.Second) + defer cancel() + + _, err := client.Call(ctx, "destruct", "partialDestruct", &nullTStruct{}, &nullTStruct{}) + assert.Error(t, err) + assert.IsType(t, tchannel.SystemError{}, err) + + time.Sleep(testutils.Timeout(time.Millisecond)) + spans := tracer.FinishedSpans() + require.Equal(t, 1, len(spans)) + assert.Equal(t, ext.SpanKindEnum("server"), spans[0].Tag("span.kind")) +} diff --git a/tracing.go b/tracing.go index 57e90852f..7842f8d95 100644 --- a/tracing.go +++ b/tracing.go @@ -24,6 +24,7 @@ import ( "fmt" "time" + "github.com/opentracing/opentracing-go/log" "github.com/uber/tchannel-go/trand" "github.com/uber/tchannel-go/typed" @@ -32,11 +33,14 @@ import ( "golang.org/x/net/context" ) -// zipkinSpanFormat defines a name for OpenTracing carrier format that tracer may support. -// It is used to extract zipkin-style trace/span IDs from the OpenTracing Span, which are -// otherwise not exposed explicitly. -// NB: the string value is what's actually shared between implementations -const zipkinSpanFormat = "zipkin-span-format" +const ( + // zipkinSpanFormat defines a name for OpenTracing carrier format that tracer may support. + // It is used to extract zipkin-style trace/span IDs from the OpenTracing Span, which are + // otherwise not exposed explicitly. + // NB: the string value is what's actually shared between implementations + zipkinSpanFormat = "zipkin-span-format" + spanComponentName = "tchannel-go" +) // Span is an internal representation of Zipkin-compatible OpenTracing Span. // It is used as OpenTracing inject/extract Carrier with ZipkinSpanFormat. @@ -151,6 +155,7 @@ func (c *Connection) startOutboundSpan(ctx context.Context, serviceName, methodN } ext.SpanKindRPCClient.Set(span) ext.PeerService.Set(span, serviceName) + ext.Component.Set(span, spanComponentName) c.setPeerHostPort(span) span.SetTag("as", call.callReq.Headers[ArgScheme]) var injectable injectableSpan @@ -214,6 +219,7 @@ func (c *Connection) extractInboundSpan(callReq *callReq) opentracing.Span { span := c.Tracer().StartSpan(operationName, ext.RPCServerOption(spanCtx)) span.SetTag("as", callReq.Headers[ArgScheme]) ext.PeerService.Set(span, callReq.Headers[CallerName]) + ext.Component.Set(span, spanComponentName) c.setPeerHostPort(span) return span } @@ -252,6 +258,7 @@ func ExtractInboundSpan(ctx context.Context, call *InboundCall, headers map[stri } span = tracer.StartSpan(call.MethodString(), ext.RPCServerOption(parent)) ext.PeerService.Set(span, call.CallerName()) + ext.Component.Set(span, spanComponentName) span.SetTag("as", string(call.Format())) call.conn.setPeerHostPort(span) call.Response().span = span @@ -286,3 +293,19 @@ func TracerFromRegistrar(registrar Registrar) opentracing.Tracer { } return opentracing.GlobalTracer() } + +// UpdateSpanWithError set error tag & error log on a span +func UpdateSpanWithError(span opentracing.Span, hasError bool, err error) { + if span == nil { + return + } + if hasError { + ext.Error.Set(span, true) + } + if err != nil { + span.LogFields( + log.String("event", "error"), + log.String("message", err.Error()), + ) + } +} diff --git a/tracing_test.go b/tracing_test.go index f1a3da4e4..4f262d32a 100644 --- a/tracing_test.go +++ b/tracing_test.go @@ -21,6 +21,7 @@ package tchannel_test import ( + "errors" "sync" "testing" "time" @@ -162,6 +163,8 @@ func TestTracingSpanAttributes(t *testing.T) { assert.Equal(t, "testService", child.Tag("peer.service")) assert.Equal(t, "json", parent.Tag("as")) assert.Equal(t, "json", child.Tag("as")) + assert.Equal(t, "tchannel-go", parent.Tag("component")) + assert.Equal(t, "tchannel-go", child.Tag("component")) assert.NotNil(t, parent.Tag("peer.ipv4")) assert.NotNil(t, child.Tag("peer.ipv4")) assert.NotNil(t, parent.Tag("peer.port")) @@ -209,3 +212,53 @@ func TestReusableHeaders(t *testing.T) { assert.Equal(t, map[string]string{"life": "42"}, sharedHeaders, "headers unchanged") }) } + +func TestUpdateSpanWithError(t *testing.T) { + var ( + tracer = mocktracer.New() + err = errors.New("test error") + ) + t.Run("nil span", func(t *testing.T) { + UpdateSpanWithError(nil, true, nil) + }) + + t.Run("error tag and error log", func(t *testing.T) { + span := tracer.StartSpan("test") + UpdateSpanWithError(span, true, err) + + mSpan, ok := span.(*mocktracer.MockSpan) + require.True(t, ok) + assert.Equal(t, true, mSpan.Tag("error")) + assert.Equal(t, 1, len(mSpan.Logs())) + }) + + t.Run("error tag and no error log", func(t *testing.T) { + span := tracer.StartSpan("test") + UpdateSpanWithError(span, true, nil) + + mSpan, ok := span.(*mocktracer.MockSpan) + require.True(t, ok) + assert.Equal(t, true, mSpan.Tag("error")) + assert.Equal(t, 0, len(mSpan.Logs())) + }) + + t.Run("no error tag and error log", func(t *testing.T) { + span := tracer.StartSpan("test") + UpdateSpanWithError(span, false, err) + + mSpan, ok := span.(*mocktracer.MockSpan) + require.True(t, ok) + assert.Equal(t, nil, mSpan.Tag("error")) + assert.Equal(t, 1, len(mSpan.Logs())) + }) + + t.Run("no error tag and no error log", func(t *testing.T) { + span := tracer.StartSpan("test") + UpdateSpanWithError(span, false, nil) + + mSpan, ok := span.(*mocktracer.MockSpan) + require.True(t, ok) + assert.Equal(t, nil, mSpan.Tag("error")) + assert.Equal(t, 0, len(mSpan.Logs())) + }) +}