diff --git a/src/NATS.Client.JetStream/INatsJSContext.cs b/src/NATS.Client.JetStream/INatsJSContext.cs index a7aa77525..a4148314f 100644 --- a/src/NATS.Client.JetStream/INatsJSContext.cs +++ b/src/NATS.Client.JetStream/INatsJSContext.cs @@ -90,6 +90,33 @@ IAsyncEnumerable ListConsumerNamesAsync( /// The name is null. ValueTask DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default); + /// + /// Pause a consumer. + /// + /// Stream name where consumer is associated to. + /// Consumer name to be paused. + /// Until when the consumer should be paused. + /// A used to cancel the API call. + /// Result of pausing the consumer. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// The name is invalid. + /// The name is null. + ValueTask PauseConsumerAsync(string stream, string consumer, DateTimeOffset pauseUntil, CancellationToken cancellationToken = default); + + /// + /// Resume a (paused) consumer. + /// + /// Stream name where consumer is associated to. + /// Consumer name to be resumed. + /// A used to cancel the API call. + /// Result of resuming the (paused) consumer. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// The name is invalid. + /// The name is null. + ValueTask ResumeConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default); + /// /// Calls JetStream Account Info API. /// diff --git a/src/NATS.Client.JetStream/Internal/NatsJSJsonSerializer.cs b/src/NATS.Client.JetStream/Internal/NatsJSJsonSerializer.cs index 54fe79ab6..0cb0a0857 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSJsonSerializer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSJsonSerializer.cs @@ -45,6 +45,8 @@ internal static class NatsJSJsonSerializer [JsonSerializable(typeof(ConsumerListResponse))] [JsonSerializable(typeof(ConsumerNamesRequest))] [JsonSerializable(typeof(ConsumerNamesResponse))] +[JsonSerializable(typeof(ConsumerPauseRequest))] +[JsonSerializable(typeof(ConsumerPauseResponse))] [JsonSerializable(typeof(ErrorResponse))] [JsonSerializable(typeof(ExternalStreamSource))] [JsonSerializable(typeof(IterableRequest))] @@ -335,3 +337,30 @@ public override TimeSpan Read(ref Utf8JsonReader reader, Type typeToConvert, Jso public override void Write(Utf8JsonWriter writer, TimeSpan value, JsonSerializerOptions options) => writer.WriteNumberValue((long)(value.TotalMilliseconds * 1_000_000L)); } + +internal class NatsJSJsonNullableNanosecondsConverter : JsonConverter +{ + private readonly NatsJSJsonNanosecondsConverter _converter = new(); + + public override TimeSpan? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType == JsonTokenType.Null) + { + return null; + } + + return _converter.Read(ref reader, typeToConvert, options); + } + + public override void Write(Utf8JsonWriter writer, TimeSpan? value, JsonSerializerOptions options) + { + if (value == null) + { + writer.WriteNullValue(); + } + else + { + _converter.Write(writer, value.Value, options); + } + } +} diff --git a/src/NATS.Client.JetStream/Models/ConsumerConfig.cs b/src/NATS.Client.JetStream/Models/ConsumerConfig.cs index 1656ef9a7..7c64e0cbd 100644 --- a/src/NATS.Client.JetStream/Models/ConsumerConfig.cs +++ b/src/NATS.Client.JetStream/Models/ConsumerConfig.cs @@ -234,6 +234,13 @@ public ConsumerConfig(string name) [System.ComponentModel.DataAnnotations.Range(long.MinValue, long.MaxValue)] public long NumReplicas { get; set; } + /// + /// If the consumer is paused, this contains until which time it is paused. + /// + [System.Text.Json.Serialization.JsonPropertyName("pause_until")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] + public DateTimeOffset? PauseUntil { get; set; } + /// /// Force the consumer state to be kept in memory rather than inherit the setting from the stream /// diff --git a/src/NATS.Client.JetStream/Models/ConsumerInfo.cs b/src/NATS.Client.JetStream/Models/ConsumerInfo.cs index cb2c3ce0a..e3c342f3c 100644 --- a/src/NATS.Client.JetStream/Models/ConsumerInfo.cs +++ b/src/NATS.Client.JetStream/Models/ConsumerInfo.cs @@ -1,3 +1,5 @@ +using NATS.Client.JetStream.Internal; + namespace NATS.Client.JetStream.Models; public record ConsumerInfo @@ -97,6 +99,21 @@ public record ConsumerInfo [System.ComponentModel.DataAnnotations.Range(ulong.MinValue, ulong.MaxValue)] public ulong NumPending { get; set; } + /// + /// Whether the consumer is paused. + /// + [System.Text.Json.Serialization.JsonPropertyName("paused")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] + public bool IsPaused { get; set; } + + /// + /// If the consumer is , this contains how much time is remaining until this consumer is unpaused. + /// + [System.Text.Json.Serialization.JsonPropertyName("pause_remaining")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] + [System.Text.Json.Serialization.JsonConverter(typeof(NatsJSJsonNullableNanosecondsConverter))] + public TimeSpan? PauseRemaining { get; set; } + [System.Text.Json.Serialization.JsonPropertyName("cluster")] [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] public ClusterInfo? Cluster { get; set; } diff --git a/src/NATS.Client.JetStream/Models/ConsumerPauseRequest.cs b/src/NATS.Client.JetStream/Models/ConsumerPauseRequest.cs new file mode 100644 index 000000000..44da8b082 --- /dev/null +++ b/src/NATS.Client.JetStream/Models/ConsumerPauseRequest.cs @@ -0,0 +1,11 @@ +namespace NATS.Client.JetStream.Models; + +/// +/// A request to the JetStream $JS.API.CONSUMER.PAUSE API +/// +internal record ConsumerPauseRequest +{ + [System.Text.Json.Serialization.JsonPropertyName("pause_until")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] + public DateTimeOffset? PauseUntil { get; set; } +} diff --git a/src/NATS.Client.JetStream/Models/ConsumerPauseResponse.cs b/src/NATS.Client.JetStream/Models/ConsumerPauseResponse.cs new file mode 100644 index 000000000..9f42ffea0 --- /dev/null +++ b/src/NATS.Client.JetStream/Models/ConsumerPauseResponse.cs @@ -0,0 +1,34 @@ +using NATS.Client.JetStream.Internal; + +namespace NATS.Client.JetStream.Models; + +/// +/// A response from the JetStream $JS.API.CONSUMER.PAUSE API +/// +public record ConsumerPauseResponse +{ + [System.Text.Json.Serialization.JsonPropertyName("paused")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)] +#if NET6_0 + public bool IsPaused { get; set; } +#else + public bool IsPaused { get; init; } +#endif + + [System.Text.Json.Serialization.JsonPropertyName("pause_until")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] +#if NET6_0 + public DateTimeOffset PauseUntil { get; set; } +#else + public DateTimeOffset PauseUntil { get; init; } +#endif + + [System.Text.Json.Serialization.JsonPropertyName("pause_remaining")] + [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] + [System.Text.Json.Serialization.JsonConverter(typeof(NatsJSJsonNullableNanosecondsConverter))] +#if NET6_0 + public TimeSpan? PauseRemaining { get; set; } +#else + public TimeSpan? PauseRemaining { get; init; } +#endif +} diff --git a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs index e361cd5c8..4ba602e81 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs @@ -152,6 +152,49 @@ public async ValueTask DeleteConsumerAsync(string stream, string consumer, return response.Success; } + /// + /// Pause a consumer. + /// + /// Stream name where consumer is associated to. + /// Consumer name to be paused. + /// Until when the consumer should be paused. + /// A used to cancel the API call. + /// Result of pausing the consumer. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// The name is invalid. + /// The name is null. + public async ValueTask PauseConsumerAsync(string stream, string consumer, DateTimeOffset pauseUntil, CancellationToken cancellationToken = default) + { + ThrowIfInvalidStreamName(stream); + var response = await JSRequestResponseAsync( + subject: $"{Opts.Prefix}.CONSUMER.PAUSE.{stream}.{consumer}", + request: new ConsumerPauseRequest { PauseUntil = pauseUntil }, + cancellationToken); + return response; + } + + /// + /// Resume a (paused) consumer. + /// + /// Stream name where consumer is associated to. + /// Consumer name to be resumed. + /// A used to cancel the API call. + /// Result of resuming the (paused) consumer. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// The name is invalid. + /// The name is null. + public async ValueTask ResumeConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) + { + ThrowIfInvalidStreamName(stream); + var response = await JSRequestResponseAsync( + subject: $"{Opts.Prefix}.CONSUMER.PAUSE.{stream}.{consumer}", + request: null, + cancellationToken); + return !response.IsPaused; + } + internal ValueTask CreateOrderedConsumerInternalAsync( string stream, NatsJSOrderedConsumerOpts opts, diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerSetupTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerSetupTest.cs index 4b16372e2..395857db8 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerSetupTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerSetupTest.cs @@ -36,4 +36,35 @@ await js.CreateOrUpdateConsumerAsync( Assert.Equal("i1", config.DeliverSubject); Assert.Equal("q1", config.DeliverGroup); } + + [SkipIfNatsServer(versionEarlierThan: "2.11")] + public async Task Create_paused_consumer() + { + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContextFactory().CreateContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token); + + var pauseUntil = DateTimeOffset.Now.AddHours(1); + + await js.CreateOrUpdateConsumerAsync( + stream: "s1", + config: new ConsumerConfig + { + Name = "c1", + PauseUntil = pauseUntil, + }, + cancellationToken: cts.Token); + + var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); + + var info = consumer.Info; + Assert.True(info.IsPaused); + + var config = info.Config; + Assert.Equal(pauseUntil, config.PauseUntil); + } } diff --git a/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs b/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs index 68c74a5e1..c29a2f3c7 100644 --- a/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs @@ -81,4 +81,41 @@ public async Task List_delete_consumer() Assert.DoesNotContain(list, c => c.Info.Config.Name == "c1"); } } + + [SkipIfNatsServer(versionEarlierThan: "2.11")] + public async Task Pause_resume_consumer() + { + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContextFactory().CreateContext(nats); + + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token); + await js.CreateOrUpdateConsumerAsync("s1", new ConsumerConfig("c1"), cts.Token); + + var pauseUntil = DateTimeOffset.Now.AddHours(1); + + // Pause + { + var consumerPauseResponse = await js.PauseConsumerAsync("s1", "c1", pauseUntil, cts.Token); + + Assert.True(consumerPauseResponse.IsPaused); + Assert.Equal(pauseUntil, consumerPauseResponse.PauseUntil); + + var consumerInfo = await js.GetConsumerAsync("s1", "c1", cts.Token); + Assert.True(consumerInfo.Info.IsPaused); + Assert.Equal(pauseUntil, consumerInfo.Info.Config.PauseUntil); + } + + // Resume + { + var isResumed = await js.ResumeConsumerAsync("s1", "c1", cts.Token); + Assert.True(isResumed); + + var consumerInfo = await js.GetConsumerAsync("s1", "c1", cts.Token); + Assert.False(consumerInfo.Info.IsPaused); + Assert.Null(consumerInfo.Info.Config.PauseUntil); + } + } } diff --git a/tests/NATS.Client.JetStream.Tests/TimeSpanJsonTests.cs b/tests/NATS.Client.JetStream.Tests/TimeSpanJsonTests.cs index d0abcc10c..08a927a2f 100644 --- a/tests/NATS.Client.JetStream.Tests/TimeSpanJsonTests.cs +++ b/tests/NATS.Client.JetStream.Tests/TimeSpanJsonTests.cs @@ -206,4 +206,33 @@ public void StreamSourceInfoActive_test(string value, string expected) Assert.NotNull(result); Assert.Equal(time, result.Active); } + + [Theory] + [InlineData(null, "\"pause_remaining\":null\\b")] + [InlineData("00:00:00.001", "\"pause_remaining\":1000000\\b")] + [InlineData("00:00:01.000", "\"pause_remaining\":1000000000\\b")] + [InlineData("00:00:01.234", "\"pause_remaining\":1234000000\\b")] + public void ConsumerInfoPauseRemaining_test(string? value, string expected) + { + TimeSpan? time = value != null ? TimeSpan.Parse(value) : null; + var serializer = NatsJSJsonSerializer.Default; + + var bw = new NatsBufferWriter(); + serializer.Serialize(bw, new ConsumerInfo { StreamName = "test", Name = "test", PauseRemaining = time }); + + var json = Encoding.UTF8.GetString(bw.WrittenSpan); + if (value != null) + { + Assert.Matches(expected, json); + } + else + { + // PauseRemaining should not be serialized, if the value is null. + Assert.DoesNotMatch(expected, "pause_remaining"); + } + + var result = serializer.Deserialize(new ReadOnlySequence(bw.WrittenMemory)); + Assert.NotNull(result); + Assert.Equal(time, result.PauseRemaining); + } } diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs index 6b173149e..2de27b095 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs @@ -60,6 +60,10 @@ public class MockJsContext : INatsJSContext public ValueTask DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + public ValueTask PauseConsumerAsync(string stream, string consumer, DateTimeOffset pauseUntil, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + + public ValueTask ResumeConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + public ValueTask GetAccountInfoAsync(CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ValueTask PublishAsync(string subject, T? data, INatsSerialize? serializer = default, NatsJSPubOpts? opts = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); diff --git a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs index fc4d43ea8..44a3f0ead 100644 --- a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs @@ -60,6 +60,10 @@ public class MockJsContext : INatsJSContext public ValueTask DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + public ValueTask PauseConsumerAsync(string stream, string consumer, DateTimeOffset pauseUntil, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + + public ValueTask ResumeConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + public ValueTask GetAccountInfoAsync(CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ValueTask PublishAsync(string subject, T? data, INatsSerialize? serializer = default, NatsJSPubOpts? opts = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) => throw new NotImplementedException();