Skip to content

Commit

Permalink
Svc info subs do not use queue group (#308)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk authored Jan 5, 2024
1 parent b432a2f commit 8e2fbbd
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 3 deletions.
4 changes: 2 additions & 2 deletions src/NATS.Client.Services/Internal/SvcListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ internal class SvcListener : IAsyncDisposable
private readonly Channel<SvcMsg> _channel;
private readonly SvcMsgType _type;
private readonly string _subject;
private readonly string _queueGroup;
private readonly string? _queueGroup;
private readonly CancellationToken _cancellationToken;
private Task? _readLoop;
private CancellationTokenSource? _cts;

public SvcListener(NatsConnection nats, Channel<SvcMsg> channel, SvcMsgType type, string subject, string queueGroup, CancellationToken cancellationToken)
public SvcListener(NatsConnection nats, Channel<SvcMsg> channel, SvcMsgType type, string subject, string? queueGroup, CancellationToken cancellationToken)
{
_nats = nats;
_channel = channel;
Expand Down
3 changes: 2 additions & 1 deletion src/NATS.Client.Services/NatsSvcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ internal async ValueTask StartAsync()
var type = svcType.ToString().ToUpper();
foreach (var subject in new[] { $"$SRV.{type}", $"$SRV.{type}.{name}", $"$SRV.{type}.{name}.{_id}" })
{
var svcListener = new SvcListener(_nats, _channel, svcType, subject, _config.QueueGroup, _cancellationToken);
// for discovery subjects do not use a queue group
var svcListener = new SvcListener(_nats, _channel, svcType, subject, default, _cancellationToken);
await svcListener.StartAsync();
_svcListeners.Add(svcListener);
}
Expand Down
33 changes: 33 additions & 0 deletions tests/NATS.Client.Services.Tests/ServicesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,37 @@ await s2.AddEndpointAsync<int>(
Assert.Equal("s2baz", eps.Data["ep_name"]?.GetValue<string>());
}
}

[Fact]
public async Task Add_multiple_service_listeners_ping_info_and_stats()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();
var svc = new NatsSvcContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

await using var s1 = await svc.AddServiceAsync("s1", "1.0.0", cancellationToken: cancellationToken);
await using var s2 = await svc.AddServiceAsync("s2", "2.0.0", cancellationToken: cancellationToken);

var pingsTask = nats.FindServicesAsync("$SRV.PING", 2, NatsSrvJsonSerializer<PingResponse>.Default, cancellationToken);
var infosTask = nats.FindServicesAsync("$SRV.INFO", 2, NatsSrvJsonSerializer<InfoResponse>.Default, cancellationToken);
var statsTask = nats.FindServicesAsync("$SRV.STATS", 2, NatsSrvJsonSerializer<StatsResponse>.Default, cancellationToken);

var pings = await pingsTask;
Assert.Equal(2, pings.Count);
Assert.Equal("1.0.0", pings.First(s => s.Name == "s1").Version);
Assert.Equal("2.0.0", pings.First(s => s.Name == "s2").Version);

var infos = await infosTask;
Assert.Equal(2, infos.Count);
Assert.Equal("1.0.0", infos.First(s => s.Name == "s1").Version);
Assert.Equal("2.0.0", infos.First(s => s.Name == "s2").Version);

var stats = await statsTask;
Assert.Equal(2, stats.Count);
Assert.Equal("1.0.0", stats.First(s => s.Name == "s1").Version);
Assert.Equal("2.0.0", stats.First(s => s.Name == "s2").Version);
}
}

0 comments on commit 8e2fbbd

Please sign in to comment.