Skip to content

Commit

Permalink
Merge pull request #2459 from opencb/TASK-5895
Browse files Browse the repository at this point in the history
TASK-5895 - HBaseLockManager Inconsistent lock status
  • Loading branch information
j-coll authored Jun 17, 2024
2 parents 7fcd257 + 97c09c6 commit 63dec4c
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
Expand Down Expand Up @@ -191,14 +190,7 @@ public ObjectMap getConfiguration() {

public Lock lockGlobal(long lockDuration, long timeout, String lockName)
throws StorageEngineException {
try {
return projectDBAdaptor.lockProject(lockDuration, timeout, lockName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StorageEngineException("Unable to lock the Project", e);
} catch (TimeoutException e) {
throw new StorageEngineException("Unable to lock the Project", e);
}
return projectDBAdaptor.lockProject(lockDuration, timeout, lockName);
}

public Lock lockStudy(int studyId) throws StorageEngineException {
Expand Down Expand Up @@ -282,17 +274,14 @@ public <E extends Exception> StudyMetadata updateStudyMetadata(Object study, Upd
throws StorageEngineException, E {
int studyId = getStudyId(study);

Lock lock = lockStudy(studyId);
try {
try (Lock lock = lockStudy(studyId)) {
StudyMetadata sm = getStudyMetadata(studyId);

sm = updater.update(sm);

lock.checkLocked();
unsecureUpdateStudyMetadata(sm);
return sm;
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -557,16 +546,8 @@ public <E extends Exception> ProjectMetadata updateProjectMetadata(UpdateConsume
public <E extends Exception> ProjectMetadata updateProjectMetadata(UpdateFunction<ProjectMetadata, E> function)
throws StorageEngineException, E {
Objects.requireNonNull(function);
Lock lock;
try {
lock = projectDBAdaptor.lockProject(lockDuration, lockTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StorageEngineException("Unable to lock the Project", e);
} catch (TimeoutException e) {
throw new StorageEngineException("Unable to lock the Project", e);
}
try {

try (Lock lock = projectDBAdaptor.lockProject(lockDuration, lockTimeout)) {
ProjectMetadata projectMetadata = getProjectMetadata();
int countersHash = (projectMetadata == null ? Collections.emptyMap() : projectMetadata.getCounters()).hashCode();

Expand All @@ -579,8 +560,6 @@ public <E extends Exception> ProjectMetadata updateProjectMetadata(UpdateFunctio
lock.checkLocked();
projectDBAdaptor.updateProjectMetadata(projectMetadata, updateCounters);
return projectMetadata;
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -673,16 +652,14 @@ public void unsecureUpdateFileMetadata(int studyId, FileMetadata file) {
public <E extends Exception> FileMetadata updateFileMetadata(int studyId, int fileId, UpdateConsumer<FileMetadata, E> update)
throws E, StorageEngineException {
getFileName(studyId, fileId); // Check file exists
Lock lock = fileDBAdaptor.lock(studyId, fileId, lockDuration, lockTimeout);
try {

try (Lock lock = fileDBAdaptor.lock(studyId, fileId, lockDuration, lockTimeout)) {
FileMetadata fileMetadata = getFileMetadata(studyId, fileId);
update.update(fileMetadata);
lock.checkLocked();
unsecureUpdateFileMetadata(studyId, fileMetadata);
fileIdIndexedCache.put(studyId, fileId, fileMetadata.isIndexed());
return fileMetadata;
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -875,15 +852,13 @@ public void unsecureUpdateSampleMetadata(int studyId, SampleMetadata sample) {
public <E extends Exception> SampleMetadata updateSampleMetadata(int studyId, int sampleId, UpdateConsumer<SampleMetadata, E> consumer)
throws E, StorageEngineException {
getSampleName(studyId, sampleId); // Check sample exists
Lock lock = sampleDBAdaptor.lock(studyId, sampleId, lockDuration, lockTimeout);
try {

try (Lock lock = sampleDBAdaptor.lock(studyId, sampleId, lockDuration, lockTimeout)) {
SampleMetadata sample = getSampleMetadata(studyId, sampleId);
sample = consumer.toFunction().update(sample);
lock.checkLocked();
unsecureUpdateSampleMetadata(studyId, sample);
return sample;
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -1054,15 +1029,12 @@ public void unsecureUpdateCohortMetadata(int studyId, CohortMetadata cohort) {
public <E extends Exception> CohortMetadata updateCohortMetadata(int studyId, int cohortId, UpdateConsumer<CohortMetadata, E> update)
throws E, StorageEngineException {
getCohortName(studyId, cohortId); // Check cohort exists
Lock lock = cohortDBAdaptor.lock(studyId, cohortId, lockDuration, lockTimeout);
try {
try (Lock lock = cohortDBAdaptor.lock(studyId, cohortId, lockDuration, lockTimeout)) {
CohortMetadata cohortMetadata = getCohortMetadata(studyId, cohortId);
update.update(cohortMetadata);
lock.checkLocked();
unsecureUpdateCohortMetadata(studyId, cohortMetadata);
return cohortMetadata;
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -1190,13 +1162,19 @@ private CohortMetadata updateCohortSamples(int studyId, String cohortName, Colle
for (Integer sampleId : sampleIds) {
Integer finalCohortId = cohortId;
if (secondaryIndexCohort) {
updateSampleMetadata(studyId, sampleId, sampleMetadata -> {
sampleMetadata.addSecondaryIndexCohort(finalCohortId);
});
if (!getSampleMetadata(studyId, sampleId).getSecondaryIndexCohorts().contains(finalCohortId)) {
// Avoid unnecessary updates
updateSampleMetadata(studyId, sampleId, sampleMetadata -> {
sampleMetadata.addSecondaryIndexCohort(finalCohortId);
});
}
} else {
updateSampleMetadata(studyId, sampleId, sampleMetadata -> {
sampleMetadata.addCohort(finalCohortId);
});
if (!getSampleMetadata(studyId, sampleId).getCohorts().contains(finalCohortId)) {
// Avoid unnecessary updates
updateSampleMetadata(studyId, sampleId, sampleMetadata -> {
sampleMetadata.addCohort(finalCohortId);
});
}
}
}

Expand All @@ -1209,13 +1187,19 @@ private CohortMetadata updateCohortSamples(int studyId, String cohortName, Colle
Integer finalCohortId = cohortId;
if (!sampleIds.contains(sampleFromCohort)) {
if (secondaryIndexCohort) {
updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> {
sampleMetadata.getSecondaryIndexCohorts().remove(finalCohortId);
});
if (getSampleMetadata(studyId, sampleFromCohort).getSecondaryIndexCohorts().contains(finalCohortId)) {
// Avoid unnecessary updates
updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> {
sampleMetadata.getSecondaryIndexCohorts().remove(finalCohortId);
});
}
} else {
updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> {
sampleMetadata.getCohorts().remove(finalCohortId);
});
if (getSampleMetadata(studyId, sampleFromCohort).getCohorts().contains(finalCohortId)) {
// Avoid unnecessary updates
updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> {
sampleMetadata.getCohorts().remove(finalCohortId);
});
}
}
}
}
Expand Down Expand Up @@ -1326,15 +1310,12 @@ public void unsecureUpdateTask(int studyId, TaskMetadata task) throws StorageEng
public <E extends Exception> TaskMetadata updateTask(int studyId, int taskId, UpdateConsumer<TaskMetadata, E> consumer)
throws E, StorageEngineException {
getTask(studyId, taskId); // Check task exists
Lock lock = taskDBAdaptor.lock(studyId, taskId, lockDuration, lockTimeout);
try {
try (Lock lock = taskDBAdaptor.lock(studyId, taskId, lockDuration, lockTimeout)) {
TaskMetadata task = getTask(studyId, taskId);
consumer.update(task);
lock.checkLocked();
unsecureUpdateTask(studyId, task);
return task;
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.opencb.opencga.storage.core.metadata.models.ProjectMetadata;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* Created on 02/05/18.
Expand All @@ -17,14 +16,12 @@
public interface ProjectMetadataAdaptor extends AutoCloseable {

default Lock lockProject(long lockDuration, long timeout)
throws InterruptedException, TimeoutException, StorageEngineException {
throws StorageEngineException {
return lockProject(lockDuration, timeout, null);
}

Lock lockProject(long lockDuration, long timeout, String lockName)
throws InterruptedException, TimeoutException, StorageEngineException;

void unLockProject(long lockId) throws StorageEngineException;
throws StorageEngineException;

DataResult<ProjectMetadata> getProjectMetadata();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public enum VariantStorageOptions implements ConfigurationOption {

INDEX_SEARCH("indexSearch", false), // Build secondary indexes using search engine.

METADATA_LOCK_DURATION("metadata.lock.duration", 5000),
METADATA_LOCK_TIMEOUT("metadata.lock.timeout", 60000),
METADATA_LOCK_DURATION("metadata.lock.duration", 60000),
METADATA_LOCK_TIMEOUT("metadata.lock.timeout", 600000),
METADATA_LOAD_BATCH_SIZE("metadata.load.batchSize", 10),
METADATA_LOAD_THREADS("metadata.load.numThreads", 4),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.opencb.opencga.storage.core.exceptions.StorageEngineException;
import org.opencb.opencga.storage.core.metadata.models.StudyMetadata;
import org.opencb.opencga.storage.core.metadata.models.TaskMetadata;
import org.opencb.opencga.storage.core.variant.VariantStorageBaseTest;
import org.opencb.opencga.storage.core.variant.VariantStorageTest;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -103,4 +105,36 @@ public List<String> getTasks(StudyMetadata study, List<TaskMetadata.Status> stat
.map(TaskMetadata::getName)
.collect(Collectors.toList());
}

@Test
public void testAddSampleToCohort() throws Exception {
StudyMetadata study = metadataManager.createStudy("study");

metadataManager.registerCohort(study.getName(), "cohort1", Collections.emptyList());

int numSamples = 100;
List<Integer> sampleIds = new ArrayList<>(numSamples);
for (int i = 0; i < numSamples; i++) {
sampleIds.add(metadataManager.registerSample(study.getId(), null, "sample_" + i));
}

metadataManager.addSamplesToCohort(study.getId(), "cohort1", sampleIds.subList(0, 10));
VariantStorageMetadataManager metadataManager = Mockito.spy(this.metadataManager);
metadataManager.addSamplesToCohort(study.getId(), "cohort1", sampleIds.subList(0, 11));
Mockito.verify(metadataManager, Mockito.times(1)).updateSampleMetadata(Mockito.anyInt(), Mockito.anyInt(), Mockito.any());

Mockito.reset(metadataManager);
metadataManager.addSamplesToCohort(study.getId(), "cohort1", sampleIds.subList(0, 11));
Mockito.verify(metadataManager, Mockito.never()).updateSampleMetadata(Mockito.anyInt(), Mockito.anyInt(), Mockito.any());
metadataManager.setSamplesToCohort(study.getId(), "cohort1", sampleIds.subList(0, 11));
Mockito.verify(metadataManager, Mockito.never()).updateSampleMetadata(Mockito.anyInt(), Mockito.anyInt(), Mockito.any());

metadataManager.setSamplesToCohort(study.getId(), "cohort1", sampleIds.subList(0, 12));
Mockito.verify(metadataManager, Mockito.times(1)).updateSampleMetadata(Mockito.anyInt(), Mockito.anyInt(), Mockito.any());

Mockito.reset(metadataManager);
metadataManager.setSamplesToCohort(study.getId(), "cohort1", sampleIds.subList(0, 6));
Mockito.verify(metadataManager, Mockito.times(6)).updateSampleMetadata(Mockito.anyInt(), Mockito.anyInt(), Mockito.any());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -29,7 +28,7 @@ public class DummyProjectMetadataAdaptor implements ProjectMetadataAdaptor {
private static Map<String, Integer> counters = new HashMap<>();

@Override
public Lock lockProject(long lockDuration, long timeout, String lockName) throws InterruptedException, TimeoutException {
public Lock lockProject(long lockDuration, long timeout, String lockName) {
return new Lock(0) {
@Override
public void unlock0() {
Expand All @@ -43,10 +42,6 @@ public void refresh() {
};
}

@Override
public void unLockProject(long lockId) {
}

@Override
public synchronized DataResult<ProjectMetadata> getProjectMetadata() {
final DataResult<ProjectMetadata> result = new DataResult<>();
Expand Down
Loading

0 comments on commit 63dec4c

Please sign in to comment.