diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/annotations/FlakyTest.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/annotations/FlakyTest.java index 27c26b123d6..e0a9db07d38 100644 --- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/annotations/FlakyTest.java +++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/testing/annotations/FlakyTest.java @@ -19,14 +19,21 @@ package org.apache.bookkeeper.common.testing.annotations; import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; /** * Intended for marking a test case as flaky. */ @Documented -@Retention(RetentionPolicy.SOURCE) +@Target({ ElementType.TYPE, ElementType.METHOD }) +@Retention(RetentionPolicy.RUNTIME) +@Tag("flaky") +@Test public @interface FlakyTest { /** diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java index b242a0d8663..818e1c7795f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java @@ -148,8 +148,8 @@ public void testStorageThresholdCompaction() throws Exception { File ledgerDir2 = tmpDirs.createNew("ledger", "test2"); File journalDir = tmpDirs.createNew("journal", "test"); String[] ledgerDirNames = new String[]{ - ledgerDir1.getPath(), - ledgerDir2.getPath() + ledgerDir1.getPath(), + ledgerDir2.getPath() }; conf.setLedgerDirNames(ledgerDirNames); conf.setJournalDirName(journalDir.getPath()); @@ -224,7 +224,7 @@ public void diskFull(File disk) { // there are no writableLedgerDirs for (File ledgerDir : bookie.getLedgerDirsManager().getAllLedgerDirs()) { assertFalse("Found entry log file ([0,1,2].log. They should have been compacted" + ledgerDir, - TestUtils.hasLogFiles(ledgerDir.getParentFile(), true, 0, 1, 2)); + TestUtils.hasLogFiles(ledgerDir.getParentFile(), true, 0, 1, 2)); } try { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java index 4b92df528fe..23649d38f7b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/DefaultEntryLogTest.java @@ -22,12 +22,13 @@ import static org.apache.bookkeeper.bookie.storage.EntryLogTestUtils.assertEntryEquals; import static org.apache.bookkeeper.bookie.storage.EntryLogTestUtils.makeEntry; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -41,7 +42,6 @@ import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.util.ReferenceCountUtil; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; @@ -72,20 +72,21 @@ import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap; import org.apache.commons.io.FileUtils; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.FixMethodOrder; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runners.MethodSorters; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestMethodOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Tests for EntryLog. */ -@FixMethodOrder(MethodSorters.NAME_ASCENDING) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@TestMethodOrder(MethodOrderer.MethodName.class) public class DefaultEntryLogTest { private static final Logger LOG = LoggerFactory.getLogger(DefaultEntryLogTest.class); @@ -104,7 +105,7 @@ File createTempDir(String prefix, String suffix) throws IOException { private LedgerDirsManager dirsMgr; private DefaultEntryLogger entryLogger; - @Before + @BeforeAll public void setUp() throws Exception { this.rootDir = createTempDir("bkTest", ".dir"); this.curDir = BookieImpl.getCurrentDirectory(rootDir); @@ -119,7 +120,7 @@ public void setUp() throws Exception { this.entryLogger = new DefaultEntryLogger(conf, dirsMgr); } - @After + @AfterAll public void tearDown() throws Exception { if (null != this.entryLogger) { entryLogger.close(); @@ -332,28 +333,6 @@ public void testMissingLogId() throws Exception { } } - /** - * Test that EntryLogger Should fail with FNFE, if entry logger directories does not exist. - */ - @Ignore // no longer valid as LedgerDirsManager creates the directory as needed - public void testEntryLoggerShouldThrowFNFEIfDirectoriesDoesNotExist() - throws Exception { - File tmpDir = createTempDir("bkTest", ".dir"); - DefaultEntryLogger entryLogger = null; - try { - entryLogger = new DefaultEntryLogger(conf, new LedgerDirsManager(conf, new File[] { tmpDir }, - new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()))); - fail("Expecting FileNotFoundException"); - } catch (FileNotFoundException e) { - assertEquals("Entry log directory '" + tmpDir + "/current' does not exist", e - .getLocalizedMessage()); - } finally { - if (entryLogger != null) { - entryLogger.close(); - } - } - } - /** * Test to verify the DiskFull during addEntry. */ @@ -384,9 +363,9 @@ public void testAddEntryFailureOnDiskFull() throws Exception { .getCurrentLogForLedger(DefaultEntryLogger.UNASSIGNED_LEDGERID).getLogFile().getParentFile()); ledgerStorage.addEntry(generateEntry(3, 1)); // Verify written entries - Assert.assertTrue(0 == generateEntry(1, 1).compareTo(ledgerStorage.getEntry(1, 1))); - Assert.assertTrue(0 == generateEntry(2, 1).compareTo(ledgerStorage.getEntry(2, 1))); - Assert.assertTrue(0 == generateEntry(3, 1).compareTo(ledgerStorage.getEntry(3, 1))); + assertEquals(0, generateEntry(1, 1).compareTo(ledgerStorage.getEntry(1, 1))); + assertEquals(0, generateEntry(2, 1).compareTo(ledgerStorage.getEntry(2, 1))); + assertEquals(0, generateEntry(3, 1).compareTo(ledgerStorage.getEntry(3, 1))); } /** @@ -1014,10 +993,9 @@ public void testEntryLogManagerInterfaceForEntryLogPerLedger() throws Exception /* * since new entryLogs are set for all the ledgers, previous entrylogs would be added to rotatedLogChannels */ - Assert.assertEquals("Number of current active EntryLogs ", numOfLedgers, - entryLogManager.getCopyOfCurrentLogs().size()); - Assert.assertEquals("Number of Rotated Logs ", numOfLedgers, - entryLogManager.getRotatedLogChannels().size()); + assertEquals(numOfLedgers, entryLogManager.getCopyOfCurrentLogs().size(), + "Number of current active EntryLogs "); + assertEquals(numOfLedgers, entryLogManager.getRotatedLogChannels().size(), "Number of Rotated Logs "); for (long i = 0; i < numOfLedgers; i++) { entryLogManager.setCurrentLogForLedgerAndAddToRotate(i, @@ -1028,25 +1006,24 @@ public void testEntryLogManagerInterfaceForEntryLogPerLedger() throws Exception * again since new entryLogs are set for all the ledgers, previous entrylogs would be added to * rotatedLogChannels */ - Assert.assertEquals("Number of current active EntryLogs ", numOfLedgers, - entryLogManager.getCopyOfCurrentLogs().size()); - Assert.assertEquals("Number of Rotated Logs ", 2 * numOfLedgers, - entryLogManager.getRotatedLogChannels().size()); + assertEquals(numOfLedgers, entryLogManager.getCopyOfCurrentLogs().size(), + "Number of current active EntryLogs "); + assertEquals(2 * numOfLedgers, entryLogManager.getRotatedLogChannels().size(), "Number of Rotated Logs "); for (BufferedLogChannel logChannel : entryLogManager.getRotatedLogChannels()) { entryLogManager.getRotatedLogChannels().remove(logChannel); } - Assert.assertEquals("Number of Rotated Logs ", 0, entryLogManager.getRotatedLogChannels().size()); + assertEquals(0, entryLogManager.getRotatedLogChannels().size(), "Number of Rotated Logs "); // entrylogid is sequential for (long i = 0; i < numOfLedgers; i++) { - assertEquals("EntryLogid for Ledger " + i, 2 * numOfLedgers + i, - entryLogManager.getCurrentLogForLedger(i).getLogId()); + assertEquals(2 * numOfLedgers + i, entryLogManager.getCurrentLogForLedger(i).getLogId(), + "EntryLogId for Ledger " + i); } for (long i = 2 * numOfLedgers; i < (3 * numOfLedgers); i++) { - assertTrue("EntryLog with logId: " + i + " should be present", - entryLogManager.getCurrentLogIfPresent(i) != null); + assertNotNull(entryLogManager.getCurrentLogIfPresent(i), + "EntryLog with logId: " + i + " should be present"); } } @@ -1076,7 +1053,6 @@ private void validateLockAcquireAndRelease(int numOfLedgers, int numOfThreadsPer CountDownLatch latchToWait = new CountDownLatch(1); AtomicInteger numberOfThreadsAcquiredLock = new AtomicInteger(0); AtomicBoolean irptExceptionHappened = new AtomicBoolean(false); - Random rand = new Random(); for (int i = 0; i < numOfLedgers * numOfThreadsPerLedger; i++) { long ledgerId = i % numOfLedgers; @@ -1094,7 +1070,7 @@ private void validateLockAcquireAndRelease(int numOfLedgers, int numOfThreadsPer }); } - assertEquals("Number Of Threads acquired Lock", 0, numberOfThreadsAcquiredLock.get()); + assertEquals(0, numberOfThreadsAcquiredLock.get(), "Number Of Threads acquired Lock"); latchToStart.countDown(); Thread.sleep(1000); /* @@ -1107,13 +1083,14 @@ private void validateLockAcquireAndRelease(int numOfLedgers, int numOfThreadsPer * After acquiring the lock there must be waiting on 'latchToWait' latch */ int currentNumberOfThreadsAcquiredLock = numberOfThreadsAcquiredLock.get(); - assertTrue("Number Of Threads acquired Lock " + currentNumberOfThreadsAcquiredLock, - (currentNumberOfThreadsAcquiredLock > 0) && (currentNumberOfThreadsAcquiredLock <= numOfLedgers)); + assertTrue((currentNumberOfThreadsAcquiredLock > 0) && (currentNumberOfThreadsAcquiredLock <= numOfLedgers), + "Number Of Threads acquired Lock " + currentNumberOfThreadsAcquiredLock); + latchToWait.countDown(); Thread.sleep(2000); - assertEquals("Number Of Threads acquired Lock", numOfLedgers * numOfThreadsPerLedger, - numberOfThreadsAcquiredLock.get()); - } + + assertEquals(numOfLedgers * numOfThreadsPerLedger, numberOfThreadsAcquiredLock.get(), + "Number Of Threads acquired Lock"); } /* * test EntryLogManager.EntryLogManagerForEntryLogPerLedger removes the @@ -1144,7 +1121,7 @@ public void testEntryLogManagerExpiryRemoval() throws Exception { entryLogManager.setCurrentLogForLedgerAndAddToRotate(ledgerId, logChannel); BufferedLogChannel currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId); - assertEquals("LogChannel for ledger " + ledgerId + " should match", logChannel, currentLogForLedger); + assertEquals(logChannel, currentLogForLedger, "LogChannel for ledger " + ledgerId + " should match"); Thread.sleep(evictionPeriod * 1000 + 100); entryLogManager.doEntryLogMapCleanup(); @@ -1154,15 +1131,15 @@ public void testEntryLogManagerExpiryRemoval() throws Exception { * ledger should not be available anymore */ currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId); - assertEquals("LogChannel for ledger " + ledgerId + " should be null", null, currentLogForLedger); - Assert.assertEquals("Number of current active EntryLogs ", 0, entryLogManager.getCopyOfCurrentLogs().size()); - Assert.assertEquals("Number of rotated EntryLogs ", 1, entryLogManager.getRotatedLogChannels().size()); - Assert.assertTrue("CopyOfRotatedLogChannels should contain the created LogChannel", - entryLogManager.getRotatedLogChannels().contains(logChannel)); - - Assert.assertTrue("since mapentry must have been evicted, it should be null", - (entryLogManager.getCacheAsMap().get(ledgerId) == null) - || (entryLogManager.getCacheAsMap().get(ledgerId).getEntryLogWithDirInfo() == null)); + assertNull(currentLogForLedger, "LogChannel for ledger " + ledgerId + " should be null"); + assertEquals(0, entryLogManager.getCopyOfCurrentLogs().size(), "Number of current active EntryLogs "); + assertEquals(1, entryLogManager.getRotatedLogChannels().size(), "Number of rotated EntryLogs "); + assertTrue(entryLogManager.getRotatedLogChannels().contains(logChannel), + "CopyOfRotatedLogChannels should contain the created LogChannel"); + + assertTrue((entryLogManager.getCacheAsMap().get(ledgerId) == null) + || (entryLogManager.getCacheAsMap().get(ledgerId).getEntryLogWithDirInfo() == null), + "since map entry must have been evicted, it should be null"); } /* @@ -1233,8 +1210,8 @@ public void testLongLedgerIdsWithEntryLogPerLedger() throws Exception { long readEntryId = buf.readLong(); byte[] readData = new byte[buf.readableBytes()]; buf.readBytes(readData); - assertEquals("LedgerId ", ledgerId, readLedgerId); - assertEquals("EntryId ", entryId, readEntryId); + assertEquals(ledgerId, readLedgerId, "LedgerId "); + assertEquals(entryId, readEntryId, "EntryId "); assertEquals("Entry Data ", expectedValue, new String(readData)); } } @@ -1347,9 +1324,9 @@ public void run() { * eviction period time, so it should not be evicted. */ BufferedLogChannel currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId); - assertEquals("LogChannel for ledger " + ledgerId, newLogChannel, currentLogForLedger); - Assert.assertEquals("Number of current active EntryLogs ", 1, entryLogManager.getCopyOfCurrentLogs().size()); - Assert.assertEquals("Number of rotated EntryLogs ", 0, entryLogManager.getRotatedLogChannels().size()); + assertEquals(newLogChannel, currentLogForLedger, "LogChannel for ledger " + ledgerId); + assertEquals(1, entryLogManager.getCopyOfCurrentLogs().size(), "Number of current active EntryLogs "); + assertEquals(0, entryLogManager.getRotatedLogChannels().size(), "Number of rotated EntryLogs "); } /** @@ -1410,23 +1387,23 @@ public void run() { t.start(); Thread.sleep(evictionPeriod * 1000 + 100); entryLogManager.doEntryLogMapCleanup(); - Assert.assertFalse("Exception occurred in thread, which is not expected", exceptionOccurred.get()); + assertFalse(exceptionOccurred.get(), "Exception occurred in thread, which is not expected"); /* * since for more than evictionPeriod, that ledger is not accessed and cache is cleaned up, mapping for that * ledger should not be available anymore */ BufferedLogChannel currentLogForLedger = entryLogManager.getCurrentLogForLedger(ledgerId); - assertEquals("LogChannel for ledger " + ledgerId + " should be null", null, currentLogForLedger); + assertNull(currentLogForLedger, "LogChannel for ledger " + ledgerId + " should be null"); // expected number of current active entryLogs is 1 since we created entrylog for 'newLedgerId' - Assert.assertEquals("Number of current active EntryLogs ", 1, entryLogManager.getCopyOfCurrentLogs().size()); - Assert.assertEquals("Number of rotated EntryLogs ", 1, entryLogManager.getRotatedLogChannels().size()); - Assert.assertTrue("CopyOfRotatedLogChannels should contain the created LogChannel", - entryLogManager.getRotatedLogChannels().contains(newLogChannel)); - - Assert.assertTrue("since mapentry must have been evicted, it should be null", - (entryLogManager.getCacheAsMap().get(ledgerId) == null) - || (entryLogManager.getCacheAsMap().get(ledgerId).getEntryLogWithDirInfo() == null)); + assertEquals(1, entryLogManager.getCopyOfCurrentLogs().size(), "Number of current active EntryLogs "); + assertEquals(1, entryLogManager.getRotatedLogChannels().size(), "Number of rotated EntryLogs "); + assertTrue(entryLogManager.getRotatedLogChannels().contains(newLogChannel), + "CopyOfRotatedLogChannels should contain the created LogChannel"); + + assertTrue((entryLogManager.getCacheAsMap().get(ledgerId) == null) + || (entryLogManager.getCacheAsMap().get(ledgerId).getEntryLogWithDirInfo() == null), + "since mapentry must have been evicted, it should be null"); } /* @@ -1502,8 +1479,8 @@ public void testEntryLogManagerForEntryLogPerLedger() throws Exception { * rotatedlogchannel and also leastUnflushedLogId should be advanced to numOfActiveLedgers */ entryLogger.flush(); - Assert.assertEquals("Number of rotated entrylogs", 0, entryLogManager.getRotatedLogChannels().size()); - Assert.assertEquals("LeastUnflushedloggerID", numOfActiveLedgers, entryLogger.getLeastUnflushedLogId()); + assertEquals(0, entryLogManager.getRotatedLogChannels().size(), "Number of rotated entrylogs"); + assertEquals(numOfActiveLedgers, entryLogger.getLeastUnflushedLogId(), "LeastUnflushedloggerID"); /* * after flush (flushCurrentLogs) unpersistedBytes should be 0. @@ -1516,12 +1493,12 @@ public void testEntryLogManagerForEntryLogPerLedger() throws Exception { @Test public void testSingleEntryLogCreateNewLog() throws Exception { - Assert.assertTrue(entryLogger.getEntryLogManager() instanceof EntryLogManagerForSingleEntryLog); + assertInstanceOf(EntryLogManagerForSingleEntryLog.class, entryLogger.getEntryLogManager()); EntryLogManagerForSingleEntryLog singleEntryLog = (EntryLogManagerForSingleEntryLog) entryLogger.getEntryLogManager(); EntryLogManagerForSingleEntryLog mockSingleEntryLog = spy(singleEntryLog); BufferedLogChannel activeLogChannel = mockSingleEntryLog.getCurrentLogForLedgerForAddEntry(1, 1024, true); - Assert.assertTrue(activeLogChannel != null); + assertNotNull(activeLogChannel); verify(mockSingleEntryLog, times(1)).createNewLog(anyLong(), anyString()); // `readEntryLogHardLimit` and `reachEntryLogLimit` should not call if new create log @@ -1580,8 +1557,8 @@ public void testReadAddCallsOfMultipleEntryLogs() throws Exception { long entryId = buf.readLong(); byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); - assertEquals("LedgerId ", i, ledgerId); - assertEquals("EntryId ", j, entryId); + assertEquals(i, ledgerId, "LedgerId "); + assertEquals(j, entryId, "EntryId "); assertEquals("Entry Data ", expectedValue, new String(data)); } } @@ -1601,8 +1578,8 @@ public void testReadAddCallsOfMultipleEntryLogs() throws Exception { long entryId = buf.readLong(); byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); - assertEquals("LedgerId ", i, ledgerId); - assertEquals("EntryId ", j, entryId); + assertEquals(i, ledgerId, "LedgerId "); + assertEquals(j, entryId, "EntryId "); assertEquals("Entry Data ", expectedValue, new String(data)); } } @@ -1735,8 +1712,8 @@ public void testEntryLoggerAddEntryWhenLedgerDirsAreFull() throws Exception { DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager); EntryLogManagerForEntryLogPerLedger entryLogManager = (EntryLogManagerForEntryLogPerLedger) entryLogger.getEntryLogManager(); - Assert.assertEquals("EntryLogManager class type", EntryLogManagerForEntryLogPerLedger.class, - entryLogManager.getClass()); + assertEquals(EntryLogManagerForEntryLogPerLedger.class, entryLogManager.getClass(), + "EntryLogManager class type"); entryLogger.addEntry(0L, generateEntry(0, 1)); entryLogger.addEntry(1L, generateEntry(1, 1)); @@ -1917,8 +1894,8 @@ public void testSwappingEntryLogManager(boolean initialEntryLogPerLedgerEnabled, long entryId = buf.readLong(); byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); - assertEquals("LedgerId ", i, ledgerId); - assertEquals("EntryId ", j, entryId); + assertEquals(i, ledgerId, "LedgerId "); + assertEquals(j, entryId, "EntryId "); assertEquals("Entry Data ", expectedValue, new String(data)); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java index 91612ec5c79..c31edabba0e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java @@ -179,7 +179,7 @@ public void testDiskSpaceWeightedBookieSelection() throws Exception { /** * Test to show that weight based selection honors the disk weight of bookies and also adapts - * when the bookies's weight changes. + * when the bookies' weight changes. */ @FlakyTest("https://github.com/apache/bookkeeper/issues/503") public void testDiskSpaceWeightedBookieSelectionWithChangingWeights() throws Exception { diff --git a/pom.xml b/pom.xml index b84fc1b9208..3e40c84ffb1 100644 --- a/pom.xml +++ b/pom.xml @@ -918,6 +918,9 @@ org.apache.maven.plugins maven-surefire-plugin ${maven-surefire-plugin.version} + + flaky + org.apache.maven.plugins diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java index cbd135de894..d1170701e14 100644 --- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java +++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java @@ -61,6 +61,12 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.rules.TestName; import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +78,9 @@ public class TestDistributedLogBase { static final Logger LOG = LoggerFactory.getLogger(TestDistributedLogBase.class); + @Rule + public final TestName runtime = new TestName(); + @Rule public Timeout globalTimeout = Timeout.seconds(120); @@ -105,7 +114,20 @@ public class TestDistributedLogBase { protected static int zkPort; protected static final List TMP_DIRS = new ArrayList(); + protected String testName; + + @Before + public void setTestNameJunit4() { + testName = runtime.getMethodName(); + } + + @BeforeEach + void setTestNameJunit5(TestInfo testInfo) { + testName = testInfo.getDisplayName(); + } + @BeforeClass + @BeforeAll public static void setupCluster() throws Exception { setupCluster(numBookies); } @@ -134,6 +156,7 @@ public void uncaughtException(Thread t, Throwable e) { } @AfterClass + @AfterAll public static void teardownCluster() throws Exception { bkutil.teardown(); zks.stop(); @@ -143,6 +166,7 @@ public static void teardownCluster() throws Exception { } @Before + @BeforeEach public void setup() throws Exception { try { zkc = LocalDLMEmulator.connectZooKeeper("127.0.0.1", zkPort); @@ -153,6 +177,7 @@ public void setup() throws Exception { } @After + @AfterEach public void teardown() throws Exception { if (null != zkc) { zkc.close(); diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java index 02cd7feccb0..f42ab1b85aa 100644 --- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java +++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java @@ -18,11 +18,13 @@ package org.apache.distributedlog.bk; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTimeout; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import java.net.URI; +import java.time.Duration; import java.util.Enumeration; import java.util.HashSet; import java.util.Map; @@ -53,17 +55,17 @@ import org.apache.zookeeper.Op; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * TestLedgerAllocator. */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) public class TestLedgerAllocator extends TestDistributedLogBase { private static final Logger logger = LoggerFactory.getLogger(TestLedgerAllocator.class); @@ -81,9 +83,6 @@ public void onAbort(Throwable t) { } }; - @Rule - public TestName runtime = new TestName(); - private ZooKeeperClient zkc; private BookKeeperClient bkc; private DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); @@ -92,7 +91,7 @@ private URI createURI(String path) { return URI.create("distributedlog://" + zkServers + path); } - @Before + @BeforeAll public void setup() throws Exception { zkc = TestZooKeeperClientBuilder.newBuilder() .uri(createURI("/")) @@ -102,7 +101,7 @@ public void setup() throws Exception { .dlConfig(dlConf).ledgersPath(ledgersPath).zkc(zkc).build(); } - @After + @AfterAll public void teardown() throws Exception { bkc.close(); zkc.close(); @@ -164,152 +163,162 @@ public void testAllocation() throws Exception { Utils.close(allocator); } - @Test(timeout = 60000) + @Test public void testBadVersionOnTwoAllocators() throws Exception { - String allocationPath = "/allocation-bad-version"; - zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - Stat stat = new Stat(); - byte[] data = zkc.get().getData(allocationPath, false, stat); - Versioned allocationData = new Versioned(data, new LongVersion(stat.getVersion())); - - SimpleLedgerAllocator allocator1 = - new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc); - SimpleLedgerAllocator allocator2 = - new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc); - allocator1.allocate(); - // wait until allocated - ZKTransaction txn1 = newTxn(); - LedgerHandle lh = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER)); - allocator2.allocate(); - ZKTransaction txn2 = newTxn(); - try { - Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER)); - fail("Should fail allocating on second allocator as allocator1 is starting allocating something."); - } catch (ZKException ke) { - assertEquals(KeeperException.Code.BADVERSION, ke.getKeeperExceptionCode()); - } - Utils.ioResult(txn1.execute()); - Utils.close(allocator1); - Utils.close(allocator2); - - long eid = lh.addEntry("hello world".getBytes()); - lh.close(); - LedgerHandle readLh = bkc.get().openLedger(lh.getId(), - BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes()); - Enumeration entries = readLh.readEntries(eid, eid); - int i = 0; - while (entries.hasMoreElements()) { - LedgerEntry entry = entries.nextElement(); - assertEquals("hello world", new String(entry.getEntry(), UTF_8)); - ++i; - } - assertEquals(1, i); + assertTimeout(Duration.ofMinutes(1), () -> { + String allocationPath = "/allocation-bad-version"; + zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Stat stat = new Stat(); + byte[] data = zkc.get().getData(allocationPath, false, stat); + Versioned allocationData = new Versioned(data, new LongVersion(stat.getVersion())); + + SimpleLedgerAllocator allocator1 = + new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), + zkc, bkc); + SimpleLedgerAllocator allocator2 = + new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), + zkc, bkc); + allocator1.allocate(); + // wait until allocated + ZKTransaction txn1 = newTxn(); + LedgerHandle lh = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER)); + allocator2.allocate(); + ZKTransaction txn2 = newTxn(); + try { + Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER)); + fail("Should fail allocating on second allocator as allocator1 is starting allocating something."); + } catch (ZKException ke) { + assertEquals(KeeperException.Code.BADVERSION, ke.getKeeperExceptionCode()); + } + Utils.ioResult(txn1.execute()); + Utils.close(allocator1); + Utils.close(allocator2); + + long eid = lh.addEntry("hello world".getBytes()); + lh.close(); + LedgerHandle readLh = bkc.get().openLedger(lh.getId(), + BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes()); + Enumeration entries = readLh.readEntries(eid, eid); + int i = 0; + while (entries.hasMoreElements()) { + LedgerEntry entry = entries.nextElement(); + assertEquals("hello world", new String(entry.getEntry(), UTF_8)); + ++i; + } + assertEquals(1, i); + }); } - @Test(timeout = 60000) + @Test public void testAllocatorWithoutEnoughBookies() throws Exception { - String allocationPath = "/allocator-without-enough-bookies"; - - DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); - confLocal.addConfiguration(conf); - confLocal.setEnsembleSize(numBookies * 2); - confLocal.setWriteQuorumSize(numBookies * 2); - - SimpleLedgerAllocator allocator1 = createAllocator(allocationPath, confLocal); - allocator1.allocate(); - ZKTransaction txn1 = newTxn(); - - try { - Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER)); - fail("Should fail allocating ledger if there aren't enough bookies"); - } catch (AllocationException ioe) { - // expected - assertEquals(Phase.ERROR, ioe.getPhase()); - } - byte[] data = zkc.get().getData(allocationPath, false, null); - assertEquals(0, data.length); + assertTimeout(Duration.ofMinutes(1), () -> { + String allocationPath = "/allocator-without-enough-bookies"; + + DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); + confLocal.addConfiguration(conf); + confLocal.setEnsembleSize(numBookies * 2); + confLocal.setWriteQuorumSize(numBookies * 2); + + SimpleLedgerAllocator allocator1 = createAllocator(allocationPath, confLocal); + allocator1.allocate(); + ZKTransaction txn1 = newTxn(); + + try { + Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER)); + fail("Should fail allocating ledger if there aren't enough bookies"); + } catch (AllocationException ioe) { + // expected + assertEquals(Phase.ERROR, ioe.getPhase()); + } + byte[] data = zkc.get().getData(allocationPath, false, null); + assertEquals(0, data.length); + }); } - @Test(timeout = 60000) - public void testSuccessAllocatorShouldDeleteUnusedledger() throws Exception { - String allocationPath = "/allocation-delete-unused-ledger"; - zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - Stat stat = new Stat(); - byte[] data = zkc.get().getData(allocationPath, false, stat); - - Versioned allocationData = new Versioned(data, new LongVersion(stat.getVersion())); - - SimpleLedgerAllocator allocator1 = - new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc); - allocator1.allocate(); - // wait until allocated - ZKTransaction txn1 = newTxn(); - LedgerHandle lh1 = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER)); - - // Second allocator kicks in - stat = new Stat(); - data = zkc.get().getData(allocationPath, false, stat); - allocationData = new Versioned(data, new LongVersion(stat.getVersion())); - SimpleLedgerAllocator allocator2 = - new SimpleLedgerAllocator(allocationPath, allocationData, newQuorumConfigProvider(dlConf), zkc, bkc); - allocator2.allocate(); - // wait until allocated - ZKTransaction txn2 = newTxn(); - LedgerHandle lh2 = Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER)); - - // should fail to commit txn1 as version is changed by second allocator - try { - Utils.ioResult(txn1.execute()); - fail("Should fail commit obtaining ledger handle from first allocator" - + " as allocator is modified by second allocator."); - } catch (ZKException ke) { - // as expected - } - Utils.ioResult(txn2.execute()); - Utils.close(allocator1); - Utils.close(allocator2); - - // ledger handle should be deleted - try { - lh1.close(); - fail("LedgerHandle allocated by allocator1 should be deleted."); - } catch (BKException bke) { - // as expected - } - try { - bkc.get().openLedger(lh1.getId(), BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes()); - fail("LedgerHandle allocated by allocator1 should be deleted."); - } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException nslee) { - // as expected - } - long eid = lh2.addEntry("hello world".getBytes()); - lh2.close(); - LedgerHandle readLh = bkc.get().openLedger(lh2.getId(), - BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes()); - Enumeration entries = readLh.readEntries(eid, eid); - int i = 0; - while (entries.hasMoreElements()) { - LedgerEntry entry = entries.nextElement(); - assertEquals("hello world", new String(entry.getEntry(), UTF_8)); - ++i; - } - assertEquals(1, i); + @Test + public void testSuccessAllocatorShouldDeleteUnusedLedger() throws Exception { + assertTimeout(Duration.ofMinutes(1), () -> { + String allocationPath = "/allocation-delete-unused-ledger"; + zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Stat stat = new Stat(); + byte[] data = zkc.get().getData(allocationPath, false, stat); + + Versioned allocationData = new Versioned(data, new LongVersion(stat.getVersion())); + + SimpleLedgerAllocator allocator1 = new SimpleLedgerAllocator(allocationPath, allocationData, + newQuorumConfigProvider(dlConf), zkc, bkc); + allocator1.allocate(); + // wait until allocated + ZKTransaction txn1 = newTxn(); + LedgerHandle lh1 = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER)); + + // Second allocator kicks in + stat = new Stat(); + data = zkc.get().getData(allocationPath, false, stat); + allocationData = new Versioned(data, new LongVersion(stat.getVersion())); + SimpleLedgerAllocator allocator2 = new SimpleLedgerAllocator(allocationPath, allocationData, + newQuorumConfigProvider(dlConf), zkc, bkc); + allocator2.allocate(); + // wait until allocated + ZKTransaction txn2 = newTxn(); + LedgerHandle lh2 = Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER)); + + // should fail to commit txn1 as version is changed by second allocator + try { + Utils.ioResult(txn1.execute()); + fail("Should fail commit obtaining ledger handle from first allocator" + + " as allocator is modified by second allocator."); + } catch (ZKException ke) { + // as expected + } + Utils.ioResult(txn2.execute()); + Utils.close(allocator1); + Utils.close(allocator2); + + // ledger handle should be deleted + try { + lh1.close(); + fail("LedgerHandle allocated by allocator1 should be deleted."); + } catch (BKException bke) { + // as expected + } + try { + bkc.get().openLedger(lh1.getId(), BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes()); + fail("LedgerHandle allocated by allocator1 should be deleted."); + } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException nslee) { + // as expected + } + long eid = lh2.addEntry("hello world".getBytes()); + lh2.close(); + LedgerHandle readLh = bkc.get().openLedger(lh2.getId(), + BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes()); + Enumeration entries = readLh.readEntries(eid, eid); + int i = 0; + while (entries.hasMoreElements()) { + LedgerEntry entry = entries.nextElement(); + assertEquals("hello world", new String(entry.getEntry(), UTF_8)); + ++i; + } + assertEquals(1, i); + }); } - @Test(timeout = 60000) + @Test public void testCloseAllocatorDuringObtaining() throws Exception { - String allocationPath = "/allocation2"; - SimpleLedgerAllocator allocator = createAllocator(allocationPath); - allocator.allocate(); - ZKTransaction txn = newTxn(); - // close during obtaining ledger. - LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER)); - Utils.close(allocator); - byte[] data = zkc.get().getData(allocationPath, false, null); - assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8))); - // the ledger is not deleted - bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32, - dlConf.getBKDigestPW().getBytes(UTF_8)); + assertTimeout(Duration.ofMinutes(1), () -> { + String allocationPath = "/allocation2"; + SimpleLedgerAllocator allocator = createAllocator(allocationPath); + allocator.allocate(); + ZKTransaction txn = newTxn(); + // close during obtaining ledger. + LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER)); + Utils.close(allocator); + byte[] data = zkc.get().getData(allocationPath, false, null); + assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8))); + // the ledger is not deleted + bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32, + dlConf.getBKDigestPW().getBytes(UTF_8)); + }); } @FlakyTest("https://issues.apache.org/jira/browse/DL-26") @@ -329,84 +338,93 @@ public void testCloseAllocatorAfterConfirm() throws Exception { dlConf.getBKDigestPW().getBytes(UTF_8)); } - @Test(timeout = 60000) + @Test public void testCloseAllocatorAfterAbort() throws Exception { - String allocationPath = "/allocation3"; - SimpleLedgerAllocator allocator = createAllocator(allocationPath); - allocator.allocate(); - ZKTransaction txn = newTxn(); - // close during obtaining ledger. - LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER)); - txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null)); - try { - Utils.ioResult(txn.execute()); - fail("Should fail the transaction when setting unexisted path"); - } catch (ZKException ke) { - // expected - } - Utils.close(allocator); - byte[] data = zkc.get().getData(allocationPath, false, null); - assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8))); - // the ledger is not deleted. - bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32, - dlConf.getBKDigestPW().getBytes(UTF_8)); + assertTimeout(Duration.ofMinutes(1), () -> { + String allocationPath = "/allocation3"; + SimpleLedgerAllocator allocator = createAllocator(allocationPath); + allocator.allocate(); + ZKTransaction txn = newTxn(); + // close during obtaining ledger. + LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER)); + txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null)); + try { + Utils.ioResult(txn.execute()); + fail("Should fail the transaction when setting unexisted path"); + } catch (ZKException ke) { + // expected + } + Utils.close(allocator); + byte[] data = zkc.get().getData(allocationPath, false, null); + assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8))); + // the ledger is not deleted. + bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32, + dlConf.getBKDigestPW().getBytes(UTF_8)); + }); } - @Test(timeout = 60000) + @Test public void testConcurrentAllocation() throws Exception { - String allocationPath = "/" + runtime.getMethodName(); - SimpleLedgerAllocator allocator = createAllocator(allocationPath); - allocator.allocate(); - ZKTransaction txn1 = newTxn(); - CompletableFuture obtainFuture1 = allocator.tryObtain(txn1, NULL_LISTENER); - ZKTransaction txn2 = newTxn(); - CompletableFuture obtainFuture2 = allocator.tryObtain(txn2, NULL_LISTENER); - assertTrue(obtainFuture2.isDone()); - assertTrue(obtainFuture2.isCompletedExceptionally()); - try { - Utils.ioResult(obtainFuture2); - fail("Should fail the concurrent obtain since there is already a transaction obtaining the ledger handle"); - } catch (SimpleLedgerAllocator.ConcurrentObtainException cbe) { - // expected - } + assertTimeout(Duration.ofMinutes(1), () -> { + String allocationPath = "/" + testName; + SimpleLedgerAllocator allocator = createAllocator(allocationPath); + allocator.allocate(); + ZKTransaction txn1 = newTxn(); + CompletableFuture obtainFuture1 = allocator.tryObtain(txn1, NULL_LISTENER); + ZKTransaction txn2 = newTxn(); + CompletableFuture obtainFuture2 = allocator.tryObtain(txn2, NULL_LISTENER); + assertTrue(obtainFuture2.isDone()); + assertTrue(obtainFuture2.isCompletedExceptionally()); + try { + Utils.ioResult(obtainFuture2); + fail("Should fail the concurrent obtain since there is " + + "already a transaction obtaining the ledger handle"); + } catch (SimpleLedgerAllocator.ConcurrentObtainException cbe) { + // expected + } + }); } - @Test(timeout = 60000) + @Test public void testObtainMultipleLedgers() throws Exception { - String allocationPath = "/" + runtime.getMethodName(); - SimpleLedgerAllocator allocator = createAllocator(allocationPath); - int numLedgers = 10; - Set allocatedLedgers = new HashSet(); - for (int i = 0; i < numLedgers; i++) { - allocator.allocate(); - ZKTransaction txn = newTxn(); - LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER)); - Utils.ioResult(txn.execute()); - allocatedLedgers.add(lh); - } - assertEquals(numLedgers, allocatedLedgers.size()); + assertTimeout(Duration.ofMinutes(1), () -> { + String allocationPath = "/" + testName; + SimpleLedgerAllocator allocator = createAllocator(allocationPath); + int numLedgers = 10; + Set allocatedLedgers = new HashSet(); + for (int i = 0; i < numLedgers; i++) { + allocator.allocate(); + ZKTransaction txn = newTxn(); + LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER)); + Utils.ioResult(txn.execute()); + allocatedLedgers.add(lh); + } + assertEquals(numLedgers, allocatedLedgers.size()); + }); } - @Test(timeout = 60000) + @Test public void testAllocationWithMetadata() throws Exception { - String allocationPath = "/" + runtime.getMethodName(); - - String application = "testApplicationMetadata"; - String component = "testComponentMetadata"; - String custom = "customMetadata"; - LedgerMetadata ledgerMetadata = new LedgerMetadata(); - ledgerMetadata.setApplication(application); - ledgerMetadata.setComponent(component); - ledgerMetadata.addCustomMetadata("custom", custom); - - SimpleLedgerAllocator allocator = createAllocator(allocationPath, dlConf, ledgerMetadata); - allocator.allocate(); + assertTimeout(Duration.ofMinutes(1), () -> { + String allocationPath = "/" + testName; + + String application = "testApplicationMetadata"; + String component = "testComponentMetadata"; + String custom = "customMetadata"; + LedgerMetadata ledgerMetadata = new LedgerMetadata(); + ledgerMetadata.setApplication(application); + ledgerMetadata.setComponent(component); + ledgerMetadata.addCustomMetadata("custom", custom); + + SimpleLedgerAllocator allocator = createAllocator(allocationPath, dlConf, ledgerMetadata); + allocator.allocate(); - ZKTransaction txn = newTxn(); - LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER)); - Map customMeta = lh.getCustomMetadata(); - assertEquals(application, new String(customMeta.get("application"), UTF_8)); - assertEquals(component, new String(customMeta.get("component"), UTF_8)); - assertEquals(custom, new String(customMeta.get("custom"), UTF_8)); + ZKTransaction txn = newTxn(); + LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER)); + Map customMeta = lh.getCustomMetadata(); + assertEquals(application, new String(customMeta.get("application"), UTF_8)); + assertEquals(component, new String(customMeta.get("component"), UTF_8)); + assertEquals(custom, new String(customMeta.get("custom"), UTF_8)); + }); } } diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java index 1ebab1e5457..3a80633bd9e 100644 --- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java +++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java @@ -17,16 +17,18 @@ */ package org.apache.distributedlog.feature; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTimeout; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.time.Duration; import org.apache.bookkeeper.common.testing.annotations.FlakyTest; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.common.config.PropertiesWriter; -import org.junit.Test; +import org.junit.jupiter.api.Test; /** * Test case for dynamic configuration based feature provider. @@ -45,35 +47,37 @@ private void ensureConfigReloaded() throws InterruptedException { Thread.sleep(1); } - @Test(timeout = 60000) + @Test public void testLoadFeaturesFromBase() throws Exception { - PropertiesWriter writer = new PropertiesWriter(); - writer.setProperty("feature_1", "10000"); - writer.setProperty("feature_2", "5000"); - writer.save(); - - DistributedLogConfiguration conf = new DistributedLogConfiguration() - .setDynamicConfigReloadIntervalSec(Integer.MAX_VALUE) - .setFileFeatureProviderBaseConfigPath(writer.getFile().toURI().toURL().getPath()); - DynamicConfigurationFeatureProvider provider = - new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE); - provider.start(); - ensureConfigReloaded(); - - Feature feature1 = provider.getFeature("feature_1"); - assertTrue(feature1.isAvailable()); - assertEquals(10000, feature1.availability()); - Feature feature2 = provider.getFeature("feature_2"); - assertTrue(feature2.isAvailable()); - assertEquals(5000, feature2.availability()); - Feature feature3 = provider.getFeature("feature_3"); - assertFalse(feature3.isAvailable()); - assertEquals(0, feature3.availability()); - Feature feature4 = provider.getFeature("unknown_feature"); - assertFalse(feature4.isAvailable()); - assertEquals(0, feature4.availability()); - - provider.stop(); + assertTimeout(Duration.ofMinutes(1), () -> { + PropertiesWriter writer = new PropertiesWriter(); + writer.setProperty("feature_1", "10000"); + writer.setProperty("feature_2", "5000"); + writer.save(); + + DistributedLogConfiguration conf = new DistributedLogConfiguration() + .setDynamicConfigReloadIntervalSec(Integer.MAX_VALUE) + .setFileFeatureProviderBaseConfigPath(writer.getFile().toURI().toURL().getPath()); + DynamicConfigurationFeatureProvider provider = + new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE); + provider.start(); + ensureConfigReloaded(); + + Feature feature1 = provider.getFeature("feature_1"); + assertTrue(feature1.isAvailable()); + assertEquals(10000, feature1.availability()); + Feature feature2 = provider.getFeature("feature_2"); + assertTrue(feature2.isAvailable()); + assertEquals(5000, feature2.availability()); + Feature feature3 = provider.getFeature("feature_3"); + assertFalse(feature3.isAvailable()); + assertEquals(0, feature3.availability()); + Feature feature4 = provider.getFeature("unknown_feature"); + assertFalse(feature4.isAvailable()); + assertEquals(0, feature4.availability()); + + provider.stop(); + }); } @FlakyTest("https://issues.apache.org/jira/browse/DL-40") @@ -116,7 +120,7 @@ public void testLoadFeaturesFromOverlay() throws Exception { provider.stop(); } - @Test(timeout = 60000) + @Test public void testReloadFeaturesFromOverlay() throws Exception { PropertiesWriter writer = new PropertiesWriter(); writer.setProperty("feature_1", "10000"); diff --git a/testtools/pom.xml b/testtools/pom.xml index f63deecd52b..8bdb6ec47ef 100644 --- a/testtools/pom.xml +++ b/testtools/pom.xml @@ -35,5 +35,9 @@ org.apache.logging.log4j log4j-slf4j2-impl + + org.junit.jupiter + junit-jupiter-engine +