diff --git a/src/NATS.Client.Core/Internal/CancellationTimer.cs b/src/NATS.Client.Core/Internal/CancellationTimer.cs index c79e6670d..537184ec6 100644 --- a/src/NATS.Client.Core/Internal/CancellationTimer.cs +++ b/src/NATS.Client.Core/Internal/CancellationTimer.cs @@ -1,4 +1,4 @@ -namespace NATS.Client.Core.Internal; +namespace NATS.Client.Core.Internal; // Support efficiently cancellation support for connection-dispose/timeout/cancel-per-operation internal sealed class CancellationTimerPool diff --git a/src/NATS.Client.Core/Internal/DebuggingExtensions.cs b/src/NATS.Client.Core/Internal/DebuggingExtensions.cs index 2f1e40b57..4d25d6b99 100644 --- a/src/NATS.Client.Core/Internal/DebuggingExtensions.cs +++ b/src/NATS.Client.Core/Internal/DebuggingExtensions.cs @@ -1,7 +1,4 @@ -#if DEBUG - using System.Buffers; -using System.Diagnostics; using System.Text; namespace NATS.Client.Core.Internal; @@ -61,5 +58,3 @@ public static string Dump(this NatsHeaders? headers) return sb.ToString(); } } - -#endif diff --git a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs index 14c9e0e16..d9333aec7 100644 --- a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs @@ -21,13 +21,14 @@ internal sealed class NatsReadProtocolProcessor : IAsyncDisposable private readonly Task _infoParsed; // wait for an upgrade private readonly ConcurrentQueue _pingCommands; // wait for pong private readonly ILogger _logger; + private readonly bool _trace; private int _disposed; public NatsReadProtocolProcessor(ISocketConnection socketConnection, NatsConnection connection, TaskCompletionSource waitForInfoSignal, TaskCompletionSource waitForPongOrErrorSignal, Task infoParsed) { _connection = connection; _logger = connection.Options.LoggerFactory.CreateLogger(); - _logger.IsEnabled(LogLevel.Trace); + _trace = _logger.IsEnabled(LogLevel.Trace); _waitForInfoSignal = waitForInfoSignal; _waitForPongOrErrorSignal = waitForPongOrErrorSignal; _infoParsed = infoParsed; @@ -237,7 +238,19 @@ private async Task ReadLoopAsync() } var msgHeader = buffer.Slice(0, positionBeforeNatsHeader.Value); + + if (_trace) + { + _logger.LogTrace("HMSG trace dump: {MsgHeader}", msgHeader.Dump()); + } + var (subject, sid, replyTo, headersLength, totalLength) = ParseHMessageHeader(msgHeader); + + if (_trace) + { + _logger.LogTrace("HMSG trace parsed: {Subject} {Sid} {ReplyTo} {HeadersLength} {TotalLength}", subject, sid, replyTo, headersLength, totalLength); + } + var payloadLength = totalLength - headersLength; Debug.Assert(payloadLength >= 0, "Protocol error: illogical header and total lengths"); diff --git a/src/NATS.Client.Core/Internal/StringExtensions.cs b/src/NATS.Client.Core/Internal/StringExtensions.cs index 025040fd2..621e3c66b 100644 --- a/src/NATS.Client.Core/Internal/StringExtensions.cs +++ b/src/NATS.Client.Core/Internal/StringExtensions.cs @@ -1,4 +1,4 @@ -namespace NATS.Client.Core.Internal; +namespace NATS.Client.Core.Internal; internal static class StringExtensions { diff --git a/src/NATS.Client.Core/NatsHeaders.cs b/src/NATS.Client.Core/NatsHeaders.cs index ff2b4591c..7644b44fc 100644 --- a/src/NATS.Client.Core/NatsHeaders.cs +++ b/src/NATS.Client.Core/NatsHeaders.cs @@ -1,4 +1,4 @@ -using System.Collections; +using System.Collections; using System.Diagnostics.CodeAnalysis; using Microsoft.Extensions.Primitives; diff --git a/src/NATS.Client.Core/NatsMsg.cs b/src/NATS.Client.Core/NatsMsg.cs index eac2a0be3..9a830c0d3 100644 --- a/src/NATS.Client.Core/NatsMsg.cs +++ b/src/NATS.Client.Core/NatsMsg.cs @@ -6,6 +6,7 @@ namespace NATS.Client.Core; public readonly record struct NatsMsg( string Subject, string? ReplyTo, + int Size, NatsHeaders? Headers, ReadOnlyMemory Data, INatsConnection? Connection) @@ -31,7 +32,12 @@ internal static NatsMsg Build( headers.SetReadOnly(); } - return new NatsMsg(subject, replyTo, headers, payloadBuffer.ToArray(), connection); + var size = subject.Length + + replyTo?.Length ?? 0 + + headersBuffer?.Length ?? 0 + + payloadBuffer.Length; + + return new NatsMsg(subject, replyTo, (int)size, headers, payloadBuffer.ToArray(), connection); } public ValueTask ReplyAsync(ReadOnlySequence payload = default, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) @@ -64,6 +70,7 @@ private void CheckReplyPreconditions() public readonly record struct NatsMsg( string Subject, string? ReplyTo, + int Size, NatsHeaders? Headers, T? Data, INatsConnection? Connection) @@ -97,7 +104,12 @@ internal static NatsMsg Build( headers.SetReadOnly(); } - return new NatsMsg(subject, replyTo, headers, data, connection); + var size = subject.Length + + replyTo?.Length ?? 0 + + headersBuffer?.Length ?? 0 + + payloadBuffer.Length; + + return new NatsMsg(subject, replyTo, (int)size, headers, data, connection); } public ValueTask ReplyAsync(TReply data, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default) diff --git a/src/NATS.Client.Core/NatsSubChannelOpts.cs b/src/NATS.Client.Core/NatsSubChannelOpts.cs index 9c8f50229..fc63aa973 100644 --- a/src/NATS.Client.Core/NatsSubChannelOpts.cs +++ b/src/NATS.Client.Core/NatsSubChannelOpts.cs @@ -1,4 +1,4 @@ -using System.Threading.Channels; +using System.Threading.Channels; namespace NATS.Client.Core; diff --git a/src/NATS.Client.JetStream/Internal/JSErrorAwareJsonSerializer.cs b/src/NATS.Client.JetStream/Internal/JSErrorAwareJsonSerializer.cs index 8fc5e42da..582825996 100644 --- a/src/NATS.Client.JetStream/Internal/JSErrorAwareJsonSerializer.cs +++ b/src/NATS.Client.JetStream/Internal/JSErrorAwareJsonSerializer.cs @@ -1,4 +1,4 @@ -using System.Buffers; +using System.Buffers; using System.Text.Json; using NATS.Client.Core; using NATS.Client.JetStream.Models; @@ -21,9 +21,7 @@ public int Serialize(ICountableBufferWriter bufferWriter, T? value) => var jsonDocument = JsonDocument.Parse(buffer); if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement)) { - var error = errorElement.Deserialize(); - if (error == null) - throw new NatsJetStreamException("Can't parse JetStream error JSON payload"); + var error = errorElement.Deserialize() ?? throw new NatsJSException("Can't parse JetStream error JSON payload"); throw new JSErrorException(error); } diff --git a/src/NATS.Client.JetStream/JSContext.cs b/src/NATS.Client.JetStream/JSContext.cs deleted file mode 100644 index 326484fe4..000000000 --- a/src/NATS.Client.JetStream/JSContext.cs +++ /dev/null @@ -1,177 +0,0 @@ -using System.ComponentModel.DataAnnotations; -using System.Runtime.CompilerServices; -using NATS.Client.Core; -using NATS.Client.Core.Internal; -using NATS.Client.JetStream.Internal; -using NATS.Client.JetStream.Models; - -namespace NATS.Client.JetStream; - -public class JSContext -{ - private readonly NatsConnection _nats; - private readonly JSOptions _options; - - public JSContext(NatsConnection nats, JSOptions options) - { - _nats = nats; - _options = options; - } - - public ValueTask> CreateStreamAsync( - StreamConfiguration request, - CancellationToken cancellationToken = default) => - JSRequestAsync( - subject: $"{_options.Prefix}.STREAM.CREATE.{request.Name}", - request, - cancellationToken); - - public ValueTask> DeleteStreamAsync( - string stream, - CancellationToken cancellationToken = default) => - JSRequestAsync( - subject: $"{_options.Prefix}.STREAM.DELETE.{stream}", - null, - cancellationToken); - - public ValueTask> CreateConsumerAsync( - ConsumerCreateRequest request, - CancellationToken cancellationToken = default) => - JSRequestAsync( - subject: $"{_options.Prefix}.CONSUMER.CREATE.{request.StreamName}.{request.Config.Name}", - request, - cancellationToken); - - public async ValueTask PublishAsync( - string subject, - T? data, - NatsPubOpts opts = default, - CancellationToken cancellationToken = default) - { - await using var sub = await _nats.RequestSubAsync( - subject: subject, - data: data, - requestOpts: opts, - replyOpts: default, - cancellationToken) - .ConfigureAwait(false); - - if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) - { - if (sub.Msgs.TryRead(out var msg)) - { - if (msg.Data == null) - { - throw new NatsJetStreamException("No response data received"); - } - - return msg.Data; - } - } - - if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) - { - throw sub.Exception; - } - - throw new NatsJetStreamException("No response received"); - } - - public async IAsyncEnumerable ConsumeAsync( - string stream, - string consumer, - ConsumerGetnextRequest request, - NatsSubOpts requestOpts = default, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - var inbox = $"_INBOX.{Guid.NewGuid():n}"; - - await using var sub = await _nats.SubAsync( - subject: inbox, - opts: requestOpts, - builder: NatsSubBuilder.Default, - cancellationToken); - - await _nats.PubModelAsync( - subject: $"$JS.API.CONSUMER.MSG.NEXT.{stream}.{consumer}", - data: request, - serializer: JsonNatsSerializer.Default, - replyTo: inbox, - headers: default, - cancellationToken); - - await foreach (var msg in sub.Msgs.ReadAllAsync(cancellationToken)) - { - yield return msg; - } - - if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) - { - throw sub.Exception; - } - } - - internal async ValueTask> JSRequestAsync( - string subject, - TRequest? request, - CancellationToken cancellationToken = default) - where TRequest : class - where TResponse : class - { - if (request != null) - { - Validator.ValidateObject(request, new ValidationContext(request)); - } - - await using var sub = await _nats.RequestSubAsync( - subject: subject, - data: request, - requestOpts: default, - replyOpts: new NatsSubOpts { Serializer = JSErrorAwareJsonSerializer.Default }, - cancellationToken) - .ConfigureAwait(false); - - if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) - { - if (sub.Msgs.TryRead(out var msg)) - { - if (msg.Data == null) - { - throw new NatsJetStreamException("No response data received"); - } - - return new JSResponse(msg.Data, default); - } - } - - if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) - { - if (sub.Exception is NatsSubException { InnerException: JSErrorException jsError }) - { - return new JSResponse(default, jsError.Error); - } - - throw sub.Exception; - } - - throw new NatsJetStreamException("No response received"); - } -} - -public class NatsJetStreamException : NatsException -{ - public NatsJetStreamException(string message) - : base(message) - { - } - - public NatsJetStreamException(string message, Exception exception) - : base(message, exception) - { - } -} - -public record JSOptions -{ - public string Prefix { get; init; } = "$JS.API"; -} diff --git a/src/NATS.Client.JetStream/JSResponse.cs b/src/NATS.Client.JetStream/JSResponse.cs deleted file mode 100644 index fbd6b143d..000000000 --- a/src/NATS.Client.JetStream/JSResponse.cs +++ /dev/null @@ -1,22 +0,0 @@ -using NATS.Client.JetStream.Models; - -namespace NATS.Client.JetStream; - -/// -/// JetStream response including an optional error property encapsulating both successful and failed calls. -/// -/// JetStream response type -public readonly struct JSResponse -{ - internal JSResponse(T? response, ApiError? error) - { - Response = response; - Error = error; - } - - public T? Response { get; } - - public ApiError? Error { get; } - - public bool Success => Error == null && Response != null; -} diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs new file mode 100644 index 000000000..ac79faaf1 --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -0,0 +1,212 @@ +using System.Runtime.CompilerServices; +using NATS.Client.Core; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream; + +public class NatsJSConsumer +{ + private readonly NatsJSContext _context; + private readonly string _stream; + private readonly string _consumer; + private bool _deleted; + + public NatsJSConsumer(NatsJSContext context, ConsumerInfo info) + { + _context = context; + Info = info; + _stream = Info.StreamName; + _consumer = Info.Name; + } + + public ConsumerInfo Info { get; } + + public async ValueTask DeleteAsync(CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + return _deleted = await _context.DeleteConsumerAsync(_stream, _consumer, cancellationToken); + } + + public async IAsyncEnumerable> ConsumeAsync(int maxMsgs, ConsumerOpts opts, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + var prefetch = opts.Prefetch; + var lowWatermark = opts.LowWatermark; + var shouldPrefetch = true; + + if (maxMsgs <= prefetch) + { + prefetch = maxMsgs; + lowWatermark = maxMsgs; + shouldPrefetch = false; + } + + var inbox = $"_INBOX.{Guid.NewGuid():n}"; + + var requestOpts = default(NatsSubOpts); + var request = new ConsumerGetnextRequest { Batch = prefetch }; + + ConsumerGetnextRequest? fetch = default; + if (shouldPrefetch) + { + fetch = new ConsumerGetnextRequest { Batch = prefetch - lowWatermark }; + } + + await using var sub = await _context.Nats.SubAsync( + subject: inbox, + opts: requestOpts, + builder: NatsJSSubModelBuilder.For(requestOpts.Serializer ?? _context.Nats.Options.Serializer), + cancellationToken); + + static async ValueTask MsgNextAsync(NatsJSContext context, string stream, string consumer, ConsumerGetnextRequest request, string inbox, CancellationToken cancellationtoken) + { + await context.Nats.PubModelAsync( + subject: $"$JS.API.CONSUMER.MSG.NEXT.{stream}.{consumer}", + data: request, + serializer: JsonNatsSerializer.Default, + replyTo: inbox, + headers: default, + cancellationtoken); + } + + await MsgNextAsync(_context, _stream, _consumer, request, inbox, cancellationToken); + + var count = 0; + await foreach (var msg in sub.Msgs.ReadAllAsync(cancellationToken)) + { + if (msg.IsControlMsg) + { + // TODO: Heartbeats etc. + } + else + { + yield return msg.JSMsg!.Value; + + if (++count == maxMsgs) + { + break; + } + + if (shouldPrefetch && count % lowWatermark == 0) + { + await MsgNextAsync(_context, _stream, _consumer, fetch!, inbox, cancellationToken); + } + } + } + + if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) + { + throw sub.Exception; + } + } + + public async ValueTask> NextAsync(CancellationToken cancellationToken = default) + { + await foreach (var natsJSMsg in FetchAsync(1, cancellationToken)) + { + return natsJSMsg; + } + + throw new NatsJSException("No data"); + } + + public async IAsyncEnumerable> FetchAsync( + int maxMsgs, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + var request = new ConsumerGetnextRequest { Batch = maxMsgs, }; + + var count = 0; + await foreach (var msg in ConsumeRawAsync(request, default, cancellationToken).ConfigureAwait(false)) + { + if (msg.IsControlMsg) + { + // TODO: Heartbeats etc. + } + else + { + yield return msg.JSMsg!.Value; + + if (++count == maxMsgs) + break; + } + } + } + + internal async IAsyncEnumerable ConsumeRawAsync( + ConsumerGetnextRequest request, + NatsSubOpts requestOpts = default, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var inbox = $"_INBOX.{Guid.NewGuid():n}"; + + await using var sub = await _context.Nats.SubAsync( + subject: inbox, + opts: requestOpts, + builder: NatsJSSubBuilder.Default, + cancellationToken); + + await _context.Nats.PubModelAsync( + subject: $"$JS.API.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", + data: request, + serializer: JsonNatsSerializer.Default, + replyTo: inbox, + headers: default, + cancellationToken); + + await foreach (var msg in sub.Msgs.ReadAllAsync(cancellationToken)) + { + yield return msg; + } + + if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) + { + throw sub.Exception; + } + } + + internal async IAsyncEnumerable> ConsumeRawAsync( + ConsumerGetnextRequest request, + NatsSubOpts requestOpts = default, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var inbox = $"_INBOX.{Guid.NewGuid():n}"; + + await using var sub = await _context.Nats.SubAsync( + subject: inbox, + opts: requestOpts, + builder: NatsJSSubModelBuilder.For(requestOpts.Serializer ?? _context.Nats.Options.Serializer), + cancellationToken); + + await _context.Nats.PubModelAsync( + subject: $"$JS.API.CONSUMER.MSG.NEXT.{_stream}.{_consumer}", + data: request, + serializer: JsonNatsSerializer.Default, + replyTo: inbox, + headers: default, + cancellationToken); + + await foreach (var msg in sub.Msgs.ReadAllAsync(cancellationToken)) + { + yield return msg; + } + + if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) + { + throw sub.Exception; + } + } + + private void ThrowIfDeleted() + { + if (_deleted) + throw new NatsJSException($"Consumer '{_stream}:{_consumer}' is deleted"); + } +} + +public record ConsumerOpts +{ + public int Prefetch { get; set; } = 1_000; + + public int LowWatermark { get; set; } = 500; +} diff --git a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs new file mode 100644 index 000000000..b459592e2 --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs @@ -0,0 +1,78 @@ +using System.Runtime.CompilerServices; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream; + +public partial class NatsJSContext +{ + public ValueTask CreateConsumerAsync( + string stream, + string consumer, + ConsumerConfigurationAckPolicy ackPolicy = ConsumerConfigurationAckPolicy.@explicit, + CancellationToken cancellationToken = default) => + CreateConsumerAsync( + new ConsumerCreateRequest + { + StreamName = stream, + Config = new ConsumerConfiguration + { + Name = consumer, + DurableName = consumer, + AckPolicy = ackPolicy, + }, + }, + cancellationToken); + + public async ValueTask CreateConsumerAsync( + ConsumerCreateRequest request, + CancellationToken cancellationToken = default) + { + if (!string.IsNullOrEmpty(request.Config.DeliverSubject)) + { + throw new NatsJSException("This API only support pull consumers. " + + "'deliver_subject' option applies to push consumers"); + } + + if (request.Config.AckPolicy == ConsumerConfigurationAckPolicy.none) + { + throw new NatsJSException("This API only support pull consumers. " + + "'ack_policy' must be set to 'explicit' or 'all' for pull consumers"); + } + + var response = await JSRequestResponseAsync( + subject: $"{Options.Prefix}.CONSUMER.CREATE.{request.StreamName}.{request.Config.Name}", + request, + cancellationToken); + return new NatsJSConsumer(this, response); + } + + public async ValueTask GetConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) + { + var response = await JSRequestResponseAsync( + subject: $"{Options.Prefix}.CONSUMER.INFO.{stream}.{consumer}", + request: null, + cancellationToken); + return new NatsJSConsumer(this, response); + } + + public async IAsyncEnumerable ListConsumersAsync(string stream, ConsumerListRequest request, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var response = await JSRequestResponseAsync( + subject: $"{Options.Prefix}.CONSUMER.LIST.{stream}", + request, + cancellationToken); + foreach (var consumer in response.Consumers) + { + yield return new NatsJSConsumer(this, consumer); + } + } + + public async ValueTask DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) + { + var response = await JSRequestResponseAsync( + subject: $"{Options.Prefix}.CONSUMER.DELETE.{stream}.{consumer}", + request: null, + cancellationToken); + return response.Success; + } +} diff --git a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs new file mode 100644 index 000000000..a9cd49dbe --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs @@ -0,0 +1,68 @@ +using System.Runtime.CompilerServices; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream; + +public partial class NatsJSContext +{ + public ValueTask CreateStreamAsync(string stream, string[] subjects, CancellationToken cancellationToken = default) => + CreateStreamAsync(new StreamCreateRequest { Name = stream, Subjects = subjects }, cancellationToken); + + public async ValueTask CreateStreamAsync( + StreamConfiguration request, + CancellationToken cancellationToken = default) + { + var response = await JSRequestResponseAsync( + subject: $"{Options.Prefix}.STREAM.CREATE.{request.Name}", + request, + cancellationToken); + return new NatsJSStream(this, response); + } + + public async ValueTask DeleteStreamAsync( + string stream, + CancellationToken cancellationToken = default) + { + var response = await JSRequestResponseAsync( + subject: $"{Options.Prefix}.STREAM.DELETE.{stream}", + request: null, + cancellationToken); + return response.Success; + } + + public async ValueTask GetStreamAsync( + string stream, + CancellationToken cancellationToken = default) + { + var response = await JSRequestResponseAsync( + subject: $"{Options.Prefix}.STREAM.INFO.{stream}", + request: null, + cancellationToken); + return new NatsJSStream(this, response); + } + + public async ValueTask UpdateStreamAsync( + StreamUpdateRequest request, + CancellationToken cancellationToken = default) + { + var response = await JSRequestResponseAsync( + subject: $"{Options.Prefix}.STREAM.UPDATE.{request.Name}", + request: request, + cancellationToken); + return new NatsJSStream(this, response); + } + + public async IAsyncEnumerable ListStreamsAsync( + StreamListRequest request, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var response = await JSRequestResponseAsync( + subject: $"{Options.Prefix}.STREAM.LIST", + request: request, + cancellationToken); + foreach (var stream in response.Streams) + { + yield return new NatsJSStream(this, stream); + } + } +} diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs new file mode 100644 index 000000000..11fd81c95 --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -0,0 +1,132 @@ +using System.ComponentModel.DataAnnotations; +using NATS.Client.Core; +using NATS.Client.JetStream.Internal; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream; + +public partial class NatsJSContext +{ + public NatsJSContext(NatsConnection nats) + : this(nats, new NatsJSOptions()) + { + } + + public NatsJSContext(NatsConnection nats, NatsJSOptions options) + { + Nats = nats; + Options = options; + } + + internal NatsConnection Nats { get; } + + internal NatsJSOptions Options { get; } + + public ValueTask GetAccountInfoAsync(CancellationToken cancellationToken = default) => + JSRequestResponseAsync( + subject: $"{Options.Prefix}.INFO", + request: null, + cancellationToken); + + public async ValueTask PublishAsync( + string subject, + T? data, + NatsPubOpts opts = default, + CancellationToken cancellationToken = default) + { + await using var sub = await Nats.RequestSubAsync( + subject: subject, + data: data, + requestOpts: opts, + replyOpts: default, + cancellationToken) + .ConfigureAwait(false); + + if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) + { + if (sub.Msgs.TryRead(out var msg)) + { + if (msg.Data == null) + { + throw new NatsJSException("No response data received"); + } + + return msg.Data; + } + } + + if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) + { + throw sub.Exception; + } + + throw new NatsJSException("No response received"); + } + + internal async ValueTask JSRequestResponseAsync( + string subject, + TRequest? request, + CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class + { + var response = await JSRequestAsync(subject, request, cancellationToken); + response.EnsureSuccess(); + return response.Response!; + } + + internal async ValueTask> JSRequestAsync( + string subject, + TRequest? request, + CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class + { + if (request != null) + { + Validator.ValidateObject(request, new ValidationContext(request)); + } + + await using var sub = await Nats.RequestSubAsync( + subject: subject, + data: request, + requestOpts: default, + replyOpts: new NatsSubOpts { Serializer = JSErrorAwareJsonSerializer.Default }, + cancellationToken) + .ConfigureAwait(false); + + if (await sub.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) + { + if (sub.Msgs.TryRead(out var msg)) + { + if (msg.Data == null) + { + throw new NatsJSException("No response data received"); + } + + return new NatsJSResponse(msg.Data, default); + } + } + + if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null }) + { + if (sub.Exception is NatsSubException { InnerException: JSErrorException jsError }) + { + return new NatsJSResponse(default, jsError.Error); + } + + throw sub.Exception; + } + + throw new NatsJSException("No response received"); + } +} + +public class NatsJSDuplicateMessageException : NatsJSException +{ + public NatsJSDuplicateMessageException(long sequence) + : base($"Duplicate of {sequence}") => + Sequence = sequence; + + public long Sequence { get; } +} diff --git a/src/NATS.Client.JetStream/NatsJSControlMsg.cs b/src/NATS.Client.JetStream/NatsJSControlMsg.cs new file mode 100644 index 000000000..6b17d2b36 --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSControlMsg.cs @@ -0,0 +1,33 @@ +using System.Buffers; +using NATS.Client.Core; + +namespace NATS.Client.JetStream; + +internal enum NatsJSControlMsgType +{ + None, + Heartbeat, +} + +internal readonly struct NatsJSControlMsg +{ + public NatsJSMsg? JSMsg { get; init; } + + public bool IsControlMsg => ControlMsgType != NatsJSControlMsgType.None; + + public NatsJSControlMsgType ControlMsgType { get; init; } +} + +internal readonly struct NatsJSControlMsg +{ + public NatsJSMsg? JSMsg { get; init; } + + public bool IsControlMsg => ControlMsgType == NatsJSControlMsgType.None; + + public NatsJSControlMsgType ControlMsgType { get; init; } +} + +public static class NatsJSConstants +{ + public static readonly ReadOnlySequence Ack = new("+ACK"u8.ToArray()); +} diff --git a/src/NATS.Client.JetStream/NatsJSException.cs b/src/NATS.Client.JetStream/NatsJSException.cs new file mode 100644 index 000000000..4c4225bf1 --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSException.cs @@ -0,0 +1,16 @@ +using NATS.Client.Core; + +namespace NATS.Client.JetStream; + +public class NatsJSException : NatsException +{ + public NatsJSException(string message) + : base(message) + { + } + + public NatsJSException(string message, Exception exception) + : base(message, exception) + { + } +} diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs new file mode 100644 index 000000000..e38ac5b20 --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -0,0 +1,34 @@ +using NATS.Client.Core; + +namespace NATS.Client.JetStream; + +/// +/// NATS JetStream message with and control messages. +/// +public readonly struct NatsJSMsg +{ + public NatsMsg Msg { get; init; } + + public ValueTask Ack(CancellationToken cancellationToken = default) + { + if (Msg == default) + throw new NatsJSException("No user message, can't acknowledge"); + return Msg.ReplyAsync(NatsJSConstants.Ack, cancellationToken: cancellationToken); + } +} + +/// +/// NATS JetStream message with and control messages. +/// +/// User message type +public readonly struct NatsJSMsg +{ + public NatsMsg Msg { get; init; } + + public ValueTask Ack(CancellationToken cancellationToken = default) + { + if (Msg == default) + throw new NatsJSException("No user message, can't acknowledge"); + return Msg.ReplyAsync(NatsJSConstants.Ack, cancellationToken: cancellationToken); + } +} diff --git a/src/NATS.Client.JetStream/NatsJSOptions.cs b/src/NATS.Client.JetStream/NatsJSOptions.cs new file mode 100644 index 000000000..c45d1f03c --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSOptions.cs @@ -0,0 +1,6 @@ +namespace NATS.Client.JetStream; + +public record NatsJSOptions +{ + public string Prefix { get; init; } = "$JS.API"; +} diff --git a/src/NATS.Client.JetStream/NatsJSResponse.cs b/src/NATS.Client.JetStream/NatsJSResponse.cs new file mode 100644 index 000000000..1237e42c8 --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSResponse.cs @@ -0,0 +1,39 @@ +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream; + +/// +/// JetStream response including an optional error property encapsulating both successful and failed calls. +/// +/// JetStream response type +public readonly struct NatsJSResponse +{ + internal NatsJSResponse(T? response, ApiError? error) + { + Response = response; + Error = error; + } + + public T? Response { get; } + + public ApiError? Error { get; } + + public bool Success => Error == null && Response != null; + + public void EnsureSuccess() + { + if (!Success) + { + throw new NatsJSApiException(Error ?? new ApiError { Description = "Unknown state" }); + } + } +} + +public class NatsJSApiException : NatsJSException +{ + public NatsJSApiException(ApiError error) + : base(error.Description) => + Error = error; + + public ApiError Error { get; } +} diff --git a/src/NATS.Client.JetStream/NatsJSStream.cs b/src/NATS.Client.JetStream/NatsJSStream.cs new file mode 100644 index 000000000..65f8221bc --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSStream.cs @@ -0,0 +1,66 @@ +using System.Runtime.CompilerServices; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream; + +public class NatsJSStream +{ + private readonly NatsJSContext _context; + private readonly string _name; + private bool _deleted; + + public NatsJSStream(NatsJSContext context, StreamInfo info) + { + _context = context; + Info = info; + _name = info.Config.Name; + } + + public StreamInfo Info { get; private set; } + + public async ValueTask DeleteAsync(string stream, CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + return _deleted = await _context.DeleteStreamAsync(_name, cancellationToken); + } + + public async ValueTask UpdateAsync( + StreamUpdateRequest request, + CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + var response = await _context.UpdateStreamAsync(request, cancellationToken); + Info = response.Info; + } + + public async IAsyncEnumerable ListConsumersAsync(string stream, ConsumerListRequest request, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + await foreach (var consumer in _context.ListConsumersAsync(_name, request, cancellationToken)) + yield return consumer; + } + + public ValueTask CreateConsumerAsync(ConsumerCreateRequest request, CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + return _context.CreateConsumerAsync(request, cancellationToken); + } + + public ValueTask GetConsumerAsync(string consumer, CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + return _context.GetConsumerAsync(_name, consumer, cancellationToken); + } + + public ValueTask DeleteConsumerAsync(string consumer, CancellationToken cancellationToken = default) + { + ThrowIfDeleted(); + return _context.DeleteConsumerAsync(_name, consumer, cancellationToken); + } + + private void ThrowIfDeleted() + { + if (_deleted) + throw new NatsJSException($"Stream '{_name}' is deleted"); + } +} diff --git a/src/NATS.Client.JetStream/NatsJSSub.cs b/src/NATS.Client.JetStream/NatsJSSub.cs new file mode 100644 index 000000000..129f82140 --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSSub.cs @@ -0,0 +1,178 @@ +using System.Buffers; +using System.Collections.Concurrent; +using System.Threading.Channels; +using NATS.Client.Core; +using NATS.Client.Core.Internal; + +namespace NATS.Client.JetStream; + +/// +/// NATS JetStream Subscription with JetStream control message support. +/// +/// User message type +internal class NatsJSSub : NatsSubBase +{ + private readonly Channel> _msgs; + + internal NatsJSSub( + NatsConnection connection, + ISubscriptionManager manager, + string subject, + NatsSubOpts? opts, + INatsSerializer serializer) + : base(connection, manager, subject, opts) + { + _msgs = Channel.CreateBounded>( + NatsSub.GetChannelOptions(opts?.ChannelOptions)); + + Serializer = serializer; + } + + public ChannelReader> Msgs => _msgs.Reader; + + private INatsSerializer Serializer { get; } + + protected override async ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) + { + if (subject == Subject) + { + // TODO: introspect JS control messages + await _msgs.Writer.WriteAsync(new NatsJSControlMsg + { + JSMsg = default, + ControlMsgType = NatsJSControlMsgType.Heartbeat, + }).ConfigureAwait(false); + } + else + { + try + { + var msg = NatsMsg.Build( + subject, + replyTo, + headersBuffer, + payloadBuffer, + Connection, + Connection.HeaderParser, + Serializer); + + await _msgs.Writer.WriteAsync(new NatsJSControlMsg + { + JSMsg = new NatsJSMsg { Msg = msg }, + ControlMsgType = NatsJSControlMsgType.None, + }).ConfigureAwait(false); + + DecrementMaxMsgs(); + } + catch (Exception e) + { + var payload = new Memory(new byte[payloadBuffer.Length]); + payloadBuffer.CopyTo(payload.Span); + + Memory headers = default; + if (headersBuffer != null) + { + headers = new Memory(new byte[headersBuffer.Value.Length]); + } + + SetException(new NatsSubException($"Message error: {e.Message}", e, payload, headers)); + } + } + } + + protected override void TryComplete() => _msgs.Writer.TryComplete(); +} + +internal class NatsJSSubModelBuilder : INatsSubBuilder> +{ + private static readonly ConcurrentDictionary> Cache = new(); + private readonly INatsSerializer _serializer; + + public NatsJSSubModelBuilder(INatsSerializer serializer) => _serializer = serializer; + + public static NatsJSSubModelBuilder For(INatsSerializer serializer) => + Cache.GetOrAdd(serializer, static s => new NatsJSSubModelBuilder(s)); + + public NatsJSSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, ISubscriptionManager manager) + { + return new NatsJSSub(connection, manager, subject, opts, _serializer); + } +} + +/// +/// NATS JetStream Subscription with JetStream control message support. +/// +internal class NatsJSSub : NatsSubBase +{ + private readonly Channel _msgs; + + internal NatsJSSub( + NatsConnection connection, + ISubscriptionManager manager, + string subject, + NatsSubOpts? opts) + : base(connection, manager, subject, opts) => + _msgs = Channel.CreateBounded( + NatsSub.GetChannelOptions(opts?.ChannelOptions)); + + public ChannelReader Msgs => _msgs.Reader; + + protected override async ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence? headersBuffer, ReadOnlySequence payloadBuffer) + { + if (subject == Subject) + { + // TODO: introspect JS control messages + await _msgs.Writer.WriteAsync(new NatsJSControlMsg + { + JSMsg = default, + ControlMsgType = NatsJSControlMsgType.Heartbeat, + }).ConfigureAwait(false); + } + else + { + try + { + var msg = NatsMsg.Build( + subject, + replyTo, + headersBuffer, + payloadBuffer, + Connection, + Connection.HeaderParser); + + await _msgs.Writer.WriteAsync(new NatsJSControlMsg + { + JSMsg = new NatsJSMsg { Msg = msg }, + ControlMsgType = NatsJSControlMsgType.None, + }).ConfigureAwait(false); + + DecrementMaxMsgs(); + } + catch (Exception e) + { + var payload = new Memory(new byte[payloadBuffer.Length]); + payloadBuffer.CopyTo(payload.Span); + + Memory headers = default; + if (headersBuffer != null) + { + headers = new Memory(new byte[headersBuffer.Value.Length]); + } + + SetException(new NatsSubException($"Message error: {e.Message}", e, payload, headers)); + } + } + } + + protected override void TryComplete() => _msgs.Writer.TryComplete(); +} + +internal class NatsJSSubBuilder : INatsSubBuilder +{ + public static readonly NatsJSSubBuilder Default = new(); + + public NatsJSSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, ISubscriptionManager manager) + { + return new NatsJSSub(connection, manager, subject, opts); + } +} diff --git a/src/NATS.Client.JetStream/NatsJSUtilities.cs b/src/NATS.Client.JetStream/NatsJSUtilities.cs new file mode 100644 index 000000000..0400e7bf5 --- /dev/null +++ b/src/NATS.Client.JetStream/NatsJSUtilities.cs @@ -0,0 +1,18 @@ +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream; + +public static class NatsJSUtilities +{ + public static void EnsureSuccess(this PubAckResponse ack) + { + if (ack == null) + throw new ArgumentNullException(nameof(ack)); + + if (ack.Error != null) + throw new NatsJSApiException(ack.Error); + + if (ack.Duplicate) + throw new NatsJSDuplicateMessageException(ack.Seq); + } +} diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs new file mode 100644 index 000000000..7c222e2e3 --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs @@ -0,0 +1,71 @@ +using NATS.Client.Core.Tests; + +namespace NATS.Client.JetStream.Tests; + +public class ConsumerConsumeTest +{ + private readonly ITestOutputHelper _output; + + public ConsumerConsumeTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task Consume_test() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await using var server = NatsServer.Start( + outputHelper: _output, + options: new NatsServerOptionsBuilder() + .UseTransport(TransportType.Tcp) + .Trace() + .UseJetStream() + .Build()); + + var (nats, proxy) = server.CreateProxiedClientConnection(); + var js = new NatsJSContext(nats); + await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + + for (var i = 0; i < 30; i++) + { + var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); + ack.EnsureSuccess(); + } + + var consumerOpts = new ConsumerOpts + { + Prefetch = 10, + LowWatermark = 5, + }; + var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); + var count = 0; + await foreach (var msg in consumer.ConsumeAsync(25, consumerOpts, cts.Token)) + { + await msg.Ack(cts.Token); + Assert.Equal(count, msg.Msg.Data!.Test); + count++; + } + + Assert.Equal(25, count); + + var msgNextRequests = proxy + .ClientFrames + .Where(f => f.Message.StartsWith("PUB $JS.API.CONSUMER.MSG.NEXT.s1.c1")) + .ToList(); + + Assert.Equal(5, msgNextRequests.Count); + + // Prefetch + Assert.Matches(@"^PUB.*{""batch"":10}", msgNextRequests.First().Message); + + foreach (var frame in msgNextRequests.Skip(1)) + { + // Consequent fetches should top up to the prefetch value + Assert.Matches(@"^PUB.*{""batch"":5}", frame.Message); + } + } + + private record TestData + { + public int Test { get; init; } + } +} diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs new file mode 100644 index 000000000..cc5249a1d --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs @@ -0,0 +1,43 @@ +using NATS.Client.Core.Tests; + +namespace NATS.Client.JetStream.Tests; + +public class ConsumerFetchTest +{ + private readonly ITestOutputHelper _output; + + public ConsumerFetchTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task Fetch_test() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + + for (var i = 0; i < 10; i++) + { + var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); + ack.EnsureSuccess(); + } + + var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); + var count = 0; + await foreach (var msg in consumer.FetchAsync(10, cts.Token)) + { + await msg.Ack(cts.Token); + Assert.Equal(count, msg.Msg.Data!.Test); + count++; + } + + Assert.Equal(10, count); + } + + private record TestData + { + public int Test { get; init; } + } +} diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerNextTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerNextTest.cs new file mode 100644 index 000000000..c26812a4d --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/ConsumerNextTest.cs @@ -0,0 +1,36 @@ +using NATS.Client.Core.Tests; + +namespace NATS.Client.JetStream.Tests; + +public class ConsumerNextTest +{ + private readonly ITestOutputHelper _output; + + public ConsumerNextTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task Next_test() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + + for (var i = 0; i < 10; i++) + { + var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, cancellationToken: cts.Token); + ack.EnsureSuccess(); + var msg = await consumer.NextAsync(cts.Token); + await msg.Ack(cts.Token); + Assert.Equal(i, msg.Msg.Data!.Test); + } + } + + private record TestData + { + public int Test { get; init; } + } +} diff --git a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs index 141a0a7e1..82e692892 100644 --- a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs @@ -1,10 +1,7 @@ -using System.Buffers; -using System.Text; -using Microsoft.Extensions.Logging; -using NATS.Client.JetStream; +using NATS.Client.Core.Tests; using NATS.Client.JetStream.Models; -namespace NATS.Client.Core.Tests; +namespace NATS.Client.JetStream.Tests; public class JetStreamTest { @@ -15,92 +12,88 @@ public class JetStreamTest [Fact] public async Task Create_stream_test() { - await using var server = NatsServer.Start(new NullOutputHelper(), new NatsServerOptionsBuilder().UseTransport(TransportType.Tcp).UseJetStream().Build()); + await using var server = NatsServer.StartJS(); var nats = server.CreateClientConnection(); // Happy user { - var context = new JSContext(nats, new JSOptions()); + var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var js = new NatsJSContext(nats, new NatsJSOptions()); // Create stream - var response = await context.CreateStreamAsync(request: new StreamConfiguration - { - Name = "events", Subjects = new[] { "events.*" }, - }); - Assert.True(response.Success); - Assert.Null(response.Error); - Assert.NotNull(response.Response); - Assert.Equal("events", response.Response.Config.Name); + var stream = await js.CreateStreamAsync( + request: new StreamConfiguration { Name = "events", Subjects = new[] { "events.*" }, }, + cancellationToken: cts1.Token); + Assert.Equal("events", stream.Info.Config.Name); // Create consumer - var consumerInfo = await context.CreateConsumerAsync(new ConsumerCreateRequest - { - StreamName = "events", - Config = new ConsumerConfiguration + var consumer = await js.CreateConsumerAsync( + new ConsumerCreateRequest { - Name = "consumer1", - DurableName = "consumer1", - - // Turn on ACK so we can test them below - AckPolicy = ConsumerConfigurationAckPolicy.@explicit, - - // Effectively set message expiry for the consumer - // so that unacknowledged messages can be put back into - // the consumer to be delivered again (in a sense). - // This is to make below consumer tests work. - AckWait = 2_000_000_000, // 2 seconds + StreamName = "events", + Config = new ConsumerConfiguration + { + Name = "consumer1", + DurableName = "consumer1", + + // Turn on ACK so we can test them below + AckPolicy = ConsumerConfigurationAckPolicy.@explicit, + + // Effectively set message expiry for the consumer + // so that unacknowledged messages can be put back into + // the consumer to be delivered again (in a sense). + // This is to make below consumer tests work. + AckWait = 2_000_000_000, // 2 seconds + }, }, - }); - Assert.True(consumerInfo.Success); - Assert.Null(consumerInfo.Error); - Assert.NotNull(consumerInfo.Response); - Assert.Equal("events", consumerInfo.Response.StreamName); - Assert.Equal("consumer1", consumerInfo.Response.Config.Name); + cts1.Token); + Assert.Equal("events", consumer.Info.StreamName); + Assert.Equal("consumer1", consumer.Info.Config.Name); // Publish - PubAckResponse ack; - ack = await context.PublishAsync("events.foo", new TestData { Test = 1 }); + var ack = await js.PublishAsync("events.foo", new TestData { Test = 1 }, cancellationToken: cts1.Token); Assert.Null(ack.Error); Assert.Equal("events", ack.Stream); Assert.Equal(1, ack.Seq); Assert.False(ack.Duplicate); // Message ID - ack = await context.PublishAsync("events.foo", new TestData { Test = 2 }, new NatsPubOpts - { - Headers = new NatsHeaders { { "Nats-Msg-Id", "test2" } }, - }); + ack = await js.PublishAsync( + "events.foo", + new TestData { Test = 2 }, + new NatsPubOpts { Headers = new NatsHeaders { { "Nats-Msg-Id", "test2" } }, }, + cts1.Token); Assert.Null(ack.Error); Assert.Equal("events", ack.Stream); Assert.Equal(2, ack.Seq); Assert.False(ack.Duplicate); // Duplicate - ack = await context.PublishAsync("events.foo", new TestData { Test = 2 }, new NatsPubOpts - { - Headers = new NatsHeaders { { "Nats-Msg-Id", "test2" } }, - }); + ack = await js.PublishAsync( + "events.foo", + new TestData { Test = 2 }, + new NatsPubOpts { Headers = new NatsHeaders { { "Nats-Msg-Id", "test2" } }, }, + cts1.Token); Assert.Null(ack.Error); Assert.Equal("events", ack.Stream); Assert.Equal(2, ack.Seq); Assert.True(ack.Duplicate); // Consume - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - var messages = new List(); - await foreach (var msg in context.ConsumeAsync( - stream: "events", - consumer: "consumer1", + var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var messages = new List(); + await foreach (var msg in consumer.ConsumeRawAsync( request: new ConsumerGetnextRequest { Batch = 100 }, requestOpts: default, - cancellationToken: cts.Token)) + cancellationToken: cts2.Token)) { messages.Add(msg); // Only ACK one message so we can consume again if (messages.Count == 1) { - await msg.ReplyAsync(new ReadOnlySequence("+ACK"u8.ToArray()), cancellationToken: cts.Token); + await msg.JSMsg!.Value.Ack(cts2.Token); } if (messages.Count == 2) @@ -110,61 +103,65 @@ public async Task Create_stream_test() } Assert.Equal(2, messages.Count); - Assert.Equal("events.foo", messages[0].Subject); - Assert.Equal("events.foo", messages[1].Subject); + Assert.Equal("events.foo", messages[0].JSMsg!.Value.Msg.Subject); + Assert.Equal("events.foo", messages[1].JSMsg!.Value.Msg.Subject); // Consume the unacknowledged message - await foreach (var msg in context.ConsumeAsync( - stream: "events", - consumer: "consumer1", + await foreach (var msg in consumer.ConsumeRawAsync( request: new ConsumerGetnextRequest { Batch = 100 }, requestOpts: default, - cancellationToken: cts.Token)) + cancellationToken: cts2.Token)) { - Assert.Equal("events.foo", msg.Subject); + Assert.Equal("events.foo", msg.JSMsg!.Value.Msg.Subject); break; } } // Handle errors { - var context = new JSContext(nats, new JSOptions()); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - var response = await context.CreateStreamAsync(request: new StreamConfiguration + var js = new NatsJSContext(nats, new NatsJSOptions()); + var exception = await Assert.ThrowsAsync(async () => { - Name = "events2", Subjects = new[] { "events.*" }, + await js.CreateStreamAsync( + request: new StreamConfiguration + { + Name = "events2", + Subjects = new[] { "events.*" }, + }, + cancellationToken: cts.Token); }); - Assert.False(response.Success); - Assert.Null(response.Response); - Assert.NotNull(response.Error); - Assert.Equal(400, response.Error.Code); + Assert.Equal(400, exception.Error.Code); // subjects overlap with an existing stream - Assert.Equal(10065, response.Error.ErrCode); + Assert.Equal(10065, exception.Error.ErrCode); } // Delete stream { - var context = new JSContext(nats, new JSOptions()); - JSResponse response; + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var js = new NatsJSContext(nats, new NatsJSOptions()); // Success - response = await context.DeleteStreamAsync("events"); - Assert.True(response.Success); - Assert.True(response.Response!.Success); + await js.DeleteStreamAsync("events", cts.Token); // Error - response = await context.DeleteStreamAsync("events2"); - Assert.False(response.Success); - Assert.Equal(404, response.Error!.Code); + var exception = await Assert.ThrowsAsync(async () => + { + await js.DeleteStreamAsync("events2", cts.Token); + }); + + Assert.Equal(404, exception.Error.Code); // stream not found - Assert.Equal(10059, response.Error!.ErrCode); + Assert.Equal(10059, exception.Error.ErrCode); } } -} -public class TestData -{ - public int Test { get; set; } + private record TestData + { + public int Test { get; init; } + } } diff --git a/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs b/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs new file mode 100644 index 000000000..7e8cba109 --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/ManageConsumerTest.cs @@ -0,0 +1,91 @@ +using NATS.Client.Core.Tests; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream.Tests; + +public class ManageConsumerTest +{ + private readonly ITestOutputHelper _output; + + public ManageConsumerTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task Create_get_consumer() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + await using var server = NatsServer.StartJS(); + var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats, new NatsJSOptions()); + await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + + // Create + { + var consumer = await js.CreateConsumerAsync( + new ConsumerCreateRequest + { + StreamName = "s1", + Config = new ConsumerConfiguration + { + Name = "c1", + DurableName = "c1", + AckPolicy = ConsumerConfigurationAckPolicy.@explicit, + }, + }, + cts.Token); + Assert.Equal("s1", consumer.Info.StreamName); + Assert.Equal("c1", consumer.Info.Config.Name); + } + + // Get + { + var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); + Assert.Equal("s1", consumer.Info.StreamName); + Assert.Equal("c1", consumer.Info.Config.Name); + } + } + + [Fact] + public async Task List_delete_consumer() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + await using var server = NatsServer.StartJS(); + var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats, new NatsJSOptions()); + await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + await js.CreateConsumerAsync("s1", "c2", cancellationToken: cts.Token); + await js.CreateConsumerAsync("s1", "c3", cancellationToken: cts.Token); + + // List + { + var list = new List(); + await foreach (var consumer in js.ListConsumersAsync("s1", new ConsumerListRequest(), cts.Token)) + { + list.Add(consumer); + } + + Assert.Equal(3, list.Count); + Assert.True(list.All(c => c.Info.StreamName == "s1")); + Assert.Contains(list, c => c.Info.Config.Name == "c1"); + Assert.Contains(list, c => c.Info.Config.Name == "c2"); + Assert.Contains(list, c => c.Info.Config.Name == "c3"); + } + + // Delete + { + var response = await js.DeleteConsumerAsync("s1", "c1", cts.Token); + Assert.True(response); + + var list = new List(); + await foreach (var consumer in js.ListConsumersAsync("s1", new ConsumerListRequest(), cts.Token)) + { + list.Add(consumer); + } + + Assert.Equal(2, list.Count); + Assert.DoesNotContain(list, c => c.Info.Config.Name == "c1"); + } + } +} diff --git a/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs b/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs new file mode 100644 index 000000000..63126891d --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs @@ -0,0 +1,99 @@ +using NATS.Client.Core.Tests; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream.Tests; + +public class ManageStreamTest +{ + private readonly ITestOutputHelper _output; + + public ManageStreamTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task Account_info_create_get_update_stream() + { + await using var server = NatsServer.StartJS(); + var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats, new NatsJSOptions()); + + // Account Info + { + var accountInfo = await js.GetAccountInfoAsync(); + Assert.Equal(0, accountInfo.Streams); + } + + // Create + { + var stream = await js.CreateStreamAsync(request: new StreamConfiguration + { + Name = "events", + Subjects = new[] { "events.*" }, + }); + Assert.Equal("events", stream.Info.Config.Name); + + var accountInfo = await js.GetAccountInfoAsync(); + Assert.Equal(1, accountInfo.Streams); + } + + // Get + { + var stream = await js.GetStreamAsync("events"); + Assert.Equal("events", stream.Info.Config.Name); + Assert.Equal(new[] { "events.*" }, stream.Info.Config.Subjects); + } + + // Update + { + var stream1 = await js.GetStreamAsync("events"); + Assert.Equal(-1, stream1.Info.Config.MaxMsgs); + + var stream2 = await js.UpdateStreamAsync(new StreamUpdateRequest { Name = "events", MaxMsgs = 10 }); + Assert.Equal(10, stream2.Info.Config.MaxMsgs); + + var stream3 = await js.GetStreamAsync("events"); + Assert.Equal(10, stream3.Info.Config.MaxMsgs); + } + } + + [Fact] + public async Task List_delete_stream() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + await using var server = NatsServer.StartJS(); + var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats, new NatsJSOptions()); + + await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + await js.CreateStreamAsync("s2", new[] { "s2.*" }, cts.Token); + await js.CreateStreamAsync("s3", new[] { "s3.*" }, cts.Token); + + // List + { + var list = new List(); + await foreach (var stream in js.ListStreamsAsync(new StreamListRequest(), cts.Token)) + { + list.Add(stream); + } + + Assert.Equal(3, list.Count); + Assert.Contains(list, s => s.Info.Config.Name == "s1"); + Assert.Contains(list, s => s.Info.Config.Name == "s2"); + Assert.Contains(list, s => s.Info.Config.Name == "s3"); + } + + // Delete + { + var deleteResponse = await js.DeleteStreamAsync("s1", cts.Token); + Assert.True(deleteResponse); + + var list = new List(); + await foreach (var stream in js.ListStreamsAsync(new StreamListRequest(), cts.Token)) + { + list.Add(stream); + } + + Assert.DoesNotContain(list, s => s.Info.Config.Name == "s1"); + } + } +} diff --git a/tests/NATS.Client.JetStream.Tests/PublishTest.cs b/tests/NATS.Client.JetStream.Tests/PublishTest.cs new file mode 100644 index 000000000..ffdc2aba3 --- /dev/null +++ b/tests/NATS.Client.JetStream.Tests/PublishTest.cs @@ -0,0 +1,56 @@ +using NATS.Client.Core.Tests; + +namespace NATS.Client.JetStream.Tests; + +public class PublishTest +{ + private readonly ITestOutputHelper _output; + + public PublishTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task Publish_test() + { + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + var js = new NatsJSContext(nats); + await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); + await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + + // Publish + { + var ack = await js.PublishAsync("s1.foo", new TestData { Test = 1 }, cancellationToken: cts.Token); + Assert.Null(ack.Error); + Assert.Equal(1, ack.Seq); + Assert.Equal("s1", ack.Stream); + Assert.False(ack.Duplicate); + } + + // Duplicate + { + var ack1 = await js.PublishAsync( + subject: "s1.foo", + data: new TestData { Test = 2 }, + opts: new NatsPubOpts { Headers = new NatsHeaders { { "Nats-Msg-Id", "2" } } }, + cancellationToken: cts.Token); + Assert.Null(ack1.Error); + Assert.Equal(2, ack1.Seq); + Assert.False(ack1.Duplicate); + + var ack2 = await js.PublishAsync( + subject: "s1.foo", + data: new TestData { Test = 2 }, + opts: new NatsPubOpts { Headers = new NatsHeaders { { "Nats-Msg-Id", "2" } } }, + cancellationToken: cts.Token); + Assert.Null(ack2.Error); + Assert.True(ack2.Duplicate); + } + } + + private record TestData + { + public int Test { get; init; } + } +} diff --git a/tests/NATS.Client.Perf/Program.cs b/tests/NATS.Client.Perf/Program.cs index 9a093fe76..36fb9f142 100644 --- a/tests/NATS.Client.Perf/Program.cs +++ b/tests/NATS.Client.Perf/Program.cs @@ -1,4 +1,4 @@ -using System.Buffers; +using System.Buffers; using System.Diagnostics; using System.Text.RegularExpressions; using NATS.Client.Core; @@ -126,7 +126,8 @@ public static int Eval() var test = result._test(); var ok = test ? "OK" : "NOT OK"; Console.WriteLine($"[{ok}] {result._message}"); - if (test == false) failed++; + if (test == false) + failed++; } Console.WriteLine(failed == 0 ? "PASS" : "FAILED"); diff --git a/tests/NATS.Client.TestUtilities/NatsServer.cs b/tests/NATS.Client.TestUtilities/NatsServer.cs index ee451df25..2213dfb37 100644 --- a/tests/NATS.Client.TestUtilities/NatsServer.cs +++ b/tests/NATS.Client.TestUtilities/NatsServer.cs @@ -137,6 +137,15 @@ public int ConnectionPort } } + public static NatsServer StartJS() => StartJS(new NullOutputHelper(), TransportType.Tcp); + + public static NatsServer StartJS(ITestOutputHelper outputHelper, TransportType transportType) => Start( + outputHelper: outputHelper, + options: new NatsServerOptionsBuilder() + .UseTransport(transportType) + .UseJetStream() + .Build()); + public static NatsServer Start() => Start(new NullOutputHelper(), TransportType.Tcp); public static NatsServer Start(ITestOutputHelper outputHelper) => Start(outputHelper, TransportType.Tcp); @@ -148,7 +157,7 @@ public static NatsServer Start(ITestOutputHelper outputHelper, NatsServerOptions { NatsServer? server = null; NatsConnection? nats = null; - for (int i = 0; i < 10; i++) + for (var i = 0; i < 10; i++) { try {