Skip to content

Commit

Permalink
JetStream API initial implementation of all major calls (#104)
Browse files Browse the repository at this point in the history
* JetStream API management functions

* Consumer fetch, next, consume implementations

* Fixed format warnings

* Ran dotnet format

* Trace logging for failing tests

* Merged management APIs

* Wrappers for managing streams and consumers

* Trace test flapper
  • Loading branch information
mtmk authored Aug 1, 2023
1 parent b00bbbc commit 9c60b06
Show file tree
Hide file tree
Showing 31 changed files with 1,401 additions and 299 deletions.
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Internal/CancellationTimer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace NATS.Client.Core.Internal;
namespace NATS.Client.Core.Internal;

// Support efficiently cancellation support for connection-dispose/timeout/cancel-per-operation
internal sealed class CancellationTimerPool
Expand Down
5 changes: 0 additions & 5 deletions src/NATS.Client.Core/Internal/DebuggingExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
#if DEBUG

using System.Buffers;
using System.Diagnostics;
using System.Text;

namespace NATS.Client.Core.Internal;
Expand Down Expand Up @@ -61,5 +58,3 @@ public static string Dump(this NatsHeaders? headers)
return sb.ToString();
}
}

#endif
15 changes: 14 additions & 1 deletion src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ internal sealed class NatsReadProtocolProcessor : IAsyncDisposable
private readonly Task _infoParsed; // wait for an upgrade
private readonly ConcurrentQueue<AsyncPingCommand> _pingCommands; // wait for pong
private readonly ILogger<NatsReadProtocolProcessor> _logger;
private readonly bool _trace;
private int _disposed;

public NatsReadProtocolProcessor(ISocketConnection socketConnection, NatsConnection connection, TaskCompletionSource waitForInfoSignal, TaskCompletionSource waitForPongOrErrorSignal, Task infoParsed)
{
_connection = connection;
_logger = connection.Options.LoggerFactory.CreateLogger<NatsReadProtocolProcessor>();
_logger.IsEnabled(LogLevel.Trace);
_trace = _logger.IsEnabled(LogLevel.Trace);
_waitForInfoSignal = waitForInfoSignal;
_waitForPongOrErrorSignal = waitForPongOrErrorSignal;
_infoParsed = infoParsed;
Expand Down Expand Up @@ -237,7 +238,19 @@ private async Task ReadLoopAsync()
}

var msgHeader = buffer.Slice(0, positionBeforeNatsHeader.Value);

if (_trace)
{
_logger.LogTrace("HMSG trace dump: {MsgHeader}", msgHeader.Dump());
}

var (subject, sid, replyTo, headersLength, totalLength) = ParseHMessageHeader(msgHeader);

if (_trace)
{
_logger.LogTrace("HMSG trace parsed: {Subject} {Sid} {ReplyTo} {HeadersLength} {TotalLength}", subject, sid, replyTo, headersLength, totalLength);
}

var payloadLength = totalLength - headersLength;
Debug.Assert(payloadLength >= 0, "Protocol error: illogical header and total lengths");

Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Internal/StringExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace NATS.Client.Core.Internal;
namespace NATS.Client.Core.Internal;

internal static class StringExtensions
{
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsHeaders.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Collections;
using System.Collections;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.Primitives;

Expand Down
16 changes: 14 additions & 2 deletions src/NATS.Client.Core/NatsMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace NATS.Client.Core;
public readonly record struct NatsMsg(
string Subject,
string? ReplyTo,
int Size,
NatsHeaders? Headers,
ReadOnlyMemory<byte> Data,
INatsConnection? Connection)
Expand All @@ -31,7 +32,12 @@ internal static NatsMsg Build(
headers.SetReadOnly();
}

return new NatsMsg(subject, replyTo, headers, payloadBuffer.ToArray(), connection);
var size = subject.Length
+ replyTo?.Length ?? 0
+ headersBuffer?.Length ?? 0
+ payloadBuffer.Length;

return new NatsMsg(subject, replyTo, (int)size, headers, payloadBuffer.ToArray(), connection);
}

public ValueTask ReplyAsync(ReadOnlySequence<byte> payload = default, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -64,6 +70,7 @@ private void CheckReplyPreconditions()
public readonly record struct NatsMsg<T>(
string Subject,
string? ReplyTo,
int Size,
NatsHeaders? Headers,
T? Data,
INatsConnection? Connection)
Expand Down Expand Up @@ -97,7 +104,12 @@ internal static NatsMsg<T> Build(
headers.SetReadOnly();
}

return new NatsMsg<T>(subject, replyTo, headers, data, connection);
var size = subject.Length
+ replyTo?.Length ?? 0
+ headersBuffer?.Length ?? 0
+ payloadBuffer.Length;

return new NatsMsg<T>(subject, replyTo, (int)size, headers, data, connection);
}

public ValueTask ReplyAsync<TReply>(TReply data, in NatsPubOpts? opts = default, CancellationToken cancellationToken = default)
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsSubChannelOpts.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Threading.Channels;
using System.Threading.Channels;

namespace NATS.Client.Core;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Buffers;
using System.Buffers;
using System.Text.Json;
using NATS.Client.Core;
using NATS.Client.JetStream.Models;
Expand All @@ -21,9 +21,7 @@ public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value) =>
var jsonDocument = JsonDocument.Parse(buffer);
if (jsonDocument.RootElement.TryGetProperty("error", out var errorElement))
{
var error = errorElement.Deserialize<ApiError>();
if (error == null)
throw new NatsJetStreamException("Can't parse JetStream error JSON payload");
var error = errorElement.Deserialize<ApiError>() ?? throw new NatsJSException("Can't parse JetStream error JSON payload");
throw new JSErrorException(error);
}

Expand Down
177 changes: 0 additions & 177 deletions src/NATS.Client.JetStream/JSContext.cs

This file was deleted.

22 changes: 0 additions & 22 deletions src/NATS.Client.JetStream/JSResponse.cs

This file was deleted.

Loading

0 comments on commit 9c60b06

Please sign in to comment.