Skip to content

Commit

Permalink
fix(cascadingfilter): retrofit sampling.rule handling fix
Browse files Browse the repository at this point in the history
  • Loading branch information
pmm-sumo committed May 17, 2022
1 parent d7c3b2d commit e4438a9
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
3 changes: 2 additions & 1 deletion processor/cascadingfilterprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ func (cfsp *cascadingFilterSpanProcessor) samplingPolicyOnTick() {

if trace.SelectedByProbabilisticFilter {
updateProbabilisticRateTag(allSpans, selectedByProbabilisticFilterSpans, totalSpans)
} else {
} else if len(cfsp.traceAcceptRules) > 0 {
// Set filtering tag only if there were actually any accept rules set otherwise
updateFilteringTag(allSpans)
}

Expand Down
55 changes: 55 additions & 0 deletions processor/cascadingfilterprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,56 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) {
}
}

func TestSamplingPolicyOnlyReject(t *testing.T) {
const maxSize = 100
const decisionWaitSeconds = 5
// For this test explicitly control the timer calls and batcher, and set a mock
// sampling policy evaluator.
msp := new(consumertest.TracesSink)
mtt := &manualTTicker{}
mde := &mockDropFalseEvaluator{}
tsp := &cascadingFilterSpanProcessor{
ctx: context.Background(),
nextConsumer: msp,
maxNumTraces: maxSize,
logger: zap.NewNop(),
decisionBatcher: newSyncIDBatcher(decisionWaitSeconds),
traceRejectRules: []*TraceRejectEvaluator{{Name: "mock-drop-eval", Evaluator: mde, ctx: context.TODO()}},
deleteChan: make(chan traceKey, maxSize),
policyTicker: mtt,
maxSpansPerSecond: 10000,
filteringEnabled: true,
}

_, batches := generateIdsAndBatches(1)
if err := tsp.ConsumeTraces(context.Background(), batches[0]); err != nil {
t.Errorf("Failed consuming traces: %v", err)
}

// Count "decision wait" times
for i := 0; i < decisionWaitSeconds; i++ {
tsp.samplingPolicyOnTick()
}

tsp.samplingPolicyOnTick()

require.Equal(t, 1, msp.SpanCount(), "all spans were accounted for")
for _, trace := range msp.AllTraces() {
for i := 0; i < trace.ResourceSpans().Len(); i++ {
sss := trace.ResourceSpans().At(i).InstrumentationLibrarySpans()
for j := 0; j < sss.Len(); j++ {
spans := sss.At(j).Spans()
for k := 0; k < spans.Len(); k++ {
attrs := spans.At(k).Attributes()
println(attrs.Len())
_, found := attrs.Get("sampling.rule")
require.False(t, found, "sampling.rule value should not be set when only reject rules are applied")
}
}
}
}
}

//nolint:unused
func collectSpanIds(trace *pdata.Traces) []pdata.SpanID {
spanIDs := make([]pdata.SpanID, 0)
Expand Down Expand Up @@ -625,10 +675,15 @@ type mockPolicyEvaluator struct {
}

type mockDropEvaluator struct{}
type mockDropFalseEvaluator struct{}

var _ sampling.PolicyEvaluator = (*mockPolicyEvaluator)(nil)
var _ sampling.DropTraceEvaluator = (*mockDropEvaluator)(nil)

func (d *mockDropFalseEvaluator) ShouldDrop(_ pdata.TraceID, _ *sampling.TraceData) bool {
return false
}

func (m *mockPolicyEvaluator) Evaluate(_ pdata.TraceID, _ *sampling.TraceData) sampling.Decision {
m.EvaluationCount++
return m.NextDecision
Expand Down

0 comments on commit e4438a9

Please sign in to comment.