Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner committed Sep 7, 2024
1 parent 41d9e3a commit 433b14b
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.apache.accumulo.core.util.threads.ThreadPools.watchNonCriticalScheduledTask;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.time.Duration;
Expand Down Expand Up @@ -76,6 +77,8 @@
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider;
import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
Expand Down Expand Up @@ -104,6 +107,7 @@
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.Retry.RetryFactory;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.AbstractServer;
Expand Down Expand Up @@ -520,7 +524,7 @@ public void enqueueManagerMessage(ManagerMessage m) {
private static final AutoCloseable NOOP_CLOSEABLE = () -> {};

AutoCloseable acquireRecoveryMemory(TabletMetadata tabletMetadata) {
if (tabletMetadata.getExtent().isMeta() || tabletMetadata.getLogs().isEmpty()) {
if (tabletMetadata.getExtent().isMeta() || tabletMetadata.getLogs().isEmpty() || !needsRecovery(tabletMetadata)) {
return NOOP_CLOSEABLE;
} else {
recoveryLock.lock();
Expand Down Expand Up @@ -1080,6 +1084,37 @@ public void minorCompactionStarted(CommitSession tablet, long lastUpdateSequence
logger.minorCompactionStarted(tablet, lastUpdateSequence, newDataFileLocation, durability);
}

public boolean needsRecovery(TabletMetadata tabletMetadata){
var logEntries = tabletMetadata.getLogs();

if(logEntries.isEmpty()) {
return false;
}

try {
// TODO cache finished marker
List<Path> recoveryDirs = new ArrayList<>();
for (LogEntry entry : logEntries) {
Path recovery = null;
Path finished = RecoveryPath.getRecoveryPath(new Path(entry.getPath()));
finished = SortedLogState.getFinishedMarkerPath(finished);
TabletServer.log.debug("Looking for " + finished);
if (getVolumeManager().exists(finished)) {
recovery = finished.getParent();
}
if (recovery == null) {
throw new IOException(
"Unable to find recovery files for extent " + tabletMetadata.getExtent() + " logEntry: " + entry);
}
recoveryDirs.add(recovery);
}

return logger.needsRecovery(getContext(), tabletMetadata.getExtent(), recoveryDirs, resourceManager.getFileLenCache(), new BasicCacheProvider(resourceManager.getIndexCache(), resourceManager.getDataCache()));
}catch (IOException e){
throw new UncheckedIOException(e);
}
}

public void recover(VolumeManager fs, KeyExtent extent, List<LogEntry> logEntries,
Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException {
List<Path> recoveryDirs = new ArrayList<>();
Expand All @@ -1097,7 +1132,10 @@ public void recover(VolumeManager fs, KeyExtent extent, List<LogEntry> logEntrie
}
recoveryDirs.add(recovery);
}
logger.recover(getContext(), extent, recoveryDirs, tabletFiles, mutationReceiver);
Timer timer = Timer.startNew();
logger.recover(getContext(), extent, recoveryDirs, tabletFiles, mutationReceiver, resourceManager.getFileLenCache(), new BasicCacheProvider(resourceManager.getIndexCache(), resourceManager.getDataCache()));
//logger.recover(getContext(), extent, recoveryDirs, tabletFiles, mutationReceiver, null, null);
log.info("Recovery time {}ms {} ihc:{} irc:{} dhc:{} drc:{}", timer.elapsed(TimeUnit.MILLISECONDS), extent, resourceManager.getIndexCache().getStats().hitCount(), resourceManager.getIndexCache().getStats().requestCount(), resourceManager.getDataCache().getStats().hitCount(), resourceManager.getDataCache().getStats().requestCount());
}

public int createLogId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,21 @@
import java.util.Map.Entry;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;

import com.github.benmanes.caffeine.cache.Cache;
import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
import org.apache.accumulo.core.iterators.IteratorAdapter;
import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.log.SortedLogState;
Expand All @@ -65,11 +69,16 @@ public class RecoveryLogsIterator
private final Iterator<Entry<Key,Value>> iter;
private final CryptoEnvironment env = new CryptoEnvironmentImpl(CryptoEnvironment.Scope.RECOVERY);

public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
LogFileKey end, boolean checkFirstKey) throws IOException {
this(context, recoveryLogDirs, start, end, checkFirstKey, null, null);
}

/**
* Scans the files in each recoveryLogDir over the range [start,end].
*/
public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
LogFileKey end, boolean checkFirstKey) throws IOException {
LogFileKey end, boolean checkFirstKey, Cache<String, Long> fileLenCache, CacheProvider cacheProvider) throws IOException {

List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
fileIters = new ArrayList<>();
Expand All @@ -81,21 +90,28 @@ public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, L

for (Path logDir : recoveryLogDirs) {
LOG.debug("Opening recovery log dir {}", logDir.getName());
Timer timer = Timer.startNew();
SortedSet<UnreferencedTabletFile> logFiles = getFiles(vm, logDir);
var gft = timer.elapsed(TimeUnit.MILLISECONDS);
var fs = vm.getFileSystemByPath(logDir);

// only check the first key once to prevent extra iterator creation and seeking
if (checkFirstKey && !logFiles.isEmpty()) {
validateFirstKey(context, cryptoService, fs, logFiles, logDir);
validateFirstKey(context, cryptoService, fs, logFiles, logDir, fileLenCache, cacheProvider);
}

long oft = 0;

for (UnreferencedTabletFile log : logFiles) {
FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder()
.forFile(log, fs, fs.getConf(), cryptoService)
.withTableConfiguration(context.getConfiguration()).seekToBeginning().build();
timer.restart();
FileSKVIterator fileIter = openLogFile(context, log, cryptoService, fs, fileLenCache, cacheProvider);

if (range != null) {
fileIter.seek(range, Collections.emptySet(), false);
}

oft += timer.elapsed(TimeUnit.MILLISECONDS);

Iterator<Entry<Key,Value>> scanIter = new IteratorAdapter(fileIter);

if (scanIter.hasNext()) {
Expand All @@ -109,6 +125,8 @@ public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, L
fileIter.close();
}
}

LOG.debug("Recovery iter setup times {} gft:{} oft:{}", logDir.getName(), gft, oft);
}
iter = Iterators.mergeSorted(iterators, Entry.comparingByKey());
}
Expand Down Expand Up @@ -171,14 +189,28 @@ private SortedSet<UnreferencedTabletFile> getFiles(VolumeManager fs, Path direct
return logFiles;
}

FileSKVIterator openLogFile(ServerContext context, UnreferencedTabletFile logFile, CryptoService cs, FileSystem fs, Cache<String, Long> fileLenCache, CacheProvider cacheProvider) throws IOException {
var builder = FileOperations.getInstance().newReaderBuilder()
.forFile(logFile, fs, fs.getConf(), cs)
.withTableConfiguration(context.getConfiguration());

if(fileLenCache != null) {
builder = builder.withFileLenCache(fileLenCache);
}

if(cacheProvider != null) {
builder = builder.withCacheProvider(cacheProvider);
}

return builder.seekToBeginning().build();
}

/**
* Check that the first entry in the WAL is OPEN. Only need to do this once.
*/
private void validateFirstKey(ServerContext context, CryptoService cs, FileSystem fs,
SortedSet<UnreferencedTabletFile> logFiles, Path fullLogPath) throws IOException {
try (FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder()
.forFile(logFiles.first(), fs, fs.getConf(), cs)
.withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) {
SortedSet<UnreferencedTabletFile> logFiles, Path fullLogPath, Cache<String, Long> fileLenCache, CacheProvider cacheProvider) throws IOException {
try (FileSKVIterator fileIter = openLogFile(context, logFiles.first(), cs, fs, fileLenCache, cacheProvider)) {
Iterator<Entry<Key,Value>> iterator = new IteratorAdapter(fileIter);

if (iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import java.util.Map.Entry;
import java.util.Set;

import com.github.benmanes.caffeine.cache.Cache;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.tserver.logger.LogEvents;
Expand All @@ -65,8 +67,21 @@ public class SortedLogRecovery {

private final ServerContext context;

private final CacheProvider cacheProvider;

private final Cache<String,Long> fileLenCache;


public SortedLogRecovery(ServerContext context) {
this.context = context;
this.cacheProvider = null;
this.fileLenCache = null;
}

public SortedLogRecovery(ServerContext context, Cache<String,Long> fileLenCache, CacheProvider cacheProvider) {
this.context = context;
this.cacheProvider = cacheProvider;
this.fileLenCache = fileLenCache;
}

static LogFileKey maxKey(LogEvents event) {
Expand Down Expand Up @@ -104,7 +119,7 @@ private int findMaxTabletId(KeyExtent extent, List<Path> recoveryLogDirs) throws
int tabletId = -1;

try (var rli = new RecoveryLogsIterator(context, recoveryLogDirs, minKey(DEFINE_TABLET),
maxKey(DEFINE_TABLET), true)) {
maxKey(DEFINE_TABLET), true, fileLenCache, cacheProvider)) {

KeyExtent alternative = extent;
if (extent.isRootTablet()) {
Expand Down Expand Up @@ -207,7 +222,7 @@ private long findRecoverySeq(List<Path> recoveryLogs, Set<String> tabletFiles, i
long recoverySeq = 0;

try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, recoveryLogs,
minKey(COMPACTION_START, tabletId), maxKey(COMPACTION_START, tabletId), false)) {
minKey(COMPACTION_START, tabletId), maxKey(COMPACTION_START, tabletId), false, fileLenCache, cacheProvider)) {

DeduplicatingIterator ddi = new DeduplicatingIterator(rli);

Expand Down Expand Up @@ -262,7 +277,7 @@ private void playbackMutations(List<Path> recoveryLogs, MutationReceiver mr, int

LogFileKey end = maxKey(MUTATION, tabletId);

try (var rli = new RecoveryLogsIterator(context, recoveryLogs, start, end, false)) {
try (var rli = new RecoveryLogsIterator(context, recoveryLogs, start, end, false, fileLenCache, cacheProvider)) {
while (rli.hasNext()) {
Entry<LogFileKey,LogFileValue> entry = rli.next();
LogFileKey logFileKey = entry.getKey();
Expand All @@ -287,6 +302,12 @@ Collection<String> asNames(List<Path> recoveryLogs) {
return Collections2.transform(recoveryLogs, Path::getName);
}

public boolean needsRecovery(KeyExtent extent, List<Path> recoveryDirs) throws IOException {
Entry<Integer,List<Path>> maxEntry = findLogsThatDefineTablet(extent, recoveryDirs);
int tabletId = maxEntry.getKey();
return tabletId != -1;
}

public void recover(KeyExtent extent, List<Path> recoveryDirs, Set<String> tabletFiles,
MutationReceiver mr) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.github.benmanes.caffeine.cache.Cache;
import org.apache.accumulo.core.client.Durability;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider;
import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.Retry;
Expand Down Expand Up @@ -510,10 +513,19 @@ public long minorCompactionStarted(final CommitSession commitSession, final long
return seq;
}

public boolean needsRecovery(ServerContext context, KeyExtent extent, List<Path> recoveryDirs, Cache<String,Long> fileLenCache, CacheProvider cacheProvider) throws IOException {
try {
SortedLogRecovery recovery = new SortedLogRecovery(context, fileLenCache, cacheProvider);
return recovery.needsRecovery(extent, recoveryDirs);
} catch (Exception e) {
throw new IOException(e);
}
}

public void recover(ServerContext context, KeyExtent extent, List<Path> recoveryDirs,
Set<String> tabletFiles, MutationReceiver mr) throws IOException {
Set<String> tabletFiles, MutationReceiver mr, Cache<String,Long> fileLenCache, CacheProvider cacheProvider) throws IOException {
try {
SortedLogRecovery recovery = new SortedLogRecovery(context);
SortedLogRecovery recovery = new SortedLogRecovery(context, fileLenCache, cacheProvider);
recovery.recover(extent, recoveryDirs, tabletFiles, mr);
} catch (Exception e) {
throw new IOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,24 @@
import java.util.Set;
import java.util.TreeMap;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCache;
import org.apache.accumulo.core.file.blockfile.impl.BasicCacheProvider;
import org.apache.accumulo.core.file.blockfile.impl.CacheProvider;
import org.apache.accumulo.core.file.rfile.bcfile.Compression;
import org.apache.accumulo.core.file.rfile.bcfile.CompressionAlgorithm;
import org.apache.accumulo.core.file.rfile.bcfile.Utils;
import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
import org.apache.accumulo.core.spi.cache.CacheType;
import org.apache.accumulo.core.spi.crypto.CryptoServiceFactory;
import org.apache.accumulo.core.spi.crypto.GenericCryptoServiceFactory;
import org.apache.accumulo.core.util.Pair;
Expand Down Expand Up @@ -162,6 +169,9 @@ private List<Mutation> recover(Map<String,KeyValue[]> logs, KeyExtent extent) th
return recover(logs, new HashSet<>(), extent, bufferSize);
}

private CacheProvider cacheProvider = new BasicCacheProvider(new TinyLfuBlockCache(BlockCacheConfiguration.forTabletServer(DefaultConfiguration.getInstance()), CacheType.INDEX), new TinyLfuBlockCache(BlockCacheConfiguration.forTabletServer(DefaultConfiguration.getInstance()), CacheType.DATA));
private Cache<String, Long> fileLenCache = Caffeine.newBuilder().build();

private List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, KeyExtent extent,
int bufferSize) throws IOException {

Expand Down Expand Up @@ -198,9 +208,10 @@ private List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, K
dirs.add(new Path(destPath));
}
// Recover
SortedLogRecovery recovery = new SortedLogRecovery(context);
SortedLogRecovery recovery = new SortedLogRecovery(context, fileLenCache, cacheProvider);
CaptureMutations capture = new CaptureMutations();
recovery.recover(extent, dirs, files, capture);
recovery.recover(extent, dirs, files, capture);
verify(context);
return capture.result;
}
Expand Down
Loading

0 comments on commit 433b14b

Please sign in to comment.