Skip to content

Commit

Permalink
add comments for IBufferQueue.CreateConsumer (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
eventhorizon-cli authored Jan 21, 2024
1 parent 0e430cf commit a6910fe
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 19 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ English | [简体中文](./README.zh-CN.md)

Mocha is an application performance monitor tools based on [OpenTelemetry](https://opentelemetry.io), which also provides a scalable platform for observability data analysis and storage.

**Note: Use `git clone --recursive` to clone this repository with submodules.**

## Quick Start
In the beta phase, we provide a Docker Compose file for users to experience our system locally.

Expand Down
2 changes: 2 additions & 0 deletions README.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Mocha

Mocha 是一个基于 [OpenTelemetry](https://opentelemetry.io) 的 APM 系统,同时提供可伸缩的可观测性数据分析和存储平台。

**注意:使用 `git clone --recursive` 克隆本仓库以及子模块。**

## 快速开始
现阶段,我们提供了 Docker Compose 文件,方便用户在本地体验我们的系统。

Expand Down
4 changes: 2 additions & 2 deletions src/Mocha.Core/Buffer/BufferConsumerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ namespace Mocha.Core.Buffer;

public class BufferConsumerOptions
{
public string TopicName { get; init; } = default!;
public required string TopicName { get; init; }

public string GroupName { get; init; } = default!;
public required string GroupName { get; init; }

public bool AutoCommit { get; init; }

Expand Down
9 changes: 6 additions & 3 deletions src/Mocha.Core/Buffer/BufferQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,24 @@ internal class BufferQueue(IServiceProvider serviceProvider) : IBufferQueue
public IBufferProducer<T> CreateProducer<T>(string topicName)
{
ArgumentException.ThrowIfNullOrEmpty(topicName, nameof(topicName));
var queue = serviceProvider.GetRequiredKeyedService<IBufferQueue<T>>(topicName);
var queue = serviceProvider.GetKeyedService<IBufferQueue<T>>(topicName) ??
throw new ArgumentException($"The topic '{topicName}' has not been registered.");
return queue.CreateProducer();
}

public IBufferConsumer<T> CreateConsumer<T>(BufferConsumerOptions options)
{
ArgumentException.ThrowIfNullOrEmpty(options.TopicName, nameof(options.TopicName));
var queue = serviceProvider.GetRequiredKeyedService<IBufferQueue<T>>(options.TopicName);
var queue = serviceProvider.GetKeyedService<IBufferQueue<T>>(options.TopicName) ??
throw new ArgumentException($"The topic '{options.TopicName}' has not been registered.");
return queue.CreateConsumer(options);
}

public IEnumerable<IBufferConsumer<T>> CreateConsumers<T>(BufferConsumerOptions options, int consumerNumber)
{
ArgumentException.ThrowIfNullOrEmpty(options.TopicName, nameof(options.TopicName));
var queue = serviceProvider.GetRequiredKeyedService<IBufferQueue<T>>(options.TopicName);
var queue = serviceProvider.GetKeyedService<IBufferQueue<T>>(options.TopicName) ??
throw new ArgumentException($"The topic '{options.TopicName}' has not been registered.");
return queue.CreateConsumers(options, consumerNumber);
}
}
29 changes: 29 additions & 0 deletions src/Mocha.Core/Buffer/IBufferQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,38 @@ namespace Mocha.Core.Buffer;

public interface IBufferQueue
{
/// <summary>
/// Create a producer for the specified topic.
/// </summary>
/// <param name="topicName">The topic name.</param>
/// <typeparam name="T">The type of the item.</typeparam>
/// <returns>The producer.</returns>
IBufferProducer<T> CreateProducer<T>(string topicName);

/// <summary>
/// Create a consumer for the specified topic.
/// This method can only be called once for each consumer group within the same topic.
/// Use the <see cref="CreateConsumers{T}"/> method to create multiple consumers.
/// </summary>
/// <param name="options">The consumer options.</param>
/// <typeparam name="T">The type of the item.</typeparam>
/// <returns>The consumer.</returns>
/// <exception cref="ArgumentNullException">The topic name is.</exception>
/// <exception cref="ArgumentException">The group name is empty.</exception>
/// <exception cref="InvalidOperationException">The consumer group has been created.</exception>
IBufferConsumer<T> CreateConsumer<T>(BufferConsumerOptions options);

/// <summary>
/// Create multiple consumers for the specified topic.
/// This method can only be called once for each consumer group within the same topic.
/// </summary>
/// <param name="options">The consumer options.</param>
/// <param name="consumerNumber">The number of consumers.</param>
/// <typeparam name="T">The type of the item.</typeparam>
/// <returns>The consumers.</returns>
/// <exception cref="ArgumentOutOfRangeException">The number of consumers must be greater than 0 and cannot be greater than the number of partitions.</exception>
/// <exception cref="ArgumentNullException">The topic name is.</exception>
/// <exception cref="ArgumentException">The group name is empty.</exception>
/// <exception cref="InvalidOperationException">The consumer group has been created.</exception>
IEnumerable<IBufferConsumer<T>> CreateConsumers<T>(BufferConsumerOptions options, int consumerNumber);
}
2 changes: 1 addition & 1 deletion src/Mocha.Core/Buffer/Memory/MemoryBufferSegment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public bool TryGet(MemoryBufferPartitionOffset offset, int count, [NotNullWhen(t

if (_writePosition < 0 || readPosition > _writePosition)
{
items = default!;
items = null;
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ public void Setup()
}

_consumers = _memoryBufferQueue!.CreateConsumers(
new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = true, BatchSize = BatchSize, },
new BufferConsumerOptions
{
GroupName = "TestGroup",
TopicName = "test",
AutoCommit = true,
BatchSize = BatchSize,
},
Environment.ProcessorCount);
}

Expand Down
105 changes: 93 additions & 12 deletions tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public async Task Produce_And_Consume()
var producer = queue.CreateProducer();
var consumer = queue.CreateConsumer(new BufferConsumerOptions
{
TopicName = "test",
GroupName = "TestGroup",
AutoCommit = false,
BatchSize = 2
Expand Down Expand Up @@ -64,7 +65,13 @@ public async Task Produce_And_Consume_AutoCommit()
var queue = new MemoryBufferQueue<int>("test", 1);
var producer = queue.CreateProducer();
var consumer = queue.CreateConsumer(
new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = true, BatchSize = 2 });
new BufferConsumerOptions
{
TopicName = "test",
GroupName = "TestGroup",
AutoCommit = true,
BatchSize = 2
});

var expectedValues = new int[10];
for (var i = 0; i < 10; i++)
Expand Down Expand Up @@ -94,7 +101,13 @@ public async Task Produce_And_Consume_With_Multiple_Partitions()
var queue = new MemoryBufferQueue<int>("test", 2);
var producer = queue.CreateProducer();
var consumer = queue.CreateConsumer(
new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = false, BatchSize = 2 });
new BufferConsumerOptions
{
TopicName = "test",
GroupName = "TestGroup",
AutoCommit = false,
BatchSize = 2
});

var expectedValues = new int[10];
for (var i = 0; i < 10; i++)
Expand Down Expand Up @@ -129,7 +142,14 @@ public async Task Produce_And_Consume_With_Multiple_Consumers()
var queue = new MemoryBufferQueue<int>("test", 2);
var producer = queue.CreateProducer();
var consumers = queue
.CreateConsumers(new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = false, BatchSize = 6 },
.CreateConsumers(
new BufferConsumerOptions
{
TopicName = "test",
GroupName = "TestGroup",
AutoCommit = false,
BatchSize = 6
},
2).ToList();
var consumer1 = consumers[0];
var consumer2 = consumers[1];
Expand All @@ -152,13 +172,46 @@ public async Task Produce_And_Consume_With_Multiple_Consumers()
}
}

