From 486436933e35be585e72e7f688ddebae23c36b70 Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Wed, 17 Jan 2024 09:20:23 +0200 Subject: [PATCH 1/4] Expose `inbound.cancels.{requested,honored}` metrics --- connection_test.go | 20 ++++++++++++++++++++ inbound.go | 4 ++++ stats_utils_test.go | 25 ++++++++++++++++++++++--- 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/connection_test.go b/connection_test.go index 5679ed9fe..a4aaf83fe 100644 --- a/connection_test.go +++ b/connection_test.go @@ -496,6 +496,9 @@ func TestServerClientCancellation(t *testing.T) { opts.DefaultConnectionOptions.SendCancelOnContextCanceled = true opts.DefaultConnectionOptions.PropagateCancel = true + serverStats := newRecordingStatsReporter() + opts.StatsReporter = serverStats + testutils.WithTestServer(t, opts, func(t testing.TB, ts *testutils.TestServer) { callReceived := make(chan struct{}) testutils.RegisterFunc(ts.Server(), "ctxWait", func(ctx context.Context, args *raw.Args) (*raw.Res, error) { @@ -519,16 +522,22 @@ func TestServerClientCancellation(t *testing.T) { _, _, _, err := raw.Call(ctx, ts.Server(), ts.HostPort(), ts.ServiceName(), "ctxWait", nil, nil) assert.Equal(t, ErrRequestCancelled, err, "client call result") + serverStats.Expected.IncCounter("inbound.cancels.requested", ts.Server().StatsTags(), 1) + serverStats.Expected.IncCounter("inbound.cancels.honored", ts.Server().StatsTags(), 1) + calls := relaytest.NewMockStats() calls.Add(ts.ServiceName(), ts.ServiceName(), "ctxWait").Failed("canceled").End() ts.AssertRelayStats(calls) }) + + serverStats.ValidateExpected(t) } func TestCancelWithoutSendCancelOnContextCanceled(t *testing.T) { tests := []struct { msg string sendCancelOnContextCanceled bool + wantCancelRequested bool }{ { msg: "no send or process cancel", @@ -537,6 +546,7 @@ func TestCancelWithoutSendCancelOnContextCanceled(t *testing.T) { { msg: "only enable cancels on outbounds", sendCancelOnContextCanceled: true, + wantCancelRequested: true, }, } @@ -545,6 +555,9 @@ func TestCancelWithoutSendCancelOnContextCanceled(t *testing.T) { opts := testutils.NewOpts() opts.DefaultConnectionOptions.SendCancelOnContextCanceled = tt.sendCancelOnContextCanceled + serverStats := newRecordingStatsReporter() + opts.StatsReporter = serverStats + testutils.WithTestServer(t, opts, func(t testing.TB, ts *testutils.TestServer) { callReceived := make(chan struct{}) testutils.RegisterFunc(ts.Server(), "ctxWait", func(ctx context.Context, args *raw.Args) (*raw.Res, error) { @@ -568,6 +581,13 @@ func TestCancelWithoutSendCancelOnContextCanceled(t *testing.T) { _, _, _, err := raw.Call(ctx, ts.Server(), ts.HostPort(), ts.ServiceName(), "ctxWait", nil, nil) assert.Equal(t, ErrRequestCancelled, err, "client call result") + if tt.wantCancelRequested { + serverStats.Expected.IncCounter("inbound.cancels.requested", ts.Server().StatsTags(), 1) + } else { + serverStats.EnsureNotPresent(t, "inbound.cancels.requested") + } + serverStats.EnsureNotPresent(t, "inbound.cancels.honored") + calls := relaytest.NewMockStats() calls.Add(ts.ServiceName(), ts.ServiceName(), "ctxWait").Failed("timeout").End() ts.AssertRelayStats(calls) diff --git a/inbound.go b/inbound.go index d6e70ab9c..dd525237a 100644 --- a/inbound.go +++ b/inbound.go @@ -144,6 +144,8 @@ func (c *Connection) handleCallReqContinue(frame *Frame) bool { } func (c *Connection) handleCancel(frame *Frame) bool { + c.statsReporter.IncCounter("inbound.cancels.requested", c.commonStatsTags, 1) + if !c.opts.PropagateCancel { if c.log.Enabled(LogLevelDebug) { c.log.Debugf("Ignoring cancel for %v", frame.Header.ID) @@ -151,6 +153,8 @@ func (c *Connection) handleCancel(frame *Frame) bool { return true } + c.statsReporter.IncCounter("inbound.cancels.honored", c.commonStatsTags, 1) + c.inbound.handleCancel(frame) // Free the frame, as it's consumed immediately. diff --git a/stats_utils_test.go b/stats_utils_test.go index 312eeb425..e820dc794 100644 --- a/stats_utils_test.go +++ b/stats_utils_test.go @@ -125,9 +125,28 @@ func (r *recordingStatsReporter) Validate(t *testing.T) { assert.Equal(t, keysMap(r.Expected.Values), keysMap(r.Values), "Metric keys are different") - for counterKey, counter := range r.Values { - expectedCounter, ok := r.Expected.Values[counterKey] - if !ok { + r.validateExpectedLocked(t) +} + +// ValidateExpected only validates metrics added to expected rather than all recorded metrics. +func (r *recordingStatsReporter) ValidateExpected(t testing.TB) { + r.Lock() + defer r.Unlock() + + r.validateExpectedLocked(t) +} + +func (r *recordingStatsReporter) EnsureNotPresent(t testing.TB, counter string) { + r.Lock() + defer r.Unlock() + + assert.NotContains(t, r.Values, counter, "metric should not be present") +} + +func (r *recordingStatsReporter) validateExpectedLocked(t testing.TB) { + for counterKey, expectedCounter := range r.Expected.Values { + counter, ok := r.Values[counterKey] + if !assert.True(t, ok, "expected %v not found", counterKey) { continue } From a839ca791306842f8968ba2ef4482644947f1e30 Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Fri, 9 Feb 2024 10:06:52 +0200 Subject: [PATCH 2/4] avoid unnecessary allocation --- connection_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/connection_test.go b/connection_test.go index a4aaf83fe..614982da0 100644 --- a/connection_test.go +++ b/connection_test.go @@ -522,8 +522,9 @@ func TestServerClientCancellation(t *testing.T) { _, _, _, err := raw.Call(ctx, ts.Server(), ts.HostPort(), ts.ServiceName(), "ctxWait", nil, nil) assert.Equal(t, ErrRequestCancelled, err, "client call result") - serverStats.Expected.IncCounter("inbound.cancels.requested", ts.Server().StatsTags(), 1) - serverStats.Expected.IncCounter("inbound.cancels.honored", ts.Server().StatsTags(), 1) + statsTags := ts.Server().StatsTags() + serverStats.Expected.IncCounter("inbound.cancels.requested", statsTags, 1) + serverStats.Expected.IncCounter("inbound.cancels.honored", statsTags, 1) calls := relaytest.NewMockStats() calls.Add(ts.ServiceName(), ts.ServiceName(), "ctxWait").Failed("canceled").End() From 93cd04f2a76436d186f37b0bc420a5504aec9812 Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Fri, 9 Feb 2024 12:34:52 +0200 Subject: [PATCH 3/4] fix metric validation --- connection_test.go | 20 +++++++++++++------- testutils/test_server.go | 4 ++++ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/connection_test.go b/connection_test.go index 614982da0..b6509a5b4 100644 --- a/connection_test.go +++ b/connection_test.go @@ -560,6 +560,8 @@ func TestCancelWithoutSendCancelOnContextCanceled(t *testing.T) { opts.StatsReporter = serverStats testutils.WithTestServer(t, opts, func(t testing.TB, ts *testutils.TestServer) { + serverStats.Reset() + callReceived := make(chan struct{}) testutils.RegisterFunc(ts.Server(), "ctxWait", func(ctx context.Context, args *raw.Args) (*raw.Res, error) { require.NoError(t, ctx.Err(), "context valid before cancellation") @@ -582,16 +584,20 @@ func TestCancelWithoutSendCancelOnContextCanceled(t *testing.T) { _, _, _, err := raw.Call(ctx, ts.Server(), ts.HostPort(), ts.ServiceName(), "ctxWait", nil, nil) assert.Equal(t, ErrRequestCancelled, err, "client call result") - if tt.wantCancelRequested { - serverStats.Expected.IncCounter("inbound.cancels.requested", ts.Server().StatsTags(), 1) - } else { - serverStats.EnsureNotPresent(t, "inbound.cancels.requested") - } - serverStats.EnsureNotPresent(t, "inbound.cancels.honored") - calls := relaytest.NewMockStats() calls.Add(ts.ServiceName(), ts.ServiceName(), "ctxWait").Failed("timeout").End() ts.AssertRelayStats(calls) + + ts.AddPostFn(func() { + // Validating these at the end of the test, when server has fully processed the cancellation. + if tt.wantCancelRequested && !ts.HasRelay() { + serverStats.Expected.IncCounter("inbound.cancels.requested", ts.Server().StatsTags(), 1) + serverStats.ValidateExpected(t) + } else { + serverStats.EnsureNotPresent(t, "inbound.cancels.requested") + } + serverStats.EnsureNotPresent(t, "inbound.cancels.honored") + }) }) }) } diff --git a/testutils/test_server.go b/testutils/test_server.go index f3abae47f..fa0320f40 100644 --- a/testutils/test_server.go +++ b/testutils/test_server.go @@ -400,6 +400,10 @@ func (ts *TestServer) verify(ch *tchannel.Channel) { assert.NoError(ts, errs, "Verification failed. Channel state:\n%v", IntrospectJSON(ch, nil /* opts */)) } +func (ts *TestServer) AddPostFn(fn func()) { + ts.postFns = append(ts.postFns, fn) +} + func (ts *TestServer) post() { if !ts.Failed() { for _, ch := range ts.channels { From e39a740978f958cbc7b2012edc4a27ec7bbde8d5 Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Wed, 14 Feb 2024 12:16:35 +0200 Subject: [PATCH 4/4] fix lint --- testutils/test_server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/testutils/test_server.go b/testutils/test_server.go index fa0320f40..c83538317 100644 --- a/testutils/test_server.go +++ b/testutils/test_server.go @@ -400,6 +400,7 @@ func (ts *TestServer) verify(ch *tchannel.Channel) { assert.NoError(ts, errs, "Verification failed. Channel state:\n%v", IntrospectJSON(ch, nil /* opts */)) } +// AddPostFn registers a function that will be executed after channels are closed. func (ts *TestServer) AddPostFn(fn func()) { ts.postFns = append(ts.postFns, fn) }