From 3437acee056d9b3a3d9a62451028d985d859b688 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 18 Sep 2024 18:25:02 -0400 Subject: [PATCH] address comment; adjust timeout --- .../src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java | 2 +- .../src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index 666a138a59bb..0e584d564b5c 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -599,7 +599,7 @@ static BlockingConnection createConnection(MQTT client) throws Exception { try { connecting.await(1, TimeUnit.MINUTES); } catch (TimeoutException e) { - LOG.warn("Connection to MQTT broker pending after waiting for 1 minute"); + LOG.warn("Connection to {} pending after waiting for 1 minute", client.getHost()); continue; } break; diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java index 52c1b80a846a..7d60d6d65780 100644 --- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java @@ -150,7 +150,7 @@ public void testRead() throws Exception { .withConnectionConfiguration( MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, "READ_TOPIC") .withClientId("READ_PIPELINE")) - .withMaxReadTime(Duration.standardSeconds(3))); + .withMaxReadTime(Duration.standardSeconds(5))); PAssert.that(output) .containsInAnyOrder( "This is test 0".getBytes(StandardCharsets.UTF_8), @@ -184,7 +184,7 @@ public void testRead() throws Exception { pipelineConnected = true; } } - Thread.sleep(2000); + Thread.sleep(1000); } for (int i = 0; i < 10; i++) { publishConnection.publish(