From 8c8a4eac92f03e66cc26fe724a80483498021921 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joaqu=C3=ADn=20T=C3=A1rraga=20Gim=C3=A9nez?= Date: Tue, 1 Oct 2024 09:51:09 +0200 Subject: [PATCH] lib: improve PGS builder, #TASK-5407, #TASK-5387 --- .../lib/builders/PolygenicScoreBuilder.java | 128 +++++++++++------- 1 file changed, 77 insertions(+), 51 deletions(-) diff --git a/cellbase-lib/src/main/java/org/opencb/cellbase/lib/builders/PolygenicScoreBuilder.java b/cellbase-lib/src/main/java/org/opencb/cellbase/lib/builders/PolygenicScoreBuilder.java index 9ad89c093..660758fe8 100644 --- a/cellbase-lib/src/main/java/org/opencb/cellbase/lib/builders/PolygenicScoreBuilder.java +++ b/cellbase-lib/src/main/java/org/opencb/cellbase/lib/builders/PolygenicScoreBuilder.java @@ -174,73 +174,89 @@ public void parse() throws Exception { logger.info(BUILDING_LOG_MESSAGE, getDataName(PGS_DATA)); + int numFiles; + int counter; + String endsWith; + File[] files = downloadPath.toFile().listFiles(); + + // First, process metadata files try (BufferedWriter bw = FileUtils.newBufferedWriter(serializer.getOutdir().resolve(PGS_COMMON_COLLECTION + JSON_GZ_EXTENSION))) { - int counter = 0; - File[] files = downloadPath.toFile().listFiles(); + counter = 0; + endsWith = "_metadata" + TAR_GZ_EXTENSION; + numFiles = getNumFiles(files, endsWith); for (File file : files) { - if (file.isFile()) { - if (file.getName().endsWith(TXT_GZ_EXTENSION)) { - // E.g.: PGS004905_hmPOS_GRCh38.txt.gz: it contains the variants - logger.info(PARSING_LOG_MESSAGE, file.getName()); - - String pgsId = null; - Map columnPos = new HashMap<>(); - - try (BufferedReader br = FileUtils.newBufferedReader(file.toPath())) { - String line; - while ((line = br.readLine()) != null) { - if (line.startsWith("#")) { - if (line.startsWith("#pgs_id=")) { - pgsId = line.split("=")[1].trim(); - // Sanity check - if (!file.getName().startsWith(pgsId)) { - throw new CellBaseException("Error parsing file " + file.getName() + ": pgs_id mismatch"); - } - // Add PGS ID to the set - pgsIdSet.add(pgsId); - } - } else if (line.startsWith(RSID_COL) || line.startsWith(CHR_NAME_COL)) { - String[] fields = line.split("\t"); - for (int i = 0; i < fields.length; i++) { - columnPos.put(fields[i], i); - } - } else { - // Sanity check - if (pgsId == null) { - throw new CellBaseException("Error parsing file " + file.getName() + ": pgs_id is null"); - } - saveVariantPolygenicScore(line, columnPos, pgsId); + if (file.isFile() && file.getName().endsWith(endsWith)) { + // E.g.: PGS004905_metadata.tar.gz: it contains a set of files about metadata + logger.info(PARSING_LOG_MESSAGE, file.getName()); + processPgsMetadataFile(file, bw); + logger.info(PARSING_DONE_LOG_MESSAGE, file.getName()); + logger.info("Progress: {} of {} meta files", ++counter, numFiles); + } + } + } + + // Second, process variant files + counter = 0; + endsWith = TXT_GZ_EXTENSION; + numFiles = getNumFiles(files, endsWith); + for (File file : files) { + if (file.isFile() && file.getName().endsWith(endsWith)) { + // E.g.: PGS004905_hmPOS_GRCh38.txt.gz: it contains the variants + logger.info(PARSING_LOG_MESSAGE, file.getName()); + + String pgsId = null; + Map columnPos = new HashMap<>(); + + try (BufferedReader br = FileUtils.newBufferedReader(file.toPath())) { + String line; + while ((line = br.readLine()) != null) { + if (line.startsWith("#")) { + if (line.startsWith("#pgs_id=")) { + pgsId = line.split("=")[1].trim(); + // Sanity check + if (!file.getName().startsWith(pgsId)) { + throw new CellBaseException("Error parsing file " + file.getName() + ": pgs_id mismatch"); } + // Add PGS ID to the set + pgsIdSet.add(pgsId); + } + } else if (line.startsWith(RSID_COL) || line.startsWith(CHR_NAME_COL)) { + String[] fields = line.split("\t"); + for (int i = 0; i < fields.length; i++) { + columnPos.put(fields[i], i); } + } else { + // Sanity check + if (pgsId == null) { + throw new CellBaseException("Error parsing file " + file.getName() + ": pgs_id is null"); + } + saveVariantPolygenicScore(line, columnPos, pgsId); } - logger.info(PARSING_DONE_LOG_MESSAGE, file.getName()); - } else if (file.getName().endsWith("_metadata" + TAR_GZ_EXTENSION)) { - // E.g.: PGS004905_metadata.tar.gz: it contains a set of files about metadata - logger.info(PARSING_LOG_MESSAGE, file.getName()); - processPgsMetadataFile(file, bw); - logger.info(PARSING_DONE_LOG_MESSAGE, file.getName()); } } - logger.info("Progress {} of {} files", ++counter, files.length); + logger.info(PARSING_DONE_LOG_MESSAGE, file.getName()); + logger.info("Progress: {} of {} variant files", ++counter, numFiles); } + } - // Write remaining variant ID batch - RocksDB rdb = (RocksDB) varRDBConn[0]; -// logger.info("Writing variant ID batch with {} items, {} KB", varBatch.count(), varBatchSize / 1024); + RocksDB rdb; + // Write remaining variant ID batch + if (varBatchCounter > 0) { + rdb = (RocksDB) varRDBConn[0]; rdb.write(new WriteOptions(), varBatch); varBatch.clear(); - // Write remaining PGS/variant batch + } + // Write remaining PGS/variant batch + if (varPgsBatchCounter > 0) { rdb = (RocksDB) varPgsRDBConn[0]; -// logger.info("Writing PGS batch with {} items, {} KB", varPgsBatch.count(), varPgsBatchSize / 1024); rdb.write(new WriteOptions(), varPgsBatch); varPgsBatch.clear(); - - - // Serialize/write the saved variant polygenic scores in the RocksDB - serializeRDB(); - serializer.close(); } + // Serialize/write the saved variant polygenic scores in the RocksDB + serializeRDB(); + serializer.close(); + logger.info(BUILDING_DONE_LOG_MESSAGE, getDataName(PGS_DATA)); } @@ -691,4 +707,14 @@ private Object[] getDBConnection(String dbLocation, boolean forceCreate) { return new Object[]{db, options, dbLocation, indexingNeeded}; } + + private int getNumFiles(File[] files, String endsWith) { + int numFiles = 0; + for (File file : files) { + if (file.isFile() && file.getName().endsWith(endsWith)) { + ++numFiles; + } + } + return numFiles; + } }