Skip to content

Commit

Permalink
Implement Consumers Pause (#432)
Browse files Browse the repository at this point in the history
* Implement consumer pause/resume

Signed-off-by: Maurice van Veen <[email protected]>

* Add (Pause/Resume)Consumer to MockJsContext

Signed-off-by: Maurice van Veen <[email protected]>

* Add test for TimeSpan? in TimeSpanJsonTests.cs

Signed-off-by: Maurice van Veen <[email protected]>

* Rename to IsPaused & make ConsumerPauseResponse immutable

Signed-off-by: Maurice van Veen <[email protected]>

* Test fixes

---------

Signed-off-by: Maurice van Veen <[email protected]>
Co-authored-by: Ziya Suzen <[email protected]>
  • Loading branch information
MauriceVanVeen and mtmk authored Mar 11, 2024
1 parent 271210f commit 1c11db3
Show file tree
Hide file tree
Showing 12 changed files with 273 additions and 0 deletions.
27 changes: 27 additions & 0 deletions src/NATS.Client.JetStream/INatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,33 @@ IAsyncEnumerable<string> ListConsumerNamesAsync(
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
ValueTask<bool> DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default);

/// <summary>
/// Pause a consumer.
/// </summary>
/// <param name="stream">Stream name where consumer is associated to.</param>
/// <param name="consumer">Consumer name to be paused.</param>
/// <param name="pauseUntil">Until when the consumer should be paused.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Result of pausing the consumer.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
ValueTask<ConsumerPauseResponse> PauseConsumerAsync(string stream, string consumer, DateTimeOffset pauseUntil, CancellationToken cancellationToken = default);

/// <summary>
/// Resume a (paused) consumer.
/// </summary>
/// <param name="stream">Stream name where consumer is associated to.</param>
/// <param name="consumer">Consumer name to be resumed.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Result of resuming the (paused) consumer.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
ValueTask<bool> ResumeConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default);

/// <summary>
/// Calls JetStream Account Info API.
/// </summary>
Expand Down
29 changes: 29 additions & 0 deletions src/NATS.Client.JetStream/Internal/NatsJSJsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ internal static class NatsJSJsonSerializer<T>
[JsonSerializable(typeof(ConsumerListResponse))]
[JsonSerializable(typeof(ConsumerNamesRequest))]
[JsonSerializable(typeof(ConsumerNamesResponse))]
[JsonSerializable(typeof(ConsumerPauseRequest))]
[JsonSerializable(typeof(ConsumerPauseResponse))]
[JsonSerializable(typeof(ErrorResponse))]
[JsonSerializable(typeof(ExternalStreamSource))]
[JsonSerializable(typeof(IterableRequest))]
Expand Down Expand Up @@ -335,3 +337,30 @@ public override TimeSpan Read(ref Utf8JsonReader reader, Type typeToConvert, Jso
public override void Write(Utf8JsonWriter writer, TimeSpan value, JsonSerializerOptions options) =>
writer.WriteNumberValue((long)(value.TotalMilliseconds * 1_000_000L));
}

internal class NatsJSJsonNullableNanosecondsConverter : JsonConverter<TimeSpan?>
{
private readonly NatsJSJsonNanosecondsConverter _converter = new();

public override TimeSpan? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType == JsonTokenType.Null)
{
return null;
}

return _converter.Read(ref reader, typeToConvert, options);
}

public override void Write(Utf8JsonWriter writer, TimeSpan? value, JsonSerializerOptions options)
{
if (value == null)
{
writer.WriteNullValue();
}
else
{
_converter.Write(writer, value.Value, options);
}
}
}
7 changes: 7 additions & 0 deletions src/NATS.Client.JetStream/Models/ConsumerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ public ConsumerConfig(string name)
[System.ComponentModel.DataAnnotations.Range(long.MinValue, long.MaxValue)]
public long NumReplicas { get; set; }

/// <summary>
/// If the consumer is paused, this contains until which time it is paused.
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("pause_until")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
public DateTimeOffset? PauseUntil { get; set; }

/// <summary>
/// Force the consumer state to be kept in memory rather than inherit the setting from the stream
/// </summary>
Expand Down
17 changes: 17 additions & 0 deletions src/NATS.Client.JetStream/Models/ConsumerInfo.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using NATS.Client.JetStream.Internal;

namespace NATS.Client.JetStream.Models;

public record ConsumerInfo
Expand Down Expand Up @@ -97,6 +99,21 @@ public record ConsumerInfo
[System.ComponentModel.DataAnnotations.Range(ulong.MinValue, ulong.MaxValue)]
public ulong NumPending { get; set; }

