diff --git a/app/app_test.go b/app/app_test.go index 57a3be7ca1..c9bbafdd81 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -35,6 +35,7 @@ import ( "github.com/honeycombio/refinery/internal/peer" "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" + "github.com/honeycombio/refinery/pubsub" "github.com/honeycombio/refinery/sample" "github.com/honeycombio/refinery/sharder" "github.com/honeycombio/refinery/transmit" @@ -198,6 +199,7 @@ func newStartedApp( &inject.Object{Value: shrdr}, &inject.Object{Value: noop.NewTracerProvider().Tracer("test"), Name: "tracer"}, &inject.Object{Value: collector}, + &inject.Object{Value: &pubsub.LocalPubSub{}}, &inject.Object{Value: metricsr, Name: "metrics"}, &inject.Object{Value: metricsr, Name: "genericMetrics"}, &inject.Object{Value: metricsr, Name: "upstreamMetrics"}, @@ -778,7 +780,7 @@ func TestPeerRouting_TraceLocalityDisabled(t *testing.T) { Data: map[string]interface{}{ "trace_id": "2", "meta.refinery.min_span": true, - "meta.annotation_type": types.SpanAnnotationTypeUnknown, + "meta.annotation_type": types.SpanTypeUnknown, "meta.refinery.root": false, "meta.refinery.span_data_size": 175, }, diff --git a/collect/cache/cuckooSentCache.go b/collect/cache/cuckooSentCache.go index e03ee8ba93..f3ffd67a68 100644 --- a/collect/cache/cuckooSentCache.go +++ b/collect/cache/cuckooSentCache.go @@ -88,10 +88,10 @@ func (t *keptTraceCacheEntry) SpanCount() uint { // Count records additional spans in the cache record. func (t *keptTraceCacheEntry) Count(s *types.Span) { t.eventCount++ - switch s.AnnotationType() { - case types.SpanAnnotationTypeSpanEvent: + switch s.Type() { + case types.SpanTypeSpanEvent: t.spanEventCount++ - case types.SpanAnnotationTypeLink: + case types.SpanTypeLink: t.spanLinkCount++ default: t.spanCount++ diff --git a/collect/collect.go b/collect/collect.go index 34301ba500..bfe50c7abb 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -4,8 +4,6 @@ import ( "context" "errors" "fmt" - "math" - "math/rand" "os" "runtime" "sort" @@ -13,6 +11,7 @@ import ( "time" "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" "github.com/honeycombio/refinery/collect/cache" "github.com/honeycombio/refinery/config" @@ -22,6 +21,7 @@ import ( "github.com/honeycombio/refinery/internal/peer" "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" + "github.com/honeycombio/refinery/pubsub" "github.com/honeycombio/refinery/sample" "github.com/honeycombio/refinery/sharder" "github.com/honeycombio/refinery/transmit" @@ -30,6 +30,9 @@ import ( "github.com/sirupsen/logrus" ) +const traceDecisionKeptTopic = "trace_decision_kept" +const traceDecisionDroppedTopic = "trace_decision_dropped" + var ErrWouldBlock = errors.New("Dropping span as channel buffer is full. Span will not be processed and will be lost.") var CollectorHealthKey = "collector" @@ -69,6 +72,7 @@ type InMemCollector struct { Transmission transmit.Transmission `inject:"upstreamTransmission"` PeerTransmission transmit.Transmission `inject:"peerTransmission"` + PubSub pubsub.PubSub `inject:""` Metrics metrics.Metrics `inject:"genericMetrics"` SamplerFactory *sample.SamplerFactory `inject:""` StressRelief StressReliever `inject:"stressRelief"` @@ -84,13 +88,15 @@ type InMemCollector struct { datasetSamplers map[string]sample.Sampler - sampleTraceCache cache.TraceSentCache + TraceDecisionCache cache.TraceSentCache incoming chan *types.Span fromPeer chan *types.Span reload chan struct{} done chan struct{} redistributeTimer *redistributeNotifier + traceDecisions chan string + droppedDecisions chan string hostname string } @@ -148,7 +154,7 @@ func (i *InMemCollector) Start() error { sampleCacheConfig := i.Config.GetSampleCacheConfig() var err error - i.sampleTraceCache, err = cache.NewCuckooSentCache(sampleCacheConfig, i.Metrics) + i.TraceDecisionCache, err = cache.NewCuckooSentCache(sampleCacheConfig, i.Metrics) if err != nil { return err } @@ -173,9 +179,20 @@ func (i *InMemCollector) Start() error { i.Peers.RegisterUpdatedPeersCallback(i.redistributeTimer.Reset) } + if !i.Config.GetCollectionConfig().EnableTraceLocality { + i.PubSub.Subscribe(context.Background(), traceDecisionKeptTopic, i.signalTraceDecisions) + i.PubSub.Subscribe(context.Background(), traceDecisionDroppedTopic, i.signalTraceDecisions) + // TODO: make this configurable? + i.traceDecisions = make(chan string, 100) + i.droppedDecisions = make(chan string, 100) + } + // spin up one collector because this is a single threaded collector go i.collect() + // spin up a drop decision collector + go i.sendDropDecisions() + return nil } @@ -200,7 +217,11 @@ func (i *InMemCollector) reloadConfigs() { // pull the old cache contents into the new cache for j, trace := range i.cache.GetAll() { if j >= imcConfig.CacheCapacity { - i.send(trace, TraceSendEjectedFull) + td, err := i.makeDecision(trace, TraceSendEjectedFull) + if err != nil { + continue + } + i.send(trace, td) continue } c.Set(trace) @@ -210,7 +231,7 @@ func (i *InMemCollector) reloadConfigs() { i.Logger.Debug().Logf("skipping reloading the in-memory cache on config reload because it hasn't changed capacity") } - i.sampleTraceCache.Resize(i.Config.GetSampleCacheConfig()) + i.TraceDecisionCache.Resize(i.Config.GetSampleCacheConfig()) i.StressRelief.UpdateFromConfig(i.Config.GetStressReliefConfig()) @@ -264,7 +285,11 @@ func (i *InMemCollector) checkAlloc() { for _, trace := range allTraces { tracesSent.Add(trace.TraceID) totalDataSizeSent += trace.DataSize - i.send(trace, TraceSendEjectedMemsize) + td, err := i.makeDecision(trace, TraceSendEjectedMemsize) + if err != nil { + continue + } + i.send(trace, td) if totalDataSizeSent > int(totalToRemove) { break } @@ -354,6 +379,12 @@ func (i *InMemCollector) collect() { return case <-i.redistributeTimer.Notify(): i.redistributeTraces() + case msg, ok := <-i.traceDecisions: + if !ok { + return + } + + i.processTraceDecision(msg) case sp, ok := <-i.fromPeer: if !ok { // channel's been closed; we should shut down. @@ -376,6 +407,12 @@ func (i *InMemCollector) collect() { runtime.Gosched() i.mutex.Lock() } + case msg, ok := <-i.traceDecisions: + if !ok { + return + } + + i.processTraceDecision(msg) case <-i.redistributeTimer.Notify(): i.redistributeTraces() case sp, ok := <-i.incoming: @@ -476,12 +513,27 @@ func (i *InMemCollector) sendExpiredTracesInCache(now time.Time) { spanLimit := uint32(i.Config.GetTracesConfig().SpanLimit) for _, t := range traces { if t.RootSpan != nil { - i.send(t, TraceSendGotRoot) + td, err := i.makeDecision(t, TraceSendGotRoot) + if err != nil { + i.Logger.Error().WithFields(map[string]interface{}{ + "trace_id": t.TraceID, + }).Logf("error making decision for trace: %s", err.Error()) + continue + } + i.send(t, td) } else { if spanLimit > 0 && t.DescendantCount() > spanLimit { - i.send(t, TraceSendSpanLimit) + td, err := i.makeDecision(t, TraceSendSpanLimit) + if err != nil { + continue + } + i.send(t, td) } else { - i.send(t, TraceSendExpired) + td, err := i.makeDecision(t, TraceSendExpired) + if err != nil { + continue + } + i.send(t, td) } } } @@ -501,7 +553,7 @@ func (i *InMemCollector) processSpan(sp *types.Span) { trace := i.cache.Get(sp.TraceID) if trace == nil { // if the trace has already been sent, just pass along the span - if sr, keptReason, found := i.sampleTraceCache.CheckSpan(sp); found { + if sr, keptReason, found := i.TraceDecisionCache.CheckSpan(sp); found { i.Metrics.Increment("trace_sent_cache_hit") // bump the count of records on this trace -- if the root span isn't // the last late span, then it won't be perfect, but it will be better than @@ -531,13 +583,16 @@ func (i *InMemCollector) processSpan(sp *types.Span) { // push this into the cache and if we eject an unsent trace, send it ASAP ejectedTrace := i.cache.Set(trace) if ejectedTrace != nil { - i.send(ejectedTrace, TraceSendEjectedFull) + td, err := i.makeDecision(ejectedTrace, TraceSendEjectedFull) + if err == nil { + i.send(ejectedTrace, td) + } } } // if the trace we got back from the cache has already been sent, deal with the // span. if trace.Sent { - if sr, reason, found := i.sampleTraceCache.CheckSpan(sp); found { + if sr, reason, found := i.TraceDecisionCache.CheckSpan(sp); found { i.Metrics.Increment("trace_sent_cache_hit") i.dealWithSentTrace(ctx, sr, reason, sp) return @@ -614,7 +669,7 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool, } var rate uint - record, reason, found := i.sampleTraceCache.CheckSpan(sp) + record, reason, found := i.TraceDecisionCache.CheckSpan(sp) if !found { rate, keep, reason = i.StressRelief.GetSampleRate(sp.TraceID) now := i.Clock.Now() @@ -629,7 +684,7 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool, trace.SetSampleRate(rate) // we do want a record of how we disposed of traces in case more come in after we've // turned off stress relief (if stress relief is on we'll keep making the same decisions) - i.sampleTraceCache.Record(trace, keep, reason) + i.TraceDecisionCache.Record(trace, keep, reason) } else { rate = record.Rate() keep = record.Kept() @@ -671,7 +726,47 @@ func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSe // if we receive a proxy span after a trace decision has been made, // we should just broadcast the decision again if sp.IsDecisionSpan() { - // TODO: broadcast the decision again + var ( + msg string + err error + topic string + ) + if tr.Kept() { + topic = traceDecisionKeptTopic + msg, err = newKeptDecisionMessage(TraceDecision{ + TraceID: sp.TraceID, + Kept: tr.Kept(), + KeptReason: keptReason, + }) + if err != nil { + i.Logger.Error().WithFields(map[string]interface{}{ + "trace_id": sp.TraceID, + "kept": tr.Kept(), + "late_span": true, + }).Logf("Failed to marshal trace decision") + return + } + } else { + topic = traceDecisionDroppedTopic + msg, err = newDroppedDecisionMessage([]string{sp.TraceID}) + if err != nil { + i.Logger.Error().WithFields(map[string]interface{}{ + "trace_id": sp.TraceID, + "kept": tr.Kept(), + "late_span": true, + }).Logf("Failed to marshal trace decision") + return + } + } + + err = i.PubSub.Publish(ctx, topic, msg) + if err != nil { + i.Logger.Error().WithFields(map[string]interface{}{ + "trace_id": sp.TraceID, + "kept": tr.Kept(), + "late_span": true, + }).Logf("Failed to publish trace decision") + } return } @@ -757,7 +852,8 @@ func mergeTraceAndSpanSampleRates(sp *types.Span, traceSampleRate uint, dryRunMo } } -func (i *InMemCollector) send(trace *types.Trace, sendReason string) { +// this is only called when a trace decision is received +func (i *InMemCollector) send(trace *types.Trace, td *TraceDecision) { if trace.Sent { // someone else already sent this so we shouldn't also send it. i.Logger.Debug(). @@ -770,73 +866,55 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) { traceDur := i.Clock.Since(trace.ArrivalTime) i.Metrics.Histogram("trace_duration_ms", float64(traceDur.Milliseconds())) - i.Metrics.Histogram("trace_span_count", float64(trace.DescendantCount())) - if trace.RootSpan != nil { - i.Metrics.Increment("trace_send_has_root") - } else { - i.Metrics.Increment("trace_send_no_root") - } - - i.Metrics.Increment(sendReason) - - var sampler sample.Sampler - var found bool - - // get sampler key (dataset for legacy keys, environment for new keys) - samplerKey, isLegacyKey := trace.GetSamplerKey() + // "trace_span_count" + // "trace_send_has_root" + // "trace_send_no_root" + // do we care about these metrics for dropped traces? if we only publish trace ids for dropped traces + // if we're supposed to drop this trace, and dry run mode is not enabled, then we're done. logFields := logrus.Fields{ - "trace_id": trace.TraceID, + "trace_id": td.TraceID, } - if isLegacyKey { - logFields["dataset"] = samplerKey - } else { - logFields["environment"] = samplerKey + if !td.Kept && !i.Config.GetIsDryRun() { + i.Metrics.Increment("trace_send_dropped") + i.Logger.Info().WithFields(logFields).Logf("Dropping trace because of sampling decision") + return } - // If we have a root span, update it with the count before determining the SampleRate. - if trace.RootSpan != nil { + i.Metrics.Histogram("trace_span_count", float64(td.DescendantCount())) + if td.HasRoot { + i.Metrics.Increment("trace_send_has_root") rs := trace.RootSpan - if i.Config.GetAddCountsToRoot() { - rs.Data["meta.span_event_count"] = int64(trace.SpanEventCount()) - rs.Data["meta.span_link_count"] = int64(trace.SpanLinkCount()) - rs.Data["meta.span_count"] = int64(trace.SpanCount()) - rs.Data["meta.event_count"] = int64(trace.DescendantCount()) - } else if i.Config.GetAddSpanCountToRoot() { - rs.Data["meta.span_count"] = int64(trace.DescendantCount()) + if rs != nil { + if i.Config.GetAddCountsToRoot() { + rs.Data["meta.span_event_count"] = int64(td.EventCount) + rs.Data["meta.span_link_count"] = int64(td.LinkCount) + rs.Data["meta.span_count"] = int64(td.Count) + rs.Data["meta.event_count"] = int64(td.DescendantCount()) + } else if i.Config.GetAddSpanCountToRoot() { + rs.Data["meta.span_count"] = int64(td.DescendantCount()) + } } + } else { + i.Metrics.Increment("trace_send_no_root") } - // use sampler key to find sampler; create and cache if not found - if sampler, found = i.datasetSamplers[samplerKey]; !found { - sampler = i.SamplerFactory.GetSamplerImplementationForKey(samplerKey, isLegacyKey) - i.datasetSamplers[samplerKey] = sampler + i.Metrics.Increment(td.SendReason) + if types.IsLegacyAPIKey(trace.APIKey) { + logFields["dataset"] = td.SamplerSelector + } else { + logFields["environment"] = td.SamplerSelector } - - // make sampling decision and update the trace - rate, shouldSend, reason, key := sampler.GetSampleRate(trace) - trace.SetSampleRate(rate) - trace.KeepSample = shouldSend - logFields["reason"] = reason - if key != "" { - logFields["sample_key"] = key + logFields["reason"] = td.KeptReason + if td.SamplerKey != "" { + logFields["sample_key"] = td.SamplerKey } - // This will observe sample rate attempts even if the trace is dropped - i.Metrics.Histogram("trace_aggregate_sample_rate", float64(rate)) - i.sampleTraceCache.Record(trace, shouldSend, reason) - - // if we're supposed to drop this trace, and dry run mode is not enabled, then we're done. - if !shouldSend && !i.Config.GetIsDryRun() { - i.Metrics.Increment("trace_send_dropped") - i.Logger.Info().WithFields(logFields).Logf("Dropping trace because of sampling decision") - return - } i.Metrics.Increment("trace_send_kept") // This will observe sample rate decisions only if the trace is kept - i.Metrics.Histogram("trace_kept_sample_rate", float64(rate)) + i.Metrics.Histogram("trace_kept_sample_rate", float64(td.SampleRate)) // ok, we're not dropping this trace; send all the spans - if i.Config.GetIsDryRun() && !shouldSend { + if i.Config.GetIsDryRun() && !td.Kept { i.Logger.Info().WithFields(logFields).Logf("Trace would have been dropped, but sending because dry run mode is enabled") } else { i.Logger.Info().WithFields(logFields).Logf("Sending trace") @@ -847,10 +925,10 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) { continue } if i.Config.GetAddRuleReasonToTrace() { - sp.Data["meta.refinery.reason"] = reason - sp.Data["meta.refinery.send_reason"] = sendReason - if key != "" { - sp.Data["meta.refinery.sample_key"] = key + sp.Data["meta.refinery.reason"] = td.KeptReason + sp.Data["meta.refinery.send_reason"] = td.SendReason + if td.SamplerKey != "" { + sp.Data["meta.refinery.sample_key"] = td.SamplerKey } } @@ -858,23 +936,23 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) { // with the final total as of our send time if sp.IsRoot { if i.Config.GetAddCountsToRoot() { - sp.Data["meta.span_event_count"] = int64(trace.SpanEventCount()) - sp.Data["meta.span_link_count"] = int64(trace.SpanLinkCount()) - sp.Data["meta.span_count"] = int64(trace.SpanCount()) - sp.Data["meta.event_count"] = int64(trace.DescendantCount()) + sp.Data["meta.span_event_count"] = int64(td.EventCount) + sp.Data["meta.span_link_count"] = int64(td.LinkCount) + sp.Data["meta.span_count"] = int64(td.Count) + sp.Data["meta.event_count"] = int64(td.DescendantCount()) } else if i.Config.GetAddSpanCountToRoot() { - sp.Data["meta.span_count"] = int64(trace.DescendantCount()) + sp.Data["meta.span_count"] = int64(td.DescendantCount()) } } isDryRun := i.Config.GetIsDryRun() if isDryRun { - sp.Data[config.DryRunFieldName] = shouldSend + sp.Data[config.DryRunFieldName] = td.Kept } if i.hostname != "" { sp.Data["meta.refinery.local_hostname"] = i.hostname } - mergeTraceAndSpanSampleRates(sp, trace.SampleRate(), isDryRun) + mergeTraceAndSpanSampleRates(sp, td.SampleRate, isDryRun) i.addAdditionalAttributes(sp) i.Transmission.EnqueueSpan(sp) } @@ -904,7 +982,7 @@ func (i *InMemCollector) Stop() error { i.Transmission.Flush() } - i.sampleTraceCache.Stop() + i.TraceDecisionCache.Stop() i.mutex.Unlock() close(i.incoming) @@ -1002,7 +1080,7 @@ func (i *InMemCollector) distributeSpansOnShutdown(sentSpanChan chan sentRecord, if sp != nil && !sp.IsDecisionSpan() { // first check if there's a trace decision - record, reason, found := i.sampleTraceCache.CheckSpan(sp) + record, reason, found := i.TraceDecisionCache.CheckSpan(sp) if found { sentSpanChan <- sentRecord{sp, record, reason} continue @@ -1119,106 +1197,209 @@ func (i *InMemCollector) createDecisionSpan(sp *types.Span, trace *types.Trace, } dc.APIHost = targetShard.GetAddress() + i.Logger.Warn().WithFields(map[string]interface{}{ + "dc": dc, + "sp": sp.Data, + }).Logf("creating decision span") return dc } -func newRedistributeNotifier(logger logger.Logger, met metrics.Metrics, clock clockwork.Clock) *redistributeNotifier { - r := &redistributeNotifier{ - initialDelay: 3 * time.Second, - maxDelay: 30 * time.Second, - maxAttempts: 5, - done: make(chan struct{}), - clock: clock, - logger: logger, - metrics: met, - triggered: make(chan struct{}), - reset: make(chan struct{}), +func (i *InMemCollector) signalTraceDecisions(ctx context.Context, msg string) { + select { + case <-ctx.Done(): + case i.traceDecisions <- msg: + default: + i.Logger.Warn().Logf("trace decision channel is full. Dropping message") } - - return r } +func (i *InMemCollector) processTraceDecision(msg string) { + tds, err := unmarshalTraceDecisionMessage(msg) + if err != nil { + i.Logger.Error().Logf("Failed to unmarshal trace decision message. %s", err) + } + toDelete := generics.NewSet[string]() + for _, td := range tds { + trace := i.cache.Get(td.TraceID) + // if we don't have the trace in the cache, we don't need to do anything + if trace == nil { + i.Logger.Debug().Logf("trace not found in cache for trace decision") + return + } + toDelete.Add(td.TraceID) -type redistributeNotifier struct { - clock clockwork.Clock - logger logger.Logger - initialDelay time.Duration - maxAttempts int - maxDelay time.Duration - metrics metrics.Metrics - - reset chan struct{} - done chan struct{} - triggered chan struct{} - once sync.Once -} + i.TraceDecisionCache.Record(trace, td.Kept, td.KeptReason) + + i.send(trace, &td) + } -func (r *redistributeNotifier) Notify() <-chan struct{} { - return r.triggered + i.cache.RemoveTraces(toDelete) } -func (r *redistributeNotifier) Reset() { - var started bool - r.once.Do(func() { - go r.run() - started = true +func (i *InMemCollector) makeDecision(trace *types.Trace, sendReason string) (*TraceDecision, error) { + if !i.IsMyTrace(trace.ID()) { + return nil, errors.New("cannot make a decision for partial traces") + } + + _, span := otelutil.StartSpan(context.Background(), i.Tracer, "makeDecision") + defer span.End() + + otelutil.AddSpanFields(span, map[string]interface{}{ + "trace_id": trace.ID(), + "root": trace.RootSpan, + "send_by": trace.SendBy, + "arrival": trace.ArrivalTime, }) - if started { - return + var sampler sample.Sampler + var found bool + // get sampler key (dataset for legacy keys, environment for new keys) + samplerSelector, isLegacyKey := trace.GetSamplerKey() + + // use sampler key to find sampler; create and cache if not found + if sampler, found = i.datasetSamplers[samplerSelector]; !found { + sampler = i.SamplerFactory.GetSamplerImplementationForKey(samplerSelector, isLegacyKey) + i.datasetSamplers[samplerSelector] = sampler } - select { - case r.reset <- struct{}{}: - case <-r.done: - return - default: - r.logger.Debug().Logf("A trace redistribution is ongoing. Ignoring reset.") + // make sampling decision and update the trace + rate, shouldSend, reason, key := sampler.GetSampleRate(trace) + trace.SetSampleRate(rate) + trace.KeepSample = shouldSend + // This will observe sample rate attempts even if the trace is dropped + i.Metrics.Histogram("trace_aggregate_sample_rate", float64(rate)) + + i.TraceDecisionCache.Record(trace, shouldSend, reason) + + // TODO: if we only send trace ids for dropped traces + // then dryrun mode won't work + if !shouldSend && !i.Config.GetCollectionConfig().EnableTraceLocality { + select { + case i.droppedDecisions <- trace.ID(): + default: + i.Logger.Warn().WithField("trace_id", trace.ID()).Logf("failed to send dropped decision. drop decision channel is full.") + } + + return &TraceDecision{ + TraceID: trace.ID(), + Kept: shouldSend, + }, nil + } -} -func (r *redistributeNotifier) Stop() { - close(r.done) + var hasRoot bool + if trace.RootSpan != nil { + hasRoot = true + } + otelutil.AddSpanFields(span, map[string]interface{}{ + "kept": shouldSend, + "reason": reason, + "sampler": key, + "selector": samplerSelector, + "rate": rate, + "send_reason": sendReason, + "hasRoot": hasRoot, + }) + i.Logger.Warn().WithField("key", key).Logf("making decision for trace") + td := TraceDecision{ + TraceID: trace.ID(), + Kept: shouldSend, + KeptReason: reason, + SamplerKey: key, + SamplerSelector: samplerSelector, + SampleRate: rate, + SendReason: sendReason, + Count: trace.SpanCount(), + EventCount: trace.SpanEventCount(), + LinkCount: trace.SpanLinkCount(), + HasRoot: hasRoot, + } + + if !i.Config.GetCollectionConfig().EnableTraceLocality { + decisionMsg, err := newKeptDecisionMessage(td) + if err != nil { + i.Logger.Error().WithFields(map[string]interface{}{ + "trace_id": trace.TraceID, + "kept": shouldSend, + "reason": reason, + "sampler": key, + "selector": samplerSelector, + "error": err.Error(), + }).Logf("Failed to marshal trace decision") + return nil, err + } + err = i.PubSub.Publish(context.Background(), traceDecisionKeptTopic, decisionMsg) + if err != nil { + i.Logger.Error().WithFields(map[string]interface{}{ + "trace_id": trace.TraceID, + "kept": shouldSend, + "reason": reason, + "sampler": key, + "selector": samplerSelector, + "error": err.Error(), + }).Logf("Failed to publish trace decision") + return nil, err + } + } + + return &td, nil } -func (r *redistributeNotifier) run() { - var attempts int - lastBackoff := r.initialDelay +func (i *InMemCollector) sendDropDecisions() { + //ToDO: make this configurable + maxCount := 500 + ticker := i.Clock.NewTicker(5 * time.Second) + defer ticker.Stop() + traceIDs := make([]string, 0, maxCount) + send := false + eg := &errgroup.Group{} for { - // if we've reached the max attempts, reset the backoff and attempts - // only when the reset signal is received. - if attempts >= r.maxAttempts { - r.metrics.Gauge("trace_redistribution_count", 0) - <-r.reset - lastBackoff = r.initialDelay - attempts = 0 - } select { - case <-r.done: + case <-i.done: + eg.Wait() return - case r.triggered <- struct{}{}: + case id := <-i.droppedDecisions: + // if we get a trace ID, add it to the list + traceIDs = append(traceIDs, id) + // if we exceeded the max count, we need to send + if len(traceIDs) >= maxCount { + send = true + } + case <-ticker.Chan(): + // ticker fired, so send what we have + send = true } - attempts++ - r.metrics.Gauge("trace_redistribution_count", attempts) + // if we need to send, do so + if send && len(traceIDs) > 0 { + // copy the traceIDs so we can clear the list + idsToProcess := make([]string, len(traceIDs)) + copy(idsToProcess, traceIDs) + // clear the list + traceIDs = traceIDs[:0] - // Calculate the backoff interval using exponential backoff with a base time. - backoff := time.Duration(math.Min(float64(lastBackoff)*2, float64(r.maxDelay))) - // Add jitter to the backoff to avoid retry collisions. - jitter := time.Duration(rand.Float64() * float64(backoff) * 0.5) - nextBackoff := backoff + jitter - lastBackoff = nextBackoff + // now process the result in a goroutine so we can keep listening + eg.Go(func() error { + select { + case <-i.done: + return nil + default: + msg, err := newDroppedDecisionMessage(idsToProcess) + if err != nil { + i.Logger.Error().Logf("Failed to marshal dropped trace decision") + } + err = i.PubSub.Publish(context.Background(), traceDecisionDroppedTopic, msg) + if err != nil { + i.Logger.Error().Logf("Failed to publish dropped trace decision") + } + } - timer := r.clock.NewTimer(nextBackoff) - select { - case <-timer.Chan(): - timer.Stop() - case <-r.reset: - lastBackoff = r.initialDelay - attempts = 0 - timer.Stop() - case <-r.done: - timer.Stop() - return + return nil + }) + send = false } } } + +func (i *InMemCollector) IsMyTrace(traceID string) bool { + return i.Sharder.WhichShard(traceID).Equals(i.Sharder.MyShard()) +} diff --git a/collect/collect_test.go b/collect/collect_test.go index 61d93f1abd..93840191b9 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -22,6 +22,7 @@ import ( "github.com/honeycombio/refinery/internal/peer" "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" + "github.com/honeycombio/refinery/pubsub" "github.com/honeycombio/refinery/sample" "github.com/honeycombio/refinery/sharder" "github.com/honeycombio/refinery/transmit" @@ -48,6 +49,11 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission, pe Clock: clock, } healthReporter.Start() + localPubSub := &pubsub.LocalPubSub{ + Config: conf, + Metrics: s, + } + localPubSub.Start() return &InMemCollector{ Config: conf, @@ -57,6 +63,7 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission, pe Health: healthReporter, Transmission: transmission, PeerTransmission: peerTransmission, + PubSub: localPubSub, Metrics: &metrics.NullMetrics{}, StressRelief: &MockStressReliever{}, SamplerFactory: &sample.SamplerFactory{ @@ -103,7 +110,7 @@ func TestAddRootSpan(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) @@ -129,7 +136,6 @@ func TestAddRootSpan(t *testing.T) { // * create the trace in the cache // * send the trace // * remove the trace from the cache - // * remove the trace from the cache assert.Nil(t, coll.getFromCache(traceID1), "after sending the span, it should be removed from the cache") transmission.Mux.RLock() assert.Equal(t, 1, len(transmission.Events), "adding a root span should send the span") @@ -213,7 +219,7 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) @@ -304,7 +310,7 @@ func TestTransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) @@ -372,7 +378,7 @@ func TestAddSpan(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) @@ -451,7 +457,7 @@ func TestDryRunMode(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) @@ -532,7 +538,6 @@ func TestDryRunMode(t *testing.T) { // check that meta value associated with dry run mode is properly applied assert.Equal(t, uint(10), transmission.Events[1].Data["meta.dryrun.sample_rate"]) // check expected sampleRate against span data - assert.Equal(t, sampleRate1, transmission.Events[0].Data["meta.dryrun.sample_rate"]) assert.Equal(t, sampleRate2, transmission.Events[1].Data["meta.dryrun.sample_rate"]) transmission.Mux.RUnlock() @@ -751,7 +756,7 @@ func TestStableMaxAlloc(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 1000) coll.fromPeer = make(chan *types.Span, 5) @@ -840,7 +845,7 @@ func TestAddSpanNoBlock(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 3) coll.fromPeer = make(chan *types.Span, 3) @@ -880,7 +885,9 @@ func TestDependencyInjection(t *testing.T) { &inject.Object{Value: &sharder.SingleServerSharder{}}, &inject.Object{Value: &transmit.MockTransmission{}, Name: "upstreamTransmission"}, &inject.Object{Value: &transmit.MockTransmission{}, Name: "peerTransmission"}, + &inject.Object{Value: &pubsub.LocalPubSub{}}, &inject.Object{Value: &metrics.NullMetrics{}, Name: "genericMetrics"}, + &inject.Object{Value: &metrics.NullMetrics{}, Name: "metrics"}, &inject.Object{Value: &sample.SamplerFactory{}}, &inject.Object{Value: &MockStressReliever{}, Name: "stressRelief"}, &inject.Object{Value: &peer.MockPeers{}}, @@ -924,7 +931,7 @@ func TestAddCountsToRoot(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) @@ -1014,7 +1021,7 @@ func TestLateRootGetsCounts(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) @@ -1104,7 +1111,7 @@ func TestAddSpanCount(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) @@ -1190,7 +1197,7 @@ func TestLateRootGetsSpanCount(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) @@ -1268,7 +1275,7 @@ func TestLateSpanNotDecorated(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) @@ -1338,7 +1345,7 @@ func TestAddAdditionalAttributes(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) @@ -1403,7 +1410,7 @@ func TestStressReliefSampleRate(t *testing.T) { stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc var traceID = "traceABC" @@ -1427,7 +1434,7 @@ func TestStressReliefSampleRate(t *testing.T) { require.True(t, processed) require.True(t, kept) - tr, _, found := coll.sampleTraceCache.CheckTrace(traceID) + tr, _, found := coll.TraceDecisionCache.CheckTrace(traceID) require.True(t, found) require.NotNil(t, tr) assert.Equal(t, uint(100), tr.Rate()) @@ -1452,7 +1459,7 @@ func TestStressReliefSampleRate(t *testing.T) { require.True(t, processed2) require.True(t, kept2) - tr2, _, found2 := coll.sampleTraceCache.CheckTrace(traceID) + tr2, _, found2 := coll.TraceDecisionCache.CheckTrace(traceID) require.True(t, found2) require.NotNil(t, tr2) assert.Equal(t, uint(100), tr2.Rate()) @@ -1496,7 +1503,7 @@ func TestStressReliefDecorateHostname(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) @@ -1601,7 +1608,7 @@ func TestSpanWithRuleReasons(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) @@ -1796,7 +1803,7 @@ func TestDrainTracesOnShutdown(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) @@ -1890,7 +1897,7 @@ func TestBigTracesGoEarly(t *testing.T) { coll.cache = c stc, err := newCache() assert.NoError(t, err, "lru cache should start") - coll.sampleTraceCache = stc + coll.TraceDecisionCache = stc coll.incoming = make(chan *types.Span, 500) coll.fromPeer = make(chan *types.Span, 500) @@ -2010,7 +2017,7 @@ func TestCreateDecisionSpan(t *testing.T) { APIHost: peerShard.Addr, APIKey: legacyAPIKey, Data: map[string]interface{}{ - "meta.annotation_type": types.SpanAnnotationTypeUnknown, + "meta.annotation_type": types.SpanTypeUnknown, "meta.refinery.min_span": true, "meta.refinery.root": false, "meta.refinery.span_data_size": 30, diff --git a/collect/redistributer.go b/collect/redistributer.go new file mode 100644 index 0000000000..4bfe7f3a29 --- /dev/null +++ b/collect/redistributer.go @@ -0,0 +1,113 @@ +package collect + +import ( + "math" + "math/rand/v2" + "sync" + "time" + + "github.com/honeycombio/refinery/logger" + "github.com/honeycombio/refinery/metrics" + "github.com/jonboulle/clockwork" +) + +type redistributeNotifier struct { + clock clockwork.Clock + logger logger.Logger + initialDelay time.Duration + maxAttempts int + maxDelay time.Duration + metrics metrics.Metrics + + reset chan struct{} + done chan struct{} + triggered chan struct{} + once sync.Once +} + +func newRedistributeNotifier(logger logger.Logger, met metrics.Metrics, clock clockwork.Clock) *redistributeNotifier { + r := &redistributeNotifier{ + initialDelay: 3 * time.Second, + maxDelay: 30 * time.Second, + maxAttempts: 5, + done: make(chan struct{}), + clock: clock, + logger: logger, + metrics: met, + triggered: make(chan struct{}), + reset: make(chan struct{}), + } + + return r +} + +func (r *redistributeNotifier) Notify() <-chan struct{} { + return r.triggered +} + +func (r *redistributeNotifier) Reset() { + var started bool + r.once.Do(func() { + go r.run() + started = true + }) + + if started { + return + } + + select { + case r.reset <- struct{}{}: + case <-r.done: + return + default: + r.logger.Debug().Logf("A trace redistribution is ongoing. Ignoring reset.") + } +} + +func (r *redistributeNotifier) Stop() { + close(r.done) +} + +func (r *redistributeNotifier) run() { + var attempts int + lastBackoff := r.initialDelay + for { + // if we've reached the max attempts, reset the backoff and attempts + // only when the reset signal is received. + if attempts >= r.maxAttempts { + r.metrics.Gauge("trace_redistribution_count", 0) + <-r.reset + lastBackoff = r.initialDelay + attempts = 0 + } + select { + case <-r.done: + return + case r.triggered <- struct{}{}: + } + + attempts++ + r.metrics.Gauge("trace_redistribution_count", attempts) + + // Calculate the backoff interval using exponential backoff with a base time. + backoff := time.Duration(math.Min(float64(lastBackoff)*2, float64(r.maxDelay))) + // Add jitter to the backoff to avoid retry collisions. + jitter := time.Duration(rand.Float64() * float64(backoff) * 0.5) + nextBackoff := backoff + jitter + lastBackoff = nextBackoff + + timer := r.clock.NewTimer(nextBackoff) + select { + case <-timer.Chan(): + timer.Stop() + case <-r.reset: + lastBackoff = r.initialDelay + attempts = 0 + timer.Stop() + case <-r.done: + timer.Stop() + return + } + } +} diff --git a/collect/trace_decision.go b/collect/trace_decision.go new file mode 100644 index 0000000000..e545ee2207 --- /dev/null +++ b/collect/trace_decision.go @@ -0,0 +1,75 @@ +package collect + +import ( + "encoding/json" + "fmt" + "strings" +) + +const ( + droppedPrefix = "drop:" + keptPrefix = "kept:" +) + +func newDroppedDecisionMessage(traceIDs []string) (string, error) { + if len(traceIDs) == 0 { + return "", fmt.Errorf("no traceIDs provided") + } + data := strings.Join(traceIDs, ",") + return droppedPrefix + string(data), nil +} +func newKeptDecisionMessage(td TraceDecision) (string, error) { + data, err := json.Marshal(td) + if err != nil { + return "", err + } + + return keptPrefix + string(data), nil +} + +func unmarshalTraceDecisionMessage(msg string) (td []TraceDecision, err error) { + data := strings.SplitN(msg, ":", 2) + if len(data) != 2 { + return nil, fmt.Errorf("invalid message format for trace decision") + } + + switch data[0] { + case droppedPrefix: + for _, traceID := range strings.Split(data[1], ",") { + td = append(td, TraceDecision{TraceID: traceID}) + } + return td, nil + case keptPrefix: + keptDecision := TraceDecision{} + err = json.Unmarshal([]byte(data[1]), &keptDecision) + if err != nil { + return nil, err + } + td = append(td, keptDecision) + + return td, nil + default: + return nil, fmt.Errorf("unexpected message prefix for trace decision") + } +} + +type TraceDecision struct { + TraceID string + // if we don'g need to immediately eject traces from the trace cache, + // we could remove this field. The TraceDecision type could be renamed to + // keptDecision + Kept bool + SampleRate uint + SamplerKey string + SamplerSelector string + KeptReason string + SendReason string + Count uint32 // number of spans in the trace + EventCount uint32 // number of span events in the trace + LinkCount uint32 // number of span links in the trace + HasRoot bool +} + +func (td *TraceDecision) DescendantCount() uint32 { + return td.Count + td.EventCount + td.LinkCount +} diff --git a/route/route.go b/route/route.go index 75dc90345b..0a9d37752f 100644 --- a/route/route.go +++ b/route/route.go @@ -636,6 +636,7 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error { if r.incomingOrPeer == "incoming" { err = r.Collector.AddSpan(span) } else { + // TODO: again, only do this if span proxy is disabled err = r.Collector.AddSpanFromPeer(span) } if err != nil { diff --git a/route/route_test.go b/route/route_test.go index ba3c9179f6..95e8a011e4 100644 --- a/route/route_test.go +++ b/route/route_test.go @@ -21,6 +21,7 @@ import ( "github.com/honeycombio/refinery/internal/peer" "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" + "github.com/honeycombio/refinery/pubsub" "github.com/honeycombio/refinery/sharder" "github.com/honeycombio/refinery/transmit" "github.com/honeycombio/refinery/types" @@ -485,6 +486,7 @@ func TestDependencyInjection(t *testing.T) { &inject.Object{Value: http.DefaultTransport, Name: "upstreamTransport"}, &inject.Object{Value: &transmit.MockTransmission{}, Name: "upstreamTransmission"}, &inject.Object{Value: &transmit.MockTransmission{}, Name: "peerTransmission"}, + &inject.Object{Value: &pubsub.LocalPubSub{}}, &inject.Object{Value: &sharder.MockSharder{}}, &inject.Object{Value: &collect.InMemCollector{}}, &inject.Object{Value: &metrics.NullMetrics{}, Name: "metrics"}, diff --git a/types/event.go b/types/event.go index dd891e4b14..852faec733 100644 --- a/types/event.go +++ b/types/event.go @@ -139,8 +139,8 @@ func (t *Trace) DescendantCount() uint32 { func (t *Trace) SpanCount() uint32 { var count uint32 for _, s := range t.spans { - switch s.AnnotationType() { - case SpanAnnotationTypeSpanEvent, SpanAnnotationTypeLink: + switch s.Type() { + case SpanTypeSpanEvent, SpanTypeLink: continue default: count++ @@ -154,7 +154,7 @@ func (t *Trace) SpanCount() uint32 { func (t *Trace) SpanLinkCount() uint32 { var count uint32 for _, s := range t.spans { - if s.AnnotationType() == SpanAnnotationTypeLink { + if s.Type() == SpanTypeLink { count++ } } @@ -165,7 +165,7 @@ func (t *Trace) SpanLinkCount() uint32 { func (t *Trace) SpanEventCount() uint32 { var count uint32 for _, s := range t.spans { - if s.AnnotationType() == SpanAnnotationTypeSpanEvent { + if s.Type() == SpanTypeSpanEvent { count++ } } @@ -227,7 +227,7 @@ func (sp *Span) ExtractDecisionContext() *Event { "trace_id": sp.TraceID, "meta.refinery.root": sp.IsRoot, "meta.refinery.min_span": true, - "meta.annotation_type": sp.AnnotationType(), + "meta.annotation_type": sp.Type(), "meta.refinery.span_data_size": dataSize, } return &decisionCtx @@ -269,28 +269,28 @@ func (sp *Span) GetDataSize() int { return total } -// SpanAnnotationType is an enum for the type of annotation this span is. -type SpanAnnotationType int +// SpanType is an enum for the type of annotation this span is. +type SpanType int const ( - // SpanAnnotationTypeUnknown is the default value for an unknown annotation type. - SpanAnnotationTypeUnknown SpanAnnotationType = iota - // SpanAnnotationTypeSpanEvent is the type for a span event. - SpanAnnotationTypeSpanEvent - // SpanAnnotationTypeLink is the type for a span link. - SpanAnnotationTypeLink + // SpanTypeUnknown is the default value for an unknown annotation type. + SpanTypeUnknown SpanType = iota + // SpanTypeSpanEvent is the type for a span event. + SpanTypeSpanEvent + // SpanTypeLink is the type for a span link. + SpanTypeLink ) -// AnnotationType returns the type of annotation this span is. -func (sp *Span) AnnotationType() SpanAnnotationType { +// Type returns the type of annotation this span is. +func (sp *Span) Type() SpanType { t := sp.Data["meta.annotation_type"] switch t { case "span_event": - return SpanAnnotationTypeSpanEvent + return SpanTypeSpanEvent case "link": - return SpanAnnotationTypeLink + return SpanTypeLink default: - return SpanAnnotationTypeUnknown + return SpanTypeUnknown } } diff --git a/types/event_test.go b/types/event_test.go index b1ac8881ac..c7051733c9 100644 --- a/types/event_test.go +++ b/types/event_test.go @@ -44,15 +44,15 @@ func TestSpan_GetDataSize(t *testing.T) { } } -func TestSpan_AnnotationType(t *testing.T) { +func TestSpan_Type(t *testing.T) { tests := []struct { name string data map[string]any - want SpanAnnotationType + want SpanType }{ - {"unknown", map[string]any{}, SpanAnnotationTypeUnknown}, - {"span_event", map[string]any{"meta.annotation_type": "span_event"}, SpanAnnotationTypeSpanEvent}, - {"link", map[string]any{"meta.annotation_type": "link"}, SpanAnnotationTypeLink}, + {"unknown", map[string]any{}, SpanTypeUnknown}, + {"span_event", map[string]any{"meta.annotation_type": "span_event"}, SpanTypeSpanEvent}, + {"link", map[string]any{"meta.annotation_type": "link"}, SpanTypeLink}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -61,7 +61,7 @@ func TestSpan_AnnotationType(t *testing.T) { Data: tt.data, }, } - if got := sp.AnnotationType(); got != tt.want { + if got := sp.Type(); got != tt.want { t.Errorf("Span.AnnotationType() = %v, want %v", got, tt.want) } }) @@ -99,7 +99,7 @@ func TestSpan_ExtractDecisionContext(t *testing.T) { "trace_id": sp.TraceID, "meta.refinery.root": true, "meta.refinery.min_span": true, - "meta.annotation_type": SpanAnnotationTypeSpanEvent, + "meta.annotation_type": SpanTypeSpanEvent, "meta.refinery.span_data_size": 14, }, got.Data) }