Skip to content

Commit

Permalink
JetStream consumer options (#107)
Browse files Browse the repository at this point in the history
* JetStream consumer options

* JetStream consumer event handling

* Added header parsing with codes.
* Subscription exception to be thrown on dispose

* Consumer notification channel

* JS consumer tidy-up

* JS consumer control handle prep

* JS consume and fetch opts

* Fixed tests

* Pull termination handling

* Update src/NATS.Client.Core/Commands/CommandConstants.cs

Co-authored-by: Caleb Lloyd <[email protected]>

* NatsSub redesign

* Heartbeat timer and Inbox bug fixes

* User can specify channel options

* Code reformat

---------

Co-authored-by: Caleb Lloyd <[email protected]>
  • Loading branch information
mtmk and caleblloyd authored Aug 9, 2023
1 parent 90ee113 commit 920a3ef
Show file tree
Hide file tree
Showing 41 changed files with 1,665 additions and 833 deletions.
3 changes: 3 additions & 0 deletions src/NATS.Client.Core/Commands/CommandConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ internal static class CommandConstants
// string.Join(",", Encoding.ASCII.GetBytes("PONG\r\n"))
public static ReadOnlySpan<byte> PongNewLine => new byte[] { 80, 79, 78, 71, 13, 10 };

// string.Join(",", Encoding.ASCII.GetBytes("NATS/1.0"))
public static ReadOnlySpan<byte> NatsHeaders10 => new byte[] { 78, 65, 84, 83, 47, 49, 46, 48 };

// string.Join(",", Encoding.ASCII.GetBytes("NATS/1.0\r\n"))
public static ReadOnlySpan<byte> NatsHeaders10NewLine => new byte[] { 78, 65, 84, 83, 47, 49, 46, 48, 13, 10 };
}
94 changes: 83 additions & 11 deletions src/NATS.Client.Core/HeaderParser.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
// Adapted from https://github.com/dotnet/aspnetcore/blob/v6.0.18/src/Servers/Kestrel/Core/src/Internal/Http/HttpParser.cs

using System.Buffers;
using System.Buffers.Text;
using System.Diagnostics;
using System.Text;
using Microsoft.Extensions.Primitives;
using NATS.Client.Core.Commands;
using NATS.Client.Core.Internal;

// ReSharper disable ConditionIsAlwaysTrueOrFalse
// ReSharper disable PossiblyImpureMethodCallOnReadonlyVariable
namespace NATS.Client.Core;

public class HeaderParser
Expand All @@ -18,13 +22,12 @@ public class HeaderParser

private readonly Encoding _encoding;

public HeaderParser(Encoding encoding)
{
_encoding = encoding;
}
public HeaderParser(Encoding encoding) => _encoding = encoding;

public bool ParseHeaders(in SequenceReader<byte> reader, NatsHeaders headers)
{
var isVersionLineRead = false;

while (!reader.End)
{
var span = reader.UnreadSpan;
Expand Down Expand Up @@ -73,7 +76,7 @@ public bool ParseHeaders(in SequenceReader<byte> reader, NatsHeaders headers)
// Headers don't end in CRLF line.
Debug.Assert(readAhead == 0 || readAhead == 2, "readAhead == 0 || readAhead == 2");

throw new NatsException($"Protocol error: invalid headers, no ending CRLFCRLF");
throw new NatsException($"Protocol error: invalid headers, no ending CRLF+CRLF");
}

var length = 0;
Expand Down Expand Up @@ -113,7 +116,7 @@ public bool ParseHeaders(in SequenceReader<byte> reader, NatsHeaders headers)
length < 5 ||

// Exclude the CRLF from the headerLine and parse the header name:value pair
!TryTakeSingleHeader(span[..(length - 2)], headers))
!TryTakeSingleHeader(span[..(length - 2)], headers, ref isVersionLineRead))
{
// Sequence needs to be CRLF and not contain an inner CR not part of terminator.
// Less than min possible headerSpan of 5 bytes a:b\r\n
Expand Down Expand Up @@ -145,7 +148,7 @@ public bool ParseHeaders(in SequenceReader<byte> reader, NatsHeaders headers)
reader.Rewind(readAhead);
}

length = ParseMultiSpanHeader(reader, headers);
length = ParseMultiSpanHeader(reader, headers, ref isVersionLineRead);
if (length < 0)
{
// Not there
Expand All @@ -163,7 +166,7 @@ public bool ParseHeaders(in SequenceReader<byte> reader, NatsHeaders headers)
return false;
}

