diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java index 02d28eaad586..82a31b9d037e 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java @@ -111,12 +111,13 @@ public void testReadFromShards() { @Test public void testReadWithEFOFromShards() { SubscribeToShardEvent shard0event = eventWithRecords(3); - SubscribeToShardEvent shard1event = eventWithRecords(3); - SubscribeToShardEvent shard2event = eventWithRecords(3); + SubscribeToShardEvent shard1event = eventWithRecords(4); + SubscribeToShardEvent shard2event = eventWithRecords(5); EFOStubbedKinesisAsyncClient asyncClientStub = new EFOStubbedKinesisAsyncClient(10); asyncClientStub.stubSubscribeToShard("0", shard0event); asyncClientStub.stubSubscribeToShard("1", shard1event); - asyncClientStub.stubSubscribeToShard("2", shard1event); + asyncClientStub.stubSubscribeToShard("2", shard2event); + MockClientBuilderFactory.set(p, KinesisAsyncClientBuilder.class, asyncClientStub); Iterable expectedRecords = concat(shard0event.records(), shard1event.records(), shard2event.records()); @@ -128,7 +129,7 @@ public void testReadWithEFOFromShards() { .withConsumerArn("consumer") .withInitialPositionInStream(TRIM_HORIZON) .withArrivalTimeWatermarkPolicy() - .withMaxNumRecords(9); + .withMaxNumRecords(12); PCollection result = p.apply(read).apply(ParDo.of(new KinesisIOReadTest.ToRecord())); PAssert.that(result).containsInAnyOrder(expectedRecords);