diff --git a/ratis-proto/src/main/proto/Grpc.proto b/ratis-proto/src/main/proto/Grpc.proto index 61a8347561..02374233f7 100644 --- a/ratis-proto/src/main/proto/Grpc.proto +++ b/ratis-proto/src/main/proto/Grpc.proto @@ -44,7 +44,7 @@ service RaftServerProtocolService { returns(stream ratis.common.AppendEntriesReplyProto) {} rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto) - returns(ratis.common.InstallSnapshotReplyProto) {} + returns(stream ratis.common.InstallSnapshotReplyProto) {} } service AdminProtocolService { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java index a9288efc38..4f71e152b0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.file.Path; import java.util.Optional; @@ -65,7 +66,7 @@ public FileChunkReader(FileInfo info, RaftStorageDirectory directory) throws IOE public FileChunkProto readFileChunk(int chunkMaxSize) throws IOException { final long remaining = info.getFileSize() - offset; final int chunkLength = remaining < chunkMaxSize ? (int) remaining : chunkMaxSize; - final ByteString data = ByteString.readFrom(in, chunkLength); + final ByteString data = readChunk(in, chunkLength); final FileChunkProto proto = FileChunkProto.newBuilder() .setFilename(relativePath.toString()) @@ -80,6 +81,32 @@ public FileChunkProto readFileChunk(int chunkMaxSize) throws IOException { return proto; } + /** + * Blocks until a chunk of the given size can be made from the stream, or EOF is reached. Calls + * read() repeatedly in case the given stream implementation doesn't completely fill the given + * buffer in one read() call. + * + * @return A chunk of the desired size, or else a chunk as large as was available when end of + * stream was reached. Returns null if the given stream had no more data in it. + */ + private static ByteString readChunk(InputStream in, final int chunkSize) throws IOException { + final byte[] buf = new byte[chunkSize]; + int bytesRead = 0; + while (bytesRead < chunkSize) { + final int count = in.read(buf, bytesRead, chunkSize - bytesRead); + if (count == -1) { + break; + } + bytesRead += count; + } + + if (bytesRead == 0) { + return null; + } + + // Always make a copy since InputStream could steal a reference to buf. + return ByteString.copyFrom(buf, 0, bytesRead); + } @Override public void close() throws IOException { in.close(); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java index cc91d8fe77..9f8f1eb845 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java @@ -21,7 +21,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.channels.FileChannel; -import java.util.UUID; import org.apache.ratis.io.CorruptedFileException; import org.apache.ratis.io.MD5Hash; @@ -61,22 +60,25 @@ public void installSnapshot(StateMachine stateMachine, final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex(); final RaftStorageDirectory dir = storage.getStorageDir(); - // create a unique temporary directory - final File tmpDir = new File(dir.getTmpDir(), UUID.randomUUID().toString()); + // create a unique temporary directory based on the request id + final File tmpDir = new File(dir.getTmpDir(), snapshotChunkRequest.getRequestId()); FileUtils.createDirectories(tmpDir); tmpDir.deleteOnExit(); - LOG.info("Installing snapshot:{}, to tmp dir:{}", request, tmpDir); + LOG.info("Installing snapshot:{}, to tmp dir:{}", snapshotChunkRequest.getRequestId(), tmpDir); // TODO: Make sure that subsequent requests for the same installSnapshot are coming in order, // and are not lost when whole request cycle is done. Check requestId and requestIndex here for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) { + LOG.info("Installing chunk :{} with offset{}, to tmp dir:{} for file {}", + chunk.getChunkIndex(), chunk.getOffset(), tmpDir, chunk.getFilename()); SnapshotInfo pi = stateMachine.getLatestSnapshot(); if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) { throw new IOException("There exists snapshot file " + pi.getFiles() + " in " + selfId - + " with endIndex >= lastIncludedIndex " + lastIncludedIndex); + + " with endIndex (" + pi.getTermIndex().getIndex() + + ") >= lastIncludedIndex (" + lastIncludedIndex + ")"); } String fileName = chunk.getFilename(); // this is relative to the root dir @@ -130,10 +132,10 @@ public void installSnapshot(StateMachine stateMachine, } if (snapshotChunkRequest.getDone()) { - LOG.info("Install snapshot is done, renaming tnp dir:{} to:{}", - tmpDir, dir.getStateMachineDir()); - dir.getStateMachineDir().delete(); - tmpDir.renameTo(dir.getStateMachineDir()); + LOG.info("Install snapshot is done, moving files from dir:{} to:{}", + tmpDir, dir.getStateMachineDir()); + FileUtils.moveDirectory(tmpDir.toPath(), dir.getStateMachineDir().toPath()); + FileUtils.deleteFully(tmpDir); } } }