Skip to content

Commit

Permalink
[AWS] Fix bug in KinesisIOReadTest (closes #27666) (#27686)
Browse files Browse the repository at this point in the history
  • Loading branch information
Moritz Mack authored Jul 26, 2023
1 parent d822fc9 commit 21e040c
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,13 @@ public void testReadFromShards() {
@Test
public void testReadWithEFOFromShards() {
SubscribeToShardEvent shard0event = eventWithRecords(3);
SubscribeToShardEvent shard1event = eventWithRecords(3);
SubscribeToShardEvent shard2event = eventWithRecords(3);
SubscribeToShardEvent shard1event = eventWithRecords(4);
SubscribeToShardEvent shard2event = eventWithRecords(5);
EFOStubbedKinesisAsyncClient asyncClientStub = new EFOStubbedKinesisAsyncClient(10);
asyncClientStub.stubSubscribeToShard("0", shard0event);
asyncClientStub.stubSubscribeToShard("1", shard1event);
asyncClientStub.stubSubscribeToShard("2", shard1event);
asyncClientStub.stubSubscribeToShard("2", shard2event);

MockClientBuilderFactory.set(p, KinesisAsyncClientBuilder.class, asyncClientStub);
Iterable<Record> expectedRecords =
concat(shard0event.records(), shard1event.records(), shard2event.records());
Expand All @@ -128,7 +129,7 @@ public void testReadWithEFOFromShards() {
.withConsumerArn("consumer")
.withInitialPositionInStream(TRIM_HORIZON)
.withArrivalTimeWatermarkPolicy()
.withMaxNumRecords(9);
.withMaxNumRecords(12);

PCollection<Record> result = p.apply(read).apply(ParDo.of(new KinesisIOReadTest.ToRecord()));
PAssert.that(result).containsInAnyOrder(expectedRecords);
Expand Down

0 comments on commit 21e040c

Please sign in to comment.