/// <summary>
/// Whether the consumer is paused.
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("paused")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
public bool IsPaused { get; set; }

/// <summary>
/// If the consumer is <see cref="IsPaused"/>, this contains how much time is remaining until this consumer is unpaused.
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("pause_remaining")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
[System.Text.Json.Serialization.JsonConverter(typeof(NatsJSJsonNullableNanosecondsConverter))]
public TimeSpan? PauseRemaining { get; set; }

[System.Text.Json.Serialization.JsonPropertyName("cluster")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
public ClusterInfo? Cluster { get; set; }
Expand Down
11 changes: 11 additions & 0 deletions src/NATS.Client.JetStream/Models/ConsumerPauseRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace NATS.Client.JetStream.Models;

/// <summary>
/// A request to the JetStream $JS.API.CONSUMER.PAUSE API
/// </summary>
internal record ConsumerPauseRequest
{
[System.Text.Json.Serialization.JsonPropertyName("pause_until")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
public DateTimeOffset? PauseUntil { get; set; }
}
34 changes: 34 additions & 0 deletions src/NATS.Client.JetStream/Models/ConsumerPauseResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using NATS.Client.JetStream.Internal;

namespace NATS.Client.JetStream.Models;

/// <summary>
/// A response from the JetStream $JS.API.CONSUMER.PAUSE API
/// </summary>
public record ConsumerPauseResponse
{
[System.Text.Json.Serialization.JsonPropertyName("paused")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)]
#if NET6_0
public bool IsPaused { get; set; }
#else
public bool IsPaused { get; init; }
#endif

[System.Text.Json.Serialization.JsonPropertyName("pause_until")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
#if NET6_0
public DateTimeOffset PauseUntil { get; set; }
#else
public DateTimeOffset PauseUntil { get; init; }
#endif

[System.Text.Json.Serialization.JsonPropertyName("pause_remaining")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
[System.Text.Json.Serialization.JsonConverter(typeof(NatsJSJsonNullableNanosecondsConverter))]
#if NET6_0
public TimeSpan? PauseRemaining { get; set; }
#else
public TimeSpan? PauseRemaining { get; init; }
#endif
}
43 changes: 43 additions & 0 deletions src/NATS.Client.JetStream/NatsJSContext.Consumers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,49 @@ public async ValueTask<bool> DeleteConsumerAsync(string stream, string consumer,
return response.Success;
}

/// <summary>
/// Pause a consumer.
/// </summary>
/// <param name="stream">Stream name where consumer is associated to.</param>
/// <param name="consumer">Consumer name to be paused.</param>
/// <param name="pauseUntil">Until when the consumer should be paused.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Result of pausing the consumer.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
public async ValueTask<ConsumerPauseResponse> PauseConsumerAsync(string stream, string consumer, DateTimeOffset pauseUntil, CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(stream);
var response = await JSRequestResponseAsync<ConsumerPauseRequest, ConsumerPauseResponse>(
subject: $"{Opts.Prefix}.CONSUMER.PAUSE.{stream}.{consumer}",
request: new ConsumerPauseRequest { PauseUntil = pauseUntil },
cancellationToken);
return response;
}

/// <summary>
/// Resume a (paused) consumer.
/// </summary>
/// <param name="stream">Stream name where consumer is associated to.</param>
/// <param name="consumer">Consumer name to be resumed.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Result of resuming the (paused) consumer.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
public async ValueTask<bool> ResumeConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(stream);
var response = await JSRequestResponseAsync<object, ConsumerPauseResponse>(
subject: $"{Opts.Prefix}.CONSUMER.PAUSE.{stream}.{consumer}",
request: null,
cancellationToken);
return !response.IsPaused;
}

internal ValueTask<ConsumerInfo> CreateOrderedConsumerInternalAsync(
string stream,
NatsJSOrderedConsumerOpts opts,
Expand Down
31 changes: 31 additions & 0 deletions tests/NATS.Client.JetStream.Tests/ConsumerSetupTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,35 @@ await js.CreateOrUpdateConsumerAsync(
Assert.Equal("i1", config.DeliverSubject);
Assert.Equal("q1", config.DeliverGroup);
}

[SkipIfNatsServer(versionEarlierThan: "2.11")]
public async Task Create_paused_consumer()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContextFactory().CreateContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token);

var pauseUntil = DateTimeOffset.Now.AddHours(1);

await js.CreateOrUpdateConsumerAsync(
stream: "s1",
config: new ConsumerConfig
{
Name = "c1",
PauseUntil = pauseUntil,
},
cancellationToken: cts.Token);

var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token);

var info = consumer.Info;
Assert.True(info.IsPaused);

var config = info.Config;
Assert.Equal(pauseUntil, config.PauseUntil);
}
}
37 changes: 37 additions & 0 deletions tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,41 @@ public async Task List_delete_consumer()
Assert.DoesNotContain(list, c => c.Info.Config.Name == "c1");
}
}