private int ParseMultiSpanHeader(in SequenceReader<byte> reader, NatsHeaders headers)
private int ParseMultiSpanHeader(in SequenceReader<byte> reader, NatsHeaders headers, ref bool isVersionLineRead)
{
var currentSlice = reader.UnreadSequence;
var lineEndPosition = currentSlice.PositionOfAny(ByteCR, ByteLF);
Expand Down Expand Up @@ -211,7 +214,7 @@ private int ParseMultiSpanHeader(in SequenceReader<byte> reader, NatsHeaders hea
if (headerSpan[^1] != ByteLF ||

// Exclude the CRLF from the headerLine and parse the header name:value pair
!TryTakeSingleHeader(headerSpan[..^2], headers))
!TryTakeSingleHeader(headerSpan[..^2], headers, ref isVersionLineRead))
{
// Sequence needs to be CRLF and not contain an inner CR not part of terminator.
// Not parsable as a valid name:value header pair.
Expand All @@ -221,8 +224,77 @@ private int ParseMultiSpanHeader(in SequenceReader<byte> reader, NatsHeaders hea
return headerSpan.Length;
}

private bool TryTakeSingleHeader(ReadOnlySpan<byte> headerLine, NatsHeaders headers)
private bool TryTakeSingleHeader(ReadOnlySpan<byte> headerLine, NatsHeaders headers, ref bool isVersionLineRead)
{
// We are first looking for a version line
// e.g. NATS/1.0 100 Idle Heartbeat
if (!isVersionLineRead)
{
headerLine.Split(out var versionBytes, out headerLine);

if (!versionBytes.SequenceEqual(CommandConstants.NatsHeaders10))
{
throw new NatsException("Protocol error: header version mismatch");
}

if (headerLine.Length != 0)
{
headerLine.Split(out var codeBytes, out headerLine);
if (!Utf8Parser.TryParse(codeBytes, out int code, out _))
throw new NatsException("Protocol error: header code is not a number");
headers.Code = code;
}

if (headerLine.Length != 0)
{
// We can reduce string allocations by detecting commonly used
// header messages.
if (headerLine.SequenceEqual(NatsHeaders.MessageIdleHeartbeat))
{
headers.Message = NatsHeaders.Messages.IdleHeartbeat;
headers.MessageText = NatsHeaders.MessageIdleHeartbeatStr;
}
else if (headerLine.SequenceEqual(NatsHeaders.MessageBadRequest))
{
headers.Message = NatsHeaders.Messages.BadRequest;
headers.MessageText = NatsHeaders.MessageBadRequestStr;
}
else if (headerLine.SequenceEqual(NatsHeaders.MessageConsumerDeleted))
{
headers.Message = NatsHeaders.Messages.ConsumerDeleted;
headers.MessageText = NatsHeaders.MessageConsumerDeletedStr;
}
else if (headerLine.SequenceEqual(NatsHeaders.MessageConsumerIsPushBased))
{
headers.Message = NatsHeaders.Messages.ConsumerIsPushBased;
headers.MessageText = NatsHeaders.MessageConsumerIsPushBasedStr;
}
else if (headerLine.SequenceEqual(NatsHeaders.MessageNoMessages))
{
headers.Message = NatsHeaders.Messages.NoMessages;
headers.MessageText = NatsHeaders.MessageNoMessagesStr;
}
else if (headerLine.SequenceEqual(NatsHeaders.MessageRequestTimeout))
{
headers.Message = NatsHeaders.Messages.RequestTimeout;
headers.MessageText = NatsHeaders.MessageRequestTimeoutStr;
}
else if (headerLine.SequenceEqual(NatsHeaders.MessageMessageSizeExceedsMaxBytes))
{
headers.Message = NatsHeaders.Messages.MessageSizeExceedsMaxBytes;
headers.MessageText = NatsHeaders.MessageMessageSizeExceedsMaxBytesStr;
}
else
{
headers.Message = NatsHeaders.Messages.Text;
headers.MessageText = _encoding.GetString(headerLine);
}
}

isVersionLineRead = true;
return true;
}

// We are looking for a colon to terminate the header name.
// However, the header name cannot contain a space or tab so look for all three
// and see which is found first.
Expand Down Expand Up @@ -332,5 +404,5 @@ private bool TryTakeSingleHeader(ReadOnlySpan<byte> headerLine, NatsHeaders head
[StackTraceHidden]
private void RejectRequestHeader(ReadOnlySpan<byte> headerLine)
=> throw new NatsException(
$"Protocol error: invalid request header line '{_encoding.GetString(headerLine)}'");
$"Protocol error: invalid request header line '{headerLine.Dump()}'");
}
15 changes: 15 additions & 0 deletions src/NATS.Client.Core/Internal/BufferExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ namespace NATS.Client.Core.Internal;

internal static class BufferExtensions
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Split(this ReadOnlySpan<byte> span, out ReadOnlySpan<byte> left, out ReadOnlySpan<byte> right)
{
var i = span.IndexOf((byte)' ');
if (i == -1)
{
left = span;
right = default;
return;
}

left = span.Slice(0, i);
right = span.Slice(i + 1);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ReadOnlySpan<byte> ToSpan(this ReadOnlySequence<byte> buffer)
{
Expand Down
1 change: 1 addition & 0 deletions src/NATS.Client.Core/Internal/DebuggingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public static string Dump(this NatsHeaders? headers)
return "<NULL>";

var sb = new StringBuilder();
sb.AppendLine($"{headers.Version} {headers.Code} {headers.Message} {headers.MessageText}");
foreach (var (key, stringValues) in headers)
{
foreach (var value in stringValues)
Expand Down
15 changes: 8 additions & 7 deletions src/NATS.Client.Core/Internal/InboxSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ public InboxSub(
_connection = connection;
}

protected override ValueTask ReceiveInternalAsync(
string subject,
string? replyTo,
ReadOnlySequence<byte>? headersBuffer,
ReadOnlySequence<byte> payloadBuffer) =>
// Avoid base class error handling since inboxed subscribers will be responsible for that.
public override ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer) =>
_inbox.ReceivedAsync(subject, replyTo, headersBuffer, payloadBuffer, _connection);

// Not used. Dummy implementation to keep base happy.
protected override ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer)
=> ValueTask.CompletedTask;

protected override void TryComplete()
{
}
Expand All @@ -46,7 +47,7 @@ public InboxSub Build(string subject, NatsSubOpts? opts, NatsConnection connecti
return new InboxSub(this, subject, opts, connection, manager);
}

public void Register(NatsSubBase sub)
public ValueTask RegisterAsync(NatsSubBase sub)
{
_bySubject.AddOrUpdate(
sub.Subject,
Expand All @@ -70,7 +71,7 @@ public void Register(NatsSubBase sub)
},
sub);

sub.Ready();
return sub.ReadyAsync();
}

public async ValueTask ReceivedAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer, NatsConnection connection)
Expand Down
40 changes: 9 additions & 31 deletions src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,6 @@ private static string ParseError(in ReadOnlySequence<byte> errorSlice)
return Encoding.UTF8.GetString(errorSlice.Slice(5));
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void Split(ReadOnlySpan<byte> span, out ReadOnlySpan<byte> left, out ReadOnlySpan<byte> right)
{
var i = span.IndexOf((byte)' ');
if (i == -1)
{
left = span;
right = default;
return;
}

left = span.Slice(0, i);
right = span.Slice(i + 1);
}

private async Task ReadLoopAsync()
{
while (true)
Expand Down Expand Up @@ -276,14 +261,7 @@ private async Task ReadLoopAsync()
// Prepare buffer for the next message by removing 'headers + payload + \r\n' from it
buffer = buffer.Slice(buffer.GetPosition(2, totalSlice.End));

var versionLength = CommandConstants.NatsHeaders10NewLine.Length;
var versionSlice = totalSlice.Slice(0, versionLength);
if (!versionSlice.ToSpan().SequenceEqual(CommandConstants.NatsHeaders10NewLine))
{
throw new NatsException("Protocol error: header version mismatch");
}

var headerSlice = totalSlice.Slice(versionLength, headersLength - versionLength);
var headerSlice = totalSlice.Slice(0, headersLength);
var payloadSlice = totalSlice.Slice(headersLength, payloadLength);

await _connection.PublishToClientHandlersAsync(subject, replyTo, sid, headerSlice, payloadSlice)
Expand Down Expand Up @@ -447,9 +425,9 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
private (string subject, int sid, int payloadLength, string? replyTo) ParseMessageHeader(ReadOnlySpan<byte> msgHeader)
{
msgHeader = msgHeader.Slice(4);
Split(msgHeader, out var subjectBytes, out msgHeader);
Split(msgHeader, out var sidBytes, out msgHeader);
Split(msgHeader, out var replyToOrSizeBytes, out msgHeader);
msgHeader.Split(out var subjectBytes, out msgHeader);
msgHeader.Split(out var sidBytes, out msgHeader);
msgHeader.Split(out var replyToOrSizeBytes, out msgHeader);

var subject = Encoding.ASCII.GetString(subjectBytes);

Expand Down Expand Up @@ -499,12 +477,12 @@ private async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, R
private (string subject, int sid, string? replyTo, int headersLength, int totalLength) ParseHMessageHeader(ReadOnlySpan<byte> msgHeader)
{
// 'HMSG' literal
Split(msgHeader, out _, out msgHeader);
msgHeader.Split(out _, out msgHeader);

Split(msgHeader, out var subjectBytes, out msgHeader);
Split(msgHeader, out var sidBytes, out msgHeader);
Split(msgHeader, out var replyToOrHeaderLenBytes, out msgHeader);
Split(msgHeader, out var headerLenOrTotalLenBytes, out msgHeader);
msgHeader.Split(out var subjectBytes, out msgHeader);
msgHeader.Split(out var sidBytes, out msgHeader);
msgHeader.Split(out var replyToOrHeaderLenBytes, out msgHeader);
msgHeader.Split(out var headerLenOrTotalLenBytes, out msgHeader);

var subject = Encoding.ASCII.GetString(subjectBytes);
var sid = GetInt32(sidBytes);
Expand Down
17 changes: 13 additions & 4 deletions src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)

public async ValueTask SubscribeAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
{
if (subject.StartsWith(_inboxPrefix, StringComparison.Ordinal))
if (IsInboxSubject(subject))
{
await SubscribeInboxAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false);
}
Expand Down Expand Up @@ -140,11 +140,18 @@ public async ValueTask ReconnectAsync(CancellationToken cancellationToken)
await _connection
.SubscribeCoreAsync(sid, sub.Subject, sub.QueueGroup, sub.PendingMsgs, cancellationToken)
.ConfigureAwait(false);
sub.Ready();
await sub.ReadyAsync().ConfigureAwait(false);
}
}
}

