From 3083db55781104b4069902943d0807035fbe8ac1 Mon Sep 17 00:00:00 2001 From: mtmk Date: Mon, 7 Oct 2024 15:38:23 +0100 Subject: [PATCH] Add other client extensions (#637) * Add client extension methods and refactor interfaces Refactor to use interfaces for enhanced flexibility and testability across various components like NatsJSContext and NatsConnection. Added new extension methods to easily create contexts for Object Store, Key-Value Store, and Services on NATS client and connection instances. * Add public `Context` properties to store interfaces Updated INatsObjStore, INatsKVContext, and INatsSvcContext interfaces to include public `Context` properties. This change ensures consistent access to the underlying context objects across various components. * dotnet format * Refactor context creation to use JetStream directly * Make extensions namespace NATS.Net * Removed debug print * Rename to JetStreamContext * Fix inheritdoc * Fix build warnings * Fix build warnings and add test * Fix test --- sandbox/Example.Client/Program.cs | 19 +++ src/NATS.Client.JetStream/INatsJSContext.cs | 27 +++ .../Internal/NatsJSOrderedPushConsumer.cs | 10 +- .../NatsClientExtensions.cs | 14 +- src/NATS.Client.JetStream/NatsJSConsumer.cs | 6 +- src/NATS.Client.JetStream/NatsJSContext.cs | 33 ++-- src/NATS.Client.JetStream/NatsJSMsg.cs | 4 +- .../INatsKVContext.cs | 5 + src/NATS.Client.KeyValueStore/INatsKVStore.cs | 6 + .../Internal/NatsKVWatchSub.cs | 6 +- .../Internal/NatsKVWatcher.cs | 4 +- .../NatsClientExtensions.cs | 33 ++++ .../NatsKVContext.cs | 91 +++------- src/NATS.Client.KeyValueStore/NatsKVStore.cs | 22 ++- .../INatsObjContext.cs | 7 + src/NATS.Client.ObjectStore/INatsObjStore.cs | 5 + .../NatsClientExtensions.cs | 33 ++++ src/NATS.Client.ObjectStore/NatsObjContext.cs | 45 ++--- src/NATS.Client.ObjectStore/NatsObjStore.cs | 155 ++++-------------- src/NATS.Client.Services/INatsSvcContext.cs | 7 + .../Internal/SvcListener.cs | 4 +- .../NatsClientExtensions.cs | 24 +++ src/NATS.Client.Services/NatsSvcContext.cs | 9 +- src/NATS.Client.Services/NatsSvcEndPoint.cs | 6 +- src/NATS.Client.Services/NatsSvcServer.cs | 4 +- .../NatsClientDefaultSerializer.cs | 14 +- .../NatsKVContextFactoryTest.cs | 7 + .../NatsObjContextFactoryTest.cs | 7 + 28 files changed, 321 insertions(+), 286 deletions(-) create mode 100644 src/NATS.Client.KeyValueStore/NatsClientExtensions.cs create mode 100644 src/NATS.Client.ObjectStore/NatsClientExtensions.cs create mode 100644 src/NATS.Client.Services/NatsClientExtensions.cs diff --git a/sandbox/Example.Client/Program.cs b/sandbox/Example.Client/Program.cs index 4bac3d56a..c72d89ba8 100644 --- a/sandbox/Example.Client/Program.cs +++ b/sandbox/Example.Client/Program.cs @@ -2,6 +2,9 @@ using System.Text; using NATS.Client.JetStream; +using NATS.Client.KeyValueStore; +using NATS.Client.ObjectStore; +using NATS.Client.Services; using NATS.Net; CancellationTokenSource cts = new(); @@ -95,6 +98,22 @@ Console.WriteLine($"JetStream Stream: {stream.Info.Config.Name}"); } +// Use KeyValueStore by referencing NATS.Client.KeyValueStore package +var kv1 = client.CreateKeyValueStoreContext(); +var kv2 = js.CreateKeyValueStoreContext(); +await kv1.CreateStoreAsync("store1"); +await kv2.CreateStoreAsync("store1"); + +// Use ObjectStore by referencing NATS.Client.ObjectStore package +var obj1 = client.CreateObjectStoreContext(); +var obj2 = js.CreateObjectStoreContext(); +await obj1.CreateObjectStoreAsync("store1"); +await obj2.CreateObjectStoreAsync("store1"); + +// Use Services by referencing NATS.Client.Services package +var svc = client.CreateServicesContext(); +await svc.AddServiceAsync("service1", "1.0.0"); + await cts.CancelAsync(); await Task.WhenAll(tasks); diff --git a/src/NATS.Client.JetStream/INatsJSContext.cs b/src/NATS.Client.JetStream/INatsJSContext.cs index 2c51a20cb..a23eff2c8 100644 --- a/src/NATS.Client.JetStream/INatsJSContext.cs +++ b/src/NATS.Client.JetStream/INatsJSContext.cs @@ -11,6 +11,11 @@ public interface INatsJSContext /// INatsConnection Connection { get; } + /// + /// Provides configuration options for the JetStream context. + /// + NatsJSOpts Opts { get; } + /// /// Creates new ordered consumer. /// @@ -296,4 +301,26 @@ ValueTask PublishConcurrentAsync( NatsJSPubOpts? opts = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default); + + /// + /// Generates a new base inbox string using the connection's inbox prefix. + /// + /// A new inbox string. + string NewBaseInbox(); + + /// + /// Sends a request message to a JetStream subject and waits for a response. + /// + /// The JetStream API subject to send the request to. + /// The request message object. + /// A used to cancel the API call. + /// The type of the request message. + /// The type of the response message. + /// A task representing the asynchronous operation, with a result of type . + ValueTask JSRequestResponseAsync( + string subject, + TRequest? request, + CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class; } diff --git a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs index 03e8e51bf..d10b1ac09 100644 --- a/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs +++ b/src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs @@ -45,7 +45,7 @@ internal class NatsJSOrderedPushConsumer { private readonly ILogger _logger; private readonly bool _debug; - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly string _stream; private readonly string _filter; private readonly INatsDeserialize _serializer; @@ -68,7 +68,7 @@ internal class NatsJSOrderedPushConsumer private int _done; public NatsJSOrderedPushConsumer( - NatsJSContext context, + INatsJSContext context, string stream, string filter, INatsDeserialize serializer, @@ -417,7 +417,7 @@ private void CreateSub(string origin) internal class NatsJSOrderedPushConsumerSub : NatsSubBase { - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly CancellationToken _cancellationToken; private readonly INatsConnection _nats; private readonly NatsHeaderParser _headerParser; @@ -425,7 +425,7 @@ internal class NatsJSOrderedPushConsumerSub : NatsSubBase private readonly ChannelWriter> _commands; public NatsJSOrderedPushConsumerSub( - NatsJSContext context, + INatsJSContext context, Channel> commandChannel, INatsDeserialize serializer, NatsSubOpts? opts, @@ -433,7 +433,7 @@ public NatsJSOrderedPushConsumerSub( : base( connection: context.Connection, manager: context.Connection.SubscriptionManager, - subject: context.NewInbox(), + subject: context.NewBaseInbox(), queueGroup: default, opts) { diff --git a/src/NATS.Client.JetStream/NatsClientExtensions.cs b/src/NATS.Client.JetStream/NatsClientExtensions.cs index 31e4e9809..3c8c0f75b 100644 --- a/src/NATS.Client.JetStream/NatsClientExtensions.cs +++ b/src/NATS.Client.JetStream/NatsClientExtensions.cs @@ -1,12 +1,24 @@ using NATS.Client.Core; +using NATS.Client.JetStream; -namespace NATS.Client.JetStream; +// ReSharper disable once CheckNamespace +namespace NATS.Net; public static class NatsClientExtensions { + /// + /// Creates a JetStream context using the provided NATS client. + /// + /// The NATS client used to create the JetStream context. + /// Returns an instance of for interacting with JetStream. public static INatsJSContext CreateJetStreamContext(this INatsClient client) => CreateJetStreamContext(client.Connection); + /// + /// Creates a JetStream context using the provided NATS connection. + /// + /// The NATS connection used to create the JetStream context. + /// Returns an instance of for interacting with JetStream. public static INatsJSContext CreateJetStreamContext(this INatsConnection connection) => new NatsJSContext(connection); } diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 9f21017ec..6d8fd736f 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -284,7 +284,7 @@ internal async ValueTask> ConsumeInternalAsync(INatsDeserial opts ??= new NatsJSConsumeOpts(); serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer(); - var inbox = _context.NewInbox(); + var inbox = _context.NewBaseInbox(); var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes); var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat); @@ -332,7 +332,7 @@ internal async ValueTask> OrderedConsumeInternalAsync ThrowIfDeleted(); serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer(); - var inbox = _context.NewInbox(); + var inbox = _context.NewBaseInbox(); var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes, opts.ThresholdMsgs, opts.ThresholdBytes); var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat); @@ -382,7 +382,7 @@ internal async ValueTask> FetchInternalAsync( ThrowIfDeleted(); serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer(); - var inbox = _context.NewInbox(); + var inbox = _context.NewBaseInbox(); var max = NatsJSOptsDefaults.SetMax(opts.MaxMsgs, opts.MaxBytes); var timeouts = NatsJSOptsDefaults.SetTimeouts(opts.Expires, opts.IdleHeartbeat); diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index c0c6f9094..8076dccec 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -32,7 +32,8 @@ public NatsJSContext(INatsConnection connection, NatsJSOpts opts) public INatsConnection Connection { get; } - internal NatsJSOpts Opts { get; } + /// + public NatsJSOpts Opts { get; } /// /// Calls JetStream Account Info API. @@ -238,6 +239,22 @@ public async ValueTask PublishConcurrentAsync( return new NatsJSPublishConcurrentFuture(sub); } + /// + public string NewBaseInbox() => NatsConnection.NewInbox(Connection.Opts.InboxPrefix); + + /// + public async ValueTask JSRequestResponseAsync( + string subject, + TRequest? request, + CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class + { + var response = await JSRequestAsync(subject, request, cancellationToken); + response.EnsureSuccess(); + return response.Response!; + } + internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArgumentExpression("name")] string? paramName = null) { #if NETSTANDARD @@ -262,20 +279,6 @@ internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArg } } - internal string NewInbox() => NatsConnection.NewInbox(Connection.Opts.InboxPrefix); - - internal async ValueTask JSRequestResponseAsync( - string subject, - TRequest? request, - CancellationToken cancellationToken = default) - where TRequest : class - where TResponse : class - { - var response = await JSRequestAsync(subject, request, cancellationToken); - response.EnsureSuccess(); - return response.Response!; - } - internal async ValueTask> JSRequestAsync( string subject, TRequest? request, diff --git a/src/NATS.Client.JetStream/NatsJSMsg.cs b/src/NATS.Client.JetStream/NatsJSMsg.cs index e5a0bd151..cc6830054 100644 --- a/src/NATS.Client.JetStream/NatsJSMsg.cs +++ b/src/NATS.Client.JetStream/NatsJSMsg.cs @@ -140,11 +140,11 @@ public interface INatsJSMsg /// User message type public readonly struct NatsJSMsg : INatsJSMsg { - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly NatsMsg _msg; private readonly Lazy _replyToDateTimeAndSeq; - public NatsJSMsg(NatsMsg msg, NatsJSContext context) + public NatsJSMsg(NatsMsg msg, INatsJSContext context) { _msg = msg; _context = context; diff --git a/src/NATS.Client.KeyValueStore/INatsKVContext.cs b/src/NATS.Client.KeyValueStore/INatsKVContext.cs index 2c00bf951..cdf371037 100644 --- a/src/NATS.Client.KeyValueStore/INatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/INatsKVContext.cs @@ -4,6 +4,11 @@ namespace NATS.Client.KeyValueStore; public interface INatsKVContext { + /// + /// Provides access to the JetStream context associated with the Key-Value Store operations. + /// + INatsJSContext JetStreamContext { get; } + /// /// Create a new Key Value Store or get an existing one /// diff --git a/src/NATS.Client.KeyValueStore/INatsKVStore.cs b/src/NATS.Client.KeyValueStore/INatsKVStore.cs index a1ed8ce37..1c99a0a3c 100644 --- a/src/NATS.Client.KeyValueStore/INatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/INatsKVStore.cs @@ -1,9 +1,15 @@ using NATS.Client.Core; +using NATS.Client.JetStream; namespace NATS.Client.KeyValueStore; public interface INatsKVStore { + /// + /// Provides access to the JetStream context associated with the Object Store operations. + /// + INatsJSContext JetStreamContext { get; } + /// /// Name of the Key Value Store bucket /// diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs index e2f7f1d3b..7b7f59335 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatchSub.cs @@ -7,7 +7,7 @@ namespace NATS.Client.KeyValueStore.Internal; internal class NatsKVWatchSub : NatsSubBase { - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly CancellationToken _cancellationToken; private readonly INatsConnection _nats; private readonly NatsHeaderParser _headerParser; @@ -15,7 +15,7 @@ internal class NatsKVWatchSub : NatsSubBase private readonly ChannelWriter> _commands; public NatsKVWatchSub( - NatsJSContext context, + INatsJSContext context, Channel> commandChannel, INatsDeserialize serializer, NatsSubOpts? opts, @@ -23,7 +23,7 @@ public NatsKVWatchSub( : base( connection: context.Connection, manager: context.Connection.SubscriptionManager, - subject: context.NewInbox(), + subject: context.NewBaseInbox(), queueGroup: default, opts) { diff --git a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs index 30f54dd00..ad2812eeb 100644 --- a/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs +++ b/src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs @@ -28,7 +28,7 @@ internal sealed class NatsKVWatcher : IAsyncDisposable { private readonly ILogger _logger; private readonly bool _debug; - private readonly NatsJSContext _context; + private readonly INatsJSContext _context; private readonly string _bucket; private readonly INatsDeserialize _serializer; private readonly NatsKVWatchOpts _opts; @@ -53,7 +53,7 @@ internal sealed class NatsKVWatcher : IAsyncDisposable private INatsJSConsumer? _initialConsumer; public NatsKVWatcher( - NatsJSContext context, + INatsJSContext context, string bucket, IEnumerable keys, INatsDeserialize serializer, diff --git a/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs b/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs new file mode 100644 index 000000000..3c616065a --- /dev/null +++ b/src/NATS.Client.KeyValueStore/NatsClientExtensions.cs @@ -0,0 +1,33 @@ +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.KeyValueStore; + +// ReSharper disable once CheckNamespace +namespace NATS.Net; + +public static class NatsClientExtensions +{ + /// + /// Creates a NATS Key-Value Store context using the specified NATS client. + /// + /// The NATS client instance. + /// An instance of which can be used to interact with the Key-Value Store. + public static INatsKVContext CreateKeyValueStoreContext(this INatsClient client) + => CreateKeyValueStoreContext(client.CreateJetStreamContext()); + + /// + /// Creates a NATS Key-Value Store context using the specified NATS connection. + /// + /// The NATS connection instance. + /// An instance of which can be used to interact with the Key-Value Store. + public static INatsKVContext CreateKeyValueStoreContext(this INatsConnection connection) + => CreateKeyValueStoreContext(connection.CreateJetStreamContext()); + + /// + /// Creates a NATS Key-Value Store context using the specified NATS JetStream context. + /// + /// The NATS JetStream context instance. + /// An instance of which can be used to interact with the Key-Value Store. + public static INatsKVContext CreateKeyValueStoreContext(this INatsJSContext context) + => new NatsKVContext(context); +} diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index cd0b1871c..501a261aa 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -23,59 +23,37 @@ public class NatsKVContext : INatsKVContext private static readonly int KvStreamNamePrefixLen = KvStreamNamePrefix.Length; private static readonly Regex ValidBucketRegex = new(pattern: @"\A[a-zA-Z0-9_-]+\z", RegexOptions.Compiled); - private readonly NatsJSContext _context; - /// /// Create a new Key Value Store context /// /// JetStream context - public NatsKVContext(NatsJSContext context) => _context = context; + public NatsKVContext(INatsJSContext context) => JetStreamContext = context; - /// - /// Create a new Key Value Store or get an existing one - /// - /// Name of the bucket - /// A used to cancel the API call. - /// Key Value Store - /// There was an issue retrieving the response. - /// Server responded with an error. + /// + public INatsJSContext JetStreamContext { get; } + + /// public ValueTask CreateStoreAsync(string bucket, CancellationToken cancellationToken = default) => CreateStoreAsync(new NatsKVConfig(bucket), cancellationToken); - /// - /// Create a new Key Value Store or get an existing one - /// - /// Key Value Store configuration - /// A used to cancel the API call. - /// Key Value Store - /// There was an issue with configuration - /// There was an issue retrieving the response. - /// Server responded with an error. + /// public async ValueTask CreateStoreAsync(NatsKVConfig config, CancellationToken cancellationToken = default) { ValidateBucketName(config.Bucket); var streamConfig = NatsKVContext.CreateStreamConfig(config); - var stream = await _context.CreateStreamAsync(streamConfig, cancellationToken); + var stream = await JetStreamContext.CreateStreamAsync(streamConfig, cancellationToken); - return new NatsKVStore(config.Bucket, _context, stream); + return new NatsKVStore(config.Bucket, JetStreamContext, stream); } - /// - /// Get a Key Value Store - /// - /// Name of the bucjet - /// A used to cancel the API call. - /// Key Value Store - /// There was an issue with configuration - /// There was an issue retrieving the response. - /// Server responded with an error. + /// public async ValueTask GetStoreAsync(string bucket, CancellationToken cancellationToken = default) { ValidateBucketName(bucket); - var stream = await _context.GetStreamAsync(BucketToStream(bucket), cancellationToken: cancellationToken); + var stream = await JetStreamContext.GetStreamAsync(BucketToStream(bucket), cancellationToken: cancellationToken); if (stream.Info.Config.MaxMsgsPerSubject < 1) { @@ -83,53 +61,32 @@ public async ValueTask GetStoreAsync(string bucket, CancellationTo } // TODO: KV mirror - return new NatsKVStore(bucket, _context, stream); + return new NatsKVStore(bucket, JetStreamContext, stream); } - /// - /// Update a key value store configuration. Storage type cannot change. - /// - /// Key Value Store configuration - /// used to cancel the API call. - /// Key Value Store - /// There was an issue with configuration - /// There was an issue retrieving the response. - /// Server responded with an error. + /// public async ValueTask UpdateStoreAsync(NatsKVConfig config, CancellationToken cancellationToken = default) { ValidateBucketName(config.Bucket); var streamConfig = NatsKVContext.CreateStreamConfig(config); - var stream = await _context.UpdateStreamAsync(streamConfig, cancellationToken); + var stream = await JetStreamContext.UpdateStreamAsync(streamConfig, cancellationToken); - return new NatsKVStore(config.Bucket, _context, stream); + return new NatsKVStore(config.Bucket, JetStreamContext, stream); } - /// - /// Delete a Key Value Store - /// - /// Name of the bucket - /// A used to cancel the API call. - /// True for success - /// There was an issue retrieving the response. - /// Server responded with an error. + /// public ValueTask DeleteStoreAsync(string bucket, CancellationToken cancellationToken = default) { ValidateBucketName(bucket); - return _context.DeleteStreamAsync(BucketToStream(bucket), cancellationToken); + return JetStreamContext.DeleteStreamAsync(BucketToStream(bucket), cancellationToken); } - /// - /// Get a list of bucket names - /// - /// used to cancel the API call. - /// Async enumerable of bucket names. Can be used in a await foreach loop. - /// There was an issue retrieving the response. - /// Server responded with an error. + /// public async IAsyncEnumerable GetBucketNamesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { - await foreach (var name in _context.ListStreamNamesAsync(cancellationToken: cancellationToken)) + await foreach (var name in JetStreamContext.ListStreamNamesAsync(cancellationToken: cancellationToken)) { if (!name.StartsWith(KvStreamNamePrefix)) { @@ -140,18 +97,12 @@ public async IAsyncEnumerable GetBucketNamesAsync([EnumeratorCancellatio } } - /// - /// Gets the status for all buckets - /// - /// used to cancel the API call. - /// Async enumerable of Key/Value statuses. Can be used in a await foreach loop. - /// There was an issue retrieving the response. - /// Server responded with an error. + /// public async IAsyncEnumerable GetStatusesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { - await foreach (var name in _context.ListStreamNamesAsync(cancellationToken: cancellationToken)) + await foreach (var name in JetStreamContext.ListStreamNamesAsync(cancellationToken: cancellationToken)) { - var stream = await _context.GetStreamAsync(name, cancellationToken: cancellationToken); + var stream = await JetStreamContext.GetStreamAsync(name, cancellationToken: cancellationToken); var isCompressed = stream.Info.Config.Compression != StreamConfigCompression.None; yield return new NatsKVStatus(name, isCompressed, stream.Info); } diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index d42c9b6f2..cd2a32fc9 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -44,23 +44,26 @@ public class NatsKVStore : INatsKVStore private const string NatsSequence = "Nats-Sequence"; private const string NatsTimeStamp = "Nats-Time-Stamp"; private static readonly Regex ValidKeyRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled); - private readonly NatsJSContext _context; private readonly INatsJSStream _stream; - internal NatsKVStore(string bucket, NatsJSContext context, INatsJSStream stream) + internal NatsKVStore(string bucket, INatsJSContext context, INatsJSStream stream) { Bucket = bucket; - _context = context; + JetStreamContext = context; _stream = stream; } + /// + public INatsJSContext JetStreamContext { get; } + + /// public string Bucket { get; } /// public async ValueTask PutAsync(string key, T value, INatsSerialize? serializer = default, CancellationToken cancellationToken = default) { ValidateKey(key); - var ack = await _context.PublishAsync($"$KV.{Bucket}.{key}", value, serializer: serializer, cancellationToken: cancellationToken); + var ack = await JetStreamContext.PublishAsync($"$KV.{Bucket}.{key}", value, serializer: serializer, cancellationToken: cancellationToken); ack.EnsureSuccess(); return ack.Seq; } @@ -100,7 +103,7 @@ public async ValueTask UpdateAsync(string key, T value, ulong revision try { - var ack = await _context.PublishAsync($"$KV.{Bucket}.{key}", value, headers: headers, serializer: serializer, cancellationToken: cancellationToken); + var ack = await JetStreamContext.PublishAsync($"$KV.{Bucket}.{key}", value, headers: headers, serializer: serializer, cancellationToken: cancellationToken); ack.EnsureSuccess(); return ack.Seq; @@ -143,7 +146,7 @@ public async ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, try { - var ack = await _context.PublishAsync(subject, null, headers: headers, cancellationToken: cancellationToken); + var ack = await JetStreamContext.PublishAsync(subject, null, headers: headers, cancellationToken: cancellationToken); ack.EnsureSuccess(); } catch (NatsJSApiException e) @@ -157,6 +160,7 @@ public async ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, } } + /// public ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default) => DeleteAsync(key, (opts ?? new NatsKVDeleteOpts()) with { Purge = true }, cancellationToken); @@ -164,7 +168,7 @@ public ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, Cancel public async ValueTask> GetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) { ValidateKey(key); - serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer(); + serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); var request = new StreamMsgGetRequest(); var keySubject = $"$KV.{Bucket}.{key}"; @@ -425,12 +429,12 @@ public async IAsyncEnumerable GetKeysAsync(IEnumerable filters, internal async ValueTask> WatchInternalAsync(IEnumerable keys, INatsDeserialize? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default) { opts ??= NatsKVWatchOpts.Default; - serializer ??= _context.Connection.Opts.SerializerRegistry.GetDeserializer(); + serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); opts.ThrowIfInvalid(); var watcher = new NatsKVWatcher( - context: _context, + context: JetStreamContext, bucket: Bucket, keys: keys, opts: opts, diff --git a/src/NATS.Client.ObjectStore/INatsObjContext.cs b/src/NATS.Client.ObjectStore/INatsObjContext.cs index 7d31337cc..f5dfd00a0 100644 --- a/src/NATS.Client.ObjectStore/INatsObjContext.cs +++ b/src/NATS.Client.ObjectStore/INatsObjContext.cs @@ -1,3 +1,5 @@ +using NATS.Client.JetStream; + namespace NATS.Client.ObjectStore; /// @@ -5,6 +7,11 @@ namespace NATS.Client.ObjectStore; /// public interface INatsObjContext { + /// + /// Provides access to the JetStream context associated with the Object Store operations. + /// + INatsJSContext JetStreamContext { get; } + /// /// Create a new object store. /// diff --git a/src/NATS.Client.ObjectStore/INatsObjStore.cs b/src/NATS.Client.ObjectStore/INatsObjStore.cs index 255a5618b..68bb23e6d 100644 --- a/src/NATS.Client.ObjectStore/INatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/INatsObjStore.cs @@ -8,6 +8,11 @@ namespace NATS.Client.ObjectStore; /// public interface INatsObjStore { + /// + /// Provides access to the JetStream context associated with the Object Store operations. + /// + INatsJSContext JetStreamContext { get; } + /// /// Object store bucket name. /// diff --git a/src/NATS.Client.ObjectStore/NatsClientExtensions.cs b/src/NATS.Client.ObjectStore/NatsClientExtensions.cs new file mode 100644 index 000000000..104c7a2e0 --- /dev/null +++ b/src/NATS.Client.ObjectStore/NatsClientExtensions.cs @@ -0,0 +1,33 @@ +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.ObjectStore; + +// ReSharper disable once CheckNamespace +namespace NATS.Net; + +public static class NatsClientExtensions +{ + /// + /// Creates a NATS Object Store context for the given NATS client. + /// + /// The NATS client instance. + /// An instance of used for interacting with the NATS Object Store. + public static INatsObjContext CreateObjectStoreContext(this INatsClient client) + => CreateObjectStoreContext(client.CreateJetStreamContext()); + + /// + /// Creates a NATS Object Store context for the given NATS connection. + /// + /// The NATS connection instance. + /// An instance of used for interacting with the NATS Object Store. + public static INatsObjContext CreateObjectStoreContext(this INatsConnection connection) + => CreateObjectStoreContext(connection.CreateJetStreamContext()); + + /// + /// Creates a NATS Object Store context for the given NATS JetStream context. + /// + /// The NATS JetStream context instance. + /// An instance of used for interacting with the NATS Object Store. + public static INatsObjContext CreateObjectStoreContext(this INatsJSContext context) + => new NatsObjContext(context); +} diff --git a/src/NATS.Client.ObjectStore/NatsObjContext.cs b/src/NATS.Client.ObjectStore/NatsObjContext.cs index c2afb850f..969239e17 100644 --- a/src/NATS.Client.ObjectStore/NatsObjContext.cs +++ b/src/NATS.Client.ObjectStore/NatsObjContext.cs @@ -12,29 +12,20 @@ public class NatsObjContext : INatsObjContext { private static readonly Regex ValidBucketRegex = new(pattern: @"\A[a-zA-Z0-9_-]+\z", RegexOptions.Compiled); - private readonly NatsJSContext _context; - /// /// Create a new object store context. /// /// JetStream context. - public NatsObjContext(NatsJSContext context) => _context = context; + public NatsObjContext(INatsJSContext context) => JetStreamContext = context; - /// - /// Create a new object store. - /// - /// Bucket name. - /// A used to cancel the API call. - /// Object store object. + /// + public INatsJSContext JetStreamContext { get; } + + /// public ValueTask CreateObjectStoreAsync(string bucket, CancellationToken cancellationToken = default) => CreateObjectStoreAsync(new NatsObjConfig(bucket), cancellationToken); - /// - /// Create a new object store. - /// - /// Object store configuration. - /// A used to cancel the API call. - /// Object store object. + /// public async ValueTask CreateObjectStoreAsync(NatsObjConfig config, CancellationToken cancellationToken = default) { ValidateBucketName(config.Bucket); @@ -61,33 +52,23 @@ public async ValueTask CreateObjectStoreAsync(NatsObjConfig confi Compression = config.Compression ? StreamConfigCompression.S2 : StreamConfigCompression.None, }; - var stream = await _context.CreateStreamAsync(streamConfig, cancellationToken); - return new NatsObjStore(config, this, _context, stream); + var stream = await JetStreamContext.CreateStreamAsync(streamConfig, cancellationToken); + return new NatsObjStore(config, this, JetStreamContext, stream); } - /// - /// Get an existing object store. - /// - /// Bucket name - /// A used to cancel the API call. - /// The Object Store object + /// public async ValueTask GetObjectStoreAsync(string bucket, CancellationToken cancellationToken = default) { ValidateBucketName(bucket); - var stream = await _context.GetStreamAsync($"OBJ_{bucket}", cancellationToken: cancellationToken); - return new NatsObjStore(new NatsObjConfig(bucket), this, _context, stream); + var stream = await JetStreamContext.GetStreamAsync($"OBJ_{bucket}", cancellationToken: cancellationToken); + return new NatsObjStore(new NatsObjConfig(bucket), this, JetStreamContext, stream); } - /// - /// Delete an object store. - /// - /// Name of the bucket. - /// A used to cancel the API call. - /// Whether delete was successful or not. + /// public ValueTask DeleteObjectStore(string bucket, CancellationToken cancellationToken) { ValidateBucketName(bucket); - return _context.DeleteStreamAsync($"OBJ_{bucket}", cancellationToken); + return JetStreamContext.DeleteStreamAsync($"OBJ_{bucket}", cancellationToken); } private void ValidateBucketName(string bucket) diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index 5e6c8006a..7fdc28f67 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -28,28 +28,23 @@ public class NatsObjStore : INatsObjStore private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } }; private readonly NatsObjContext _objContext; - private readonly NatsJSContext _context; private readonly INatsJSStream _stream; - internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, NatsJSContext context, INatsJSStream stream) + internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, INatsJSContext context, INatsJSStream stream) { Bucket = config.Bucket; _objContext = objContext; - _context = context; + JetStreamContext = context; _stream = stream; } - /// - /// Object store bucket name. - /// + /// + public INatsJSContext JetStreamContext { get; } + + /// public string Bucket { get; } - /// - /// Get object by key. - /// - /// Object key. - /// A used to cancel the API call. - /// Object value as a byte array. + /// public async ValueTask GetBytesAsync(string key, CancellationToken cancellationToken = default) { using var memoryStream = new MemoryStream(); @@ -57,15 +52,7 @@ public async ValueTask GetBytesAsync(string key, CancellationToken cance return memoryStream.ToArray(); } - /// - /// Get object by key. - /// - /// Object key. - /// Stream to write the object value to. - /// true to not close the underlying stream when async method returns; otherwise, false - /// A used to cancel the API call. - /// Object metadata. - /// Metadata didn't match the value retrieved e.g. the SHA digest. + /// public async ValueTask GetAsync(string key, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default) { ValidateObjectName(key); @@ -84,7 +71,7 @@ public async ValueTask GetAsync(string key, Stream stream, bool } await using var pushConsumer = new NatsJSOrderedPushConsumer>( - context: _context, + context: JetStreamContext, stream: $"OBJ_{Bucket}", filter: GetChunkSubject(info.Nuid), serializer: NatsDefaultSerializer>.Default, @@ -158,39 +145,15 @@ public async ValueTask GetAsync(string key, Stream stream, bool return info; } - /// - /// Put an object by key. - /// - /// Object key. - /// Object value as a byte array. - /// A used to cancel the API call. - /// Object metadata. + /// public ValueTask PutAsync(string key, byte[] value, CancellationToken cancellationToken = default) => PutAsync(new ObjectMetadata { Name = key }, new MemoryStream(value), cancellationToken: cancellationToken); - /// - /// Put an object by key. - /// - /// Object key. - /// Stream to read the value from. - /// true to not close the underlying stream when async method returns; otherwise, false - /// A used to cancel the API call. - /// Object metadata. - /// There was an error calculating SHA digest. - /// Server responded with an error. + /// public ValueTask PutAsync(string key, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default) => PutAsync(new ObjectMetadata { Name = key }, stream, leaveOpen, cancellationToken); - /// - /// Put an object by key. - /// - /// Object metadata. - /// Stream to read the value from. - /// true to not close the underlying stream when async method returns; otherwise, false - /// A used to cancel the API call. - /// Object metadata. - /// There was an error calculating SHA digest. - /// Server responded with an error. + /// public async ValueTask PutAsync(ObjectMetadata meta, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default) { ValidateObjectName(meta.Name); @@ -294,7 +257,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre var buffer = memoryOwner.Slice(0, currentChunkSize); // Chunks - var ack = await _context.PublishAsync(GetChunkSubject(nuid), buffer, serializer: NatsRawSerializer>.Default, cancellationToken: cancellationToken); + var ack = await JetStreamContext.PublishAsync(GetChunkSubject(nuid), buffer, serializer: NatsRawSerializer>.Default, cancellationToken: cancellationToken); ack.EnsureSuccess(); if (eof) @@ -320,8 +283,8 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre { try { - await _context.JSRequestResponseAsync( - subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{Bucket}", + await JetStreamContext.JSRequestResponseAsync( + subject: $"{JetStreamContext.Opts.Prefix}.STREAM.PURGE.OBJ_{Bucket}", request: new StreamPurgeRequest { Filter = GetChunkSubject(info.Nuid), @@ -338,14 +301,7 @@ await _context.JSRequestResponseAsync( return meta; } - /// - /// Update object metadata - /// - /// Object key - /// Object metadata - /// A used to cancel the API call. - /// Object metadata - /// There is already an object with the same name + /// public async ValueTask UpdateMetaAsync(string key, ObjectMetadata meta, CancellationToken cancellationToken = default) { ValidateObjectName(meta.Name); @@ -375,23 +331,11 @@ public async ValueTask UpdateMetaAsync(string key, ObjectMetadat return info; } - /// - /// Add a link to another object - /// - /// Link name - /// Target object's name - /// A used to cancel the API call. - /// Metadata of the new link object + /// public ValueTask AddLinkAsync(string link, string target, CancellationToken cancellationToken = default) => AddLinkAsync(link, new ObjectMetadata { Name = target, Bucket = Bucket }, cancellationToken); - /// - /// Add a link to another object - /// - /// Link name - /// Target object's metadata - /// A used to cancel the API call. - /// Metadata of the new link object + /// public async ValueTask AddLinkAsync(string link, ObjectMetadata target, CancellationToken cancellationToken = default) { ValidateObjectName(link); @@ -444,14 +388,7 @@ public async ValueTask AddLinkAsync(string link, ObjectMetadata return info; } - /// - /// Add a link to another object store - /// - /// Object's name to be linked - /// Target object store - /// A used to cancel the API call. - /// Metadata of the new link object - /// Object with the same name already exists + /// public async ValueTask AddBucketLinkAsync(string link, INatsObjStore target, CancellationToken cancellationToken = default) { ValidateObjectName(link); @@ -488,23 +425,19 @@ public async ValueTask AddBucketLinkAsync(string link, INatsObjS return info; } - /// - /// Seal the object store. No further modifications will be allowed. - /// - /// A used to cancel the API call. - /// Update operation failed + /// public async ValueTask SealAsync(CancellationToken cancellationToken = default) { - var info = await _context.JSRequestResponseAsync( - subject: $"{_context.Opts.Prefix}.STREAM.INFO.{_stream.Info.Config.Name}", + var info = await JetStreamContext.JSRequestResponseAsync( + subject: $"{JetStreamContext.Opts.Prefix}.STREAM.INFO.{_stream.Info.Config.Name}", request: null, cancellationToken).ConfigureAwait(false); var config = info.Config; config.Sealed = true; - var response = await _context.JSRequestResponseAsync( - subject: $"{_context.Opts.Prefix}.STREAM.UPDATE.{_stream.Info.Config.Name}", + var response = await JetStreamContext.JSRequestResponseAsync( + subject: $"{JetStreamContext.Opts.Prefix}.STREAM.UPDATE.{_stream.Info.Config.Name}", request: config, cancellationToken); @@ -514,14 +447,7 @@ public async ValueTask SealAsync(CancellationToken cancellationToken = default) } } - /// - /// Get object metadata by key. - /// - /// Object key. - /// Also retrieve deleted objects. - /// A used to cancel the API call. - /// Object metadata. - /// Object was not found. + /// public async ValueTask GetInfoAsync(string key, bool showDeleted = false, CancellationToken cancellationToken = default) { ValidateObjectName(key); @@ -562,12 +488,7 @@ public async ValueTask GetInfoAsync(string key, bool showDeleted } } - /// - /// List all the objects in this store. - /// - /// List options - /// A used to cancel the API call. - /// An async enumerable object metadata to be used in an await foreach + /// public IAsyncEnumerable ListAsync(NatsObjListOpts? opts = default, CancellationToken cancellationToken = default) { opts ??= new NatsObjListOpts(); @@ -581,11 +502,7 @@ public IAsyncEnumerable ListAsync(NatsObjListOpts? opts = defaul return WatchAsync(watchOpts, cancellationToken); } - /// - /// Retrieves run-time status about the backing store of the bucket. - /// - /// A used to cancel the API call. - /// Object store status + /// public async ValueTask GetStatusAsync(CancellationToken cancellationToken = default) { await _stream.RefreshAsync(cancellationToken); @@ -593,12 +510,7 @@ public async ValueTask GetStatusAsync(CancellationToken cancellat return new NatsObjStatus(Bucket, isCompressed, _stream.Info); } - /// - /// Watch for changes in the underlying store and receive meta information updates. - /// - /// Watch options - /// A used to cancel the API call. - /// An async enumerable object metadata to be used in an await foreach + /// public async IAsyncEnumerable WatchAsync(NatsObjWatchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) { opts ??= new NatsObjWatchOpts(); @@ -616,7 +528,7 @@ public async IAsyncEnumerable WatchAsync(NatsObjWatchOpts? opts } await using var pushConsumer = new NatsJSOrderedPushConsumer>( - context: _context, + context: JetStreamContext, stream: $"OBJ_{Bucket}", filter: $"$O.{Bucket}.M.>", serializer: NatsDefaultSerializer>.Default, @@ -662,12 +574,7 @@ public async IAsyncEnumerable WatchAsync(NatsObjWatchOpts? opts } } - /// - /// Delete an object by key. - /// - /// Object key. - /// A used to cancel the API call. - /// Object metadata was invalid or chunks can't be purged. + /// public async ValueTask DeleteAsync(string key, CancellationToken cancellationToken = default) { ValidateObjectName(key); @@ -696,7 +603,7 @@ public async ValueTask DeleteAsync(string key, CancellationToken cancellationTok private async ValueTask PublishMeta(ObjectMetadata meta, CancellationToken cancellationToken) { - var ack = await _context.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer.Default, headers: NatsRollupHeaders, cancellationToken: cancellationToken); + var ack = await JetStreamContext.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer.Default, headers: NatsRollupHeaders, cancellationToken: cancellationToken); ack.EnsureSuccess(); } diff --git a/src/NATS.Client.Services/INatsSvcContext.cs b/src/NATS.Client.Services/INatsSvcContext.cs index b06513dcf..1e8a683fa 100644 --- a/src/NATS.Client.Services/INatsSvcContext.cs +++ b/src/NATS.Client.Services/INatsSvcContext.cs @@ -1,3 +1,5 @@ +using NATS.Client.Core; + namespace NATS.Client.Services; /// @@ -5,6 +7,11 @@ namespace NATS.Client.Services; /// public interface INatsSvcContext { + /// + /// Gets the associated NATS connection. + /// + INatsConnection Connection { get; } + /// /// Adds a new service. /// diff --git a/src/NATS.Client.Services/Internal/SvcListener.cs b/src/NATS.Client.Services/Internal/SvcListener.cs index 6b3edb220..72ce5fd16 100644 --- a/src/NATS.Client.Services/Internal/SvcListener.cs +++ b/src/NATS.Client.Services/Internal/SvcListener.cs @@ -7,7 +7,7 @@ namespace NATS.Client.Services.Internal; internal class SvcListener : IAsyncDisposable { private readonly ILogger _logger; - private readonly NatsConnection _nats; + private readonly INatsConnection _nats; private readonly Channel _channel; private readonly SvcMsgType _type; private readonly string _subject; @@ -16,7 +16,7 @@ internal class SvcListener : IAsyncDisposable private INatsSub>? _sub; private Task? _readLoop; - public SvcListener(ILogger logger, NatsConnection nats, Channel channel, SvcMsgType type, string subject, string? queueGroup, CancellationToken cancellationToken) + public SvcListener(ILogger logger, INatsConnection nats, Channel channel, SvcMsgType type, string subject, string? queueGroup, CancellationToken cancellationToken) { _logger = logger; _nats = nats; diff --git a/src/NATS.Client.Services/NatsClientExtensions.cs b/src/NATS.Client.Services/NatsClientExtensions.cs new file mode 100644 index 000000000..ba0015af1 --- /dev/null +++ b/src/NATS.Client.Services/NatsClientExtensions.cs @@ -0,0 +1,24 @@ +using NATS.Client.Core; +using NATS.Client.Services; + +// ReSharper disable once CheckNamespace +namespace NATS.Net; + +public static class NatsClientExtensions +{ + /// + /// Creates a NATS Services context for the given NATS client. + /// + /// The NATS client for which to create the services context. + /// An instance of used for interacting with the NATS Services. + public static INatsSvcContext CreateServicesContext(this INatsClient client) + => CreateServicesContext(client.Connection); + + /// + /// Creates a NATS Services context for the given NATS connection. + /// + /// The NATS connection for which to create the services context. + /// An instance of used for interacting with the NATS Services. + public static INatsSvcContext CreateServicesContext(this INatsConnection connection) + => new NatsSvcContext(connection); +} diff --git a/src/NATS.Client.Services/NatsSvcContext.cs b/src/NATS.Client.Services/NatsSvcContext.cs index cfcfb7eb0..9a450c31e 100644 --- a/src/NATS.Client.Services/NatsSvcContext.cs +++ b/src/NATS.Client.Services/NatsSvcContext.cs @@ -7,13 +7,14 @@ namespace NATS.Client.Services; /// public class NatsSvcContext : INatsSvcContext { - private readonly NatsConnection _nats; - /// /// Creates a new instance of . /// /// NATS connection. - public NatsSvcContext(NatsConnection nats) => _nats = nats; + public NatsSvcContext(INatsConnection nats) => Connection = nats; + + /// + public INatsConnection Connection { get; } /// /// Adds a new service. @@ -34,7 +35,7 @@ public ValueTask AddServiceAsync(string name, string version, st /// NATS Service instance. public async ValueTask AddServiceAsync(NatsSvcConfig config, CancellationToken cancellationToken = default) { - var service = new NatsSvcServer(_nats, config, cancellationToken); + var service = new NatsSvcServer(Connection, config, cancellationToken); await service.StartAsync().ConfigureAwait(false); return service; } diff --git a/src/NATS.Client.Services/NatsSvcEndPoint.cs b/src/NATS.Client.Services/NatsSvcEndPoint.cs index 8e6947c2f..6db64067a 100644 --- a/src/NATS.Client.Services/NatsSvcEndPoint.cs +++ b/src/NATS.Client.Services/NatsSvcEndPoint.cs @@ -69,7 +69,7 @@ public interface INatsSvcEndpoint : IAsyncDisposable /// public abstract class NatsSvcEndpointBase : NatsSubBase, INatsSvcEndpoint { - protected NatsSvcEndpointBase(NatsConnection connection, string subject, string? queueGroup, NatsSubOpts? opts) + protected NatsSvcEndpointBase(INatsConnection connection, string subject, string? queueGroup, NatsSubOpts? opts) : base(connection, connection.SubscriptionManager, subject, queueGroup, opts) { } @@ -108,7 +108,7 @@ public class NatsSvcEndpoint : NatsSvcEndpointBase { private readonly ILogger _logger; private readonly Func, ValueTask> _handler; - private readonly NatsConnection _nats; + private readonly INatsConnection _nats; private readonly CancellationToken _cancellationToken; private readonly Channel> _channel; private readonly INatsDeserialize _serializer; @@ -131,7 +131,7 @@ public class NatsSvcEndpoint : NatsSvcEndpointBase /// Serializer to use for the message type. /// Subscription options. /// A used to cancel the API call. - public NatsSvcEndpoint(NatsConnection nats, string? queueGroup, string name, Func, ValueTask> handler, string subject, IDictionary? metadata, INatsDeserialize serializer, NatsSubOpts? opts, CancellationToken cancellationToken) + public NatsSvcEndpoint(INatsConnection nats, string? queueGroup, string name, Func, ValueTask> handler, string subject, IDictionary? metadata, INatsDeserialize serializer, NatsSubOpts? opts, CancellationToken cancellationToken) : base(nats, subject, queueGroup, opts) { _logger = nats.Opts.LoggerFactory.CreateLogger>(); diff --git a/src/NATS.Client.Services/NatsSvcServer.cs b/src/NATS.Client.Services/NatsSvcServer.cs index 08f4379b4..1b30c21c8 100644 --- a/src/NATS.Client.Services/NatsSvcServer.cs +++ b/src/NATS.Client.Services/NatsSvcServer.cs @@ -16,7 +16,7 @@ public class NatsSvcServer : INatsSvcServer { private readonly ILogger _logger; private readonly string _id; - private readonly NatsConnection _nats; + private readonly INatsConnection _nats; private readonly NatsSvcConfig _config; private readonly Channel _channel; private readonly Task _taskMsgLoop; @@ -31,7 +31,7 @@ public class NatsSvcServer : INatsSvcServer /// NATS connection. /// Service configuration. /// A used to cancel the service creation requests. - public NatsSvcServer(NatsConnection nats, NatsSvcConfig config, CancellationToken cancellationToken) + public NatsSvcServer(INatsConnection nats, NatsSvcConfig config, CancellationToken cancellationToken) { _logger = nats.Opts.LoggerFactory.CreateLogger(); _id = Nuid.NewNuid(); diff --git a/src/NATS.Client.Simplified/NatsClientDefaultSerializer.cs b/src/NATS.Client.Simplified/NatsClientDefaultSerializer.cs index 811c9571c..7dee8b0cb 100644 --- a/src/NATS.Client.Simplified/NatsClientDefaultSerializer.cs +++ b/src/NATS.Client.Simplified/NatsClientDefaultSerializer.cs @@ -14,13 +14,9 @@ public static class NatsClientDefaultSerializer /// public static readonly INatsSerializer Default; - static NatsClientDefaultSerializer() - { - Default = new NatsSerializerBuilder() - .Add(new NatsRawSerializer()) - .Add(new NatsUtf8PrimitivesSerializer()) - .Add(new NatsJsonSerializer()) - .Build(); - Console.WriteLine($"Default serializer for {typeof(T).Name} is {Default.GetType().Name}"); - } + static NatsClientDefaultSerializer() => Default = new NatsSerializerBuilder() + .Add(new NatsRawSerializer()) + .Add(new NatsUtf8PrimitivesSerializer()) + .Add(new NatsJsonSerializer()) + .Build(); } diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs index 9f63aeada..f3700d222 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs @@ -93,5 +93,12 @@ public class MockJsContext : INatsJSContext public IAsyncEnumerable ListStreamNamesAsync(string? subject = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ValueTask PublishConcurrentAsync(string subject, T? data, INatsSerialize? serializer = default, NatsJSPubOpts? opts = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + + public string NewBaseInbox() => throw new NotImplementedException(); + + public ValueTask JSRequestResponseAsync(string subject, TRequest? request, CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class + => throw new NotImplementedException(); } } diff --git a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs index 1ac2c470b..adc5eef32 100644 --- a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs @@ -93,5 +93,12 @@ public class MockJsContext : INatsJSContext public IAsyncEnumerable ListStreamNamesAsync(string? subject = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ValueTask PublishConcurrentAsync(string subject, T? data, INatsSerialize? serializer = default, NatsJSPubOpts? opts = default, NatsHeaders? headers = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); + + public string NewBaseInbox() => throw new NotImplementedException(); + + public ValueTask JSRequestResponseAsync(string subject, TRequest? request, CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class + => throw new NotImplementedException(); } }