Skip to content

Commit

Permalink
Support configuring/resolving IAdminClient (#10)
Browse files Browse the repository at this point in the history
* Example: Resolve admin client
  • Loading branch information
kmcclellan authored Feb 17, 2023
1 parent 723c1ba commit e1b8b2a
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
namespace Confluent.Kafka.DependencyInjection.Clients;

using Confluent.Kafka.DependencyInjection.Handlers;

using System;
using System.Collections.Generic;
using System.Linq;

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")]
sealed class DIAdminClientBuilder : AdminClientBuilder
{
readonly IEnumerable<IErrorHandler> errorHandlers;
readonly IEnumerable<IStatisticsHandler> statisticsHandlers;
readonly IEnumerable<ILogHandler> logHandlers;

public DIAdminClientBuilder(
AdminClientConfig config,
IEnumerable<IErrorHandler> errorHandlers,
IEnumerable<IStatisticsHandler> statisticsHandlers,
IEnumerable<ILogHandler> logHandlers)
: base(config)
{
this.errorHandlers = errorHandlers;
this.statisticsHandlers = statisticsHandlers;
this.logHandlers = logHandlers;
}

public override IAdminClient Build()
{
ErrorHandler ??= errorHandlers.Aggregate(default(Action<IClient, Error>), (x, y) => x + y.OnError);

StatisticsHandler ??= statisticsHandlers.Aggregate(
default(Action<IClient, string>),
(x, y) => x + y.OnStatistics);

LogHandler ??= logHandlers.Aggregate(default(Action<IClient, LogMessage>), (x, y) => x + y.OnLog);

return base.Build();
}
}
164 changes: 164 additions & 0 deletions Confluent.Kafka.DependencyInjection/Clients/ScopedAdminClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
namespace Confluent.Kafka.DependencyInjection.Clients;

using Confluent.Kafka.Admin;

using Microsoft.Extensions.DependencyInjection;

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

class ScopedAdminClient : IAdminClient
{
readonly IAdminClient client;
readonly IDisposable scope;

public ScopedAdminClient(IServiceScopeFactory scopes, IEnumerable<KeyValuePair<string, string>>? config)
{
IServiceScope scope;
this.scope = scope = scopes.CreateScope();

if (config != null)
{
var merged = scope.ServiceProvider.GetRequiredService<AdminClientConfig>();

foreach (var kvp in config)
{
merged.Set(kvp.Key, kvp.Value);
}
}

this.client = scope.ServiceProvider.GetRequiredService<AdminClientBuilder>().Build();
}

public Handle Handle => client.Handle;

public string Name => client.Name;

public int AddBrokers(string brokers)
{
return client.AddBrokers(brokers);
}

public Metadata GetMetadata(TimeSpan timeout)
{
return client.GetMetadata(timeout);
}

public Metadata GetMetadata(string topic, TimeSpan timeout)
{
return client.GetMetadata(topic, timeout);
}

public Task CreateTopicsAsync(IEnumerable<TopicSpecification> topics, CreateTopicsOptions? options = null)
{
return client.CreateTopicsAsync(topics, options);
}

public Task DeleteTopicsAsync(IEnumerable<string> topics, DeleteTopicsOptions? options = null)
{
return client.DeleteTopicsAsync(topics, options);
}

public Task CreatePartitionsAsync(
IEnumerable<PartitionsSpecification> partitionsSpecifications,
CreatePartitionsOptions? options = null)
{
return client.CreatePartitionsAsync(partitionsSpecifications, options);
}

public Task<List<DeleteRecordsResult>> DeleteRecordsAsync(
IEnumerable<TopicPartitionOffset> topicPartitionOffsets,
DeleteRecordsOptions? options = null)
{
return client.DeleteRecordsAsync(topicPartitionOffsets, options);
}

public GroupInfo ListGroup(string group, TimeSpan timeout)
{
return client.ListGroup(group, timeout);
}

public List<GroupInfo> ListGroups(TimeSpan timeout)
{
return client.ListGroups(timeout);
}

public Task<ListConsumerGroupsResult> ListConsumerGroupsAsync(ListConsumerGroupsOptions? options = null)
{
return client.ListConsumerGroupsAsync(options);
}

public Task<DescribeConsumerGroupsResult> DescribeConsumerGroupsAsync(
IEnumerable<string> groups,
DescribeConsumerGroupsOptions? options = null)
{
return client.DescribeConsumerGroupsAsync(groups, options);
}

public Task DeleteGroupsAsync(IList<string> groups, DeleteGroupsOptions? options = null)
{
return client.DeleteGroupsAsync(groups, options);
}

public Task<List<ListConsumerGroupOffsetsResult>> ListConsumerGroupOffsetsAsync(
IEnumerable<ConsumerGroupTopicPartitions> groupPartitions,
ListConsumerGroupOffsetsOptions? options = null)
{
return client.ListConsumerGroupOffsetsAsync(groupPartitions, options);
}

public Task<List<AlterConsumerGroupOffsetsResult>> AlterConsumerGroupOffsetsAsync(
IEnumerable<ConsumerGroupTopicPartitionOffsets> groupPartitions,
AlterConsumerGroupOffsetsOptions? options = null)
{
return client.AlterConsumerGroupOffsetsAsync(groupPartitions, options);
}

public Task<DeleteConsumerGroupOffsetsResult> DeleteConsumerGroupOffsetsAsync(
string group,
IEnumerable<TopicPartition> partitions,
DeleteConsumerGroupOffsetsOptions? options = null)
{
return client.DeleteConsumerGroupOffsetsAsync(group, partitions, options);
}

public Task<List<DescribeConfigsResult>> DescribeConfigsAsync(
IEnumerable<ConfigResource> resources,
DescribeConfigsOptions? options = null)
{
return client.DescribeConfigsAsync(resources, options);
}

public Task AlterConfigsAsync(
Dictionary<ConfigResource, List<ConfigEntry>> configs,
AlterConfigsOptions? options = null)
{
return client.AlterConfigsAsync(configs, options);
}

public Task<DescribeAclsResult> DescribeAclsAsync(
AclBindingFilter aclBindingFilter,
DescribeAclsOptions? options = null)
{
return client.DescribeAclsAsync(aclBindingFilter, options);
}

public Task CreateAclsAsync(IEnumerable<AclBinding> aclBindings, CreateAclsOptions? options = null)
{
return client.CreateAclsAsync(aclBindings, options);
}

public Task<List<DeleteAclsResult>> DeleteAclsAsync(
IEnumerable<AclBindingFilter> aclBindingFilters,
DeleteAclsOptions? options = null)
{
return client.DeleteAclsAsync(aclBindingFilters, options);
}

public void Dispose()
{
client.Dispose();
scope.Dispose();
}
}
12 changes: 12 additions & 0 deletions Confluent.Kafka.DependencyInjection/Clients/ServiceAdminClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Confluent.Kafka.DependencyInjection.Clients;

using Microsoft.Extensions.DependencyInjection;

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1812", Justification = "Instantiated by container")]
sealed class ServiceAdminClient : ScopedAdminClient
{
public ServiceAdminClient(IServiceScopeFactory scopes, AdminClientConfig config)
: base(scopes, config)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public static IServiceCollection AddKafkaClient(

AddConfig<ProducerConfig>(services);
AddConfig<ConsumerConfig>(services);
AddConfig<AdminClientConfig>(services);

services.TryAddSingleton<IErrorHandler, GlobalHandler>();
services.TryAddSingleton<ILogHandler, GlobalHandler>();
Expand All @@ -48,10 +49,12 @@ public static IServiceCollection AddKafkaClient(

services.TryAddTransient(typeof(ProducerBuilder<,>), typeof(DIProducerBuilder<,>));
services.TryAddTransient(typeof(ConsumerBuilder<,>), typeof(DIConsumerBuilder<,>));
services.TryAddTransient<AdminClientBuilder, DIAdminClientBuilder>();

// These must be scoped to consume scoped config.
services.TryAddScoped(typeof(IProducer<,>), typeof(ServiceProducer<,>));
services.TryAddScoped(typeof(IConsumer<,>), typeof(ServiceConsumer<,>));
services.TryAddScoped<IAdminClient, ServiceAdminClient>();

services.TryAddSingleton<IKafkaFactory, KafkaFactory>();

Expand Down Expand Up @@ -162,6 +165,7 @@ public KafkaScope(IServiceScope scope, IEnumerable<KeyValuePair<string, string>>

Configure<ProducerConfig>(config);
Configure<ConsumerConfig>(config);
Configure<AdminClientConfig>(config);
}

void Configure<T>(IEnumerable<KeyValuePair<string, string>> config)
Expand Down
14 changes: 13 additions & 1 deletion Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

var producer = provider.GetRequiredService<IProducer<Null, byte[]>>();
var consumer = provider.GetRequiredService<IConsumer<Null, byte[]>>();
var adminClient = provider.GetRequiredService<IAdminClient>();

using var producer2 = provider.GetRequiredService<ProducerBuilder<Null, byte[]>>().Build();
using var consumer2 = provider.GetRequiredService<ConsumerBuilder<Null, byte[]>>().Build();
Expand All @@ -36,7 +37,18 @@
var producer4 = provider.GetRequiredService<TestClient1>().Producer;
var consumer4 = provider.GetRequiredService<TestClient2>().Consumer;

var clients = new IClient[] { producer, producer2, producer3, producer4, consumer, consumer2, consumer3, consumer4 };
var clients = new IClient[]
{
producer,
producer2,
producer3,
producer4,
consumer,
consumer2,
consumer3,
consumer4,
adminClient,
};

foreach (var client in clients)
{
Expand Down

0 comments on commit e1b8b2a

Please sign in to comment.