From 7c5c79f93c5b9a8022b1dd11519fc68754a75761 Mon Sep 17 00:00:00 2001 From: Taylor Smock Date: Thu, 27 Oct 2022 11:09:47 -0600 Subject: [PATCH] Fix #22471: NPE in MapillaryExportDownloadThread.loadingFinished This was due to killing old download threads as quickly as possible, to avoid filling up the queue and making a user wait. Signed-off-by: Taylor Smock --- .../plugins/mapillary/cache/CacheUtils.java | 15 +- .../mapillary/cache/MapillaryCache.java | 18 ++- .../export/MapillaryExportDownloadThread.java | 44 +++++- .../io/export/MapillaryExportManager.java | 20 ++- .../export/MapillaryExportWriterThread.java | 34 ++++- .../io/export/MapillaryExportManagerTest.java | 133 ++++++++++++++++++ .../testutils/annotations/LoggingHandler.java | 104 ++++++++++++++ .../annotations/MapillaryURLWireMock.java | 16 ++- 8 files changed, 361 insertions(+), 23 deletions(-) create mode 100644 test/unit/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportManagerTest.java create mode 100644 test/unit/org/openstreetmap/josm/plugins/mapillary/testutils/annotations/LoggingHandler.java diff --git a/src/main/java/org/openstreetmap/josm/plugins/mapillary/cache/CacheUtils.java b/src/main/java/org/openstreetmap/josm/plugins/mapillary/cache/CacheUtils.java index 0ab5adc14..73aafbf13 100644 --- a/src/main/java/org/openstreetmap/josm/plugins/mapillary/cache/CacheUtils.java +++ b/src/main/java/org/openstreetmap/josm/plugins/mapillary/cache/CacheUtils.java @@ -64,10 +64,23 @@ public static void downloadPicture(INode img, MapillaryCache.Type type) { * The listener that is going to receive the picture. */ public static void submit(INode image, MapillaryCache.Type type, ICachedLoaderListener lis) { + submit(image, type, true, lis); + } + + /** + * Requests the picture with the given key and quality and uses the given + * listener. + * + * @param image + * The picture to be requested. + * @param lis + * The listener that is going to receive the picture. + */ + public static void submit(INode image, MapillaryCache.Type type, boolean removeCurrent, ICachedLoaderListener lis) { try { final MapillaryCache cache = new MapillaryCache(image, type); if (cache.getUrl() != null) { - cache.submit(lis != null ? lis : IGNORE_DOWNLOAD, false); + cache.submit(lis != null ? lis : IGNORE_DOWNLOAD, false, removeCurrent); } else { Logging.error("Mapillary: {0} has no url. Maybe API limits have been reached?", MapillaryImageUtils.getKey(image)); diff --git a/src/main/java/org/openstreetmap/josm/plugins/mapillary/cache/MapillaryCache.java b/src/main/java/org/openstreetmap/josm/plugins/mapillary/cache/MapillaryCache.java index 2395d90d2..5f3d45734 100644 --- a/src/main/java/org/openstreetmap/josm/plugins/mapillary/cache/MapillaryCache.java +++ b/src/main/java/org/openstreetmap/josm/plugins/mapillary/cache/MapillaryCache.java @@ -243,15 +243,27 @@ protected BufferedImageCacheEntry createCacheEntry(byte[] content) { return new BufferedImageCacheEntry(content); } - @Override - public void submit(ICachedLoaderListener listener, boolean force) throws IOException { + /** + * Submit a new task + * + * @param listener The listener to notify + * @param force {@code true} if the load should skip all caches + * @param removeCurrent {@code true} if any outstanding tasks should be canceled + * @throws IOException If something happens during fetch ore read + */ + public void submit(ICachedLoaderListener listener, boolean force, boolean removeCurrent) throws IOException { // Clear the queue for larger images - if (this.type == Type.ORIGINAL || this.type == Type.THUMB_2048) { + if (removeCurrent && (this.type == Type.ORIGINAL || this.type == Type.THUMB_2048)) { this.cancelOutstandingTasks(); } super.submit(listener, force); } + @Override + public void submit(ICachedLoaderListener listener, boolean force) throws IOException { + this.submit(listener, force, true); + } + @Override protected boolean isObjectLoadable() { if (this.cacheData == null) { diff --git a/src/main/java/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportDownloadThread.java b/src/main/java/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportDownloadThread.java index b27d1a977..8bd503841 100644 --- a/src/main/java/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportDownloadThread.java +++ b/src/main/java/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportDownloadThread.java @@ -7,10 +7,12 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import javax.imageio.ImageIO; +import org.openstreetmap.josm.data.cache.BufferedImageCacheEntry; import org.openstreetmap.josm.data.cache.CacheEntry; import org.openstreetmap.josm.data.cache.CacheEntryAttributes; import org.openstreetmap.josm.data.cache.ICachedLoaderListener; @@ -29,15 +31,19 @@ * @see MapillaryExportWriterThread */ public class MapillaryExportDownloadThread implements Runnable, ICachedLoaderListener { + private static final AtomicInteger THREAD_COUNT = new AtomicInteger(); private final ArrayBlockingQueue queue; private final ArrayBlockingQueue queueImages; private final INode image; + private final MapillaryExportWriterThread exportWriterThread; /** * Main constructor. * + * @param exportWriterThread + * The thread to notify of failures ({@code queue}/{@code queueImages} should both be in the thread) * @param image * Image to be downloaded. * @param queue @@ -47,17 +53,30 @@ public class MapillaryExportDownloadThread implements Runnable, ICachedLoaderLis * Queue of {@link INode} objects for the * {@link MapillaryExportWriterThread}. */ - public MapillaryExportDownloadThread(INode image, ArrayBlockingQueue queue, - ArrayBlockingQueue queueImages) { + public MapillaryExportDownloadThread(MapillaryExportWriterThread exportWriterThread, INode image, + ArrayBlockingQueue queue, ArrayBlockingQueue queueImages) { this.queue = queue; this.image = image; this.queueImages = queueImages; + this.exportWriterThread = exportWriterThread; } @Override public void run() { if (MapillaryImageUtils.getKey(this.image) != 0) { - CacheUtils.submit(this.image, MapillaryCache.Type.ORIGINAL, this); + final int threadCount = MapillaryCache.THREAD_LIMIT.get(); + synchronized (THREAD_COUNT) { + while (THREAD_COUNT.get() > threadCount - 1) { + try { + THREAD_COUNT.wait(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + THREAD_COUNT.incrementAndGet(); + } + CacheUtils.submit(this.image, MapillaryCache.Type.ORIGINAL, false, this); } else { throw new UnsupportedOperationException(tr("We cannot export {0}", image.getInterestingTags().entrySet().stream() @@ -68,9 +87,26 @@ public void run() { @Override public synchronized void loadingFinished(CacheEntry data, CacheEntryAttributes attributes, LoadResult result) { + THREAD_COUNT.decrementAndGet(); + synchronized (THREAD_COUNT) { + THREAD_COUNT.notifyAll(); + } + if (result != LoadResult.SUCCESS) { + this.exportWriterThread.decrementSize(); + return; + } try { - this.queue.put(ImageIO.read(new ByteArrayInputStream(data.getContent()))); + final BufferedImage bufferedImage; + if (data instanceof BufferedImageCacheEntry) { + bufferedImage = ((BufferedImageCacheEntry) data).getImage(); + } else { + bufferedImage = ImageIO.read(new ByteArrayInputStream(data.getContent())); + } + this.queue.put(bufferedImage); this.queueImages.put(this.image); + synchronized (this.queue) { + this.queue.notifyAll(); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); Logging.error(e); diff --git a/src/main/java/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportManager.java b/src/main/java/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportManager.java index 92b849836..825b29ed0 100644 --- a/src/main/java/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportManager.java +++ b/src/main/java/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportManager.java @@ -15,6 +15,7 @@ import org.openstreetmap.josm.data.osm.INode; import org.openstreetmap.josm.gui.PleaseWaitRunnable; import org.openstreetmap.josm.gui.progress.swing.PleaseWaitProgressMonitor; +import org.openstreetmap.josm.plugins.mapillary.cache.MapillaryCache; import org.openstreetmap.josm.plugins.mapillary.utils.MapillaryImageUtils; import org.openstreetmap.josm.tools.Logging; @@ -33,14 +34,14 @@ */ public class MapillaryExportManager extends PleaseWaitRunnable { - private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(10); - private final ArrayBlockingQueue queueImages = new ArrayBlockingQueue<>(10); + private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(MapillaryCache.THREAD_LIMIT.get()); + private final ArrayBlockingQueue queueImages = new ArrayBlockingQueue<>(MapillaryCache.THREAD_LIMIT.get()); private final int amount; private final Set images; private final String path; - private Thread writer; + private MapillaryExportWriterThread writer; private ThreadPoolExecutor ex; /** @@ -77,25 +78,30 @@ protected void realRun() { } return; } - ArrayBlockingQueue executionQueue = new ArrayBlockingQueue<>(10); - this.ex = new ThreadPoolExecutor(20, 35, 25, TimeUnit.SECONDS, executionQueue); + ArrayBlockingQueue executionQueue = new ArrayBlockingQueue<>(MapillaryCache.THREAD_LIMIT.get()); + this.ex = new ThreadPoolExecutor(1, 1, 25, TimeUnit.SECONDS, executionQueue); for (INode image : this.images) { if (MapillaryImageUtils.isImage(image)) { synchronized (this) { while (this.ex.getQueue().remainingCapacity() == 0) { try { - this.wait(1000); + this.wait(10); } catch (InterruptedException e) { Logging.error(e); Thread.currentThread().interrupt(); + return; } } } try { - this.ex.execute(new MapillaryExportDownloadThread(image, this.queue, this.queueImages)); + this.ex + .execute(new MapillaryExportDownloadThread(this.writer, image, this.queue, this.queueImages)); } catch (RejectedExecutionException e) { Logging.error(e); } + } else { + // We need to ensure that the writer thread gets the number of "images" expected. + this.writer.decrementSize(); } } try { diff --git a/src/main/java/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportWriterThread.java b/src/main/java/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportWriterThread.java index bdb08b641..ae552c981 100644 --- a/src/main/java/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportWriterThread.java +++ b/src/main/java/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportWriterThread.java @@ -41,8 +41,9 @@ public class MapillaryExportWriterThread extends Thread { private final String path; private final ArrayBlockingQueue queue; private final ArrayBlockingQueue queueImages; - private final int amount; private final ProgressMonitor monitor; + private int amount; + private int written; /** * Main constructor. @@ -74,6 +75,19 @@ public void run() { INode mimg; String finalPath; for (int i = 0; i < this.amount; i++) { + while (this.queue.peek() == null) { + if (this.amount == this.written) { + return; + } + synchronized (this.queue) { + try { + this.queue.wait(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } try { img = this.queue.take(); mimg = this.queueImages.take(); @@ -138,8 +152,12 @@ public void run() { Logging.info("Unable to set last modified time for {0} to {1}", file, MapillaryImageUtils.getDate(mimg)); } + this.written++; } catch (InterruptedException e) { - Logging.info("Mapillary export cancelled"); + if (this.written != this.amount) { + Logging.info("Mapillary export cancelled"); + Logging.trace(e); + } Thread.currentThread().interrupt(); return; } catch (IOException | ImageReadException | ImageWriteException e) { @@ -151,4 +169,16 @@ public void run() { this.monitor.setCustomText("Downloaded " + (i + 1) + "/" + this.amount); } } + + /** + * Called when the size is decreased + */ + public void decrementSize() { + this.amount = this.amount - 1; + if (this.amount == this.written) { + synchronized (this.queue) { + this.queue.notifyAll(); + } + } + } } diff --git a/test/unit/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportManagerTest.java b/test/unit/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportManagerTest.java new file mode 100644 index 000000000..c44c7d2ba --- /dev/null +++ b/test/unit/org/openstreetmap/josm/plugins/mapillary/io/export/MapillaryExportManagerTest.java @@ -0,0 +1,133 @@ +// License: GPL. For details, see LICENSE file. +package org.openstreetmap.josm.plugins.mapillary.io.export; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.awt.Graphics2D; +import java.awt.Image; +import java.awt.image.BufferedImage; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.logging.LogRecord; +import java.util.stream.Collectors; + +import javax.imageio.ImageIO; + +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import com.github.tomakehurst.wiremock.junit5.WireMockTest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.openstreetmap.josm.data.coor.LatLon; +import org.openstreetmap.josm.gui.MainApplication; +import org.openstreetmap.josm.plugins.mapillary.cache.MapillaryCache; +import org.openstreetmap.josm.plugins.mapillary.data.mapillary.MapillaryNode; +import org.openstreetmap.josm.plugins.mapillary.spi.preferences.MapillaryConfig; +import org.openstreetmap.josm.plugins.mapillary.testutils.annotations.LoggingHandler; +import org.openstreetmap.josm.plugins.mapillary.testutils.annotations.MapillaryURLWireMock; +import org.openstreetmap.josm.plugins.mapillary.utils.MapillaryImageUtils; +import org.openstreetmap.josm.testutils.annotations.BasicPreferences; +import org.openstreetmap.josm.testutils.annotations.HTTP; +import org.openstreetmap.josm.tools.ImageProvider; + +/** + * Test class for {@link MapillaryExportManager} + */ +@BasicPreferences +@WireMockTest +@HTTP +class MapillaryExportManagerTest { + @TempDir + Path temporaryDirectory; + + @AfterEach + void tearDown() { + MapillaryConfig.setUrlsProvider(new MapillaryURLWireMock.NullMapillaryUrl()); + } + + /** + * Non-regression test for #22471 + */ + @Test + @LoggingHandler + void testNonRegression22471(WireMockRuntimeInfo wireMockRuntimeInfo, LoggingHandler.TestHandler handler) + throws IOException { + MapillaryConfig + .setUrlsProvider(new MapillaryURLWireMock.WireMockServerMapillaryUrl(wireMockRuntimeInfo.getHttpBaseUrl())); + // This needs to be a bit more than the queue limit. Which is the thread limit. + int images = 200 * MapillaryCache.THREAD_LIMIT.get(); + List nodes = generateMapillaryNodes(wireMockRuntimeInfo, images); + MapillaryExportManager manager = new MapillaryExportManager<>(nodes, + temporaryDirectory.toString()); + manager.getProgressMonitor().beginTask("testNonRegression22471"); + manager.realRun(); + List records = new ArrayList<>(handler.getRecords()); + records.removeIf(record -> record.getMessage() != null && record.getMessage().contains("HTTP/1.1 200")); + assertTrue(records.isEmpty(), + records.stream().map(LogRecord::getMessage).filter(Objects::nonNull).collect(Collectors.joining(", "))); + List files = new ArrayList<>(); + Files.walkFileTree(this.temporaryDirectory, new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + if (file.toFile().isFile()) { + files.add(file); + } + return super.visitFile(file, attrs); + } + }); + assertEquals(images, files.size(), files.stream().map(Path::toString).collect(Collectors.joining(", "))); + } + + @Test + @LoggingHandler + void testNoBlocking(WireMockRuntimeInfo wireMockRuntimeInfo, LoggingHandler.TestHandler handler) { + MapillaryConfig + .setUrlsProvider(new MapillaryURLWireMock.WireMockServerMapillaryUrl(wireMockRuntimeInfo.getHttpBaseUrl())); + List nodes = generateMapillaryNodes(wireMockRuntimeInfo, 10); + nodes.forEach(node -> node.setKeys(null)); + MapillaryExportManager manager = new MapillaryExportManager<>(nodes, + temporaryDirectory.toString()); + Future future = MainApplication.worker.submit(manager); + assertDoesNotThrow(() -> future.get(10, TimeUnit.SECONDS)); + assertTrue(handler.getRecords().isEmpty()); + } + + private static List generateMapillaryNodes(WireMockRuntimeInfo wireMockRuntimeInfo, int count) { + List nodes = new ArrayList<>(count); + Image image = ImageProvider.getEmpty(ImageProvider.ImageSizes.SMALLICON).getImage(); + BufferedImage bufferedImage = new BufferedImage(image.getWidth(null), image.getHeight(null), + BufferedImage.TYPE_BYTE_GRAY); + Graphics2D g2d = bufferedImage.createGraphics(); + g2d.drawImage(image, 0, 0, null); + g2d.dispose(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + assertDoesNotThrow(() -> ImageIO.write(bufferedImage, "jpg", baos)); + byte[] emptyImage = baos.toByteArray(); + for (int i = 1; i <= count; i++) { + MapillaryNode node = new MapillaryNode(); + node.setOsmId(i, 1); + node.setCoor(new LatLon(i / 100d, i / 100d)); + node.put(MapillaryImageUtils.ImageProperties.ID.toString(), Integer.toString(i)); + node.put(MapillaryImageUtils.ImageProperties.THUMB_ORIGINAL_URL.toString(), + wireMockRuntimeInfo.getHttpBaseUrl() + "/image/" + i); + node.put(MapillaryImageUtils.ImageProperties.SEQUENCE_ID.toString(), "test-sequence"); + wireMockRuntimeInfo.getWireMock() + .register(WireMock.get("/image/" + i).willReturn(WireMock.aResponse().withBody(emptyImage))); + nodes.add(node); + } + return nodes; + } +} diff --git a/test/unit/org/openstreetmap/josm/plugins/mapillary/testutils/annotations/LoggingHandler.java b/test/unit/org/openstreetmap/josm/plugins/mapillary/testutils/annotations/LoggingHandler.java new file mode 100644 index 000000000..ee67e4ef2 --- /dev/null +++ b/test/unit/org/openstreetmap/josm/plugins/mapillary/testutils/annotations/LoggingHandler.java @@ -0,0 +1,104 @@ +// License: GPL. For details, see LICENSE file. +package org.openstreetmap.josm.plugins.mapillary.testutils.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.logging.Handler; +import java.util.logging.LogRecord; + +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; +import org.openstreetmap.josm.tools.Logging; + +/** + * + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ ElementType.METHOD }) +@ExtendWith(LoggingHandler.LoggingHandlerImplementation.class) +public @interface LoggingHandler { + class LoggingHandlerImplementation implements BeforeEachCallback, AfterEachCallback, ParameterResolver { + + @Override + public void afterEach(ExtensionContext context) { + ExtensionContext.Store store = context.getStore(ExtensionContext.Namespace.create(LoggingHandler.class)); + Logging.getLogger().removeHandler(store.get(TestHandler.class, TestHandler.class)); + Handler[] handlers = store.get(Logging.class, Handler[].class); + for (Handler handler : handlers) { + Logging.getLogger().addHandler(handler); + } + } + + @Override + public void beforeEach(ExtensionContext context) { + ExtensionContext.Store store = context.getStore(ExtensionContext.Namespace.create(LoggingHandler.class)); + store.put(Logging.class, Logging.getLogger().getHandlers()); + for (Handler handler : Logging.getLogger().getHandlers()) { + Logging.getLogger().removeHandler(handler); + } + TestHandler testHandler = new TestHandler(); + store.put(TestHandler.class, testHandler); + Logging.getLogger().addHandler(testHandler); + } + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) + throws ParameterResolutionException { + return TestHandler.class.isAssignableFrom(parameterContext.getParameter().getType()); + } + + @Override + public TestHandler resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) + throws ParameterResolutionException { + return extensionContext.getStore(ExtensionContext.Namespace.create(LoggingHandler.class)) + .get(TestHandler.class, TestHandler.class); + } + } + + class TestHandler extends Handler { + private final List records = new ArrayList<>(); + + /** + * Get the records saved by this handler + * + * @return The saved records + */ + public List getRecords() { + return Collections.unmodifiableList(this.records); + } + + /** + * Clear stored records + */ + public void clearRecords() { + this.records.clear(); + } + + @Override + public void publish(LogRecord record) { + this.records.add(record); + } + + @Override + public void flush() { + // Do nothing + } + + @Override + public void close() throws SecurityException { + // Do nothing + } + } +} diff --git a/test/unit/org/openstreetmap/josm/plugins/mapillary/testutils/annotations/MapillaryURLWireMock.java b/test/unit/org/openstreetmap/josm/plugins/mapillary/testutils/annotations/MapillaryURLWireMock.java index 450a99490..9f748c365 100644 --- a/test/unit/org/openstreetmap/josm/plugins/mapillary/testutils/annotations/MapillaryURLWireMock.java +++ b/test/unit/org/openstreetmap/josm/plugins/mapillary/testutils/annotations/MapillaryURLWireMock.java @@ -370,25 +370,29 @@ private static void filterFiles(final JsonObjectBuilder builder, final List