Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: publish trace decisions through redis pubsub #1362

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 95 additions & 39 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ import (
"github.com/honeycombio/refinery/internal/peer"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/pubsub"
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/sharder"
"github.com/honeycombio/refinery/transmit"
"github.com/honeycombio/refinery/types"
)

const legacyAPIKey = "c9945edf5d245834089a1bd6cc9ad01e"
Expand Down Expand Up @@ -89,25 +89,35 @@ func (w *countingWriterSender) waitForCount(t testing.TB, target int) {
}
}

func defaultConfig(basePort int) *config.MockConfig {
func defaultConfig(basePort int, redisDB int) *config.MockConfig {
return &config.MockConfig{
GetTracesConfigVal: config.TracesConfig{
SendTicker: config.Duration(2 * time.Millisecond),
SendDelay: config.Duration(1 * time.Millisecond),
TraceTimeout: config.Duration(10 * time.Millisecond),
MaxBatchSize: 500,
},
GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1},
PeerManagementType: "file",
GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1},
PeerManagementType: "redis",
GetRedisPeerManagementVal: config.RedisPeerManagementConfig{
Prefix: "refinery-app-test",
Timeout: config.Duration(1 * time.Second),
Database: redisDB,
},
GetUpstreamBufferSizeVal: 10000,
GetPeerBufferSizeVal: 10000,
GetListenAddrVal: "127.0.0.1:" + strconv.Itoa(basePort),
GetPeerListenAddrVal: "127.0.0.1:" + strconv.Itoa(basePort+1),
GetHoneycombAPIVal: "http://api.honeycomb.io",
GetCollectionConfigVal: config.CollectionConfig{CacheCapacity: 10000, ShutdownDelay: config.Duration(1 * time.Second), EnableTraceLocality: true},
TraceIdFieldNames: []string{"trace.trace_id"},
ParentIdFieldNames: []string{"trace.parent_id"},
SampleCache: config.SampleCacheConfig{KeptSize: 10000, DroppedSize: 100000, SizeCheckInterval: config.Duration(10 * time.Second)},
GetCollectionConfigVal: config.CollectionConfig{
CacheCapacity: 10000,
ShutdownDelay: config.Duration(1 * time.Second),
HealthCheckTimeout: config.Duration(3 * time.Second),
EnableTraceLocality: true,
},
TraceIdFieldNames: []string{"trace.trace_id"},
ParentIdFieldNames: []string{"trace.parent_id"},
SampleCache: config.SampleCacheConfig{KeptSize: 10000, DroppedSize: 100000, SizeCheckInterval: config.Duration(10 * time.Second)},
GetAccessKeyConfigVal: config.AccessKeyConfig{
ReceiveKeys: []string{legacyAPIKey, nonLegacyAPIKey},
AcceptOnlyListedKeys: true,
Expand Down Expand Up @@ -196,6 +206,7 @@ func newStartedApp(
&inject.Object{Value: shrdr},
&inject.Object{Value: noop.NewTracerProvider().Tracer("test"), Name: "tracer"},
&inject.Object{Value: collector},
&inject.Object{Value: &pubsub.GoRedisPubSub{}},
&inject.Object{Value: metricsr, Name: "metrics"},
&inject.Object{Value: metricsr, Name: "genericMetrics"},
&inject.Object{Value: metricsr, Name: "upstreamMetrics"},
Expand Down Expand Up @@ -232,9 +243,10 @@ func post(t testing.TB, req *http.Request) {
func TestAppIntegration(t *testing.T) {
t.Parallel()
port := 10500
redisDB := 2

sender := &transmission.MockSender{}
cfg := defaultConfig(port)
cfg := defaultConfig(port, redisDB)
app, graph := newStartedApp(t, sender, nil, nil, cfg)

// Send a root span, it should be sent in short order.
Expand Down Expand Up @@ -270,9 +282,10 @@ func TestAppIntegrationWithNonLegacyKey(t *testing.T) {
// Parallel integration tests need different ports!
t.Parallel()
port := 10600
redisDB := 3

sender := &transmission.MockSender{}
cfg := defaultConfig(port)
cfg := defaultConfig(port, redisDB)
a, graph := newStartedApp(t, sender, nil, nil, cfg)
a.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
a.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
Expand Down Expand Up @@ -311,9 +324,10 @@ func TestAppIntegrationWithUnauthorizedKey(t *testing.T) {
// Parallel integration tests need different ports!
t.Parallel()
port := 10700
redisDB := 4

sender := &transmission.MockSender{}
cfg := defaultConfig(port)
cfg := defaultConfig(port, redisDB)
a, graph := newStartedApp(t, sender, nil, nil, cfg)
a.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
a.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
Expand Down Expand Up @@ -355,7 +369,8 @@ func TestPeerRouting(t *testing.T) {
Peers: peerList,
ID: peerList[i],
}
cfg := defaultConfig(basePort)
redisDB := 5 + i
cfg := defaultConfig(basePort, redisDB)

apps[i], graph = newStartedApp(t, senders[i], nil, peers, cfg)
defer startstop.Stop(graph.Objects(), nil)
Expand All @@ -378,7 +393,7 @@ func TestPeerRouting(t *testing.T) {
post(t, req)
assert.Eventually(t, func() bool {
return len(senders[0].Events()) == 1
}, 2*time.Second, 2*time.Millisecond)
}, 5*time.Second, 2*time.Millisecond)

expectedEvent := &transmission.Event{
APIKey: legacyAPIKey,
Expand Down Expand Up @@ -429,23 +444,24 @@ func TestPeerRouting(t *testing.T) {
req.Body = io.NopCloser(strings.NewReader(blob))
post(t, req)
require.Eventually(t, func() bool {
return len(senders[0].Events()) == 2
}, 2*time.Second, 2*time.Millisecond)
return len(senders[0].Events()) == 1
}, 5*time.Second, 2*time.Millisecond)
expectedEvent.Metadata = map[string]any{
"api_host": "http://api.honeycomb.io",
"dataset": "dataset",
"environment": "",
"enqueued_at": senders[0].Events()[1].Metadata.(map[string]any)["enqueued_at"],
"enqueued_at": senders[0].Events()[0].Metadata.(map[string]any)["enqueued_at"],
}
assert.Equal(t, expectedEvent, senders[0].Events()[1])
assert.Equal(t, expectedEvent, senders[0].Events()[0])
}

func TestHostMetadataSpanAdditions(t *testing.T) {
t.Parallel()
port := 14000
redisDB := 7

sender := &transmission.MockSender{}
cfg := defaultConfig(port)
cfg := defaultConfig(port, redisDB)
cfg.AddHostMetadataToTrace = true
app, graph := newStartedApp(t, sender, nil, nil, cfg)

Expand Down Expand Up @@ -500,8 +516,9 @@ func TestEventsEndpoint(t *testing.T) {
Peers: peerList,
ID: peerList[i],
}
redisDB := 8 + i

cfg := defaultConfig(basePort)
cfg := defaultConfig(basePort, redisDB)
apps[i], graph = newStartedApp(t, senders[i], nil, peers, cfg)
defer startstop.Stop(graph.Objects(), nil)
}
Expand Down Expand Up @@ -618,7 +635,8 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
ID: peerList[i],
}

cfg := defaultConfig(basePort)
redisDB := 10 + i
cfg := defaultConfig(basePort, redisDB)

app, graph := newStartedApp(t, senders[i], nil, peers, cfg)
app.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
Expand Down Expand Up @@ -730,22 +748,21 @@ func TestPeerRouting_TraceLocalityDisabled(t *testing.T) {

var apps [2]*App
var senders [2]*transmission.MockSender
var peerSenders [2]*transmission.MockSender
for i := range apps {
var graph inject.Graph
basePort := 17000 + (i * 2)
senders[i] = &transmission.MockSender{}
peerSenders[i] = &transmission.MockSender{}
peers := &peer.MockPeers{
Peers: peerList,
ID: peerList[i],
}
cfg := defaultConfig(basePort)
collectionCfg := cfg.GetCollectionConfigVal
redisDB := 12 + i
cfg := defaultConfig(basePort, redisDB)
collectionCfg := cfg.GetCollectionConfig()
collectionCfg.EnableTraceLocality = false
cfg.GetCollectionConfigVal = collectionCfg

apps[i], graph = newStartedApp(t, senders[i], peerSenders[i], peers, cfg)
apps[i], graph = newStartedApp(t, senders[i], nil, peers, cfg)
defer startstop.Stop(graph.Objects(), nil)
}

Expand All @@ -764,31 +781,68 @@ func TestPeerRouting_TraceLocalityDisabled(t *testing.T) {
blob := `[` + string(spans[10]) + `]`
req.Body = io.NopCloser(strings.NewReader(blob))
post(t, req)
require.Eventually(t, func() bool {
return len(peerSenders[1].Events()) == 1
}, 2*time.Second, 2*time.Millisecond)
assert.Eventually(t, func() bool {
return len(senders[1].Events()) == 1
}, 5*time.Second, 2*time.Millisecond)

expectedEvent := &transmission.Event{
APIKey: legacyAPIKey,
Dataset: "dataset",
SampleRate: 2,
APIHost: "http://localhost:17001",
APIHost: "http://api.honeycomb.io",
Timestamp: now,
Data: map[string]interface{}{
"trace_id": "2",
"meta.refinery.min_span": true,
"meta.annotation_type": types.SpanAnnotationTypeUnknown,
"meta.refinery.root": false,
"meta.refinery.span_data_size": 168,
"trace.trace_id": "2",
"trace.span_id": "10",
"trace.parent_id": "0000000000",
"key": "value",
"field0": float64(0),
"field1": float64(1),
"field2": float64(2),
"field3": float64(3),
"field4": float64(4),
"field5": float64(5),
"field6": float64(6),
"field7": float64(7),
"field8": float64(8),
"field9": float64(9),
"field10": float64(10),
"long": "this is a test of the emergency broadcast system",
"meta.refinery.original_sample_rate": uint(2),
"meta.refinery.incoming_user_agent": "Test-Client",
"foo": "bar",
},
Metadata: map[string]any{
"api_host": "http://localhost:17001",
"api_host": "http://api.honeycomb.io",
"dataset": "dataset",
"environment": "",
"enqueued_at": peerSenders[1].Events()[0].Metadata.(map[string]any)["enqueued_at"],
"enqueued_at": senders[1].Events()[0].Metadata.(map[string]any)["enqueued_at"],
},
}
assert.Equal(t, expectedEvent, peerSenders[1].Events()[0])
assert.Equal(t, expectedEvent, senders[1].Events()[0])
// Repeat, but deliver to host 1 on the peer channel, it should be
// passed to host 0 since that's who the trace belongs to.
req, err = http.NewRequest(
"POST",
"http://localhost:17003/1/batch/dataset",
nil,
)
assert.NoError(t, err)
req.Header.Set("X-Honeycomb-Team", legacyAPIKey)
req.Header.Set("Content-Type", "application/json")

req.Body = io.NopCloser(strings.NewReader(blob))
post(t, req)
require.Eventually(t, func() bool {
return len(senders[1].Events()) == 2
}, 5*time.Second, 2*time.Millisecond)
expectedEvent.Metadata = map[string]any{
"api_host": "http://api.honeycomb.io",
"dataset": "dataset",
"environment": "",
"enqueued_at": senders[1].Events()[1].Metadata.(map[string]any)["enqueued_at"],
}
assert.Equal(t, expectedEvent, senders[1].Events()[1])
}

var (
Expand Down Expand Up @@ -851,7 +905,8 @@ func BenchmarkTraces(b *testing.B) {
W: io.Discard,
},
}
cfg := defaultConfig(11000)
redisDB := 1
cfg := defaultConfig(11000, redisDB)
_, graph := newStartedApp(b, sender, nil, nil, cfg)

req, err := http.NewRequest(
Expand Down Expand Up @@ -954,7 +1009,8 @@ func BenchmarkDistributedTraces(b *testing.B) {
ID: peerList[i],
}

cfg := defaultConfig(basePort)
redisDB := 2 + i
cfg := defaultConfig(basePort, redisDB)
apps[i], graph = newStartedApp(b, sender, nil, peers, cfg)
defer startstop.Stop(graph.Objects(), nil)

Expand Down
4 changes: 1 addition & 3 deletions collect/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,9 @@ func (d *DefaultInMemCache) TakeExpiredTraces(now time.Time) []*types.Trace {
d.Metrics.Histogram("collect_cache_entries", float64(len(d.cache)))

var res []*types.Trace
for i, t := range d.traceBuffer {
for _, t := range d.traceBuffer {
if t != nil && now.After(t.SendBy) {
res = append(res, t)
d.traceBuffer[i] = nil
delete(d.cache, t.TraceID)
}
}
return res
Expand Down
10 changes: 1 addition & 9 deletions collect/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,6 @@ func TestTakeExpiredTraces(t *testing.T) {
assert.Equal(t, traces[0], expired[0])
assert.Equal(t, traces[1], expired[1])
assert.Equal(t, traces[3], expired[2])

assert.Equal(t, 1, len(c.cache))

all := c.GetAll()
assert.Equal(t, 1, len(all))
for i := range all {
assert.Equal(t, traces[2], all[i])
}
}

func TestRemoveSentTraces(t *testing.T) {
Expand Down Expand Up @@ -123,7 +115,7 @@ func TestSkipOldUnsentTraces(t *testing.T) {
assert.Equal(t, traces[0], expired[0])
assert.Equal(t, traces[2], expired[1])

assert.Equal(t, 2, len(c.cache))
c.RemoveTraces(generics.NewSet(expired[0].TraceID, expired[1].TraceID))

// fill up those slots now, which requires skipping over the old traces
newTraces := []*types.Trace{
Expand Down
Loading
Loading