From 612d609d3cb669deeedc376839d362c8124bb7a7 Mon Sep 17 00:00:00 2001 From: Cristian Goina Date: Tue, 6 Aug 2024 14:16:19 -0400 Subject: [PATCH] replaced stream with forloop in masks processing --- .../cmd/CalculateGradientScoresCmd.java | 144 +++++++++++------- 1 file changed, 86 insertions(+), 58 deletions(-) diff --git a/colormipsearch-tools/src/main/java/org/janelia/colormipsearch/cmd/CalculateGradientScoresCmd.java b/colormipsearch-tools/src/main/java/org/janelia/colormipsearch/cmd/CalculateGradientScoresCmd.java index 9a305df9..4c769832 100644 --- a/colormipsearch-tools/src/main/java/org/janelia/colormipsearch/cmd/CalculateGradientScoresCmd.java +++ b/colormipsearch-tools/src/main/java/org/janelia/colormipsearch/cmd/CalculateGradientScoresCmd.java @@ -136,8 +136,6 @@ private void calculateAllGradientScores() { excludedRegions ); NeuronMatchesReader> cdMatchesReader = getCDMatchesReader(); - NeuronMatchesWriter> matchesWriter = getCDMatchesWriter(); - CDMIPsWriter cdmipsWriter = getCDMipsWriter(); Collection matchesMasksToProcess = cdMatchesReader.listMatchesLocations( args.masksLibraries.stream() .map(larg -> new DataSourceParam() @@ -169,55 +167,13 @@ private void calculateAllGradientScores() { masksPartitionedStream.forEach(indexedPartition -> { int partitionId = indexedPartition.getKey(); // unbox it List partionMasks = indexedPartition.getValue(); - LOG.info("Start processing partition {} ({} items)", - partitionId, - partionMasks.size()); - long startProcessingPartitionTime = System.currentTimeMillis(); - // process each item from the current partition sequentially - partionMasks.forEach(maskIdToProcess -> { - // read all matches for the current mask - List> cdMatchesForMask = getCDMatchesForMask(cdMatchesReader, maskIdToProcess); - long nPublishedLines = cdMatchesForMask.stream() - .map(cdm -> cdm.getMatchedImage().getPublishedName()) - .distinct() - .count(); - // calculate the grad scores - LOG.info("Partition {} - calculate grad scores for {} matches ({} lines) of {} - memory usage {}M out of {}M", - partitionId, - cdMatchesForMask.size(), nPublishedLines, maskIdToProcess, - (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up - (Runtime.getRuntime().totalMemory() / _1M)); - List> cdMatchesWithGradScores = calculateGradientScores( - gradScoreAlgorithmProvider, - cdMatchesForMask, - args.gradScoreParallelism, - executor); - LOG.info("Partition {} - completed grad scores for {} matches of {} - memory usage {}M out of {}M", - partitionId, - cdMatchesWithGradScores.size(), maskIdToProcess, - (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up - (Runtime.getRuntime().totalMemory() / _1M)); - long writtenUpdates = updateCDMatches(cdMatchesWithGradScores, matchesWriter); - LOG.info("Partition {} - updated {} grad scores for {} matches of {} - memory usage {}M out of {}M", - partitionId, - writtenUpdates, cdMatchesWithGradScores.size(), maskIdToProcess, - (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up - (Runtime.getRuntime().totalMemory() / _1M)); - if (StringUtils.isNotBlank(args.processingTag)) { - long updatesWithProcessedTag = updateProcessingTag(cdMatchesForMask, cdmipsWriter); - LOG.info("Partition {} - set processing tag {} for {} mips - memory usage {}M out of {}M", - partitionId, args.getProcessingTag(), updatesWithProcessedTag, - (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up - (Runtime.getRuntime().totalMemory() / _1M)); - } - System.gc(); // explicitly garbage collect - }); - LOG.info("Finished partition {} ({} items) in {}s - memory usage {}M out of {}M", - partitionId, - partionMasks.size(), - (System.currentTimeMillis() - startProcessingPartitionTime) / 1000., - (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up - (Runtime.getRuntime().totalMemory() / _1M)); + processMasks( + partionMasks, + cdMatchesReader, + gradScoreAlgorithmProvider, + executor, + String.format("Partition %d", partitionId) + ); }); LOG.info("Finished calculating gradient scores for {} items in {}s - memory usage {}M out of {}M", size, @@ -226,6 +182,78 @@ private void calculateAllGradientScores() { (Runtime.getRuntime().totalMemory() / _1M)); } + private void processMasks(List masksIds, + NeuronMatchesReader> cdMatchesReader, + ColorDepthSearchAlgorithmProvider gradScoreAlgorithmProvider, + Executor executor, + String processingContext) { + LOG.info("Start {} - process {} masks", processingContext, masksIds.size()); + long startProcessingPartitionTime = System.currentTimeMillis(); + for (String maskId : masksIds) { + processMask(maskId, cdMatchesReader, gradScoreAlgorithmProvider, executor, processingContext); + } + LOG.info("Finished {} - completed {} masks in {}s - memory usage {}M out of {}M", + processingContext, + masksIds.size(), + (System.currentTimeMillis() - startProcessingPartitionTime) / 1000., + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up + (Runtime.getRuntime().totalMemory() / _1M)); + } + + private void processMask(String maskId, + NeuronMatchesReader> cdMatchesReader, + ColorDepthSearchAlgorithmProvider gradScoreAlgorithmProvider, + Executor executor, + String processingContext) { + // read all matches for the current mask + List> cdMatchesForMask = getCDMatchesForMask(cdMatchesReader, maskId); + long nPublishedNames = cdMatchesForMask.stream() + .map(cdm -> cdm.getMatchedImage().getPublishedName()) + .distinct() + .count(); + long nSourceSamples = cdMatchesForMask.stream() + .map(cdm -> cdm.getMatchedImage().getSourceRefId()) + .distinct() + .count(); + // calculate the grad scores + LOG.info("{} - calculate grad scores for {} matches ({}/{} published names/source samples) of {} - memory usage {}M out of {}M", + processingContext, + cdMatchesForMask.size(), + maskId, + nPublishedNames, + nSourceSamples, + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up + (Runtime.getRuntime().totalMemory() / _1M)); + List> cdMatchesWithGradScores = calculateGradientScores( + gradScoreAlgorithmProvider, + cdMatchesForMask, + args.gradScoreParallelism, + executor); + LOG.info("{} - completed grad scores for {} matches of {} - memory usage {}M out of {}M", + processingContext, + cdMatchesWithGradScores.size(), + maskId, + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up + (Runtime.getRuntime().totalMemory() / _1M)); + long writtenUpdates = updateCDMatches(cdMatchesWithGradScores, getCDMatchesWriter()); + LOG.info("{} - updated {} grad scores for {} matches of {} - memory usage {}M out of {}M", + processingContext, + writtenUpdates, cdMatchesWithGradScores.size(), + maskId, + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up + (Runtime.getRuntime().totalMemory() / _1M)); + if (StringUtils.isNotBlank(args.processingTag)) { + long updatesWithProcessedTag = updateProcessingTag(cdMatchesForMask); + LOG.info("{} - set processing tag {} for {} mips - memory usage {}M out of {}M", + processingContext, + args.getProcessingTag(), + updatesWithProcessedTag, + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up + (Runtime.getRuntime().totalMemory() / _1M)); + } + System.gc(); // explicitly garbage collect + } + /** * The ROI mask is typically the hemibrain mask that should be applied when the color depth search is done from LM to EM. * @@ -332,8 +360,8 @@ private long up )); } - private long updateProcessingTag(List> cdMatches, - CDMIPsWriter cdmipsWriter) { + private long updateProcessingTag(List> cdMatches) { + CDMIPsWriter cdmipsWriter = getCDMipsWriter(); if (cdmipsWriter != null) { Set processingTags = Collections.singleton(args.getProcessingTag()); Set masksToUpdate = cdMatches.stream() @@ -403,11 +431,11 @@ List>>> runGradScoreComputations(M ma LOG.info("Prepare gradient score computations for {} with {} matches", mask, selectedMatches.size()); LOG.info("Load query image {}", mask); NeuronMIP maskImage = NeuronMIPUtils.loadComputeFile(mask, ComputeFileType.InputColorDepthImage); - if (NeuronMIPUtils.hasNoImageArray(maskImage)) { - LOG.error("No image found for {}", mask); - return Collections.emptyList(); - } try { + if (NeuronMIPUtils.hasNoImageArray(maskImage)) { + LOG.error("No image found for {}", mask); + return Collections.emptyList(); + } ColorDepthSearchAlgorithm gradScoreAlgorithm = gradScoreAlgorithmProvider.createColorDepthQuerySearchAlgorithmWithDefaultParams( maskImage.getImageArray(), @@ -449,6 +477,7 @@ List>>> runGradScoreComputations(M ma cdsMatch.setGradientAreaGap(-1L); } } + System.gc(); return cdsMatches; }, executor)) .collect(Collectors.toList()); @@ -456,7 +485,6 @@ List>>> runGradScoreComputations(M ma maskImage = null; // I am explicitly nullifying it because the method returns promises // so I think the var may appear as it is still used even though // the promises use only the imageArray inside it - System.gc(); } }