diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 6cd6505e680b..4e9672767463 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -214,7 +214,6 @@ public void testReadMessages() throws Exception { producer.send(message); producer.send(message); producer.send(message); - producer.send(message); producer.close(); session.close(); connection.close(); @@ -232,11 +231,22 @@ public void testReadMessages() throws Exception { PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(5L); pipeline.run(); - connection = connectionFactory.createConnection(USERNAME, PASSWORD); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE)); - Message msg = consumer.receiveNoWait(); - assertNull(msg); + assertQueueIsEmpty(); + } + + private void assertQueueIsEmpty() throws JMSException { + // we need to disable the prefetch, otherwise the receiveNoWait could still return null even + // when there are messages waiting on the queue + Connection connection = + connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection(USERNAME, PASSWORD); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE)); + connection.start(); + assertNull(consumer.receiveNoWait()); + } finally { + connection.close(); + } } @Test @@ -268,11 +278,7 @@ public void testReadBytesMessages() throws Exception { PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(1L); pipeline.run(); - connection = connectionFactory.createConnection(USERNAME, PASSWORD); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE)); - Message msg = consumer.receiveNoWait(); - assertNull(msg); + assertQueueIsEmpty(); } @Test @@ -921,11 +927,7 @@ public void testWriteMessageWithRetryPolicyReachesLimit() throws Exception { "%s:%s", JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME))))))); - Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE)); - assertNull(consumer.receiveNoWait()); + assertQueueIsEmpty(); } @Test