Skip to content

Commit

Permalink
Update README
Browse files Browse the repository at this point in the history
  • Loading branch information
kmcclellan committed Mar 27, 2023
1 parent b41a87a commit ddd7ed4
Showing 1 changed file with 50 additions and 62 deletions.
112 changes: 50 additions & 62 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
An extension of [Confluent's Kafka client](https://github.com/confluentinc/confluent-kafka-dotnet) for use with `Microsoft.Extensions.DependencyInjection` (and friends).

### Features
* Configure Kafka producers/consumers using `Microsoft.Extensions.DependencyInjection.IServiceCollection`.
* Default logging of asynchronous Kafka events through `Microsoft.Extensions.Logging.ILogger`.
* Inject/resolve Kafka clients using the service container.
* Configure Kafka clients using the options pattern.
* Load client config properties using `Microsoft.Extensions.Configuration`.
* Automatically log client events using `Microsoft.Extensions.Logging`.

## Installation

Expand All @@ -13,83 +15,69 @@ Add the NuGet package to your project:

## Usage

Add a global Kafka client:
### Resolving clients

Kafka DI works out-of-the-box after registering services with an `IServiceCollection`.

```c#
services.AddKafkaClient(new Dictionary<string, string>
{
{ "bootstrap.servers", "localhost:9092" },
{ "enable.idempotence", "true" },
{ "group.id", "group1" }
});
services.AddKafkaClient();
services.AddSingleton<MyService>();
```

Alternatively, add typed clients with distinct configurations:
Inject Kafka clients via constructor.

```c#
services.AddKafkaClient<MyService>(new ProducerConfig
{
BootstrapServers = "localhost:9092",
EnableIdempotence = true
});

services.AddKafkaClient<MyOtherService>(new ConsumerConfig
public MyService(IProducer<string, byte[]> producer, IConsumer<Ignore, MyType> consumer, IAdminClient adminClient)
{
BootstrapServers = "somewhere.else:9092",
GroupId = "group1"
});
// Clients are singletons managed by the container.
Producer = producer;
Consumer = consumer;
AdminClient = adminClient;
}
```

Optionally, configure message serialization:
### Configuring clients

```c#
// Use open generics to apply to all keys and values.
services.AddSingleton(typeof(IAsyncDeserializer<>), typeof(AvroDeserializer<>));
Client config properties are bound to the `Kafka` section of .NET configuration providers, such as `appsettings.json`.

// Use closed generics to select type-specific serializers.
services.AddSingleton<IAsyncSerializer<MyType>, JsonSerializer<MyType>>();

// Synchronous serializers take precedence, if present.
services.AddSingleton(sp => sp.GetRequiredService<IAsyncSerializer<MyType>>().AsSyncOverAsync());

// Configure schema registry (required by some serializers).
services.AddSingleton<ISchemaRegistryClient>(sp =>
new CachedSchemaRegistryClient(new SchemaRegistryConfig
{
Url = "localhost:8081"
}));
```json
{
"Kafka": {
"Producer": {
"bootstrap.servers": "localhost:9092",
"transactional.id": "example"
},
"Consumer": {
"bootstrap.servers": "localhost:9092",
"group.id": "example"
},
"Admin": {
"bootstrap.servers": "localhost:9092"
}
}
}
```

Optionally, configure custom handlers for Kafka events:
You can also configure `KafkaClientOptions` directly, including serialization and event handlers.

```c#
services.AddTransient<IErrorHandler, MyHandler>()
.AddTransient<IStatisticsHandler, MyHandler>()
.AddTransient<ILogHandler, MyHandler>()
.AddTransient<IPartitionsAssignedHandler, MyHandler>()
.AddTransient<IPartitionsRevokedHandler, MyHandler>()
.AddTransient<IOffsetsCommittedHandler, MyHandler>();
```

Inject producers/consumers via constructor:
var builder = services.AddKafkaClient()

```c#
public MyService(IProducer<Null, string> producer)
{
// Producer is a singleton managed by the container.
this.producer = producer;
}
```
builder.Configure(
options =>
{
// Config properties apply to all clients with a matching type (consumers, in this case).
options.Configure(new ConsumerConfig { StatisticsIntervalMs = 5000 });

Alternatively, inject `IKafkaFactory` to override configuration and control lifespan:
// Optionally, configure handlers for asynchronous client events.
options.OnStatistics((x, y) => Console.WriteLine(y));
});

```c#
using var consumer = factory.CreateConsumer<MyType, MyOtherType>(new ConsumerConfig
{
GroupId = "group2"
});
// Optionally, configure serialization for specific types.
builder.Configure<JsonDeserializer<MyType>>((x, y) => x.Deserialize(y));
services.AddSingleton(typeof(JsonDeserializer<>));

// ...
// Remember to close manually created consumers.
consumer.Close();
// Configure schema registry (required by some serializers).
services.AddSingleton<ISchemaRegistryClient>(
x => new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = "localhost:8081" }));
```

0 comments on commit ddd7ed4

Please sign in to comment.