Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed an exception which happens when PutAsync is used more than once and activity logging is enabled in main project #675

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,4 @@ nuget/*.unitypackage

# MacOS folder attributes
.DS_Store
/tests/NATS.Client.TestUtilities/Properties/launchSettings.json
5 changes: 2 additions & 3 deletions src/NATS.Client.ObjectStore/NatsObjStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ public class NatsObjStore : INatsObjStore
private const string NatsRollup = "Nats-Rollup";
private const string RollupSubject = "sub";

private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } };

private readonly NatsObjContext _objContext;
private readonly INatsJSStream _stream;

Expand Down Expand Up @@ -603,7 +601,8 @@ public async ValueTask DeleteAsync(string key, CancellationToken cancellationTok

private async ValueTask PublishMeta(ObjectMetadata meta, CancellationToken cancellationToken)
{
var ack = await JetStreamContext.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer<ObjectMetadata>.Default, headers: NatsRollupHeaders, cancellationToken: cancellationToken);
var natsRollupHeaders = new NatsHeaders { { NatsRollup, RollupSubject } };
var ack = await JetStreamContext.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer<ObjectMetadata>.Default, headers: natsRollupHeaders, cancellationToken: cancellationToken);
ack.EnsureSuccess();
}

Expand Down
67 changes: 67 additions & 0 deletions tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics;
using System.Security.Cryptography;
using System.Text;
using NATS.Client.Core.Tests;
Expand Down Expand Up @@ -484,4 +485,70 @@ public async Task Put_get_serialization_when_default_serializer_is_not_used()
var info = await store.GetInfoAsync("k1", cancellationToken: cancellationToken);
Assert.Equal("k1", info.Name);
}

[Fact]
public async Task Put_with_activity()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

using var activitySource = new ActivitySource($"NATS-debug-{nameof(Put_with_activity)}");
using var activityListener = new ActivityListener
{
ShouldListenTo = _ => true,
SampleUsingParentId = (ref ActivityCreationOptions<string> _) => ActivitySamplingResult.AllData,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
};
using var activity = activitySource.StartActivity(ActivityKind.Client);
ActivitySource.AddActivityListener(activityListener);

await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);
var obj = new NatsObjContext(js);

var store = await obj.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);

var data = new byte[1024];
Random.Shared.NextBytes(data);

const string filename = $"_tmp_test_file_{nameof(Put_with_activity)}.bin";
await File.WriteAllBytesAsync(filename, data, cancellationToken);

await store.PutAsync("my/random/data_1.bin", File.OpenRead(filename), cancellationToken: cancellationToken);
}

[Fact]
public async Task Put_multiple_times_with_activity()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

using var activitySource = new ActivitySource($"NATS-debug-{nameof(Put_multiple_times_with_activity)}");
using var activityListener = new ActivityListener
{
ShouldListenTo = _ => true,
SampleUsingParentId = (ref ActivityCreationOptions<string> _) => ActivitySamplingResult.AllData,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
};
using var activity = activitySource.StartActivity(ActivityKind.Client);
ActivitySource.AddActivityListener(activityListener);

await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);
var obj = new NatsObjContext(js);

var store = await obj.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);

var data = new byte[1024];
Random.Shared.NextBytes(data);

const string filename = $"_tmp_test_file_{nameof(Put_multiple_times_with_activity)}.bin";
await File.WriteAllBytesAsync(filename, data, cancellationToken);

await store.PutAsync("my/random/data_1.bin", File.OpenRead(filename), cancellationToken: cancellationToken);
await store.PutAsync("my/random/data_2.bin", File.OpenRead(filename), cancellationToken: cancellationToken);
await store.PutAsync("my/random/data_3.bin", File.OpenRead(filename), cancellationToken: cancellationToken);
}
}