Skip to content

Commit

Permalink
Merge Sumo tail-sampling extensions
Browse files Browse the repository at this point in the history
  • Loading branch information
pmm-sumo committed Nov 4, 2020
1 parent 0ebee3e commit 58c9e34
Show file tree
Hide file tree
Showing 23 changed files with 1,113 additions and 82 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ require (
honnef.co/go/tools v0.0.1-2020.1.6
)

replace go.opentelemetry.io/collector => github.com/SumoLogic/opentelemetry-collector v0.13.0

// Replace references to modules that are in this repository with their relateive paths
// so that we always build with current (latest) version of the source code.

Expand Down
2 changes: 0 additions & 2 deletions processor/k8sprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,3 @@ require (
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig => ./../../internal/k8sconfig

replace go.opentelemetry.io/collector => github.com/SumoLogic/opentelemetry-collector v0.13.0
4 changes: 1 addition & 3 deletions processor/sourceprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,5 @@ go 1.14
require (
github.com/stretchr/testify v1.6.1
go.opencensus.io v0.22.4
go.opentelemetry.io/collector v0.8.0
go.opentelemetry.io/collector v0.13.1-0.20201103180041-ab843b20c2fb
)

replace go.opentelemetry.io/collector => github.com/SumoLogic/opentelemetry-collector v0.13.0
39 changes: 38 additions & 1 deletion processor/tailsamplingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Multiple policies exist today and it is straight forward to add more. These incl
- `numeric_attribute`: Sample based on number attributes
- `string_attribute`: Sample based on string attributes
- `rate_limiting`: Sample based on rate
- `cascading`: Sample based on a set of cascading rules
- `properties`: Sample based on properties (duration, operation name, number of spans)

The following configuration options can also be modified:
- `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision
Expand Down Expand Up @@ -50,7 +52,42 @@ processors:
name: test-policy-4,
type: rate_limiting,
rate_limiting: {spans_per_second: 35}
}
},
{
name: test-policy-5,
type: cascading,

# This is total budget available for the policy
spans_per_second: 1000,
rules: [
{
# This rule will consume no more than 150 spans_per_second for the traces with matching spans
name: "some-name",
spans_per_second: 150,
numeric_attribute: {key: key1, min_value: 50, max_value: 100}
},
{
name: "some-other-name",
spans_per_second: 50,
properties: {min_duration_micros: 9000000 }
},
{
# This rule will match anything and take any left bandwidth available, up to
# spans_per_second defined at the top level
name: "capture-anything-else",
spans_per_second: -1
}
]
},
{
name: test-policy-6,
type: properties,
properties: {
name_pattern: "foo.*",
min_number_of_spans: 10,
min_duration_micros: 100000
}
},
]
```

Expand Down
129 changes: 129 additions & 0 deletions processor/tailsamplingprocessor/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"time"

"go.opentelemetry.io/collector/config/configmodels"
)

// PolicyType indicates the type of sampling policy.
type PolicyType string

const (
// AlwaysSample samples all traces, typically used for debugging.
AlwaysSample PolicyType = "always_sample"
// NumericAttribute sample traces that have a given numeric attribute in a specified
// range, e.g.: attribute "http.status_code" >= 399 and <= 999.
NumericAttribute PolicyType = "numeric_attribute"
// StringAttribute sample traces that a attribute, of type string, matching
// one of the listed values.
StringAttribute PolicyType = "string_attribute"
// RateLimiting allows all traces until the specified limits are satisfied.
RateLimiting PolicyType = "rate_limiting"
// Properties allows all traces that conform to specified properties.
Properties PolicyType = "properties"
// Cascading provides ability to specify several rules organized by priority an with ingestion budget
Cascading PolicyType = "cascading"
)

// PolicyCfg holds the common configuration to all policies.
type PolicyCfg struct {
// Name given to the instance of the policy to make easy to identify it in metrics and logs.
Name string `mapstructure:"name"`
// Type of the policy this will be used to match the proper configuration of the policy.
Type PolicyType `mapstructure:"type"`
// Configs for numeric attribute filter sampling policy evaluator.
NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric_attribute"`
// Configs for string attribute filter sampling policy evaluator.
StringAttributeCfg StringAttributeCfg `mapstructure:"string_attribute"`
// Configs for rate limiting filter sampling policy evaluator.
RateLimitingCfg RateLimitingCfg `mapstructure:"rate_limiting"`
// Configs for properties sampling policy evaluator.
PropertiesCfg PropertiesCfg `mapstructure:"properties"`
// SpansPerSecond specifies the total budget that should never be exceeded for cascading rule
SpansPerSecond int64 `mapstructure:"spans_per_second"`
// Rules provide a list of prioritized rules for filling the budgets
Rules []CascadingRuleCfg `mapstructure:"rules"`
}

