diff --git a/README.md b/README.md index a720f9c..139550d 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,8 @@ English | [简体中文](./README.zh-CN.md) Mocha is an application performance monitor tools based on [OpenTelemetry](https://opentelemetry.io), which also provides a scalable platform for observability data analysis and storage. +**Note: Use `git clone --recursive` to clone this repository with submodules.** + ## Quick Start In the beta phase, we provide a Docker Compose file for users to experience our system locally. diff --git a/README.zh-CN.md b/README.zh-CN.md index 2288c4e..210593f 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -7,6 +7,8 @@ Mocha Mocha 是一个基于 [OpenTelemetry](https://opentelemetry.io) 的 APM 系统,同时提供可伸缩的可观测性数据分析和存储平台。 +**注意:使用 `git clone --recursive` 克隆本仓库以及子模块。** + ## 快速开始 现阶段,我们提供了 Docker Compose 文件,方便用户在本地体验我们的系统。 diff --git a/src/Mocha.Core/Buffer/BufferConsumerOptions.cs b/src/Mocha.Core/Buffer/BufferConsumerOptions.cs index b00b81d..4dbc514 100644 --- a/src/Mocha.Core/Buffer/BufferConsumerOptions.cs +++ b/src/Mocha.Core/Buffer/BufferConsumerOptions.cs @@ -5,9 +5,9 @@ namespace Mocha.Core.Buffer; public class BufferConsumerOptions { - public string TopicName { get; init; } = default!; + public required string TopicName { get; init; } - public string GroupName { get; init; } = default!; + public required string GroupName { get; init; } public bool AutoCommit { get; init; } diff --git a/src/Mocha.Core/Buffer/BufferQueue.cs b/src/Mocha.Core/Buffer/BufferQueue.cs index 1d71861..5c4eaea 100644 --- a/src/Mocha.Core/Buffer/BufferQueue.cs +++ b/src/Mocha.Core/Buffer/BufferQueue.cs @@ -10,21 +10,24 @@ internal class BufferQueue(IServiceProvider serviceProvider) : IBufferQueue public IBufferProducer CreateProducer(string topicName) { ArgumentException.ThrowIfNullOrEmpty(topicName, nameof(topicName)); - var queue = serviceProvider.GetRequiredKeyedService>(topicName); + var queue = serviceProvider.GetKeyedService>(topicName) ?? + throw new ArgumentException($"The topic '{topicName}' has not been registered."); return queue.CreateProducer(); } public IBufferConsumer CreateConsumer(BufferConsumerOptions options) { ArgumentException.ThrowIfNullOrEmpty(options.TopicName, nameof(options.TopicName)); - var queue = serviceProvider.GetRequiredKeyedService>(options.TopicName); + var queue = serviceProvider.GetKeyedService>(options.TopicName) ?? + throw new ArgumentException($"The topic '{options.TopicName}' has not been registered."); return queue.CreateConsumer(options); } public IEnumerable> CreateConsumers(BufferConsumerOptions options, int consumerNumber) { ArgumentException.ThrowIfNullOrEmpty(options.TopicName, nameof(options.TopicName)); - var queue = serviceProvider.GetRequiredKeyedService>(options.TopicName); + var queue = serviceProvider.GetKeyedService>(options.TopicName) ?? + throw new ArgumentException($"The topic '{options.TopicName}' has not been registered."); return queue.CreateConsumers(options, consumerNumber); } } diff --git a/src/Mocha.Core/Buffer/IBufferQueue.cs b/src/Mocha.Core/Buffer/IBufferQueue.cs index eea0839..ae181a5 100644 --- a/src/Mocha.Core/Buffer/IBufferQueue.cs +++ b/src/Mocha.Core/Buffer/IBufferQueue.cs @@ -5,9 +5,38 @@ namespace Mocha.Core.Buffer; public interface IBufferQueue { + /// + /// Create a producer for the specified topic. + /// + /// The topic name. + /// The type of the item. + /// The producer. IBufferProducer CreateProducer(string topicName); + /// + /// Create a consumer for the specified topic. + /// This method can only be called once for each consumer group within the same topic. + /// Use the method to create multiple consumers. + /// + /// The consumer options. + /// The type of the item. + /// The consumer. + /// The topic name is. + /// The group name is empty. + /// The consumer group has been created. IBufferConsumer CreateConsumer(BufferConsumerOptions options); + /// + /// Create multiple consumers for the specified topic. + /// This method can only be called once for each consumer group within the same topic. + /// + /// The consumer options. + /// The number of consumers. + /// The type of the item. + /// The consumers. + /// The number of consumers must be greater than 0 and cannot be greater than the number of partitions. + /// The topic name is. + /// The group name is empty. + /// The consumer group has been created. IEnumerable> CreateConsumers(BufferConsumerOptions options, int consumerNumber); } diff --git a/src/Mocha.Core/Buffer/Memory/MemoryBufferSegment.cs b/src/Mocha.Core/Buffer/Memory/MemoryBufferSegment.cs index bc5117e..a68b6a7 100644 --- a/src/Mocha.Core/Buffer/Memory/MemoryBufferSegment.cs +++ b/src/Mocha.Core/Buffer/Memory/MemoryBufferSegment.cs @@ -66,7 +66,7 @@ public bool TryGet(MemoryBufferPartitionOffset offset, int count, [NotNullWhen(t if (_writePosition < 0 || readPosition > _writePosition) { - items = default!; + items = null; return false; } diff --git a/tests/Mocha.Core.Benchmarks/MemoryBufferQueueConsumeBenchmark.cs b/tests/Mocha.Core.Benchmarks/MemoryBufferQueueConsumeBenchmark.cs index b12d984..5b08558 100644 --- a/tests/Mocha.Core.Benchmarks/MemoryBufferQueueConsumeBenchmark.cs +++ b/tests/Mocha.Core.Benchmarks/MemoryBufferQueueConsumeBenchmark.cs @@ -31,7 +31,13 @@ public void Setup() } _consumers = _memoryBufferQueue!.CreateConsumers( - new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = true, BatchSize = BatchSize, }, + new BufferConsumerOptions + { + GroupName = "TestGroup", + TopicName = "test", + AutoCommit = true, + BatchSize = BatchSize, + }, Environment.ProcessorCount); } diff --git a/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferQueueTests.cs b/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferQueueTests.cs index 77216fb..ab81f89 100644 --- a/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferQueueTests.cs +++ b/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferQueueTests.cs @@ -25,6 +25,7 @@ public async Task Produce_And_Consume() var producer = queue.CreateProducer(); var consumer = queue.CreateConsumer(new BufferConsumerOptions { + TopicName = "test", GroupName = "TestGroup", AutoCommit = false, BatchSize = 2 @@ -64,7 +65,13 @@ public async Task Produce_And_Consume_AutoCommit() var queue = new MemoryBufferQueue("test", 1); var producer = queue.CreateProducer(); var consumer = queue.CreateConsumer( - new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = true, BatchSize = 2 }); + new BufferConsumerOptions + { + TopicName = "test", + GroupName = "TestGroup", + AutoCommit = true, + BatchSize = 2 + }); var expectedValues = new int[10]; for (var i = 0; i < 10; i++) @@ -94,7 +101,13 @@ public async Task Produce_And_Consume_With_Multiple_Partitions() var queue = new MemoryBufferQueue("test", 2); var producer = queue.CreateProducer(); var consumer = queue.CreateConsumer( - new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = false, BatchSize = 2 }); + new BufferConsumerOptions + { + TopicName = "test", + GroupName = "TestGroup", + AutoCommit = false, + BatchSize = 2 + }); var expectedValues = new int[10]; for (var i = 0; i < 10; i++) @@ -129,7 +142,14 @@ public async Task Produce_And_Consume_With_Multiple_Consumers() var queue = new MemoryBufferQueue("test", 2); var producer = queue.CreateProducer(); var consumers = queue - .CreateConsumers(new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = false, BatchSize = 6 }, + .CreateConsumers( + new BufferConsumerOptions + { + TopicName = "test", + GroupName = "TestGroup", + AutoCommit = false, + BatchSize = 6 + }, 2).ToList(); var consumer1 = consumers[0]; var consumer2 = consumers[1]; @@ -152,13 +172,46 @@ public async Task Produce_And_Consume_With_Multiple_Consumers() } } + [Fact] + public void Throw_If_Wrong_Consumer_Number() + { + var queue = new MemoryBufferQueue("test", 2); + Assert.Throws(() => + queue.CreateConsumers( + new BufferConsumerOptions + { + TopicName = "test", + GroupName = "TestGroup", + AutoCommit = false, + BatchSize = 6 + }, + 3).ToList()); + + Assert.Throws(() => + queue.CreateConsumers( + new BufferConsumerOptions + { + TopicName = "test", + GroupName = "TestGroup", + AutoCommit = false, + BatchSize = 6 + }, + 0).ToList()); + } + [Fact] public async Task Offset_Will_Not_Change_If_Consumer_Not_Commit() { var queue = new MemoryBufferQueue("test", 1); var producer = queue.CreateProducer(); var consumer = queue.CreateConsumer( - new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = false, BatchSize = 7 }); + new BufferConsumerOptions + { + TopicName = "test", + GroupName = "TestGroup", + AutoCommit = false, + BatchSize = 7 + }); for (var i = 0; i < 10; i++) { @@ -195,7 +248,13 @@ public async Task Consumer_Will_Wait_Until_Produce() { var queue = new MemoryBufferQueue("test", 1); var producer = queue.CreateProducer(); - var consumer = queue.CreateConsumer(new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = false }); + var consumer = + queue.CreateConsumer(new BufferConsumerOptions + { + TopicName = "test", + GroupName = "TestGroup", + AutoCommit = false + }); var task = Task.Run(async () => { @@ -221,19 +280,23 @@ public void Equal_Distribution_Load_Balancing_Strategy_For_Consumers() var assignedPartitionsFieldInfo = typeof(MemoryBufferConsumer) .GetField("_assignedPartitions", BindingFlags.Instance | BindingFlags.NonPublic)!; var group1Consumers = - queue.CreateConsumers(new BufferConsumerOptions { GroupName = "TestGroup1", AutoCommit = false }, 3) + queue.CreateConsumers( + new BufferConsumerOptions { TopicName = "test", GroupName = "TestGroup1", AutoCommit = false }, 3) .ToList(); var group2Consumers = queue - .CreateConsumers(new BufferConsumerOptions { GroupName = "TestGroup2", AutoCommit = false }, 4) + .CreateConsumers( + new BufferConsumerOptions { TopicName = "test", GroupName = "TestGroup2", AutoCommit = false }, 4) .ToList(); var group3Consumers = queue - .CreateConsumers(new BufferConsumerOptions { GroupName = "TestGroup3", AutoCommit = false }, 5) + .CreateConsumers( + new BufferConsumerOptions { TopicName = "test", GroupName = "TestGroup3", AutoCommit = false }, 5) .ToList(); var group4Consumers = queue - .CreateConsumers(new BufferConsumerOptions { GroupName = "TestGroup4", AutoCommit = false }, 16) + .CreateConsumers( + new BufferConsumerOptions { TopicName = "test", GroupName = "TestGroup4", AutoCommit = false }, 16) .ToList(); for (var i = 0; i < 3; i++) @@ -273,7 +336,13 @@ public void Concurrent_Producer_Single_Partition() var queue = new MemoryBufferQueue("test", 1); var countDownEvent = new CountdownEvent(messageSize); - var consumer = queue.CreateConsumer(new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = true }); + var consumer = + queue.CreateConsumer(new BufferConsumerOptions + { + TopicName = "test", + GroupName = "TestGroup", + AutoCommit = true + }); _ = Task.Run(async () => { await foreach (var items in consumer.ConsumeAsync()) @@ -313,7 +382,13 @@ public void Concurrent_Producer_Multiple_Partition() var queue = new MemoryBufferQueue("test", Environment.ProcessorCount); - var consumer = queue.CreateConsumer(new BufferConsumerOptions { GroupName = "TestGroup", AutoCommit = true }); + var consumer = + queue.CreateConsumer(new BufferConsumerOptions + { + TopicName = "test", + GroupName = "TestGroup", + AutoCommit = true + }); var countDownEvent = new CountdownEvent(messageSize); _ = Task.Run(async () => { @@ -379,6 +454,7 @@ public void Concurrent_Consumer_Multiple_Groups(int groupNumber, int batchSize) .CreateConsumers( new BufferConsumerOptions { + TopicName = "test", GroupName = "TestGroup" + (groupIndex + 1), AutoCommit = true, BatchSize = batchSize @@ -436,7 +512,12 @@ public void Concurrent_Producer_And_Concurrent_Consumer_Multiple_Groups(int grou { var consumers = queue .CreateConsumers( - new BufferConsumerOptions { GroupName = "TestGroup" + (groupIndex + 1), AutoCommit = true }, + new BufferConsumerOptions + { + TopicName = "test", + GroupName = "TestGroup" + (groupIndex + 1), + AutoCommit = true + }, consumerNumberPerGroup) .ToList(); diff --git a/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferServiceCollectionExtensionsTests.cs b/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferServiceCollectionExtensionsTests.cs index 99148d2..497a97e 100644 --- a/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferServiceCollectionExtensionsTests.cs +++ b/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferServiceCollectionExtensionsTests.cs @@ -38,4 +38,64 @@ public void AddMemoryBuffer() Assert.Equal("topic2", consumer.TopicName); } } + + [Fact] + public async Task No_Consumption_If_TopicName_Not_Match() + { + var services = new ServiceCollection(); + services.AddBuffer(options => options.UseMemory(builder => + builder + .AddTopic("topic1", 1) + .AddTopic("topic2", 2) + )); + + var provider = services.BuildServiceProvider(); + var bufferQueue = provider.GetRequiredService(); + + var topic1Producer = bufferQueue.CreateProducer("topic1"); + var topic1Consumer = + bufferQueue.CreateConsumer(new BufferConsumerOptions { TopicName = "topic1", GroupName = "test" }); + var topic2Consumer = + bufferQueue.CreateConsumer(new BufferConsumerOptions { TopicName = "topic2", GroupName = "test" }); + + await topic1Producer.ProduceAsync(1); + + await foreach (var item in topic1Consumer.ConsumeAsync()) + { + Assert.Equal(1, item.Single()); + break; + } + + _ = Task.Run(async () => + { + await foreach (var item in topic2Consumer.ConsumeAsync()) + { + Assert.True(false, "Should not consume any item."); + } + }); + + await Task.Delay(100); + } + + [Fact] + public void Throw_If_TopicName_Not_Registered() + { + var services = new ServiceCollection(); + services.AddBuffer(options => options.UseMemory(builder => + builder + .AddTopic("topic1", 1) + .AddTopic("topic2", 2) + )); + + var provider = services.BuildServiceProvider(); + var bufferQueue = provider.GetRequiredService(); + + Assert.Throws(() => bufferQueue.CreateProducer("topic3")); + Assert.Throws(() => + bufferQueue.CreateConsumer(new BufferConsumerOptions { TopicName = "topic3", GroupName = "test" })); + Assert.Throws(() => + bufferQueue.CreateConsumers( + new BufferConsumerOptions { TopicName = "topic3", GroupName = "test" }, + 2)); + } }