diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java index 89d86a63a89..a27f908173b 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java @@ -56,7 +56,6 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.BiPredicate; @@ -191,14 +190,7 @@ public ObjectMap getConfiguration() { public Lock lockGlobal(long lockDuration, long timeout, String lockName) throws StorageEngineException { - try { - return projectDBAdaptor.lockProject(lockDuration, timeout, lockName); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new StorageEngineException("Unable to lock the Project", e); - } catch (TimeoutException e) { - throw new StorageEngineException("Unable to lock the Project", e); - } + return projectDBAdaptor.lockProject(lockDuration, timeout, lockName); } public Lock lockStudy(int studyId) throws StorageEngineException { @@ -282,8 +274,7 @@ public StudyMetadata updateStudyMetadata(Object study, Upd throws StorageEngineException, E { int studyId = getStudyId(study); - Lock lock = lockStudy(studyId); - try { + try (Lock lock = lockStudy(studyId)) { StudyMetadata sm = getStudyMetadata(studyId); sm = updater.update(sm); @@ -291,8 +282,6 @@ public StudyMetadata updateStudyMetadata(Object study, Upd lock.checkLocked(); unsecureUpdateStudyMetadata(sm); return sm; - } finally { - lock.unlock(); } } @@ -557,16 +546,8 @@ public ProjectMetadata updateProjectMetadata(UpdateConsume public ProjectMetadata updateProjectMetadata(UpdateFunction function) throws StorageEngineException, E { Objects.requireNonNull(function); - Lock lock; - try { - lock = projectDBAdaptor.lockProject(lockDuration, lockTimeout); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new StorageEngineException("Unable to lock the Project", e); - } catch (TimeoutException e) { - throw new StorageEngineException("Unable to lock the Project", e); - } - try { + + try (Lock lock = projectDBAdaptor.lockProject(lockDuration, lockTimeout)) { ProjectMetadata projectMetadata = getProjectMetadata(); int countersHash = (projectMetadata == null ? Collections.emptyMap() : projectMetadata.getCounters()).hashCode(); @@ -579,8 +560,6 @@ public ProjectMetadata updateProjectMetadata(UpdateFunctio lock.checkLocked(); projectDBAdaptor.updateProjectMetadata(projectMetadata, updateCounters); return projectMetadata; - } finally { - lock.unlock(); } } @@ -673,16 +652,14 @@ public void unsecureUpdateFileMetadata(int studyId, FileMetadata file) { public FileMetadata updateFileMetadata(int studyId, int fileId, UpdateConsumer update) throws E, StorageEngineException { getFileName(studyId, fileId); // Check file exists - Lock lock = fileDBAdaptor.lock(studyId, fileId, lockDuration, lockTimeout); - try { + + try (Lock lock = fileDBAdaptor.lock(studyId, fileId, lockDuration, lockTimeout)) { FileMetadata fileMetadata = getFileMetadata(studyId, fileId); update.update(fileMetadata); lock.checkLocked(); unsecureUpdateFileMetadata(studyId, fileMetadata); fileIdIndexedCache.put(studyId, fileId, fileMetadata.isIndexed()); return fileMetadata; - } finally { - lock.unlock(); } } @@ -875,15 +852,13 @@ public void unsecureUpdateSampleMetadata(int studyId, SampleMetadata sample) { public SampleMetadata updateSampleMetadata(int studyId, int sampleId, UpdateConsumer consumer) throws E, StorageEngineException { getSampleName(studyId, sampleId); // Check sample exists - Lock lock = sampleDBAdaptor.lock(studyId, sampleId, lockDuration, lockTimeout); - try { + + try (Lock lock = sampleDBAdaptor.lock(studyId, sampleId, lockDuration, lockTimeout)) { SampleMetadata sample = getSampleMetadata(studyId, sampleId); sample = consumer.toFunction().update(sample); lock.checkLocked(); unsecureUpdateSampleMetadata(studyId, sample); return sample; - } finally { - lock.unlock(); } } @@ -1054,15 +1029,12 @@ public void unsecureUpdateCohortMetadata(int studyId, CohortMetadata cohort) { public CohortMetadata updateCohortMetadata(int studyId, int cohortId, UpdateConsumer update) throws E, StorageEngineException { getCohortName(studyId, cohortId); // Check cohort exists - Lock lock = cohortDBAdaptor.lock(studyId, cohortId, lockDuration, lockTimeout); - try { + try (Lock lock = cohortDBAdaptor.lock(studyId, cohortId, lockDuration, lockTimeout)) { CohortMetadata cohortMetadata = getCohortMetadata(studyId, cohortId); update.update(cohortMetadata); lock.checkLocked(); unsecureUpdateCohortMetadata(studyId, cohortMetadata); return cohortMetadata; - } finally { - lock.unlock(); } } @@ -1190,13 +1162,19 @@ private CohortMetadata updateCohortSamples(int studyId, String cohortName, Colle for (Integer sampleId : sampleIds) { Integer finalCohortId = cohortId; if (secondaryIndexCohort) { - updateSampleMetadata(studyId, sampleId, sampleMetadata -> { - sampleMetadata.addSecondaryIndexCohort(finalCohortId); - }); + if (!getSampleMetadata(studyId, sampleId).getSecondaryIndexCohorts().contains(finalCohortId)) { + // Avoid unnecessary updates + updateSampleMetadata(studyId, sampleId, sampleMetadata -> { + sampleMetadata.addSecondaryIndexCohort(finalCohortId); + }); + } } else { - updateSampleMetadata(studyId, sampleId, sampleMetadata -> { - sampleMetadata.addCohort(finalCohortId); - }); + if (!getSampleMetadata(studyId, sampleId).getCohorts().contains(finalCohortId)) { + // Avoid unnecessary updates + updateSampleMetadata(studyId, sampleId, sampleMetadata -> { + sampleMetadata.addCohort(finalCohortId); + }); + } } } @@ -1209,13 +1187,19 @@ private CohortMetadata updateCohortSamples(int studyId, String cohortName, Colle Integer finalCohortId = cohortId; if (!sampleIds.contains(sampleFromCohort)) { if (secondaryIndexCohort) { - updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> { - sampleMetadata.getSecondaryIndexCohorts().remove(finalCohortId); - }); + if (getSampleMetadata(studyId, sampleFromCohort).getSecondaryIndexCohorts().contains(finalCohortId)) { + // Avoid unnecessary updates + updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> { + sampleMetadata.getSecondaryIndexCohorts().remove(finalCohortId); + }); + } } else { - updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> { - sampleMetadata.getCohorts().remove(finalCohortId); - }); + if (getSampleMetadata(studyId, sampleFromCohort).getCohorts().contains(finalCohortId)) { + // Avoid unnecessary updates + updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> { + sampleMetadata.getCohorts().remove(finalCohortId); + }); + } } } } @@ -1326,15 +1310,12 @@ public void unsecureUpdateTask(int studyId, TaskMetadata task) throws StorageEng public TaskMetadata updateTask(int studyId, int taskId, UpdateConsumer consumer) throws E, StorageEngineException { getTask(studyId, taskId); // Check task exists - Lock lock = taskDBAdaptor.lock(studyId, taskId, lockDuration, lockTimeout); - try { + try (Lock lock = taskDBAdaptor.lock(studyId, taskId, lockDuration, lockTimeout)) { TaskMetadata task = getTask(studyId, taskId); consumer.update(task); lock.checkLocked(); unsecureUpdateTask(studyId, task); return task; - } finally { - lock.unlock(); } } diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/adaptors/ProjectMetadataAdaptor.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/adaptors/ProjectMetadataAdaptor.java index dc88a85d338..3045ee8f3cd 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/adaptors/ProjectMetadataAdaptor.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/adaptors/ProjectMetadataAdaptor.java @@ -7,7 +7,6 @@ import org.opencb.opencga.storage.core.metadata.models.ProjectMetadata; import java.io.IOException; -import java.util.concurrent.TimeoutException; /** * Created on 02/05/18. @@ -17,14 +16,12 @@ public interface ProjectMetadataAdaptor extends AutoCloseable { default Lock lockProject(long lockDuration, long timeout) - throws InterruptedException, TimeoutException, StorageEngineException { + throws StorageEngineException { return lockProject(lockDuration, timeout, null); } Lock lockProject(long lockDuration, long timeout, String lockName) - throws InterruptedException, TimeoutException, StorageEngineException; - - void unLockProject(long lockId) throws StorageEngineException; + throws StorageEngineException; DataResult getProjectMetadata(); diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageOptions.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageOptions.java index caefbb5260e..bc1be055466 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageOptions.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageOptions.java @@ -83,8 +83,8 @@ public enum VariantStorageOptions implements ConfigurationOption { INDEX_SEARCH("indexSearch", false), // Build secondary indexes using search engine. - METADATA_LOCK_DURATION("metadata.lock.duration", 5000), - METADATA_LOCK_TIMEOUT("metadata.lock.timeout", 60000), + METADATA_LOCK_DURATION("metadata.lock.duration", 60000), + METADATA_LOCK_TIMEOUT("metadata.lock.timeout", 600000), METADATA_LOAD_BATCH_SIZE("metadata.load.batchSize", 10), METADATA_LOAD_THREADS("metadata.load.numThreads", 4), diff --git a/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManagerTest.java b/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManagerTest.java index 1b3311958f4..71ea72de3c0 100644 --- a/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManagerTest.java +++ b/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManagerTest.java @@ -4,12 +4,14 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.opencb.opencga.storage.core.exceptions.StorageEngineException; import org.opencb.opencga.storage.core.metadata.models.StudyMetadata; import org.opencb.opencga.storage.core.metadata.models.TaskMetadata; import org.opencb.opencga.storage.core.variant.VariantStorageBaseTest; import org.opencb.opencga.storage.core.variant.VariantStorageTest; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -103,4 +105,36 @@ public List getTasks(StudyMetadata study, List stat .map(TaskMetadata::getName) .collect(Collectors.toList()); } + + @Test + public void testAddSampleToCohort() throws Exception { + StudyMetadata study = metadataManager.createStudy("study"); + + metadataManager.registerCohort(study.getName(), "cohort1", Collections.emptyList()); + + int numSamples = 100; + List sampleIds = new ArrayList<>(numSamples); + for (int i = 0; i < numSamples; i++) { + sampleIds.add(metadataManager.registerSample(study.getId(), null, "sample_" + i)); + } + + metadataManager.addSamplesToCohort(study.getId(), "cohort1", sampleIds.subList(0, 10)); + VariantStorageMetadataManager metadataManager = Mockito.spy(this.metadataManager); + metadataManager.addSamplesToCohort(study.getId(), "cohort1", sampleIds.subList(0, 11)); + Mockito.verify(metadataManager, Mockito.times(1)).updateSampleMetadata(Mockito.anyInt(), Mockito.anyInt(), Mockito.any()); + + Mockito.reset(metadataManager); + metadataManager.addSamplesToCohort(study.getId(), "cohort1", sampleIds.subList(0, 11)); + Mockito.verify(metadataManager, Mockito.never()).updateSampleMetadata(Mockito.anyInt(), Mockito.anyInt(), Mockito.any()); + metadataManager.setSamplesToCohort(study.getId(), "cohort1", sampleIds.subList(0, 11)); + Mockito.verify(metadataManager, Mockito.never()).updateSampleMetadata(Mockito.anyInt(), Mockito.anyInt(), Mockito.any()); + + metadataManager.setSamplesToCohort(study.getId(), "cohort1", sampleIds.subList(0, 12)); + Mockito.verify(metadataManager, Mockito.times(1)).updateSampleMetadata(Mockito.anyInt(), Mockito.anyInt(), Mockito.any()); + + Mockito.reset(metadataManager); + metadataManager.setSamplesToCohort(study.getId(), "cohort1", sampleIds.subList(0, 6)); + Mockito.verify(metadataManager, Mockito.times(6)).updateSampleMetadata(Mockito.anyInt(), Mockito.anyInt(), Mockito.any()); + } + } \ No newline at end of file diff --git a/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/dummy/DummyProjectMetadataAdaptor.java b/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/dummy/DummyProjectMetadataAdaptor.java index bed8d419666..d223180d9d1 100644 --- a/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/dummy/DummyProjectMetadataAdaptor.java +++ b/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/dummy/DummyProjectMetadataAdaptor.java @@ -16,7 +16,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -29,7 +28,7 @@ public class DummyProjectMetadataAdaptor implements ProjectMetadataAdaptor { private static Map counters = new HashMap<>(); @Override - public Lock lockProject(long lockDuration, long timeout, String lockName) throws InterruptedException, TimeoutException { + public Lock lockProject(long lockDuration, long timeout, String lockName) { return new Lock(0) { @Override public void unlock0() { @@ -43,10 +42,6 @@ public void refresh() { }; } - @Override - public void unLockProject(long lockId) { - } - @Override public synchronized DataResult getProjectMetadata() { final DataResult result = new DataResult<>(); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/HBaseLockManager.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/HBaseLockManager.java index 6fc7609b998..b1f827b2baf 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/HBaseLockManager.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/HBaseLockManager.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.solr.common.StringUtils; import org.opencb.opencga.core.common.TimeUtils; import org.opencb.opencga.storage.core.metadata.models.Lock; import org.slf4j.Logger; @@ -134,18 +133,17 @@ public Lock lock(byte[] row, byte[] column, long lockDuration, long timeout) // Minimum lock duration of 100ms lockDuration = Math.max(lockDuration, 100); - byte[] lockValue; - String readToken = ""; + LockToken lockToken; StopWatch stopWatch = new StopWatch(); stopWatch.start(); do { - lockValue = readLockValue(row, column); + lockToken = readLockToken(row, column); // If the lock is taken, wait - while (isLockTaken(lockValue)) { + while (lockToken.isTaken()) { Thread.sleep(100); - lockValue = readLockValue(row, column); + lockToken = readLockToken(row, column); //Check if the lock is still valid if (stopWatch.getTime() > timeout) { throw new TimeoutException("Unable to get the lock"); @@ -157,19 +155,19 @@ public Lock lock(byte[] row, byte[] column, long lockDuration, long timeout) } // Try to lock cell - if (tryToPutToken(token, lockDuration, row, column, lockValue, CURRENT)) { - readToken = parseValidLockToken(readLockValue(row, column)); + if (tryToPutToken(token, lockDuration, row, column, lockToken, CURRENT)) { + lockToken = readLockToken(row, column); } - // You win the lock if the first available lock is yours. - } while (!token.equals(readToken)); + // You win the lock if you manage to write your lock. + } while (!lockToken.equals(token)); - boolean prevTokenExpired = lockValue != null && lockValue.length > 0; + boolean prevTokenExpired = !lockToken.isEmpty() && lockToken.isExpired(); boolean slowQuery = stopWatch.getTime() > 60000; if (prevTokenExpired || slowQuery) { StringBuilder msg = new StringBuilder("Lock column '").append(Bytes.toStringBinary(column)).append("'"); if (prevTokenExpired) { - long expireDate = parseExpireDate(lockValue); + long expireDate = lockToken.getExpireDate(); msg.append(". Previous token expired ") .append(TimeUtils.durationToString(System.currentTimeMillis() - expireDate)) .append(" ago"); @@ -181,105 +179,65 @@ public Lock lock(byte[] row, byte[] column, long lockDuration, long timeout) logger.warn(msg.toString()); } - long tokenHash = token.hashCode(); - logger.debug("Won the lock with token " + token + " (" + tokenHash + ")"); - - long finalLockDuration = lockDuration; - return new Lock(threadPool, (int) (finalLockDuration / 4), tokenHash) { - @Override - public void unlock0() { - try { - HBaseLockManager.this.unlock(row, column, tokenHash); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } + logger.debug("Won the lock with token " + token + " (" + token.hashCode() + ")"); - @Override - public synchronized void refresh() throws IOException { - HBaseLockManager.this.refresh(row, column, tokenHash, finalLockDuration); - } - }; + return new HBaseLock(lockDuration, token, row, column); } - /** - * Refreshes the lock. - * - * @param column Column to find the lock cell - * @param lockToken Lock token - * @param lockDuration Duration un milliseconds of the token. After this time the token is expired. - * @throws IOException if there is an error writing or reading from HBase. - */ - public void refresh(byte[] column, long lockToken, int lockDuration) throws IOException { - refresh(defaultRow, column, lockToken, lockDuration); - } - - /** * Refreshes the lock. * * @param row Row to find the lock cell * @param column Column to find the lock cell - * @param lockToken Lock token + * @param lockTokenHash Lock token * @param lockDuration Duration un milliseconds of the token. After this time the token is expired. * @throws IOException if there is an error writing or reading from HBase. */ - public void refresh(byte[] row, byte[] column, long lockToken, long lockDuration) throws IOException { + private void refresh(byte[] row, byte[] column, long lockTokenHash, long lockDuration) throws IOException { // Check token is valid - byte[] lockValue = readLockValue(row, column); - String currentLockToken = parseValidLockToken(lockValue); - if (currentLockToken == null || currentLockToken.hashCode() != lockToken) { - throw IllegalLockStatusException.inconsistentLock(row, column, lockToken, currentLockToken, lockValue); + LockToken currentLockToken = readLockToken(row, column); + if (currentLockToken.isEmpty() || currentLockToken.isExpired() || !currentLockToken.equals(lockTokenHash)) { + throw IllegalLockStatusException.inconsistentLock(row, column, lockTokenHash, currentLockToken); + } + if (currentLockToken.getRemainingTime() < lockDuration / 2) { + logger.warn("Refreshing lock with less than half of the duration remaining. Expected duration: {} Remaining time: {}ms", + lockDuration, + currentLockToken.getRemainingTime()); } - if (!tryToPutToken(currentLockToken, lockDuration, row, column, lockValue, REFRESH)) { + if (!tryToPutToken(currentLockToken.token, lockDuration, row, column, currentLockToken, REFRESH)) { // Error refreshing! - lockValue = readLockValue(row, column); - String newLockToken = parseValidLockToken(lockValue); + LockToken newLockToken = readLockToken(row, column); - logger.error("Current lock token:" + currentLockToken); - logger.error("New lock token: " + newLockToken); - throw IllegalLockStatusException.inconsistentLock(row, column, lockToken, currentLockToken, lockValue); + logger.error("Current lock token:" + currentLockToken.token); + logger.error("New lock token: " + newLockToken.token); + throw IllegalLockStatusException.inconsistentLock(row, column, lockTokenHash, currentLockToken); } } - /** - * Releases the lock. - * - * @param column Column to find the lock cell - * @param lockToken Lock token - * @throws IOException if there is an error writing or reading from HBase. - * @throws IllegalLockStatusException if the lockToken does not match with the current lockToken - */ - public void unlock(byte[] column, long lockToken) throws IOException, IllegalLockStatusException { - unlock(defaultRow, column, lockToken); - } - /** * Releases the lock. * * @param row Row to find the lock cell * @param column Column to find the lock cell - * @param lockToken Lock token + * @param lockTokenHash Lock token * @throws IOException if there is an error writing or reading from HBase. * @throws IllegalLockStatusException if the lockToken does not match with the current lockToken */ - public void unlock(byte[] row, byte[] column, long lockToken) throws IOException, IllegalLockStatusException { - byte[] lockValue = readLockValue(row, column); + private void unlock(byte[] row, byte[] column, long lockTokenHash) throws IOException, IllegalLockStatusException { + LockToken currentToken = readLockToken(row, column); - String currentToken = parseValidLockToken(lockValue); - - if (currentToken == null || currentToken.hashCode() != lockToken) { - throw IllegalLockStatusException.inconsistentLock(row, column, lockToken, currentToken, lockValue); + if (currentToken.isEmpty() || currentToken.isExpired() || !currentToken.equals(lockTokenHash)) { + throw IllegalLockStatusException.inconsistentLock(row, column, lockTokenHash, currentToken); } - logger.debug("Unlock lock with token " + lockToken); - if (!clearLock(row, column, lockValue)) { - throw IllegalLockStatusException.inconsistentLock(row, column, lockToken, currentToken, lockValue); + logger.debug("Unlock lock with token " + lockTokenHash); + if (!clearLock(row, column, currentToken)) { + throw IllegalLockStatusException.inconsistentLock(row, column, lockTokenHash, currentToken); } } - private Boolean tryToPutToken(String token, long lockDuration, byte[] row, byte[] qualifier, byte[] lockValue, String type) + private Boolean tryToPutToken(String token, long lockDuration, byte[] row, byte[] qualifier, LockToken currentLock, String type) throws IOException { return hbaseManager.act(tableName, table -> { Put put = new Put(row) @@ -288,30 +246,31 @@ private Boolean tryToPutToken(String token, long lockDuration, byte[] row, byte[ + token + LOCK_EXPIRING_DATE_SEPARATOR_STR + (System.currentTimeMillis() + lockDuration))); - return table.checkAndPut(row, columnFamily, qualifier, CompareFilter.CompareOp.EQUAL, lockValue, put); + return table.checkAndPut(row, columnFamily, qualifier, CompareFilter.CompareOp.EQUAL, currentLock.lockValue, put); }); } - private boolean clearLock(byte[] row, byte[] qualifier, byte[] lockValue) throws IOException { + private boolean clearLock(byte[] row, byte[] qualifier, LockToken lockToken) throws IOException { return hbaseManager.act(tableName, table -> { Put put = new Put(row) .addColumn(columnFamily, qualifier, Bytes.toBytes("")); - return table.checkAndPut(row, columnFamily, qualifier, CompareFilter.CompareOp.EQUAL, lockValue, put); + return table.checkAndPut(row, columnFamily, qualifier, CompareFilter.CompareOp.EQUAL, lockToken.lockValue, put); }); } /** - * Parse non-expired lock token. + * Parse lock token. * @param lockValue lock values - * @return Current lock token, if any + * @return Current lock token. */ - protected static String parseValidLockToken(byte[] lockValue) { + protected static LockToken parseLockToken(byte[] lockValue) { if (lockValue == null || lockValue.length == 0) { - return null; + return new LockToken(); } int idx1 = Bytes.indexOf(lockValue, LOCK_PREFIX_SEPARATOR_BYTE); int idx2 = Bytes.indexOf(lockValue, LOCK_EXPIRING_DATE_SEPARATOR_BYTE); + String type = Bytes.toString(lockValue, 0, idx1); String token = Bytes.toString(lockValue, idx1 + 1, idx2 - idx1 - 1); long expireDate; try { @@ -319,45 +278,82 @@ protected static String parseValidLockToken(byte[] lockValue) { } catch (NumberFormatException e) { // Deprecated token. Assume expired token if (Bytes.contains(lockValue, DEPRECATED_LOCK_SEPARATOR_BYTE)) { - return null; + return new LockToken(); } throw e; } + return new LockToken(lockValue, type, token, expireDate); + } + + protected static final class LockToken { + protected final byte[] lockValue; + protected final String type; + protected final String token; + protected final Long expireDate; + + private LockToken() { + this.lockValue = new byte[0]; + this.type = null; + this.token = null; + this.expireDate = null; + } + + private LockToken(byte[] lockValue, String type, String token, long expireDate) { + this.lockValue = lockValue; + this.type = type; + this.token = token; + this.expireDate = expireDate; + } + + /** + * A lock is taken if there is any lockValue, and + * the token has not expired. + * + * @return if the lock is taken + */ + public boolean isTaken() { + return token != null && !isExpired(); + } + + public boolean isExpired() { + return expireDate != null && expireDate < System.currentTimeMillis(); + } - if (isExpired(expireDate)) { - return null; - } else { + public boolean isEmpty() { + return token == null; + } + + public boolean equals(String token) { + return !isEmpty() && this.token.equals(token); + } + + public boolean equals(long tokenHash) { + return !isEmpty() && this.token.hashCode() == tokenHash; + } + + public byte[] getLockValue() { + return lockValue; + } + + public String getType() { + return type; + } + + public String getToken() { return token; } - } - protected static long parseExpireDate(byte[] lockValue) { - int idx2 = Bytes.indexOf(lockValue, LOCK_EXPIRING_DATE_SEPARATOR_BYTE); - try { - return Long.parseLong(Bytes.toString(lockValue, idx2 + 1)); - } catch (NumberFormatException e) { - // Deprecated token. Assume expired token - if (Bytes.contains(lockValue, DEPRECATED_LOCK_SEPARATOR_BYTE)) { - return -1; - } - throw e; + public Long getExpireDate() { + return expireDate; } - } - /** - * A lock is taken if there is any lockValue in the array, and - * the token has not expired. - * - * - * @param lockValue lock values - * @return if the lock is taken - */ - protected static boolean isLockTaken(byte[] lockValue) { - return parseValidLockToken(lockValue) != null; + public long getRemainingTime() { + return expireDate == null ? 0 : expireDate - System.currentTimeMillis(); + } } - private static boolean isExpired(long expireDate) { - return expireDate < System.currentTimeMillis(); + private LockToken readLockToken(byte[] row, byte[] qualifier) throws IOException { + return parseLockToken(readLockValue(row, qualifier)); } private byte[] readLockValue(byte[] row, byte[] qualifier) throws IOException { @@ -380,18 +376,22 @@ public IllegalLockStatusException(String s) { super(s); } - public static IllegalLockStatusException inconsistentLock(byte[] row, byte[] column, long lockToken, String currentLock, - byte[] lockValue) { - if (StringUtils.isEmpty(currentLock)) { - return new IllegalLockStatusException("Inconsistent lock status. You don't have the lock! " + private static IllegalLockStatusException inconsistentLock(byte[] row, byte[] column, long lockTokenHash, LockToken currentLock) { + if (currentLock.isEmpty()) { + return new IllegalLockStatusException("Inconsistent lock status. You don't have the lock! Empty lock. " + + "Row: '" + Bytes.toStringBinary(row) + "', " + + "column: '" + Bytes.toStringBinary(column) + "'. " + + "Lock: " + Bytes.toString(currentLock.lockValue) + "."); + } else if (currentLock.isExpired()) { + return new IllegalLockStatusException("Inconsistent lock status. You don't have the lock! Expired lock. " + "Row: '" + Bytes.toStringBinary(row) + "', " + "column: '" + Bytes.toStringBinary(column) + "'. " - + "Lock: " + Bytes.toString(lockValue) + "."); + + "Lock: " + Bytes.toString(currentLock.lockValue) + "."); } else { - return new IllegalLockStatusException("Inconsistent lock status. You don't have the lock! " + return new IllegalLockStatusException("Inconsistent lock status. You don't have the lock! Lock is taken. " + "Row: '" + Bytes.toStringBinary(row) + "', " + "column: '" + Bytes.toStringBinary(column) + "'. " - + lockToken + " != " + currentLock.hashCode() + " from " + Bytes.toString(lockValue)); + + lockTokenHash + " != " + currentLock.token.hashCode() + " from " + Bytes.toString(currentLock.lockValue)); } } } @@ -403,4 +403,38 @@ protected static ExecutorService buildThreadPool() { .build()); } + private final class HBaseLock extends Lock { + private final long lockDuration; + private final String token; + private final long tokenHash; + private final byte[] row; + private final byte[] column; + + private HBaseLock(long lockDuration, String token, byte[] row, byte[] column) { + super(HBaseLockManager.threadPool, (int) (lockDuration / 4), token.hashCode()); + this.lockDuration = lockDuration; + this.token = token; + this.tokenHash = token.hashCode(); + this.row = row; + this.column = column; + } + + @Override + public void unlock0() { + try { + synchronized (this) { + HBaseLockManager.this.unlock(row, column, tokenHash); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void refresh() throws IOException { + synchronized (this) { + HBaseLockManager.this.refresh(row, column, tokenHash, lockDuration); + } + } + } } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/metadata/AbstractHBaseDBAdaptor.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/metadata/AbstractHBaseDBAdaptor.java index 3f6a23c5abf..fbdfdf920ec 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/metadata/AbstractHBaseDBAdaptor.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/metadata/AbstractHBaseDBAdaptor.java @@ -279,14 +279,5 @@ protected Lock lockToken(byte[] rowKey, byte[] lockName, long lockDuration, long } } - protected void unLock(byte[] rowKey, byte[] lockName, long token) { - try { - this.lock.unlock(rowKey, lockName, token); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/metadata/HBaseProjectMetadataDBAdaptor.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/metadata/HBaseProjectMetadataDBAdaptor.java index 85a1d0cec0c..6ddae38be65 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/metadata/HBaseProjectMetadataDBAdaptor.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/metadata/HBaseProjectMetadataDBAdaptor.java @@ -49,20 +49,14 @@ public HBaseProjectMetadataDBAdaptor(HBaseManager hBaseManager, String metaTable @Override public Lock lockProject(long lockDuration, long timeout, String lockName) - throws InterruptedException, TimeoutException, StorageEngineException { + throws StorageEngineException { try { ensureTableExists(); return lock.lock(getProjectRowKey(), getLockColumn(lockName), lockDuration, timeout); - } catch (IOException e) { - throw new StorageEngineException("Error locking project in HBase", e); - } - } - - @Override - public void unLockProject(long lockId) throws StorageEngineException { - try { - lock.unlock(getProjectRowKey(), getLockColumn(), lockId); - } catch (IOException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new StorageEngineException("Unable to lock the Project", e); + } catch (IOException | TimeoutException e) { throw new StorageEngineException("Error locking project in HBase", e); } } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/utils/HBaseLockManagerTest.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/utils/HBaseLockManagerTest.java index 0f421081ff6..5943a43519f 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/utils/HBaseLockManagerTest.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/utils/HBaseLockManagerTest.java @@ -214,23 +214,47 @@ public void testLockRefreshExpiredRefresh() throws Exception { @Test public void testGetCurrent() { long e = System.currentTimeMillis() + 1000; - String s; + HBaseLockManager.LockToken s; + + // null token + s = HBaseLockManager.parseLockToken(null); + assertTrue(s.isEmpty()); + assertFalse(s.isTaken()); + assertEquals(null, s.getType()); + assertArrayEquals(new byte[0], s.getLockValue()); + + // Empty token + s = HBaseLockManager.parseLockToken(Bytes.toBytes("")); + assertTrue(s.isEmpty()); + assertFalse(s.isTaken()); + assertEquals(null, s.getType()); + assertArrayEquals(new byte[0], s.getLockValue()); // Expired current token - s = HBaseLockManager.parseValidLockToken(Bytes.toBytes("CURRENT-abc:123")); - assertNull(s); + s = HBaseLockManager.parseLockToken(Bytes.toBytes("CURRENT-abc:123")); + assertFalse(s.isEmpty()); + assertEquals("CURRENT", s.getType()); // Valid current token - s = HBaseLockManager.parseValidLockToken(Bytes.toBytes("CURRENT-abc:" + e)); - assertEquals("abc", s); + s = HBaseLockManager.parseLockToken(Bytes.toBytes("CURRENT-abc:" + e)); + assertEquals("abc", s.token); + assertEquals("CURRENT", s.getType()); + assertFalse(s.isExpired()); + assertTrue(s.isTaken()); // Current expired, first refresh valid - s = HBaseLockManager.parseValidLockToken(Bytes.toBytes("REFRESH-abc:" + e)); - assertEquals("abc", s); + s = HBaseLockManager.parseLockToken(Bytes.toBytes("REFRESH-abc:" + e)); + assertEquals("abc", s.token); + assertEquals("REFRESH", s.getType()); + assertFalse(s.isExpired()); + assertTrue(s.isTaken()); // Expired refresh - s = HBaseLockManager.parseValidLockToken(Bytes.toBytes("REFRESH-abc:200")); - assertNull(s); + s = HBaseLockManager.parseLockToken(Bytes.toBytes("REFRESH-abc:200")); + assertEquals("abc", s.token); + assertEquals("REFRESH", s.getType()); + assertTrue(s.isExpired()); + assertFalse(s.isTaken()); } } \ No newline at end of file