Skip to content

Commit

Permalink
start the connection before calling receive in JmsIOTest (#31334)
Browse files Browse the repository at this point in the history
  • Loading branch information
tilgalas authored May 20, 2024
1 parent 092f769 commit 2e4a152
Showing 1 changed file with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ public void testReadMessages() throws Exception {
producer.send(message);
producer.send(message);
producer.send(message);
producer.send(message);
producer.close();
session.close();
connection.close();
Expand All @@ -232,11 +231,22 @@ public void testReadMessages() throws Exception {
PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(5L);
pipeline.run();

connection = connectionFactory.createConnection(USERNAME, PASSWORD);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE));
Message msg = consumer.receiveNoWait();
assertNull(msg);
assertQueueIsEmpty();
}

private void assertQueueIsEmpty() throws JMSException {
// we need to disable the prefetch, otherwise the receiveNoWait could still return null even
// when there are messages waiting on the queue
Connection connection =
connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection(USERNAME, PASSWORD);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE));
connection.start();
assertNull(consumer.receiveNoWait());
} finally {
connection.close();
}
}

@Test
Expand Down Expand Up @@ -268,11 +278,7 @@ public void testReadBytesMessages() throws Exception {
PAssert.thatSingleton(output.apply("Count", Count.<String>globally())).isEqualTo(1L);
pipeline.run();

connection = connectionFactory.createConnection(USERNAME, PASSWORD);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE));
Message msg = consumer.receiveNoWait();
assertNull(msg);
assertQueueIsEmpty();
}

@Test
Expand Down Expand Up @@ -921,11 +927,7 @@ public void testWriteMessageWithRetryPolicyReachesLimit() throws Exception {
"%s:%s",
JMS_IO_PRODUCER_METRIC_NAME, PUBLICATION_RETRIES_METRIC_NAME)))))));

Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE));
assertNull(consumer.receiveNoWait());
assertQueueIsEmpty();
}

@Test
Expand Down

0 comments on commit 2e4a152

Please sign in to comment.