// CascadingRuleCfg holds specification of a given rule and its budget
type CascadingRuleCfg struct {
// Name, as simple as that
Name string `mapstructure:"name"`
// SpansPerSecond specifies the budget available for cascading policy rule
SpansPerSecond int64 `mapstructure:"spans_per_second"`
// Configs for numeric attribute filter sampling policy evaluator.
NumericAttributeCfg *NumericAttributeCfg `mapstructure:"numeric_attribute"`
// Configs for string attribute filter sampling policy evaluator.
StringAttributeCfg *StringAttributeCfg `mapstructure:"string_attribute"`
// Configs for properties sampling policy evaluator.
PropertiesCfg *PropertiesCfg `mapstructure:"properties"`
}

// PropertiesCfg holds the configurable settings to create a duration filter
type PropertiesCfg struct {
// NamePattern (optional) describes a regular expression that must be met by any span operation name.
NamePattern *string `mapstructure:"name_pattern"`
// MinDurationMicros (optional) is the minimum duration of trace to be considered a match.
MinDurationMicros *int64 `mapstructure:"min_duration_micros"`
// MinNumberOfSpans (optional) is the minimum number spans that must be present in a matching trace.
MinNumberOfSpans *int `mapstructure:"min_number_of_spans"`
}

// NumericAttributeCfg holds the configurable settings to create a numeric attribute filter
// sampling policy evaluator.
type NumericAttributeCfg struct {
// Tag that the filter is going to be matching against.
Key string `mapstructure:"key"`
// MinValue is the minimum value of the attribute to be considered a match.
MinValue int64 `mapstructure:"min_value"`
// MaxValue is the maximum value of the attribute to be considered a match.
MaxValue int64 `mapstructure:"max_value"`
}

// StringAttributeCfg holds the configurable settings to create a string attribute filter
// sampling policy evaluator.
type StringAttributeCfg struct {
// Tag that the filter is going to be matching against.
Key string `mapstructure:"key"`
// Values is the set of attribute values that if any is equal to the actual attribute value to be considered a match.
Values []string `mapstructure:"values"`
}

// RateLimitingCfg holds the configurable settings to create a rate limiting
// sampling policy evaluator.
type RateLimitingCfg struct {
// SpansPerSecond sets the limit on the maximum nuber of spans that can be processed each second.
SpansPerSecond int64 `mapstructure:"spans_per_second"`
}

// Config holds the configuration for tail-based sampling.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
// DecisionWait is the desired wait time from the arrival of the first span of
// trace until the decision about sampling it or not is evaluated.
DecisionWait time.Duration `mapstructure:"decision_wait"`
// NumTraces is the number of traces kept on memory. Typically most of the data
// of a trace is released after a sampling decision is taken.
NumTraces uint64 `mapstructure:"num_traces"`
// ExpectedNewTracesPerSec sets the expected number of new traces sending to the tail sampling processor
// per second. This helps with allocating data structures with closer to actual usage size.
ExpectedNewTracesPerSec uint64 `mapstructure:"expected_new_traces_per_sec"`
// PolicyCfgs sets the tail-based sampling policy which makes a sampling decision
// for a given trace when requested.
PolicyCfgs []PolicyCfg `mapstructure:"policies"`
}
57 changes: 48 additions & 9 deletions processor/tailsamplingprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/config"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -37,34 +39,71 @@ func TestLoadConfig(t *testing.T) {
require.Nil(t, err)
require.NotNil(t, cfg)

minDurationValue := int64(9000000)
minSpansValue := 10
namePatternValue := "foo.*"

