Skip to content

Commit

Permalink
Validate stream name parameters in JSContext methods. (#379)
Browse files Browse the repository at this point in the history
* Add MacOS .DS_Store

* Add stream name parameter validation in NatsJSContext.Streams

* Fix test name

* Add stream name parameter validation for consumer methods

* Move validation methods

* Fix warning

* Add wilcard tests

* Remove checking for > and *

* Remove tests of * and > that were left behind

* Align .net6 and .net8 exception messages

---------

Co-authored-by: Niklas Petersen <[email protected]>
  • Loading branch information
niklasfp and niklasfp authored Feb 20, 2024
1 parent b68542e commit f693efe
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 2 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,5 @@ nuget/*.unitypackage
# NuGet package
/dist

# MacOS folder attributes
.DS_Store
12 changes: 12 additions & 0 deletions src/NATS.Client.JetStream/INatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public interface INatsJSContext
/// <param name="opts">Ordered consumer options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream consumer object which can be used retrieving ordered data from the stream.</returns>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
ValueTask<INatsJSConsumer> CreateOrderedConsumerAsync(
string stream,
NatsJSOrderedConsumerOpts? opts = default,
Expand All @@ -27,6 +29,8 @@ ValueTask<INatsJSConsumer> CreateOrderedConsumerAsync(
/// <returns>The NATS JetStream consumer object which can be used retrieving data from the stream.</returns>
/// <exception cref="NatsJSException">Ack policy is set to <c>none</c> or there was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
ValueTask<INatsJSConsumer> CreateOrUpdateConsumerAsync(
string stream,
ConsumerConfig config,
Expand All @@ -41,6 +45,8 @@ ValueTask<INatsJSConsumer> CreateOrUpdateConsumerAsync(
/// <returns>The NATS JetStream consumer object which can be used retrieving data from the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
ValueTask<INatsJSConsumer> GetConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default);

/// <summary>
Expand All @@ -51,6 +57,8 @@ ValueTask<INatsJSConsumer> CreateOrUpdateConsumerAsync(
/// <returns>Async enumerable of consumer info objects. Can be used in a <c>await foreach</c> loop.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
IAsyncEnumerable<INatsJSConsumer> ListConsumersAsync(
string stream,
CancellationToken cancellationToken = default);
Expand All @@ -63,6 +71,8 @@ IAsyncEnumerable<INatsJSConsumer> ListConsumersAsync(
/// <returns>Async enumerable of consumer info objects. Can be used in a <c>await foreach</c> loop.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
IAsyncEnumerable<string> ListConsumerNamesAsync(
string stream,
CancellationToken cancellationToken = default);
Expand All @@ -76,6 +86,8 @@ IAsyncEnumerable<string> ListConsumerNamesAsync(
/// <returns>Whether the deletion was successful.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
ValueTask<bool> DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default);

/// <summary>
Expand Down
13 changes: 13 additions & 0 deletions src/NATS.Client.JetStream/NatsJSContext.Consumers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ public partial class NatsJSContext : INatsJSContext
/// <param name="opts">Ordered consumer options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream consumer object which can be used retrieving ordered data from the stream.</returns>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
public ValueTask<INatsJSConsumer> CreateOrderedConsumerAsync(
string stream,
NatsJSOrderedConsumerOpts? opts = default,
CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(stream);
opts ??= NatsJSOrderedConsumerOpts.Default;
return new ValueTask<INatsJSConsumer>(new NatsJSOrderedConsumer(stream, this, opts, cancellationToken));
}
Expand All @@ -29,6 +32,8 @@ public async ValueTask<INatsJSConsumer> CreateOrUpdateConsumerAsync(
ConsumerConfig config,
CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(stream);

// TODO: Adjust API subject according to server version and filter subject
var subject = $"{Opts.Prefix}.CONSUMER.CREATE.{stream}";

Expand Down Expand Up @@ -64,8 +69,11 @@ public async ValueTask<INatsJSConsumer> CreateOrUpdateConsumerAsync(
/// <returns>The NATS JetStream consumer object which can be used retrieving data from the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
public async ValueTask<INatsJSConsumer> GetConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(stream);
var response = await JSRequestResponseAsync<object, ConsumerInfo>(
subject: $"{Opts.Prefix}.CONSUMER.INFO.{stream}.{consumer}",
request: null,
Expand All @@ -78,6 +86,7 @@ public async IAsyncEnumerable<INatsJSConsumer> ListConsumersAsync(
string stream,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(stream);
var offset = 0;
while (!cancellationToken.IsCancellationRequested)
{
Expand All @@ -103,6 +112,7 @@ public async IAsyncEnumerable<string> ListConsumerNamesAsync(
string stream,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(stream);
var offset = 0;
while (!cancellationToken.IsCancellationRequested)
{
Expand Down Expand Up @@ -130,8 +140,11 @@ public async IAsyncEnumerable<string> ListConsumerNamesAsync(
/// <returns>Whether the deletion was successful.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
public async ValueTask<bool> DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(stream);
var response = await JSRequestResponseAsync<object, ConsumerDeleteResponse>(
subject: $"{Opts.Prefix}.CONSUMER.DELETE.{stream}.{consumer}",
request: null,
Expand Down
20 changes: 18 additions & 2 deletions src/NATS.Client.JetStream/NatsJSContext.Streams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ public partial class NatsJSContext
/// <returns>The NATS JetStream stream object which can be used to manage the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The stream name in <paramref name="config"/> is invalid.</exception>
/// <exception cref="ArgumentNullException">The name in <paramref name="config"/> is <c>null</c>.</exception>
public async ValueTask<INatsJSStream> CreateStreamAsync(
StreamConfig config,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(config.Name, nameof(config.Name));
ThrowIfInvalidStreamName(config.Name, nameof(config.Name));
var response = await JSRequestResponseAsync<StreamConfig, StreamInfo>(
subject: $"{Opts.Prefix}.STREAM.CREATE.{config.Name}",
config,
Expand All @@ -33,10 +35,13 @@ public async ValueTask<INatsJSStream> CreateStreamAsync(
/// <returns>Whether delete was successful or not.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
public async ValueTask<bool> DeleteStreamAsync(
string stream,
CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(stream);
var response = await JSRequestResponseAsync<object, StreamMsgDeleteResponse>(
subject: $"{Opts.Prefix}.STREAM.DELETE.{stream}",
request: null,
Expand All @@ -53,11 +58,14 @@ public async ValueTask<bool> DeleteStreamAsync(
/// <returns>Purge response</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
public async ValueTask<StreamPurgeResponse> PurgeStreamAsync(
string stream,
StreamPurgeRequest request,
CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(stream);
var response = await JSRequestResponseAsync<StreamPurgeRequest, StreamPurgeResponse>(
subject: $"{Opts.Prefix}.STREAM.PURGE.{stream}",
request: request,
Expand All @@ -74,11 +82,14 @@ public async ValueTask<StreamPurgeResponse> PurgeStreamAsync(
/// <returns>Delete message response</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
public async ValueTask<StreamMsgDeleteResponse> DeleteMessageAsync(
string stream,
StreamMsgDeleteRequest request,
CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(stream);
var response = await JSRequestResponseAsync<StreamMsgDeleteRequest, StreamMsgDeleteResponse>(
subject: $"{Opts.Prefix}.STREAM.MSG.DELETE.{stream}",
request: request,
Expand All @@ -95,11 +106,14 @@ public async ValueTask<StreamMsgDeleteResponse> DeleteMessageAsync(
/// <returns>The NATS JetStream stream object which can be used to manage the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The <paramref name="stream"/> name is invalid.</exception>
/// <exception cref="ArgumentNullException">The <paramref name="stream"/> name is <c>null</c>.</exception>
public async ValueTask<INatsJSStream> GetStreamAsync(
string stream,
StreamInfoRequest? request = null,
CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(stream);
var response = await JSRequestResponseAsync<StreamInfoRequest, StreamInfoResponse>(
subject: $"{Opts.Prefix}.STREAM.INFO.{stream}",
request: request,
Expand All @@ -115,11 +129,13 @@ public async ValueTask<INatsJSStream> GetStreamAsync(
/// <returns>The updated NATS JetStream stream object.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The stream name in <paramref name="request"/> is invalid.</exception>
/// <exception cref="ArgumentNullException">The name in <paramref name="request"/> is <c>null</c>.</exception>
public async ValueTask<NatsJSStream> UpdateStreamAsync(
StreamConfig request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request.Name, nameof(request.Name));
ThrowIfInvalidStreamName(request.Name, nameof(request.Name));
var response = await JSRequestResponseAsync<StreamConfig, StreamUpdateResponse>(
subject: $"{Opts.Prefix}.STREAM.UPDATE.{request.Name}",
request: request,
Expand Down
25 changes: 25 additions & 0 deletions src/NATS.Client.JetStream/NatsJSContext.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
Expand Down Expand Up @@ -172,6 +173,22 @@ public async ValueTask<PubAckResponse> PublishAsync<T>(
throw new NatsJSPublishNoResponseException();
}

internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArgumentExpression("name")] string? paramName = null)
{
ArgumentNullException.ThrowIfNull(name, paramName);

if (name.Length == 0)
{
ThrowEmptyException(paramName);
}

var nameSpan = name.AsSpan();
if (nameSpan.IndexOfAny(" .") >= 0)
{
ThrowInvalidStreamNameException(paramName);
}
}

internal string NewInbox() => NatsConnection.NewInbox(Connection.Opts.InboxPrefix);

internal async ValueTask<TResponse> JSRequestResponseAsync<TRequest, TResponse>(
Expand Down Expand Up @@ -237,4 +254,12 @@ internal async ValueTask<NatsJSResponse<TResponse>> JSRequestAsync<TRequest, TRe

throw new NatsJSApiNoResponseException();
}

[DoesNotReturn]
private static void ThrowInvalidStreamNameException(string? paramName) =>
throw new ArgumentException("Stream name cannot contain ' ', '.'", paramName);

[DoesNotReturn]
private static void ThrowEmptyException(string? paramName) =>
throw new ArgumentException("The value cannot be an empty string.", paramName);
}
39 changes: 39 additions & 0 deletions tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,45 @@ public class ConsumerConsumeTest

public ConsumerConsumeTest(ITestOutputHelper output) => _output = output;

[Theory]
[InlineData("Invalid.DotName")]
[InlineData("Invalid SpaceName")]
[InlineData(null)]
public async Task Consumer_stream_invalid_name_test(string? streamName)
{
var jsmContext = new NatsJSContext(new NatsConnection());

var consumerConfig = new ConsumerConfig("aconsumer");

// Create ordered consumer
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await jsmContext.CreateOrderedConsumerAsync(streamName!, cancellationToken: CancellationToken.None));

// Create or update consumer
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await jsmContext.CreateOrUpdateConsumerAsync(streamName!, consumerConfig));

// Get consumer
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await jsmContext.GetConsumerAsync(streamName!, "aconsumer"));

// List consumers
await Assert.ThrowsAnyAsync<ArgumentException>(async () =>
{
await foreach (var unused in jsmContext.ListConsumersAsync(streamName!, CancellationToken.None))
{
}
});

// List consumer names
await Assert.ThrowsAnyAsync<ArgumentException>(async () =>
{
await foreach (var unused in jsmContext.ListConsumerNamesAsync(streamName!, CancellationToken.None))
{
}
});

// Delete consumer
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await jsmContext.DeleteConsumerAsync(streamName!, "aconsumer"));
}

[Fact]
public async Task Consume_msgs_test()
{
Expand Down
36 changes: 36 additions & 0 deletions tests/NATS.Client.JetStream.Tests/JetStreamTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,42 @@ public class JetStreamTest

public JetStreamTest(ITestOutputHelper output) => _output = output;

[Theory]
[InlineData("Invalid.DotName")]
[InlineData("Invalid SpaceName")]
[InlineData(null)]
public async Task Stream_invalid_name_test(string? streamName)
{
var jsmContext = new NatsJSContext(new NatsConnection());

var cfg = new StreamConfig()
{
Name = streamName,
Subjects = new[] { "events.*" },
};

// Create stream
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await jsmContext.CreateStreamAsync(cfg));

// Delete stream
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await jsmContext.DeleteStreamAsync(streamName!));

// Get stream
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await jsmContext.GetStreamAsync(streamName!, null));

// Update stream
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await jsmContext.UpdateStreamAsync(cfg));

// Purge stream
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await jsmContext.PurgeStreamAsync(streamName!, new StreamPurgeRequest()));

// Get stream
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await jsmContext.GetStreamAsync(streamName!));

// Delete Messages
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await jsmContext.DeleteMessageAsync(streamName!, new StreamMsgDeleteRequest()));
}

[Fact]
public async Task Create_stream_test()
{
Expand Down
8 changes: 8 additions & 0 deletions tests/NATS.Client.JetStream.Tests/NatsJSContextTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,12 @@ public void InterfaceShouldHaveSamePublicPropertiesEventsAndMethodAsClass()
interfaceMethods.Select(m => m.Name).Should().Contain(name);
}
}

[Fact]
public void Invalid_stream_validation_test()
{
Assert.Throws<ArgumentNullException>(() => NatsJSContext.ThrowIfInvalidStreamName(null!));
Assert.Throws<ArgumentException>(() => NatsJSContext.ThrowIfInvalidStreamName("Invalid.DotName"));
Assert.Throws<ArgumentException>(() => NatsJSContext.ThrowIfInvalidStreamName("Invalid SpaceName"));
}
}

0 comments on commit f693efe

Please sign in to comment.