Skip to content

Commit

Permalink
Added deliver_group to consumer config (#366)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk authored Jan 30, 2024
1 parent ed1dca8 commit 9b049d6
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/NATS.Client.JetStream/Models/ConsumerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public ConsumerConfig(string name)
[System.ComponentModel.DataAnnotations.StringLength(int.MaxValue, MinimumLength = 1)]
public string? DeliverSubject { get; set; }

[System.Text.Json.Serialization.JsonPropertyName("deliver_group")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
[System.ComponentModel.DataAnnotations.StringLength(int.MaxValue, MinimumLength = 1)]
public string? DeliverGroup { get; set; }

[System.Text.Json.Serialization.JsonPropertyName("ack_policy")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)]
[System.ComponentModel.DataAnnotations.Required(AllowEmptyStrings = true)]
Expand Down
39 changes: 39 additions & 0 deletions tests/NATS.Client.JetStream.Tests/ConsumerSetupTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using NATS.Client.Core.Tests;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream.Tests;

public class ConsumerSetupTest
{
[Fact]
public async Task Create_push_consumer()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);

await js.CreateOrUpdateConsumerAsync(
stream: "s1",
config: new ConsumerConfig
{
Name = "c1",
DeliverSubject = "i1",
DeliverGroup = "q1",
},
cancellationToken: cts.Token);

var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token);

var info = consumer.Info;
Assert.Equal("s1", info.StreamName);

var config = info.Config;
Assert.Equal("c1", config.Name);
Assert.Equal("i1", config.DeliverSubject);
Assert.Equal("q1", config.DeliverGroup);
}
}

0 comments on commit 9b049d6

Please sign in to comment.