Skip to content

Commit

Permalink
replaced stream with forloop in masks processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Cristian Goina committed Aug 6, 2024
1 parent 602f5ad commit 612d609
Showing 1 changed file with 86 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ private void calculateAllGradientScores() {
excludedRegions
);
NeuronMatchesReader<CDMatchEntity<EMNeuronEntity, LMNeuronEntity>> cdMatchesReader = getCDMatchesReader();
NeuronMatchesWriter<CDMatchEntity<EMNeuronEntity, LMNeuronEntity>> matchesWriter = getCDMatchesWriter();
CDMIPsWriter cdmipsWriter = getCDMipsWriter();
Collection<String> matchesMasksToProcess = cdMatchesReader.listMatchesLocations(
args.masksLibraries.stream()
.map(larg -> new DataSourceParam()
Expand Down Expand Up @@ -169,55 +167,13 @@ private void calculateAllGradientScores() {
masksPartitionedStream.forEach(indexedPartition -> {
int partitionId = indexedPartition.getKey(); // unbox it
List<String> partionMasks = indexedPartition.getValue();
LOG.info("Start processing partition {} ({} items)",
partitionId,
partionMasks.size());
long startProcessingPartitionTime = System.currentTimeMillis();
// process each item from the current partition sequentially
partionMasks.forEach(maskIdToProcess -> {
// read all matches for the current mask
List<CDMatchEntity<EMNeuronEntity, LMNeuronEntity>> cdMatchesForMask = getCDMatchesForMask(cdMatchesReader, maskIdToProcess);
long nPublishedLines = cdMatchesForMask.stream()
.map(cdm -> cdm.getMatchedImage().getPublishedName())
.distinct()
.count();
// calculate the grad scores
LOG.info("Partition {} - calculate grad scores for {} matches ({} lines) of {} - memory usage {}M out of {}M",
partitionId,
cdMatchesForMask.size(), nPublishedLines, maskIdToProcess,
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up
(Runtime.getRuntime().totalMemory() / _1M));
List<CDMatchEntity<EMNeuronEntity, LMNeuronEntity>> cdMatchesWithGradScores = calculateGradientScores(
gradScoreAlgorithmProvider,
cdMatchesForMask,
args.gradScoreParallelism,
executor);
LOG.info("Partition {} - completed grad scores for {} matches of {} - memory usage {}M out of {}M",
partitionId,
cdMatchesWithGradScores.size(), maskIdToProcess,
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up
(Runtime.getRuntime().totalMemory() / _1M));
long writtenUpdates = updateCDMatches(cdMatchesWithGradScores, matchesWriter);
LOG.info("Partition {} - updated {} grad scores for {} matches of {} - memory usage {}M out of {}M",
partitionId,
writtenUpdates, cdMatchesWithGradScores.size(), maskIdToProcess,
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up
(Runtime.getRuntime().totalMemory() / _1M));
if (StringUtils.isNotBlank(args.processingTag)) {
long updatesWithProcessedTag = updateProcessingTag(cdMatchesForMask, cdmipsWriter);
LOG.info("Partition {} - set processing tag {} for {} mips - memory usage {}M out of {}M",
partitionId, args.getProcessingTag(), updatesWithProcessedTag,
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up
(Runtime.getRuntime().totalMemory() / _1M));
}
System.gc(); // explicitly garbage collect
});
LOG.info("Finished partition {} ({} items) in {}s - memory usage {}M out of {}M",
partitionId,
partionMasks.size(),
(System.currentTimeMillis() - startProcessingPartitionTime) / 1000.,
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up
(Runtime.getRuntime().totalMemory() / _1M));
processMasks(
partionMasks,
cdMatchesReader,
gradScoreAlgorithmProvider,
executor,
String.format("Partition %d", partitionId)
);
});
LOG.info("Finished calculating gradient scores for {} items in {}s - memory usage {}M out of {}M",
size,
Expand All @@ -226,6 +182,78 @@ private void calculateAllGradientScores() {
(Runtime.getRuntime().totalMemory() / _1M));
}

private void processMasks(List<String> masksIds,
NeuronMatchesReader<CDMatchEntity<EMNeuronEntity, LMNeuronEntity>> cdMatchesReader,
ColorDepthSearchAlgorithmProvider<ShapeMatchScore> gradScoreAlgorithmProvider,
Executor executor,
String processingContext) {
LOG.info("Start {} - process {} masks", processingContext, masksIds.size());
long startProcessingPartitionTime = System.currentTimeMillis();
for (String maskId : masksIds) {
processMask(maskId, cdMatchesReader, gradScoreAlgorithmProvider, executor, processingContext);
}
LOG.info("Finished {} - completed {} masks in {}s - memory usage {}M out of {}M",
processingContext,
masksIds.size(),
(System.currentTimeMillis() - startProcessingPartitionTime) / 1000.,
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up
(Runtime.getRuntime().totalMemory() / _1M));
}

private void processMask(String maskId,
NeuronMatchesReader<CDMatchEntity<EMNeuronEntity, LMNeuronEntity>> cdMatchesReader,
ColorDepthSearchAlgorithmProvider<ShapeMatchScore> gradScoreAlgorithmProvider,
Executor executor,
String processingContext) {
// read all matches for the current mask
List<CDMatchEntity<EMNeuronEntity, LMNeuronEntity>> cdMatchesForMask = getCDMatchesForMask(cdMatchesReader, maskId);
long nPublishedNames = cdMatchesForMask.stream()
.map(cdm -> cdm.getMatchedImage().getPublishedName())
.distinct()
.count();
long nSourceSamples = cdMatchesForMask.stream()
.map(cdm -> cdm.getMatchedImage().getSourceRefId())
.distinct()
.count();
// calculate the grad scores
LOG.info("{} - calculate grad scores for {} matches ({}/{} published names/source samples) of {} - memory usage {}M out of {}M",
processingContext,
cdMatchesForMask.size(),
maskId,
nPublishedNames,
nSourceSamples,
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up
(Runtime.getRuntime().totalMemory() / _1M));
List<CDMatchEntity<EMNeuronEntity, LMNeuronEntity>> cdMatchesWithGradScores = calculateGradientScores(
gradScoreAlgorithmProvider,
cdMatchesForMask,
args.gradScoreParallelism,
executor);
LOG.info("{} - completed grad scores for {} matches of {} - memory usage {}M out of {}M",
processingContext,
cdMatchesWithGradScores.size(),
maskId,
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up
(Runtime.getRuntime().totalMemory() / _1M));
long writtenUpdates = updateCDMatches(cdMatchesWithGradScores, getCDMatchesWriter());
LOG.info("{} - updated {} grad scores for {} matches of {} - memory usage {}M out of {}M",
processingContext,
writtenUpdates, cdMatchesWithGradScores.size(),
maskId,
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up
(Runtime.getRuntime().totalMemory() / _1M));
if (StringUtils.isNotBlank(args.processingTag)) {
long updatesWithProcessedTag = updateProcessingTag(cdMatchesForMask);
LOG.info("{} - set processing tag {} for {} mips - memory usage {}M out of {}M",
processingContext,
args.getProcessingTag(),
updatesWithProcessedTag,
(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / _1M + 1, // round up
(Runtime.getRuntime().totalMemory() / _1M));
}
System.gc(); // explicitly garbage collect
}

/**
* The ROI mask is typically the hemibrain mask that should be applied when the color depth search is done from LM to EM.
*
Expand Down Expand Up @@ -332,8 +360,8 @@ private <M extends AbstractNeuronEntity, T extends AbstractNeuronEntity> long up
));
}

private <M extends AbstractNeuronEntity, T extends AbstractNeuronEntity> long updateProcessingTag(List<CDMatchEntity<M, T>> cdMatches,
CDMIPsWriter cdmipsWriter) {
private <M extends AbstractNeuronEntity, T extends AbstractNeuronEntity> long updateProcessingTag(List<CDMatchEntity<M, T>> cdMatches) {
CDMIPsWriter cdmipsWriter = getCDMipsWriter();
if (cdmipsWriter != null) {
Set<String> processingTags = Collections.singleton(args.getProcessingTag());
Set<M> masksToUpdate = cdMatches.stream()
Expand Down Expand Up @@ -403,11 +431,11 @@ List<CompletableFuture<List<CDMatchEntity<M, T>>>> runGradScoreComputations(M ma
LOG.info("Prepare gradient score computations for {} with {} matches", mask, selectedMatches.size());
LOG.info("Load query image {}", mask);
NeuronMIP<M> maskImage = NeuronMIPUtils.loadComputeFile(mask, ComputeFileType.InputColorDepthImage);
if (NeuronMIPUtils.hasNoImageArray(maskImage)) {
LOG.error("No image found for {}", mask);
return Collections.emptyList();
}
try {
if (NeuronMIPUtils.hasNoImageArray(maskImage)) {
LOG.error("No image found for {}", mask);
return Collections.emptyList();
}
ColorDepthSearchAlgorithm<ShapeMatchScore> gradScoreAlgorithm =
gradScoreAlgorithmProvider.createColorDepthQuerySearchAlgorithmWithDefaultParams(
maskImage.getImageArray(),
Expand Down Expand Up @@ -449,14 +477,14 @@ List<CompletableFuture<List<CDMatchEntity<M, T>>>> runGradScoreComputations(M ma
cdsMatch.setGradientAreaGap(-1L);
}
}
System.gc();
return cdsMatches;
}, executor))
.collect(Collectors.toList());
} finally {
maskImage = null; // I am explicitly nullifying it because the method returns promises
// so I think the var may appear as it is still used even though
// the promises use only the imageArray inside it
System.gc();
}
}

Expand Down

0 comments on commit 612d609

Please sign in to comment.