diff --git a/src/Mocha.Core/Buffer/Memory/MemoryBuferQueue.cs b/src/Mocha.Core/Buffer/Memory/MemoryBuferQueue.cs index e9e26a3..a2f5f69 100644 --- a/src/Mocha.Core/Buffer/Memory/MemoryBuferQueue.cs +++ b/src/Mocha.Core/Buffer/Memory/MemoryBuferQueue.cs @@ -22,7 +22,7 @@ public MemoryBufferQueue(string topicName, int partitionNumber) _partitions = new MemoryBufferPartition[partitionNumber]; for (var i = 0; i < partitionNumber; i++) { - _partitions[i] = new MemoryBufferPartition(); + _partitions[i] = new MemoryBufferPartition(i); } _producer = new MemoryBufferProducer(topicName, _partitions); diff --git a/src/Mocha.Core/Buffer/Memory/MemoryBufferPartition.cs b/src/Mocha.Core/Buffer/Memory/MemoryBufferPartition.cs index 59d2610..b68f3b4 100644 --- a/src/Mocha.Core/Buffer/Memory/MemoryBufferPartition.cs +++ b/src/Mocha.Core/Buffer/Memory/MemoryBufferPartition.cs @@ -14,8 +14,6 @@ internal sealed class MemoryBufferPartition // internal for test internal readonly int _segmentLength; - private static int _idIncreacement; - private volatile MemoryBufferSegment _head; private volatile MemoryBufferSegment _tail; @@ -25,10 +23,10 @@ internal sealed class MemoryBufferPartition private readonly object _createSegmentLock; - public MemoryBufferPartition(int segmentLength = 1024) + public MemoryBufferPartition(int id, int segmentLength = 1024) { _segmentLength = segmentLength; - PartitionId = _idIncreacement++; + PartitionId = id; _head = _tail = new MemoryBufferSegment(_segmentLength, default); _consumerReaders = new ConcurrentDictionary(); _consumers = new HashSet>(); diff --git a/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferPartitionTests.cs b/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferPartitionTests.cs index f2d0ef6..3e757d3 100644 --- a/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferPartitionTests.cs +++ b/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferPartitionTests.cs @@ -11,7 +11,7 @@ public class MemoryBufferPartitionTests [Fact] public void Enqueue_And_TryPull() { - var partition = new MemoryBufferPartition(2); + var partition = new MemoryBufferPartition(0, 2); for (var i = 0; i < 12; i++) { @@ -45,7 +45,7 @@ public void Enqueue_And_TryPull() [Fact] public void Repeatable_Pull_If_Not_Commit() { - var partition = new MemoryBufferPartition(2); + var partition = new MemoryBufferPartition(0, 2); for (var i = 0; i < 11; i++) { @@ -82,7 +82,7 @@ public void Repeatable_Pull_If_Not_Commit() [Fact] public void Segment_Will_Be_Recycled_If_All_Consumers_Consumed_Single_Group() { - var partition = new MemoryBufferPartition(3); + var partition = new MemoryBufferPartition(0, 3); for (var i = 0; i < 9; i++) { @@ -116,7 +116,7 @@ public void Segment_Will_Be_Recycled_If_All_Consumers_Consumed_Single_Group() [Fact] public void Segment_Will_Be_Recycled_If_All_Consumers_Consumed_MultipleGroup() { - var partition = new MemoryBufferPartition(3); + var partition = new MemoryBufferPartition(0, 3); for (var i = 0; i < 9; i++) { @@ -148,7 +148,7 @@ public void Segment_Will_Be_Recycled_If_All_Consumers_Consumed_MultipleGroup() [Fact] public void Segment_Will_Not_Be_Recycled_If_Not_All_Consumers_Consumed_MultipleGroup() { - var partition = new MemoryBufferPartition(3); + var partition = new MemoryBufferPartition(0, 3); for (var i = 0; i < 6; i++) { diff --git a/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferQueueTests.cs b/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferQueueTests.cs index c3324b7..d4cf85c 100644 --- a/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferQueueTests.cs +++ b/tests/Mocha.Core.Tests/Buffer/Memory/MemoryBufferQueueTests.cs @@ -9,7 +9,7 @@ namespace Mocha.Core.Tests.Buffer.Memory; public class MemoryBufferQueueTests { - private static int MemoryBufferPartitionSegmentLength => new MemoryBufferPartition()._segmentLength; + private static int MemoryBufferPartitionSegmentLength => new MemoryBufferPartition(0)._segmentLength; [Fact] public async Task Produce_And_Consume()