Skip to content

Commit

Permalink
change partition id to a constructor parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
eventhorizon-cli committed Jan 29, 2024
1 parent e502c7f commit 717581d
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/Mocha.Core/Buffer/Memory/MemoryBuferQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public MemoryBufferQueue(string topicName, int partitionNumber)
_partitions = new MemoryBufferPartition<T>[partitionNumber];
for (var i = 0; i < partitionNumber; i++)
{
_partitions[i] = new MemoryBufferPartition<T>();
_partitions[i] = new MemoryBufferPartition<T>(i);
}

_producer = new MemoryBufferProducer<T>(topicName, _partitions);
Expand Down
6 changes: 2 additions & 4 deletions src/Mocha.Core/Buffer/Memory/MemoryBufferPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ internal sealed class MemoryBufferPartition<T>
// internal for test
internal readonly int _segmentLength;

private static int _idIncreacement;

private volatile MemoryBufferSegment<T> _head;
private volatile MemoryBufferSegment<T> _tail;

Expand All @@ -25,10 +23,10 @@ internal sealed class MemoryBufferPartition<T>

private readonly object _createSegmentLock;

public MemoryBufferPartition(int segmentLength = 1024)
public MemoryBufferPartition(int id, int segmentLength = 1024)
{
_segmentLength = segmentLength;
PartitionId = _idIncreacement++;
PartitionId = id;
_head = _tail = new MemoryBufferSegment<T>(_segmentLength, default);
_consumerReaders = new ConcurrentDictionary<string, Reader>();
_consumers = new HashSet<MemoryBufferConsumer<T>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class MemoryBufferPartitionTests
[Fact]
public void Enqueue_And_TryPull()
{
var partition = new MemoryBufferPartition<int>(2);
var partition = new MemoryBufferPartition<int>(0, 2);

for (var i = 0; i < 12; i++)
{
Expand Down Expand Up @@ -45,7 +45,7 @@ public void Enqueue_And_TryPull()
[Fact]
public void Repeatable_Pull_If_Not_Commit()
{
var partition = new MemoryBufferPartition<int>(2);
var partition = new MemoryBufferPartition<int>(0, 2);

for (var i = 0; i < 11; i++)
{
Expand Down Expand Up @@ -82,7 +82,7 @@ public void Repeatable_Pull_If_Not_Commit()
[Fact]
public void Segment_Will_Be_Recycled_If_All_Consumers_Consumed_Single_Group()
{
var partition = new MemoryBufferPartition<int>(3);
var partition = new MemoryBufferPartition<int>(0, 3);

for (var i = 0; i < 9; i++)
{
Expand Down Expand Up @@ -116,7 +116,7 @@ public void Segment_Will_Be_Recycled_If_All_Consumers_Consumed_Single_Group()
[Fact]
public void Segment_Will_Be_Recycled_If_All_Consumers_Consumed_MultipleGroup()
{
var partition = new MemoryBufferPartition<int>(3);
var partition = new MemoryBufferPartition<int>(0, 3);

for (var i = 0; i < 9; i++)
{
Expand Down Expand Up @@ -148,7 +148,7 @@ public void Segment_Will_Be_Recycled_If_All_Consumers_Consumed_MultipleGroup()
[Fact]
public void Segment_Will_Not_Be_Recycled_If_Not_All_Consumers_Consumed_MultipleGroup()
{
var partition = new MemoryBufferPartition<int>(3);
var partition = new MemoryBufferPartition<int>(0, 3);

for (var i = 0; i < 6; i++)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Mocha.Core.Tests.Buffer.Memory;

public class MemoryBufferQueueTests
{
private static int MemoryBufferPartitionSegmentLength => new MemoryBufferPartition<int>()._segmentLength;
private static int MemoryBufferPartitionSegmentLength => new MemoryBufferPartition<int>(0)._segmentLength;

[Fact]
public async Task Produce_And_Consume()
Expand Down

0 comments on commit 717581d

Please sign in to comment.