From b247d067e4cdaac581f21ca12e0830116f02892e Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 18 Sep 2024 14:17:25 -0400 Subject: [PATCH 1/2] Emit warning when Matt waiting for connection for extended period of time --- .../org/apache/beam/sdk/io/mqtt/MqttIO.java | 24 +++++++++++++++---- .../apache/beam/sdk/io/mqtt/MqttIOTest.java | 5 ++-- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index 8b7f0991c2dd..666a138a59bb 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -30,6 +30,7 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; @@ -45,6 +46,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.QoS; @@ -431,8 +433,7 @@ public boolean start() throws IOException { client = spec.connectionConfiguration().createClient(); LOG.debug("Reader client ID is {}", client.getClientId()); checkpointMark.clientId = client.getClientId().toString(); - connection = client.blockingConnection(); - connection.connect(); + connection = createConnection(client); connection.subscribe( new Topic[] {new Topic(spec.connectionConfiguration().getTopic(), QoS.AT_LEAST_ONCE)}); return advance(); @@ -569,8 +570,7 @@ public void createMqttClient() throws Exception { LOG.debug("Starting MQTT writer"); client = spec.connectionConfiguration().createClient(); LOG.debug("MQTT writer client ID is {}", client.getClientId()); - connection = client.blockingConnection(); - connection.connect(); + connection = createConnection(client); } @ProcessElement @@ -590,4 +590,20 @@ public void closeMqttClient() throws Exception { } } } + + /** Create a connected MQTT BlockingConnection from given client, aware of connection timeout. */ + static BlockingConnection createConnection(MQTT client) throws Exception { + FutureConnection futureConnection = client.futureConnection(); + org.fusesource.mqtt.client.Future connecting = futureConnection.connect(); + while (true) { + try { + connecting.await(1, TimeUnit.MINUTES); + } catch (TimeoutException e) { + LOG.warn("Connection to MQTT broker pending after waiting for 1 minute"); + continue; + } + break; + } + return new BlockingConnection(futureConnection); + } } diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java index 30adad708f8d..52c1b80a846a 100644 --- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java @@ -142,8 +142,7 @@ public void testReadNoClientId() throws Exception { publisherThread.join(); } - @Test(timeout = 30 * 1000) - @Ignore("https://github.com/apache/beam/issues/19092 Flake Non-deterministic output.") + @Test(timeout = 40 * 1000) public void testRead() throws Exception { PCollection output = pipeline.apply( @@ -180,12 +179,12 @@ public void testRead() throws Exception { + "messages ..."); boolean pipelineConnected = false; while (!pipelineConnected) { - Thread.sleep(1000); for (Connection connection : brokerService.getBroker().getClients()) { if (connection.getConnectionId().startsWith("READ_PIPELINE")) { pipelineConnected = true; } } + Thread.sleep(2000); } for (int i = 0; i < 10; i++) { publishConnection.publish( From 3437acee056d9b3a3d9a62451028d985d859b688 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 18 Sep 2024 18:25:02 -0400 Subject: [PATCH 2/2] address comment; adjust timeout --- .../src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java | 2 +- .../src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index 666a138a59bb..0e584d564b5c 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -599,7 +599,7 @@ static BlockingConnection createConnection(MQTT client) throws Exception { try { connecting.await(1, TimeUnit.MINUTES); } catch (TimeoutException e) { - LOG.warn("Connection to MQTT broker pending after waiting for 1 minute"); + LOG.warn("Connection to {} pending after waiting for 1 minute", client.getHost()); continue; } break; diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java index 52c1b80a846a..7d60d6d65780 100644 --- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java @@ -150,7 +150,7 @@ public void testRead() throws Exception { .withConnectionConfiguration( MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, "READ_TOPIC") .withClientId("READ_PIPELINE")) - .withMaxReadTime(Duration.standardSeconds(3))); + .withMaxReadTime(Duration.standardSeconds(5))); PAssert.that(output) .containsInAnyOrder( "This is test 0".getBytes(StandardCharsets.UTF_8), @@ -184,7 +184,7 @@ public void testRead() throws Exception { pipelineConnected = true; } } - Thread.sleep(2000); + Thread.sleep(1000); } for (int i = 0; i < 10; i++) { publishConnection.publish(