From e1b8b2af6925fed9fe9bb54450606dc9bb41bb96 Mon Sep 17 00:00:00 2001 From: Kyle McClellan Date: Thu, 16 Feb 2023 23:21:35 -0500 Subject: [PATCH] Support configuring/resolving `IAdminClient` (#10) * Example: Resolve admin client --- .../Clients/DIAdminClientBuilder.cs | 40 +++++ .../Clients/ScopedAdminClient.cs | 164 ++++++++++++++++++ .../Clients/ServiceAdminClient.cs | 12 ++ .../ServiceCollectionExtensions.cs | 4 + Example/Program.cs | 14 +- 5 files changed, 233 insertions(+), 1 deletion(-) create mode 100644 Confluent.Kafka.DependencyInjection/Clients/DIAdminClientBuilder.cs create mode 100644 Confluent.Kafka.DependencyInjection/Clients/ScopedAdminClient.cs create mode 100644 Confluent.Kafka.DependencyInjection/Clients/ServiceAdminClient.cs diff --git a/Confluent.Kafka.DependencyInjection/Clients/DIAdminClientBuilder.cs b/Confluent.Kafka.DependencyInjection/Clients/DIAdminClientBuilder.cs new file mode 100644 index 0000000..03944ca --- /dev/null +++ b/Confluent.Kafka.DependencyInjection/Clients/DIAdminClientBuilder.cs @@ -0,0 +1,40 @@ +namespace Confluent.Kafka.DependencyInjection.Clients; + +using Confluent.Kafka.DependencyInjection.Handlers; + +using System; +using System.Collections.Generic; +using System.Linq; + +[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")] +sealed class DIAdminClientBuilder : AdminClientBuilder +{ + readonly IEnumerable errorHandlers; + readonly IEnumerable statisticsHandlers; + readonly IEnumerable logHandlers; + + public DIAdminClientBuilder( + AdminClientConfig config, + IEnumerable errorHandlers, + IEnumerable statisticsHandlers, + IEnumerable logHandlers) + : base(config) + { + this.errorHandlers = errorHandlers; + this.statisticsHandlers = statisticsHandlers; + this.logHandlers = logHandlers; + } + + public override IAdminClient Build() + { + ErrorHandler ??= errorHandlers.Aggregate(default(Action), (x, y) => x + y.OnError); + + StatisticsHandler ??= statisticsHandlers.Aggregate( + default(Action), + (x, y) => x + y.OnStatistics); + + LogHandler ??= logHandlers.Aggregate(default(Action), (x, y) => x + y.OnLog); + + return base.Build(); + } +} diff --git a/Confluent.Kafka.DependencyInjection/Clients/ScopedAdminClient.cs b/Confluent.Kafka.DependencyInjection/Clients/ScopedAdminClient.cs new file mode 100644 index 0000000..d91a5b3 --- /dev/null +++ b/Confluent.Kafka.DependencyInjection/Clients/ScopedAdminClient.cs @@ -0,0 +1,164 @@ +namespace Confluent.Kafka.DependencyInjection.Clients; + +using Confluent.Kafka.Admin; + +using Microsoft.Extensions.DependencyInjection; + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +class ScopedAdminClient : IAdminClient +{ + readonly IAdminClient client; + readonly IDisposable scope; + + public ScopedAdminClient(IServiceScopeFactory scopes, IEnumerable>? config) + { + IServiceScope scope; + this.scope = scope = scopes.CreateScope(); + + if (config != null) + { + var merged = scope.ServiceProvider.GetRequiredService(); + + foreach (var kvp in config) + { + merged.Set(kvp.Key, kvp.Value); + } + } + + this.client = scope.ServiceProvider.GetRequiredService().Build(); + } + + public Handle Handle => client.Handle; + + public string Name => client.Name; + + public int AddBrokers(string brokers) + { + return client.AddBrokers(brokers); + } + + public Metadata GetMetadata(TimeSpan timeout) + { + return client.GetMetadata(timeout); + } + + public Metadata GetMetadata(string topic, TimeSpan timeout) + { + return client.GetMetadata(topic, timeout); + } + + public Task CreateTopicsAsync(IEnumerable topics, CreateTopicsOptions? options = null) + { + return client.CreateTopicsAsync(topics, options); + } + + public Task DeleteTopicsAsync(IEnumerable topics, DeleteTopicsOptions? options = null) + { + return client.DeleteTopicsAsync(topics, options); + } + + public Task CreatePartitionsAsync( + IEnumerable partitionsSpecifications, + CreatePartitionsOptions? options = null) + { + return client.CreatePartitionsAsync(partitionsSpecifications, options); + } + + public Task> DeleteRecordsAsync( + IEnumerable topicPartitionOffsets, + DeleteRecordsOptions? options = null) + { + return client.DeleteRecordsAsync(topicPartitionOffsets, options); + } + + public GroupInfo ListGroup(string group, TimeSpan timeout) + { + return client.ListGroup(group, timeout); + } + + public List ListGroups(TimeSpan timeout) + { + return client.ListGroups(timeout); + } + + public Task ListConsumerGroupsAsync(ListConsumerGroupsOptions? options = null) + { + return client.ListConsumerGroupsAsync(options); + } + + public Task DescribeConsumerGroupsAsync( + IEnumerable groups, + DescribeConsumerGroupsOptions? options = null) + { + return client.DescribeConsumerGroupsAsync(groups, options); + } + + public Task DeleteGroupsAsync(IList groups, DeleteGroupsOptions? options = null) + { + return client.DeleteGroupsAsync(groups, options); + } + + public Task> ListConsumerGroupOffsetsAsync( + IEnumerable groupPartitions, + ListConsumerGroupOffsetsOptions? options = null) + { + return client.ListConsumerGroupOffsetsAsync(groupPartitions, options); + } + + public Task> AlterConsumerGroupOffsetsAsync( + IEnumerable groupPartitions, + AlterConsumerGroupOffsetsOptions? options = null) + { + return client.AlterConsumerGroupOffsetsAsync(groupPartitions, options); + } + + public Task DeleteConsumerGroupOffsetsAsync( + string group, + IEnumerable partitions, + DeleteConsumerGroupOffsetsOptions? options = null) + { + return client.DeleteConsumerGroupOffsetsAsync(group, partitions, options); + } + + public Task> DescribeConfigsAsync( + IEnumerable resources, + DescribeConfigsOptions? options = null) + { + return client.DescribeConfigsAsync(resources, options); + } + + public Task AlterConfigsAsync( + Dictionary> configs, + AlterConfigsOptions? options = null) + { + return client.AlterConfigsAsync(configs, options); + } + + public Task DescribeAclsAsync( + AclBindingFilter aclBindingFilter, + DescribeAclsOptions? options = null) + { + return client.DescribeAclsAsync(aclBindingFilter, options); + } + + public Task CreateAclsAsync(IEnumerable aclBindings, CreateAclsOptions? options = null) + { + return client.CreateAclsAsync(aclBindings, options); + } + + public Task> DeleteAclsAsync( + IEnumerable aclBindingFilters, + DeleteAclsOptions? options = null) + { + return client.DeleteAclsAsync(aclBindingFilters, options); + } + + public void Dispose() + { + client.Dispose(); + scope.Dispose(); + } +} diff --git a/Confluent.Kafka.DependencyInjection/Clients/ServiceAdminClient.cs b/Confluent.Kafka.DependencyInjection/Clients/ServiceAdminClient.cs new file mode 100644 index 0000000..bc61462 --- /dev/null +++ b/Confluent.Kafka.DependencyInjection/Clients/ServiceAdminClient.cs @@ -0,0 +1,12 @@ +namespace Confluent.Kafka.DependencyInjection.Clients; + +using Microsoft.Extensions.DependencyInjection; + +[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")] +sealed class ServiceAdminClient : ScopedAdminClient +{ + public ServiceAdminClient(IServiceScopeFactory scopes, AdminClientConfig config) + : base(scopes, config) + { + } +} diff --git a/Confluent.Kafka.DependencyInjection/ServiceCollectionExtensions.cs b/Confluent.Kafka.DependencyInjection/ServiceCollectionExtensions.cs index 89df7fa..e088e34 100644 --- a/Confluent.Kafka.DependencyInjection/ServiceCollectionExtensions.cs +++ b/Confluent.Kafka.DependencyInjection/ServiceCollectionExtensions.cs @@ -39,6 +39,7 @@ public static IServiceCollection AddKafkaClient( AddConfig(services); AddConfig(services); + AddConfig(services); services.TryAddSingleton(); services.TryAddSingleton(); @@ -48,10 +49,12 @@ public static IServiceCollection AddKafkaClient( services.TryAddTransient(typeof(ProducerBuilder<,>), typeof(DIProducerBuilder<,>)); services.TryAddTransient(typeof(ConsumerBuilder<,>), typeof(DIConsumerBuilder<,>)); + services.TryAddTransient(); // These must be scoped to consume scoped config. services.TryAddScoped(typeof(IProducer<,>), typeof(ServiceProducer<,>)); services.TryAddScoped(typeof(IConsumer<,>), typeof(ServiceConsumer<,>)); + services.TryAddScoped(); services.TryAddSingleton(); @@ -162,6 +165,7 @@ public KafkaScope(IServiceScope scope, IEnumerable> Configure(config); Configure(config); + Configure(config); } void Configure(IEnumerable> config) diff --git a/Example/Program.cs b/Example/Program.cs index 5067bd9..b25868e 100644 --- a/Example/Program.cs +++ b/Example/Program.cs @@ -23,6 +23,7 @@ var producer = provider.GetRequiredService>(); var consumer = provider.GetRequiredService>(); +var adminClient = provider.GetRequiredService(); using var producer2 = provider.GetRequiredService>().Build(); using var consumer2 = provider.GetRequiredService>().Build(); @@ -36,7 +37,18 @@ var producer4 = provider.GetRequiredService().Producer; var consumer4 = provider.GetRequiredService().Consumer; -var clients = new IClient[] { producer, producer2, producer3, producer4, consumer, consumer2, consumer3, consumer4 }; +var clients = new IClient[] +{ + producer, + producer2, + producer3, + producer4, + consumer, + consumer2, + consumer3, + consumer4, + adminClient, +}; foreach (var client in clients) {