From 62dd9022d07c20d6c7e9a5572668b800198b03f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joaqu=C3=ADn=20T=C3=A1rraga=20Gim=C3=A9nez?= Date: Tue, 13 Aug 2024 14:08:09 +0200 Subject: [PATCH] lib: improve thread management in data release singleton, #TASK-6565 --- .../core/singleton/DataReleaseSingleton.java | 90 +++++++++++-------- .../lib/impl/core/singleton/DatabaseInfo.java | 22 ++--- 2 files changed, 66 insertions(+), 46 deletions(-) diff --git a/cellbase-lib/src/main/java/org/opencb/cellbase/lib/impl/core/singleton/DataReleaseSingleton.java b/cellbase-lib/src/main/java/org/opencb/cellbase/lib/impl/core/singleton/DataReleaseSingleton.java index b0b401b27..3b884242d 100644 --- a/cellbase-lib/src/main/java/org/opencb/cellbase/lib/impl/core/singleton/DataReleaseSingleton.java +++ b/cellbase-lib/src/main/java/org/opencb/cellbase/lib/impl/core/singleton/DataReleaseSingleton.java @@ -39,13 +39,17 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; public final class DataReleaseSingleton { // Map where the key is dbname and the value is the DatabaseInfo private Map dbInfoMap = new HashMap<>(); + private ExecutorService executorService; private CellBaseManagerFactory managerFactory; private static DataReleaseSingleton instance; @@ -58,6 +62,21 @@ public final class DataReleaseSingleton { // Private constructor to prevent instantiation private DataReleaseSingleton(CellBaseManagerFactory managerFactory) throws CellBaseException { + // Create the executor service with a 4-thread pool and add shutdown hook to terminate thread pool + this.executorService = Executors.newFixedThreadPool(4); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + LOGGER.info("Shutting down thread pool..."); + executorService.shutdownNow(); + try { + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + LOGGER.info("Thread pool did not terminate in the specified time."); + } + } catch (InterruptedException e) { + LOGGER.info("Thread pool termination interrupted."); + Thread.currentThread().interrupt(); + } + })); + this.managerFactory = managerFactory; // Support multi species and assemblies @@ -67,7 +86,7 @@ private DataReleaseSingleton(CellBaseManagerFactory managerFactory) throws CellB String databaseName = MongoDBManager.getDatabaseName(vertebrate.getId(), assembly.getName(), configuration.getVersion()); // This is necessary, before creating the database name the assembly is "cleaned", and we need to get the data release // manager from the species and the assembly - DatabaseInfo dbInfo = new DatabaseInfo(databaseName, vertebrate.getId(), assembly.getName(), new ReentrantReadWriteLock(), + DatabaseInfo dbInfo = new DatabaseInfo(databaseName, vertebrate.getId(), assembly.getName(), new ReentrantLock(), new HashMap<>()); dbInfoMap.put(databaseName, dbInfo); @@ -78,33 +97,31 @@ private DataReleaseSingleton(CellBaseManagerFactory managerFactory) throws CellB LOGGER.info("Setting listener for database {} and collection {}", database.getName(), collection.getNamespace() .getCollectionName()); // Set up the change stream for the collection - new Thread(() -> { - while (true) { + executorService.submit(() -> { + while (!Thread.currentThread().isInterrupted()) { try { collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach(changeStreamDocument -> { try { handleDocumentChange(changeStreamDocument); } catch (CellBaseException e) { LOGGER.warn("Exception from handle document change function (database = {}, collection = {}): {}", - collection.getNamespace().getDatabaseName(), collection.getNamespace().getCollectionName(), - Arrays.toString(e.getStackTrace())); + collection.getNamespace().getDatabaseName(), collection.getNamespace().getCollectionName(), e); } }); } catch (Exception e) { LOGGER.error("Failed to watch collection (database = {}, collection = {}): {}", - collection.getNamespace().getDatabaseName(), collection.getNamespace().getCollectionName(), - Arrays.toString(e.getStackTrace())); + collection.getNamespace().getDatabaseName(), collection.getNamespace().getCollectionName(), e); try { // Sleep 5 sec before retrying to avoid tight loop Thread.sleep(5000); } catch (InterruptedException ie) { - // Restore interrupt status + // Restore interrupt status, and exit loop if interrupted Thread.currentThread().interrupt(); - break; // Exit loop if interrupted + break; } } } - }).start(); + }); } } } @@ -154,30 +171,33 @@ public void checkDataRelease(String dbname, int release) throws CellBaseExceptio public void checkDataRelease(String dbname, int release, String data) throws CellBaseException { DatabaseInfo dbInfo = getDatabaseInfo(dbname); - // Lock and load data if necessary - dbInfo.getRwLock().writeLock().lock(); - try { - if (!dbInfo.getCacheData().containsKey(release) - || (StringUtils.isNotEmpty(data) && !dbInfo.getCacheData().get(release).containsKey(data))) { - // Load the data releases from the MongoDB collection for that database name - loadData(dbname); - - // Check after loading - if (!dbInfo.getCacheData().containsKey(release)) { - // If the release is invalid, throw an exception - String msg = INVALID_RELEASE_MSG_PREFIX + release + ". The available data releases are: " - + dbInfo.getCacheData().keySet(); - throw new CellBaseException(msg); - } - if (StringUtils.isNotEmpty(data) && !dbInfo.getCacheData().get(release).containsKey(data)) { - // If the data is invalid, throw an exception - String msg = INVALID_DATA_MSG_PREFIX + " '" + data + "', it's not present in release " + release - + ". The available data are: " + dbInfo.getCacheData().get(release).keySet(); - throw new CellBaseException(msg); + if (!dbInfo.getCacheData().containsKey(release) + || (StringUtils.isNotEmpty(data) && !dbInfo.getCacheData().get(release).containsKey(data))) { + try { + // Lock and load data if necessary + dbInfo.getLock().lock(); + if (!dbInfo.getCacheData().containsKey(release) + || (StringUtils.isNotEmpty(data) && !dbInfo.getCacheData().get(release).containsKey(data))) { + // Load the data releases from the MongoDB collection for that database name + loadData(dbname); } + } finally { + dbInfo.getLock().unlock(); + } + + // Check after loading + if (!dbInfo.getCacheData().containsKey(release)) { + // If the release is invalid, throw an exception + String msg = INVALID_RELEASE_MSG_PREFIX + release + ". The available data releases are: " + + dbInfo.getCacheData().keySet(); + throw new CellBaseException(msg); + } + if (StringUtils.isNotEmpty(data) && !dbInfo.getCacheData().get(release).containsKey(data)) { + // If the data is invalid, throw an exception + String msg = INVALID_DATA_MSG_PREFIX + " '" + data + "', it's not present in release " + release + + ". The available data are: " + dbInfo.getCacheData().get(release).keySet(); + throw new CellBaseException(msg); } - } finally { - dbInfo.getRwLock().writeLock().unlock(); } } @@ -198,11 +218,11 @@ private void handleDocumentChange(ChangeStreamDocument changeStreamDoc DatabaseInfo dbInfo = getDatabaseInfo(dbname); // Handle the change event - dbInfo.getRwLock().writeLock().lock(); + dbInfo.getLock().lock(); try { loadData(dbname); } finally { - dbInfo.getRwLock().writeLock().unlock(); + dbInfo.getLock().unlock(); } } diff --git a/cellbase-lib/src/main/java/org/opencb/cellbase/lib/impl/core/singleton/DatabaseInfo.java b/cellbase-lib/src/main/java/org/opencb/cellbase/lib/impl/core/singleton/DatabaseInfo.java index 85eb32eb1..d953f278d 100644 --- a/cellbase-lib/src/main/java/org/opencb/cellbase/lib/impl/core/singleton/DatabaseInfo.java +++ b/cellbase-lib/src/main/java/org/opencb/cellbase/lib/impl/core/singleton/DatabaseInfo.java @@ -20,26 +20,26 @@ import java.util.HashMap; import java.util.Map; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; public class DatabaseInfo { private String dbName; private String species; private String assembly; - private ReentrantReadWriteLock rwLock; + private ReentrantLock lock; private Map> cacheData; public DatabaseInfo() { - this.rwLock = new ReentrantReadWriteLock(); + this.lock = new ReentrantLock(); this.cacheData = new HashMap<>(); } - public DatabaseInfo(String dbName, String species, String assembly, ReentrantReadWriteLock rwLock, Map> cacheData) { + public DatabaseInfo(String dbName, String species, String assembly, ReentrantLock lock, + Map> cacheData) { this.dbName = dbName; this.species = species; this.assembly = assembly; - this.rwLock = rwLock; + this.lock = lock; this.cacheData = cacheData; } @@ -49,7 +49,7 @@ public String toString() { sb.append("dbName='").append(dbName).append('\''); sb.append(", species='").append(species).append('\''); sb.append(", assembly='").append(assembly).append('\''); - sb.append(", rwLock=").append(rwLock); + sb.append(", lock=").append(lock); sb.append(", cacheData=").append(cacheData); sb.append('}'); return sb.toString(); @@ -82,12 +82,12 @@ public DatabaseInfo setAssembly(String assembly) { return this; } - public ReentrantReadWriteLock getRwLock() { - return rwLock; + public ReentrantLock getLock() { + return lock; } - public DatabaseInfo setRwLock(ReentrantReadWriteLock rwLock) { - this.rwLock = rwLock; + public DatabaseInfo setRwLock(ReentrantLock lock) { + this.lock = lock; return this; }