assert.Equal(t, cfg.Processors["tail_sampling"],
&Config{
&config.Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "tail_sampling",
NameVal: "tail_sampling",
},
DecisionWait: 10 * time.Second,
NumTraces: 100,
ExpectedNewTracesPerSec: 10,
PolicyCfgs: []PolicyCfg{
PolicyCfgs: []config.PolicyCfg{
{
Name: "test-policy-1",
Type: AlwaysSample,
Type: config.AlwaysSample,
},
{
Name: "test-policy-2",
Type: NumericAttribute,
NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100},
Type: config.NumericAttribute,
NumericAttributeCfg: config.NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100},
},
{
Name: "test-policy-3",
Type: StringAttribute,
StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}},
Type: config.StringAttribute,
StringAttributeCfg: config.StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}},
},
{
Name: "test-policy-4",
Type: RateLimiting,
RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 35},
Type: config.RateLimiting,
RateLimitingCfg: config.RateLimitingCfg{SpansPerSecond: 35},
},
{
Name: "test-policy-5",
Type: config.Cascading,
SpansPerSecond: 1000,
Rules: []config.CascadingRuleCfg{
{
Name: "num",
SpansPerSecond: 123,
NumericAttributeCfg: &config.NumericAttributeCfg{
Key: "key1", MinValue: 50, MaxValue: 100},
},
{
Name: "dur",
SpansPerSecond: 50,
PropertiesCfg: &config.PropertiesCfg{
MinDurationMicros: &minDurationValue,
},
},
{
Name: "everything_else",
SpansPerSecond: -1,
},
},
},
{
Name: "test-policy-6",
Type: config.Properties,
PropertiesCfg: config.PropertiesCfg{
NamePattern: &namePatternValue,
MinDurationMicros: &minDurationValue,
MinNumberOfSpans: &minSpansValue,
},
},
},
})
Expand Down
6 changes: 4 additions & 2 deletions processor/tailsamplingprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/config"
)

const (
Expand All @@ -38,7 +40,7 @@ func NewFactory() component.ProcessorFactory {
}

func createDefaultConfig() configmodels.Processor {
return &Config{
return &config.Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
NameVal: typeStr,
Expand All @@ -54,6 +56,6 @@ func createTraceProcessor(
cfg configmodels.Processor,
nextConsumer consumer.TracesConsumer,
) (component.TracesProcessor, error) {
tCfg := cfg.(*Config)
tCfg := cfg.(*config.Config)
return newTraceProcessor(params.Logger, nextConsumer, *tCfg)
}
8 changes: 5 additions & 3 deletions processor/tailsamplingprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/config"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand All @@ -34,13 +36,13 @@ func TestCreateDefaultConfig(t *testing.T) {
func TestCreateProcessor(t *testing.T) {
factory := NewFactory()

cfg := factory.CreateDefaultConfig().(*Config)
cfg := factory.CreateDefaultConfig().(*config.Config)
// Manually set required fields
cfg.ExpectedNewTracesPerSec = 64
cfg.PolicyCfgs = []PolicyCfg{
cfg.PolicyCfgs = []config.PolicyCfg{
{
Name: "test-policy",
Type: AlwaysSample,
Type: config.AlwaysSample,
},
}

Expand Down
1 change: 1 addition & 0 deletions processor/tailsamplingprocessor/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,7 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opentelemetry.io/collector v0.13.0 h1:w5DywMfxHIoGhomH382SJn95+TGQDTz5r9n4bBACe4Y=
go.opentelemetry.io/collector v0.13.1-0.20201103180041-ab843b20c2fb h1:3M71pl4LBA6t3cEenTRvB9i0W7BAwoO7u5eE6C1LMds=
go.opentelemetry.io/collector v0.13.1-0.20201103180041-ab843b20c2fb/go.mod h1:itblxiZ5r454TNNQVvcAp7vj7LbwCdeNRtodo2t+lGM=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down
5 changes: 5 additions & 0 deletions processor/tailsamplingprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/obsreport"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/sampling"
)

// Variables related to metrics specific to tail sampling.
Expand Down Expand Up @@ -130,6 +133,8 @@ func SamplingProcessorMetricViews(level configtelemetry.Level) []*view.View {
countTraceDroppedTooEarlyView,
countTraceIDArrivalView,
trackTracesOnMemorylView,

sampling.CountTracesSampledRulesView,
}

return obsreport.ProcessorMetricViews(typeStr, legacyViews)
Expand Down
Loading

0 comments on commit 58c9e34

Please sign in to comment.