[SkipIfNatsServer(versionEarlierThan: "2.11")]
public async Task Pause_resume_consumer()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContextFactory().CreateContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

await js.CreateStreamAsync(new StreamConfig("s1", new[] { "s1.*" }), cts.Token);
await js.CreateOrUpdateConsumerAsync("s1", new ConsumerConfig("c1"), cts.Token);

var pauseUntil = DateTimeOffset.Now.AddHours(1);

// Pause
{
var consumerPauseResponse = await js.PauseConsumerAsync("s1", "c1", pauseUntil, cts.Token);

Assert.True(consumerPauseResponse.IsPaused);
Assert.Equal(pauseUntil, consumerPauseResponse.PauseUntil);

var consumerInfo = await js.GetConsumerAsync("s1", "c1", cts.Token);
Assert.True(consumerInfo.Info.IsPaused);
Assert.Equal(pauseUntil, consumerInfo.Info.Config.PauseUntil);
}

// Resume
{
var isResumed = await js.ResumeConsumerAsync("s1", "c1", cts.Token);
Assert.True(isResumed);

var consumerInfo = await js.GetConsumerAsync("s1", "c1", cts.Token);
Assert.False(consumerInfo.Info.IsPaused);
Assert.Null(consumerInfo.Info.Config.PauseUntil);
}
}
}
29 changes: 29 additions & 0 deletions tests/NATS.Client.JetStream.Tests/TimeSpanJsonTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,33 @@ public void StreamSourceInfoActive_test(string value, string expected)
Assert.NotNull(result);
Assert.Equal(time, result.Active);
}

[Theory]
[InlineData(null, "\"pause_remaining\":null\\b")]
[InlineData("00:00:00.001", "\"pause_remaining\":1000000\\b")]
[InlineData("00:00:01.000", "\"pause_remaining\":1000000000\\b")]
[InlineData("00:00:01.234", "\"pause_remaining\":1234000000\\b")]
public void ConsumerInfoPauseRemaining_test(string? value, string expected)
{
TimeSpan? time = value != null ? TimeSpan.Parse(value) : null;
var serializer = NatsJSJsonSerializer<ConsumerInfo>.Default;

var bw = new NatsBufferWriter<byte>();
serializer.Serialize(bw, new ConsumerInfo { StreamName = "test", Name = "test", PauseRemaining = time });

var json = Encoding.UTF8.GetString(bw.WrittenSpan);
if (value != null)
{
Assert.Matches(expected, json);
}
else
{
// PauseRemaining should not be serialized, if the value is null.
Assert.DoesNotMatch(expected, "pause_remaining");
}

var result = serializer.Deserialize(new ReadOnlySequence<byte>(bw.WrittenMemory));
Assert.NotNull(result);
Assert.Equal(time, result.PauseRemaining);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public class MockJsContext : INatsJSContext

public ValueTask<bool> DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) => throw new NotImplementedException();

public ValueTask<ConsumerPauseResponse> PauseConsumerAsync(string stream, string consumer, DateTimeOffset pauseUntil, CancellationToken cancellationToken = default) => throw new NotImplementedException();

public ValueTask<bool> ResumeConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) => throw new NotImplementedException();

public ValueTask<AccountInfoResponse> GetAccountInfoAsync(CancellationToken cancellationToken = default) => throw new NotImplementedException();

public ValueTask<PubAckResponse> PublishAsync<T>(string subject, T? data, INatsSerialize<T>? serializer = default, NatsJSPubOpts? opts = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) => throw new NotImplementedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public class MockJsContext : INatsJSContext

public ValueTask<bool> DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) => throw new NotImplementedException();

public ValueTask<ConsumerPauseResponse> PauseConsumerAsync(string stream, string consumer, DateTimeOffset pauseUntil, CancellationToken cancellationToken = default) => throw new NotImplementedException();

public ValueTask<bool> ResumeConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) => throw new NotImplementedException();

public ValueTask<AccountInfoResponse> GetAccountInfoAsync(CancellationToken cancellationToken = default) => throw new NotImplementedException();

public ValueTask<PubAckResponse> PublishAsync<T>(string subject, T? data, INatsSerialize<T>? serializer = default, NatsJSPubOpts? opts = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) => throw new NotImplementedException();
Expand Down

0 comments on commit 1c11db3

Please sign in to comment.