[Fact]
public void Throw_If_Wrong_Consumer_Number()
{
var queue = new MemoryBufferQueue<int>("test", 2);
Assert.Throws<ArgumentOutOfRangeException>(() =>
queue.CreateConsumers(
new BufferConsumerOptions
{
TopicName = "test",
GroupName = "TestGroup",
AutoCommit = false,
BatchSize = 6
},
3).ToList());

Assert.Throws<ArgumentOutOfRangeException>(() =>
queue.CreateConsumers(
new BufferConsumerOptions
{
TopicName = "test",
GroupName = "TestGroup",
AutoCommit = false,
BatchSize = 6
},
0).ToList());
}

[Fact]
public async Task Offset_Will_Not_Change_If_Consumer_Not_Commit()
{
var queue = new MemoryBufferQueue<int>("test", 1);
var producer = queue.CreateProducer();
var consumer = queue.CreateConsumer(
new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = false, BatchSize = 7 });
new BufferConsumerOptions
{
TopicName = "test",
GroupName = "TestGroup",
AutoCommit = false,
BatchSize = 7
});

for (var i = 0; i < 10; i++)
{
Expand Down Expand Up @@ -195,7 +248,13 @@ public async Task Consumer_Will_Wait_Until_Produce()
{
var queue = new MemoryBufferQueue<int>("test", 1);
var producer = queue.CreateProducer();
var consumer = queue.CreateConsumer(new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = false });
var consumer =
queue.CreateConsumer(new BufferConsumerOptions
{
TopicName = "test",
GroupName = "TestGroup",
AutoCommit = false
});

var task = Task.Run(async () =>
{
Expand All @@ -221,19 +280,23 @@ public void Equal_Distribution_Load_Balancing_Strategy_For_Consumers()
var assignedPartitionsFieldInfo = typeof(MemoryBufferConsumer<int>)
.GetField("_assignedPartitions", BindingFlags.Instance | BindingFlags.NonPublic)!;
var group1Consumers =
queue.CreateConsumers(new BufferConsumerOptions { GroupName = "TestGroup1", AutoCommit = false }, 3)
queue.CreateConsumers(
new BufferConsumerOptions { TopicName = "test", GroupName = "TestGroup1", AutoCommit = false }, 3)
.ToList();

var group2Consumers = queue
.CreateConsumers(new BufferConsumerOptions { GroupName = "TestGroup2", AutoCommit = false }, 4)
.CreateConsumers(
new BufferConsumerOptions { TopicName = "test", GroupName = "TestGroup2", AutoCommit = false }, 4)
.ToList();

var group3Consumers = queue
.CreateConsumers(new BufferConsumerOptions { GroupName = "TestGroup3", AutoCommit = false }, 5)
.CreateConsumers(
new BufferConsumerOptions { TopicName = "test", GroupName = "TestGroup3", AutoCommit = false }, 5)
.ToList();

var group4Consumers = queue
.CreateConsumers(new BufferConsumerOptions { GroupName = "TestGroup4", AutoCommit = false }, 16)
.CreateConsumers(
new BufferConsumerOptions { TopicName = "test", GroupName = "TestGroup4", AutoCommit = false }, 16)
.ToList();

for (var i = 0; i < 3; i++)
Expand Down Expand Up @@ -273,7 +336,13 @@ public void Concurrent_Producer_Single_Partition()
var queue = new MemoryBufferQueue<int>("test", 1);

var countDownEvent = new CountdownEvent(messageSize);
var consumer = queue.CreateConsumer(new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = true });
var consumer =
queue.CreateConsumer(new BufferConsumerOptions
{
TopicName = "test",
GroupName = "TestGroup",
AutoCommit = true
});
_ = Task.Run(async () =>
{
await foreach (var items in consumer.ConsumeAsync())
Expand Down Expand Up @@ -313,7 +382,13 @@ public void Concurrent_Producer_Multiple_Partition()

var queue = new MemoryBufferQueue<int>("test", Environment.ProcessorCount);

var consumer = queue.CreateConsumer(new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = true });
var consumer =
queue.CreateConsumer(new BufferConsumerOptions
{
TopicName = "test",
GroupName = "TestGroup",
AutoCommit = true
});
var countDownEvent = new CountdownEvent(messageSize);
_ = Task.Run(async () =>
{
Expand Down Expand Up @@ -379,6 +454,7 @@ public void Concurrent_Consumer_Multiple_Groups(int groupNumber, int batchSize)
.CreateConsumers(
new BufferConsumerOptions
{
TopicName = "test",
GroupName = "TestGroup" + (groupIndex + 1),
AutoCommit = true,
BatchSize = batchSize
Expand Down Expand Up @@ -436,7 +512,12 @@ public void Concurrent_Producer_And_Concurrent_Consumer_Multiple_Groups(int grou
{
var consumers = queue
.CreateConsumers(
new BufferConsumerOptions { GroupName = "TestGroup" + (groupIndex + 1), AutoCommit = true },
new BufferConsumerOptions
{
TopicName = "test",
GroupName = "TestGroup" + (groupIndex + 1),
AutoCommit = true
},
consumerNumberPerGroup)
.ToList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,64 @@ public void AddMemoryBuffer()
Assert.Equal("topic2", consumer.TopicName);
}
}

[Fact]
public async Task No_Consumption_If_TopicName_Not_Match()
{
var services = new ServiceCollection();
services.AddBuffer(options => options.UseMemory(builder =>
builder
.AddTopic<int>("topic1", 1)
.AddTopic<int>("topic2", 2)
));

var provider = services.BuildServiceProvider();
var bufferQueue = provider.GetRequiredService<IBufferQueue>();

var topic1Producer = bufferQueue.CreateProducer<int>("topic1");
var topic1Consumer =
bufferQueue.CreateConsumer<int>(new BufferConsumerOptions { TopicName = "topic1", GroupName = "test" });
var topic2Consumer =
bufferQueue.CreateConsumer<int>(new BufferConsumerOptions { TopicName = "topic2", GroupName = "test" });

await topic1Producer.ProduceAsync(1);

await foreach (var item in topic1Consumer.ConsumeAsync())
{
Assert.Equal(1, item.Single());
break;
}

_ = Task.Run(async () =>
{
await foreach (var item in topic2Consumer.ConsumeAsync())
{
Assert.True(false, "Should not consume any item.");
}
});

await Task.Delay(100);
}

[Fact]
public void Throw_If_TopicName_Not_Registered()
{
var services = new ServiceCollection();
services.AddBuffer(options => options.UseMemory(builder =>
builder
.AddTopic<int>("topic1", 1)
.AddTopic<int>("topic2", 2)
));

var provider = services.BuildServiceProvider();
var bufferQueue = provider.GetRequiredService<IBufferQueue>();

Assert.Throws<ArgumentException>(() => bufferQueue.CreateProducer<int>("topic3"));
Assert.Throws<ArgumentException>(() =>
bufferQueue.CreateConsumer<int>(new BufferConsumerOptions { TopicName = "topic3", GroupName = "test" }));
Assert.Throws<ArgumentException>(() =>
bufferQueue.CreateConsumers<int>(
new BufferConsumerOptions { TopicName = "topic3", GroupName = "test" },
2));
}
}

0 comments on commit a6910fe

Please sign in to comment.