Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix span leak for inbound & outbound #922

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"time"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -173,6 +172,8 @@ func (call *InboundCall) createStatsTags(connectionTags map[string]string) {

// dispatchInbound ispatches an inbound call to the appropriate handler
func (c *Connection) dispatchInbound(_ uint32, _ uint32, call *InboundCall, frame *Frame) {
defer call.Response().finishSpanWithOptions(opentracing.FinishOptions{})

if call.log.Enabled(LogLevelDebug) {
call.log.Debugf("Received incoming call for %s from %s", call.ServiceName(), c.remotePeerInfo)
}
Expand Down Expand Up @@ -359,8 +360,10 @@ type InboundCallResponse struct {
systemError bool
headers transportHeaders
span opentracing.Span
statsReporter StatsReporter
commonStatsTags map[string]string
// spanFinished is a bool flag for avoiding finishing a span multiple times
spanFinished bool
statsReporter StatsReporter
commonStatsTags map[string]string
}

// SendSystemError returns a system error response to the peer. The call is considered
Expand Down Expand Up @@ -432,18 +435,23 @@ func (response *InboundCallResponse) setSpanErrorDetails(err error) {
}
}

// finishSpanWithOptions finishes the span in InboundCallResponse if it is not nil and hasn't been finished yet
func (response *InboundCallResponse) finishSpanWithOptions(opts opentracing.FinishOptions) {
if response == nil || response.span == nil || response.spanFinished {
return
}
response.spanFinished = true
response.span.FinishWithOptions(opts)
}

// doneSending shuts down the message exchange for this call.
// For incoming calls, the last message is sending the call response.
func (response *InboundCallResponse) doneSending() {
// TODO(prashant): Move this to when the message is actually being sent.
now := response.timeNow()

if span := response.span; span != nil {
if response.applicationError || response.systemError {
ext.Error.Set(span, true)
}
span.FinishWithOptions(opentracing.FinishOptions{FinishTime: now})
}
UpdateSpanWithError(response.span, response.applicationError || response.systemError, nil)
response.finishSpanWithOptions(opentracing.FinishOptions{FinishTime: now})

latency := now.Sub(response.calledAt)
response.statsReporter.RecordTimer("inbound.calls.latency", response.commonStatsTags, latency)
Expand Down
3 changes: 3 additions & 0 deletions json/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (c *Client) Call(ctx Context, method string, arg, resp interface{}) error {
if err != nil {
return err
}
defer call.Response().Done()

isOK, errAt, err = makeCall(call, headers, arg, &respHeaders, resp, &respErr)
return err
Expand Down Expand Up @@ -157,6 +158,7 @@ func CallPeer(ctx Context, peer *tchannel.Peer, serviceName, method string, arg,
if err != nil {
return err
}
defer call.Response().Done()

return wrapCall(ctx, call, method, arg, resp)
}
Expand All @@ -167,6 +169,7 @@ func CallSC(ctx Context, sc *tchannel.SubChannel, method string, arg, resp inter
if err != nil {
return err
}
defer call.Response().Done()

return wrapCall(ctx, call, method, arg, resp)
}
36 changes: 23 additions & 13 deletions outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/uber/tchannel-go/typed"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -132,6 +131,8 @@ func (c *Connection) beginCall(ctx context.Context, serviceName, methodName stri
call.response = response

if err := call.writeMethod([]byte(methodName)); err != nil {
UpdateSpanWithError(response.span, true, err)
response.Done()
return nil, err
}
return call, nil
Expand Down Expand Up @@ -231,9 +232,11 @@ type OutboundCallResponse struct {

requestState *RequestState
// startedAt is the time at which the outbound call was started.
startedAt time.Time
timeNow func() time.Time
span opentracing.Span
startedAt time.Time
timeNow func() time.Time
span opentracing.Span
// spanFinished is a bool flag for avoiding finishing a span multiple times
spanFinished bool
statsReporter StatsReporter
commonStatsTags map[string]string
}
Expand Down Expand Up @@ -269,6 +272,20 @@ func (response *OutboundCallResponse) Arg3Reader() (ArgReader, error) {
return response.arg3Reader()
}

// Done does the cleanup job for OutboundCallResponse.
func (response *OutboundCallResponse) Done() {
response.finishSpanWithOptions(opentracing.FinishOptions{})
}

// finishSpanWithOptions finishes the span in OutboundCallResponse if it is not nil and hasn't been finished yet
func (response *OutboundCallResponse) finishSpanWithOptions(opts opentracing.FinishOptions) {
if response == nil || response.span == nil || response.spanFinished {
return
}
response.spanFinished = true
response.span.FinishWithOptions(opts)
}

// handleError handles an error coming back from the peer. If the error is a
// protocol level error, the entire connection will be closed. If the error is
// a request specific error, it will be written to the request's response
Expand Down Expand Up @@ -330,15 +347,8 @@ func (response *OutboundCallResponse) doneReading(unexpected error) {
lastAttempt := isSuccess || !response.requestState.HasRetries(unexpected)

// TODO how should this work with retries?
if span := response.span; span != nil {
if unexpected != nil {
span.LogEventWithPayload("error", unexpected)
}
if !isSuccess && lastAttempt {
ext.Error.Set(span, true)
}
span.FinishWithOptions(opentracing.FinishOptions{FinishTime: now})
}
UpdateSpanWithError(response.span, !isSuccess && lastAttempt, unexpected)
response.finishSpanWithOptions(opentracing.FinishOptions{FinishTime: now})

latency := now.Sub(response.startedAt)
response.statsReporter.RecordTimer("outbound.calls.per-attempt.latency", response.commonStatsTags, latency)
Expand Down
3 changes: 3 additions & 0 deletions raw/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func Call(ctx context.Context, ch *tchannel.Channel, hostPort string, serviceNam
if err != nil {
return nil, nil, nil, err
}
defer call.Response().Done()

return WriteArgs(call, arg2, arg3)
}
Expand All @@ -90,6 +91,7 @@ func CallSC(ctx context.Context, sc *tchannel.SubChannel, method string, arg2, a
if err != nil {
return nil, nil, nil, err
}
defer call.Response().Done()

return WriteArgs(call, arg2, arg3)
}
Expand All @@ -115,6 +117,7 @@ func CallV2(ctx context.Context, sc *tchannel.SubChannel, cArgs CArgs) (*CRes, e
if err != nil {
return nil, err
}
defer call.Response().Done()

arg2, arg3, res, err := WriteArgs(call, cArgs.Arg2, cArgs.Arg3)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions thrift/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (c *client) Call(ctx Context, thriftService, methodName string, req, resp t
if err != nil {
return err
}
defer call.Response().Done()

if err := writeArgs(call, headers, req); err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion thrift/server_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package thrift
package thrift_test

import (
"errors"
Expand All @@ -9,6 +9,7 @@ import (
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/testutils"
athrift "github.com/uber/tchannel-go/thirdparty/github.com/apache/thrift/lib/go/thrift"
. "github.com/uber/tchannel-go/thrift"
)

var errIO = errors.New("IO Error")
Expand Down
80 changes: 79 additions & 1 deletion thrift/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ package thrift_test
import (
json_encoding "encoding/json"
"testing"
"time"

"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/mocktracer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/testutils"
. "github.com/uber/tchannel-go/testutils/testtracing"
"github.com/uber/tchannel-go/thrift"
. "github.com/uber/tchannel-go/thrift"
gen "github.com/uber/tchannel-go/thrift/gen-go/test"

"golang.org/x/net/context"
)

Expand Down Expand Up @@ -108,3 +114,75 @@ func TestThriftTracingPropagation(t *testing.T) {
}
suite.Run(t)
}

func TestClientSpanFinish(t *testing.T) {
tracer := mocktracer.New()

server := testutils.NewTestServer(t, testutils.NewOpts())
defer server.CloseAndVerify()

sCh := server.Server()
NewServer(sCh).Register(&thriftStruction{})

ch := server.NewClient(&testutils.ChannelOpts{
ChannelOptions: tchannel.ChannelOptions{
Tracer: tracer,
},
})
defer ch.Close()
client := NewClient(
ch,
sCh.ServiceName(),
&ClientOptions{HostPort: server.HostPort()},
)

ctx, cancel := NewContext(time.Second)
defer cancel()

_, err := client.Call(ctx, "destruct", "partialDestruct", &badTStruct{Err: errIO}, &nullTStruct{})
assert.Error(t, err)
assert.IsType(t, errIO, err)

time.Sleep(testutils.Timeout(time.Millisecond))
spans := tracer.FinishedSpans()
require.Equal(t, 1, len(spans))
assert.Equal(t, ext.SpanKindEnum("client"), spans[0].Tag("span.kind"))
}

func TestServerSpanSpanFinish(t *testing.T) {
tracer := mocktracer.New()

opts := &testutils.ChannelOpts{
ChannelOptions: tchannel.ChannelOptions{
Tracer: tracer,
},
}
opts.AddLogFilter(
"Thrift server error.", 1,
"error", "IO Error",
"method", "destruct::partialDestruct")

server := testutils.NewTestServer(t, opts)
defer server.CloseAndVerify()

sCh := server.Server()
NewServer(sCh).Register(&thriftStruction{})

client := NewClient(
server.NewClient(nil), // not provide the tracer to check server span only
sCh.ServiceName(),
&ClientOptions{HostPort: server.HostPort()},
)

ctx, cancel := NewContext(time.Second)
defer cancel()

_, err := client.Call(ctx, "destruct", "partialDestruct", &nullTStruct{}, &nullTStruct{})
assert.Error(t, err)
assert.IsType(t, tchannel.SystemError{}, err)

time.Sleep(testutils.Timeout(time.Millisecond))
spans := tracer.FinishedSpans()
require.Equal(t, 1, len(spans))
assert.Equal(t, ext.SpanKindEnum("server"), spans[0].Tag("span.kind"))
}
33 changes: 28 additions & 5 deletions tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"time"

"github.com/opentracing/opentracing-go/log"
"github.com/uber/tchannel-go/trand"
"github.com/uber/tchannel-go/typed"

Expand All @@ -32,11 +33,14 @@ import (
"golang.org/x/net/context"
)

// zipkinSpanFormat defines a name for OpenTracing carrier format that tracer may support.
// It is used to extract zipkin-style trace/span IDs from the OpenTracing Span, which are
// otherwise not exposed explicitly.
// NB: the string value is what's actually shared between implementations
const zipkinSpanFormat = "zipkin-span-format"
const (
// zipkinSpanFormat defines a name for OpenTracing carrier format that tracer may support.
// It is used to extract zipkin-style trace/span IDs from the OpenTracing Span, which are
// otherwise not exposed explicitly.
// NB: the string value is what's actually shared between implementations
zipkinSpanFormat = "zipkin-span-format"
spanComponentName = "tchannel-go"
)

// Span is an internal representation of Zipkin-compatible OpenTracing Span.
// It is used as OpenTracing inject/extract Carrier with ZipkinSpanFormat.
Expand Down Expand Up @@ -151,6 +155,7 @@ func (c *Connection) startOutboundSpan(ctx context.Context, serviceName, methodN
}
ext.SpanKindRPCClient.Set(span)
ext.PeerService.Set(span, serviceName)
ext.Component.Set(span, spanComponentName)
c.setPeerHostPort(span)
span.SetTag("as", call.callReq.Headers[ArgScheme])
var injectable injectableSpan
Expand Down Expand Up @@ -214,6 +219,7 @@ func (c *Connection) extractInboundSpan(callReq *callReq) opentracing.Span {
span := c.Tracer().StartSpan(operationName, ext.RPCServerOption(spanCtx))
span.SetTag("as", callReq.Headers[ArgScheme])
ext.PeerService.Set(span, callReq.Headers[CallerName])
ext.Component.Set(span, spanComponentName)
c.setPeerHostPort(span)
return span
}
Expand Down Expand Up @@ -252,6 +258,7 @@ func ExtractInboundSpan(ctx context.Context, call *InboundCall, headers map[stri
}
span = tracer.StartSpan(call.MethodString(), ext.RPCServerOption(parent))
ext.PeerService.Set(span, call.CallerName())
ext.Component.Set(span, spanComponentName)
span.SetTag("as", string(call.Format()))
call.conn.setPeerHostPort(span)
call.Response().span = span
Expand Down Expand Up @@ -286,3 +293,19 @@ func TracerFromRegistrar(registrar Registrar) opentracing.Tracer {
}
return opentracing.GlobalTracer()
}

// UpdateSpanWithError set error tag & error log on a span
func UpdateSpanWithError(span opentracing.Span, hasError bool, err error) {
if span == nil {
return
}
if hasError {
ext.Error.Set(span, true)
}
if err != nil {
span.LogFields(
log.String("event", "error"),
log.String("message", err.Error()),
)
}
}
Loading
Loading