diff --git a/.gitignore b/.gitignore index 1d23e55fa..9ceba8657 100644 --- a/.gitignore +++ b/.gitignore @@ -119,3 +119,4 @@ nuget/*.unitypackage # MacOS folder attributes .DS_Store +/tests/NATS.Client.TestUtilities/Properties/launchSettings.json diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index 7fdc28f67..f20dfefcd 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -25,8 +25,6 @@ public class NatsObjStore : INatsObjStore private const string NatsRollup = "Nats-Rollup"; private const string RollupSubject = "sub"; - private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } }; - private readonly NatsObjContext _objContext; private readonly INatsJSStream _stream; @@ -603,7 +601,8 @@ public async ValueTask DeleteAsync(string key, CancellationToken cancellationTok private async ValueTask PublishMeta(ObjectMetadata meta, CancellationToken cancellationToken) { - var ack = await JetStreamContext.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer.Default, headers: NatsRollupHeaders, cancellationToken: cancellationToken); + var natsRollupHeaders = new NatsHeaders { { NatsRollup, RollupSubject } }; + var ack = await JetStreamContext.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer.Default, headers: natsRollupHeaders, cancellationToken: cancellationToken); ack.EnsureSuccess(); } diff --git a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs index 45a9c74f1..aa0b686e0 100644 --- a/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using System.Security.Cryptography; using System.Text; using NATS.Client.Core.Tests; @@ -484,4 +485,70 @@ public async Task Put_get_serialization_when_default_serializer_is_not_used() var info = await store.GetInfoAsync("k1", cancellationToken: cancellationToken); Assert.Equal("k1", info.Name); } + + [Fact] + public async Task Put_with_activity() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + + using var activitySource = new ActivitySource($"NATS-debug-{nameof(Put_with_activity)}"); + using var activityListener = new ActivityListener + { + ShouldListenTo = _ => true, + SampleUsingParentId = (ref ActivityCreationOptions _) => ActivitySamplingResult.AllData, + Sample = (ref ActivityCreationOptions _) => ActivitySamplingResult.AllData, + }; + using var activity = activitySource.StartActivity(ActivityKind.Client); + ActivitySource.AddActivityListener(activityListener); + + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + var obj = new NatsObjContext(js); + + var store = await obj.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken); + + var data = new byte[1024]; + Random.Shared.NextBytes(data); + + const string filename = $"_tmp_test_file_{nameof(Put_with_activity)}.bin"; + await File.WriteAllBytesAsync(filename, data, cancellationToken); + + await store.PutAsync("my/random/data_1.bin", File.OpenRead(filename), cancellationToken: cancellationToken); + } + + [Fact] + public async Task Put_multiple_times_with_activity() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var cancellationToken = cts.Token; + + using var activitySource = new ActivitySource($"NATS-debug-{nameof(Put_multiple_times_with_activity)}"); + using var activityListener = new ActivityListener + { + ShouldListenTo = _ => true, + SampleUsingParentId = (ref ActivityCreationOptions _) => ActivitySamplingResult.AllData, + Sample = (ref ActivityCreationOptions _) => ActivitySamplingResult.AllData, + }; + using var activity = activitySource.StartActivity(ActivityKind.Client); + ActivitySource.AddActivityListener(activityListener); + + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + var obj = new NatsObjContext(js); + + var store = await obj.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken); + + var data = new byte[1024]; + Random.Shared.NextBytes(data); + + const string filename = $"_tmp_test_file_{nameof(Put_multiple_times_with_activity)}.bin"; + await File.WriteAllBytesAsync(filename, data, cancellationToken); + + await store.PutAsync("my/random/data_1.bin", File.OpenRead(filename), cancellationToken: cancellationToken); + await store.PutAsync("my/random/data_2.bin", File.OpenRead(filename), cancellationToken: cancellationToken); + await store.PutAsync("my/random/data_3.bin", File.OpenRead(filename), cancellationToken: cancellationToken); + } }