Skip to content

Commit

Permalink
Fixes for OpenTelemetry Context Propagation (#1454)
Browse files Browse the repository at this point in the history
1. Fixes an issue that caused context propagation to fail on message
retry (#1452)
Before, BeforeSubscribeInvoke removed the context so it was no longer
available on retry.

It also removes the _contexts dictionary, as during Subscribe this
handling didn't work during retry and during Publish, the dictionary
value was no longer used.

I have tested this with both aspnetcore services and console subscriber
that publishes during the subscribe and context propagation was working
fine. Thus #1100 is also closed, probably already in #1407.

Co-authored-by: Benjamin Schwehn <[email protected]>
  • Loading branch information
bschwehn and Benjamin Schwehn authored Dec 9, 2023
1 parent aa6091d commit 8c1c104
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 19 deletions.
36 changes: 35 additions & 1 deletion docs/content/user-guide/en/monitoring/opentelemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,40 @@ services.AddOpenTelemetryTracing((builder) => builder
);
```

If you don't use a framework that does this automatically for you (like aspnetcore), make sure you enable a listener, for example:

```C#
ActivitySource.AddActivityListener(new ActivityListener()
{
ShouldListenTo = _ => true,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
ActivityStarted = activity => Console.WriteLine($"{activity.ParentId}:{activity.Id} - Start"),
ActivityStopped = activity => Console.WriteLine($"{activity.ParentId}:{activity.Id} - Stop")
});
```
Here is a diagram of CAP's tracking data in Zipkin:

<img src="/img/opentelemetry.png">
<img src="/img/opentelemetry.png">

### Context Propagation
CAP supports [Context
Propagation](https://opentelemetry.io/docs/instrumentation/js/propagation/) by
injecting `traceparent` and `baggage` headers when sending messages and
restoring the context from those headers when receiving messages.

CAP uses the configured Propagators.DefaultTextMapPropagator propagator, which
is usually set to both TraceContextPropagator and BaggagePropagator [by the
dotnet OpenTelemetry
SDK](https://github.com/open-telemetry/opentelemetry-dotnet/blob/main/src/OpenTelemetry/Sdk.cs#L21)
but can be set in your your client program. For example, to opt out of the
Baggage propagation, you can call:

```C#
OpenTelemetry.Sdk.SetDefaultTextMapPropagator(
new TraceContextPropagator());
```

See the [dotnet OpenTelemetry.Api
readme](https://github.com/open-telemetry/opentelemetry-dotnet/blob/main/src/OpenTelemetry.Api/README.md?plain=1#L455)
for more details.

45 changes: 27 additions & 18 deletions src/DotNetCore.CAP.OpenTelemetry/DiagnosticListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
Expand All @@ -23,9 +22,7 @@ internal class DiagnosticListener : IObserver<KeyValuePair<string, object?>>
private const string ProducerOperateNameSuffix = "/Publisher";
private const string ConsumerOperateNameSuffix = "/Subscriber";
private static readonly ActivitySource ActivitySource = new(SourceName, "1.0.0");
private static readonly TextMapPropagator Propagator = new TraceContextPropagator();

private readonly ConcurrentDictionary<string, ActivityContext> _contexts = new();
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;

public void OnCompleted()
{
Expand All @@ -42,10 +39,7 @@ public void OnNext(KeyValuePair<string, object?> evt)
case CapEvents.BeforePublishMessageStore:
{
var eventData = (CapEventDataPubStore)evt.Value!;
ActivityContext parentContext = default;

if (Activity.Current != null)
_contexts.TryAdd(eventData.Message.GetId(), parentContext = Activity.Current.Context);
var parentContext = Activity.Current?.Context ?? default;

var activity = ActivitySource.StartActivity("Event Persistence: " + eventData.Operation,
ActivityKind.Internal, parentContext);
Expand All @@ -55,9 +49,8 @@ public void OnNext(KeyValuePair<string, object?> evt)
activity.AddEvent(new ActivityEvent("CAP message persistence start...",
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value)));

if (parentContext != default)
if (parentContext != default && Activity.Current != null)
{
_contexts[eventData.Message.GetId()] = Activity.Current!.Context;
Propagator.Inject(new PropagationContext(Activity.Current.Context, Baggage.Current),
eventData.Message,
(msg, key, value) => { msg.Headers[key] = value; });
Expand Down Expand Up @@ -94,9 +87,14 @@ public void OnNext(KeyValuePair<string, object?> evt)
var eventData = (CapEventDataPubSend)evt.Value!;
var parentContext = Propagator.Extract(default, eventData.TransportMessage, (msg, key) =>
{
return msg.Headers.TryGetValue(key, out var value) ? (new[] { value }) : Enumerable.Empty<string>();
if (msg.Headers.TryGetValue(key, out var value) && !string.IsNullOrEmpty(value))
{
return new string[] { value };
}
return Enumerable.Empty<string>();
});
_contexts.TryRemove(eventData.TransportMessage.GetId(), out var context);

var activity = ActivitySource.StartActivity(
OperateNamePrefix + eventData.Operation + ProducerOperateNameSuffix, ActivityKind.Producer,
parentContext.ActivityContext);
Expand All @@ -105,7 +103,7 @@ public void OnNext(KeyValuePair<string, object?> evt)
activity.SetTag("messaging.system", eventData.BrokerAddress.Name);
activity.SetTag("messaging.destination", eventData.Operation);
activity.SetTag("messaging.destination_kind", "topic");
activity.SetTag("messaging.url", eventData.BrokerAddress.Endpoint.Replace("-1", "5672"));
activity.SetTag("messaging.url", eventData.BrokerAddress.Endpoint?.Replace("-1", "5672"));
activity.SetTag("messaging.message_id", eventData.TransportMessage.GetId());
activity.SetTag("messaging.message_payload_size_bytes", eventData.TransportMessage.Body.Length);

Expand Down Expand Up @@ -152,6 +150,7 @@ public void OnNext(KeyValuePair<string, object?> evt)
return Enumerable.Empty<string>();
});

Baggage.Current = parentContext.Baggage;
var activity = ActivitySource.StartActivity(
OperateNamePrefix + eventData.Operation + ConsumerOperateNameSuffix,
ActivityKind.Consumer,
Expand All @@ -162,15 +161,12 @@ public void OnNext(KeyValuePair<string, object?> evt)
activity.SetTag("messaging.system", eventData.BrokerAddress.Name);
activity.SetTag("messaging.destination", eventData.Operation);
activity.SetTag("messaging.destination_kind", "topic");
activity.SetTag("messaging.url", eventData.BrokerAddress.Endpoint.Replace("-1", "5672"));
activity.SetTag("messaging.url", eventData.BrokerAddress.Endpoint?.Replace("-1", "5672"));
activity.SetTag("messaging.message_id", eventData.TransportMessage.GetId());
activity.SetTag("messaging.message_payload_size_bytes", eventData.TransportMessage.Body.Length);

activity.AddEvent(new ActivityEvent("CAP message persistence start...",
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value)));

_contexts[eventData.TransportMessage.GetId() + eventData.TransportMessage.GetGroup()] =
activity.Context;
}
}
break;
Expand Down Expand Up @@ -201,11 +197,24 @@ public void OnNext(KeyValuePair<string, object?> evt)
break;
case CapEvents.BeforeSubscriberInvoke:
{
ActivityContext context = default;
var eventData = (CapEventDataSubExecute)evt.Value!;
_contexts.TryRemove(eventData.Message.GetId() + eventData.Message.GetGroup(), out var context);
var propagatedContext = Propagator.Extract(default, eventData.Message, (msg, key) =>
{
if (msg.Headers.TryGetValue(key, out var value)) return new[] { value };
return Enumerable.Empty<string>();
});

if (propagatedContext != default)
{
context = propagatedContext.ActivityContext;
Baggage.Current = propagatedContext.Baggage;
}

var activity = ActivitySource.StartActivity("Subscriber Invoke: " + eventData.MethodInfo!.Name,
ActivityKind.Internal,
context);

if (activity != null)
{
activity.SetTag("messaging.operation", "process");
Expand Down

0 comments on commit 8c1c104

Please sign in to comment.