public ISubscriptionManager GetManagerFor(string subject)
{
if (IsInboxSubject(subject))
return InboxSubBuilder;
return this;
}

private async ValueTask SubscribeInboxAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
{
if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel)
Expand All @@ -169,7 +176,7 @@ await SubscribeInternalAsync(
}
}

InboxSubBuilder.Register(sub);
await InboxSubBuilder.RegisterAsync(sub).ConfigureAwait(false);
}

private async ValueTask SubscribeInternalAsync(string subject, NatsSubOpts? opts, NatsSubBase sub, CancellationToken cancellationToken)
Expand All @@ -185,7 +192,7 @@ private async ValueTask SubscribeInternalAsync(string subject, NatsSubOpts? opts
{
await _connection.SubscribeCoreAsync(sid, subject, opts?.QueueGroup, opts?.MaxMsgs, cancellationToken)
.ConfigureAwait(false);
sub.Ready();
await sub.ReadyAsync().ConfigureAwait(false);
}
catch
{
Expand Down Expand Up @@ -241,4 +248,6 @@ private async ValueTask UnsubscribeSidsAsync(List<int> sids)
}
}
}

private bool IsInboxSubject(string subject) => subject.StartsWith(_inboxPrefix, StringComparison.Ordinal);
}
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsConnection.Subscribe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public async ValueTask<INatsSub> SubscribeAsync(string subject, NatsSubOpts? opt
public async ValueTask<INatsSub<T>> SubscribeAsync<T>(string subject, NatsSubOpts? opts = default, CancellationToken cancellationToken = default)
{
var serializer = opts?.Serializer ?? Options.Serializer;
var sub = new NatsSub<T>(this, SubscriptionManager, subject, opts, serializer);
var sub = new NatsSub<T>(this, SubscriptionManager.GetManagerFor(subject), subject, opts, serializer);
await SubAsync(subject, opts, sub, cancellationToken).ConfigureAwait(false);
return sub;
}
Expand Down
Loading

0 comments on commit 920a3ef

Please sign in to comment.