From fb07197a9dadd0d86476041ae3387f95fb07e8a3 Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Tue, 11 Jun 2024 12:22:23 -0500 Subject: [PATCH 1/6] Swapped RFS Docker Compose to use ES 7.17 Signed-off-by: Chris Helma --- RFS/build-preloaded-source-image.gradle | 4 ++-- RFS/build.gradle | 7 ++++-- RFS/docker/TestSource_ES_7_17/Dockerfile | 22 +++++++++++++++++++ .../TestSource_ES_7_17/container-start.sh | 13 +++++++++++ 4 files changed, 42 insertions(+), 4 deletions(-) create mode 100644 RFS/docker/TestSource_ES_7_17/Dockerfile create mode 100755 RFS/docker/TestSource_ES_7_17/container-start.sh diff --git a/RFS/build-preloaded-source-image.gradle b/RFS/build-preloaded-source-image.gradle index c1dca9b17..b0b25b3fa 100644 --- a/RFS/build-preloaded-source-image.gradle +++ b/RFS/build-preloaded-source-image.gradle @@ -23,8 +23,8 @@ def createNetworkTask = task createNetwork(type: Exec) { } } task createInitialElasticsearchContainer(type: DockerCreateContainer) { - dependsOn createNetwork, buildDockerImage_emptyElasticsearchSource - targetImageId 'migrations/empty_elasticsearch_source:latest' + dependsOn createNetwork, buildDockerImage_emptyElasticsearchSource_7_17 + targetImageId 'migrations/emptyElasticsearchSource_7_17:latest' containerName = "elasticsearch-${uniqueId}" hostConfig.network = myNetworkName hostConfig.dns = ['elasticsearch'] diff --git a/RFS/build.gradle b/RFS/build.gradle index de25b72c2..f784e738e 100644 --- a/RFS/build.gradle +++ b/RFS/build.gradle @@ -132,9 +132,12 @@ DockerServiceProps[] dockerServices = [ dockerImageName:"reindex_from_snapshot", inputDir:"./docker", taskDependencies:["copyDockerRuntimeJars"]]), - new DockerServiceProps([projectName:"emptyElasticsearchSource", - dockerImageName:"empty_elasticsearch_source", + new DockerServiceProps([projectName:"emptyElasticsearchSource_7_10", + dockerImageName:"empty_elasticsearch_source_7_10", inputDir:"./docker/TestSource_ES_7_10"]), + new DockerServiceProps([projectName:"emptyElasticsearchSource_7_17", + dockerImageName:"empty_elasticsearch_source_7_17", + inputDir:"./docker/TestSource_ES_7_17"]), new DockerServiceProps([projectName:"trafficGenerator", dockerImageName:"osb_traffic_generator", inputDir:"./docker/TrafficGenerator", diff --git a/RFS/docker/TestSource_ES_7_17/Dockerfile b/RFS/docker/TestSource_ES_7_17/Dockerfile new file mode 100644 index 000000000..49a1abfca --- /dev/null +++ b/RFS/docker/TestSource_ES_7_17/Dockerfile @@ -0,0 +1,22 @@ +FROM docker.elastic.co/elasticsearch/elasticsearch:7.17.21 AS base + +# Configure Elastic +ENV ELASTIC_SEARCH_CONFIG_FILE=/usr/share/elasticsearch/config/elasticsearch.yml +# Prevents ES from complaining about nodes count +RUN echo "discovery.type: single-node" >> $ELASTIC_SEARCH_CONFIG_FILE +ENV PATH=${PATH}:/usr/share/elasticsearch/jdk/bin/ + +# Install the S3 Repo Plugin +RUN echo y | /usr/share/elasticsearch/bin/elasticsearch-plugin install repository-s3 + +# Install the AWS CLI for testing purposes +RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \ + unzip awscliv2.zip && \ + ./aws/install + +RUN mkdir /snapshots && chown elasticsearch /snapshots + +# Install our custom entrypoint script +COPY ./container-start.sh /usr/share/elasticsearch/container-start.sh + +CMD /usr/share/elasticsearch/container-start.sh diff --git a/RFS/docker/TestSource_ES_7_17/container-start.sh b/RFS/docker/TestSource_ES_7_17/container-start.sh new file mode 100755 index 000000000..b2507c11c --- /dev/null +++ b/RFS/docker/TestSource_ES_7_17/container-start.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +echo "Setting AWS Creds from ENV Variables" +bin/elasticsearch-keystore create +echo $AWS_ACCESS_KEY_ID | bin/elasticsearch-keystore add s3.client.default.access_key --stdin +echo $AWS_SECRET_ACCESS_KEY | bin/elasticsearch-keystore add s3.client.default.secret_key --stdin + +if [ -n "$AWS_SESSION_TOKEN" ]; then + echo $AWS_SESSION_TOKEN | bin/elasticsearch-keystore add s3.client.default.session_token --stdin +fi + +echo "Starting Elasticsearch" +/usr/local/bin/docker-entrypoint.sh eswrapper \ No newline at end of file From b179824283411dad285c1c862553f8acfa60b74d Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Tue, 11 Jun 2024 16:33:29 -0500 Subject: [PATCH 2/6] Updated RFS to delete files when finished, reject large shards Signed-off-by: Chris Helma --- .../java/com/rfs/DemoPrintOutSnapshot.java | 4 +- .../java/com/rfs/ReindexFromSnapshot.java | 3 +- RFS/src/main/java/com/rfs/RunRfsWorker.java | 11 +++- .../rfs/common/DefaultSourceRepoAccessor.java | 29 +++++++++++ .../common/DeletingSourceRepoAccessor.java | 52 +++++++++++++++++++ .../java/com/rfs/common/PartSliceStream.java | 11 ++-- .../java/com/rfs/common/ShardMetadata.java | 2 +- .../com/rfs/common/SnapshotShardUnpacker.java | 37 +++++++++++-- .../com/rfs/common/SourceRepoAccessor.java | 47 +++++++++++++++++ .../ShardMetadataData_ES_6_8.java | 2 +- .../ShardMetadataData_ES_7_10.java | 2 +- .../java/com/rfs/worker/DocumentsRunner.java | 8 +-- .../java/com/rfs/worker/DocumentsStep.java | 26 ++++++++-- .../SimpleRestoreFromSnapshot_ES_7_10.java | 4 +- .../com/rfs/worker/DocumentsRunnerTest.java | 3 +- .../com/rfs/worker/DocumentsStepTest.java | 32 +++++++++++- 16 files changed, 247 insertions(+), 26 deletions(-) create mode 100644 RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java create mode 100644 RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java create mode 100644 RFS/src/main/java/com/rfs/common/SourceRepoAccessor.java diff --git a/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java b/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java index 6f8b4a3c7..888381548 100644 --- a/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java +++ b/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java @@ -27,6 +27,7 @@ import com.rfs.common.SnapshotRepo; import com.rfs.common.SnapshotShardUnpacker; import com.rfs.common.ClusterVersion; +import com.rfs.common.DefaultSourceRepoAccessor; import com.rfs.common.FileSystemRepo; import com.rfs.version_es_7_10.*; @@ -219,7 +220,8 @@ public static void main(String[] args) { } else { bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES; } - SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, Paths.get(luceneBasePathString), bufferSize); + DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo); + SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repoAccessor, Paths.get(luceneBasePathString), bufferSize); unpacker.unpack(shardMetadata); // Now, read the documents back out diff --git a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java index 31ec61cac..67d6cdc4d 100644 --- a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java +++ b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java @@ -329,7 +329,8 @@ public static void main(String[] args) throws InterruptedException { } else { bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES; } - SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, luceneDirPath, bufferSize); + DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo); + SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repoAccessor, luceneDirPath, bufferSize); for (IndexMetadata.Data indexMetadata : indexMetadatas) { logger.info("Processing index: " + indexMetadata.getName()); diff --git a/RFS/src/main/java/com/rfs/RunRfsWorker.java b/RFS/src/main/java/com/rfs/RunRfsWorker.java index f5734b594..7a6457807 100644 --- a/RFS/src/main/java/com/rfs/RunRfsWorker.java +++ b/RFS/src/main/java/com/rfs/RunRfsWorker.java @@ -21,6 +21,7 @@ import com.rfs.cms.OpenSearchCmsClient; import com.rfs.common.ClusterVersion; import com.rfs.common.ConnectionDetails; +import com.rfs.common.DeletingSourceRepoAccessor; import com.rfs.common.DocumentReindexer; import com.rfs.common.GlobalMetadata; import com.rfs.common.IndexMetadata; @@ -100,6 +101,10 @@ public static class Args { @Parameter(names = {"--component-template-allowlist"}, description = ("Optional. List of component template names to migrate" + " (e.g. 'posts_template1, posts_template2'). Default: empty list"), required = false) public List componentTemplateAllowlist = List.of(); + + @Parameter(names = {"--max-shard-size-bytes"}, description = ("Optional. The maximum shard size, in bytes, to allow when" + + " performing the document migration. Useful for preventing disk overflow. Default: 50 * 1024 * 1024 * 1024 (50 GB)"), required = false) + public long maxShardSizeBytes = 50 * 1024 * 1024 * 1024L; //https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/ @Parameter(names = {"--min-replicas"}, description = ("Optional. The minimum number of replicas configured for migrated indices on the target." @@ -132,6 +137,7 @@ public static void main(String[] args) throws Exception { String targetPass = arguments.targetPass; List indexTemplateAllowlist = arguments.indexTemplateAllowlist; List componentTemplateAllowlist = arguments.componentTemplateAllowlist; + long maxShardSizeBytes = arguments.maxShardSizeBytes; int awarenessDimensionality = arguments.minNumberOfReplicas + 1; Level logLevel = arguments.logLevel; @@ -165,10 +171,11 @@ public static void main(String[] args) throws Exception { indexWorker.run(); ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); - SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(sourceRepo, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); + DeletingSourceRepoAccessor repoAccessor = new DeletingSourceRepoAccessor(sourceRepo); + SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repoAccessor, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath); DocumentReindexer reindexer = new DocumentReindexer(targetClient); - DocumentsRunner documentsWorker = new DocumentsRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + DocumentsRunner documentsWorker = new DocumentsRunner(globalState, cmsClient, snapshotName, maxShardSizeBytes, indexMetadataFactory, shardMetadataFactory, unpacker, reader, reindexer); documentsWorker.run(); } catch (Runner.PhaseFailed e) { diff --git a/RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java b/RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java new file mode 100644 index 000000000..b9c06d2b1 --- /dev/null +++ b/RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java @@ -0,0 +1,29 @@ +package com.rfs.common; + +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; + +/* + * Provides "simple" access to the underlying files in the source repo without any special behavior + */ +public class DefaultSourceRepoAccessor extends SourceRepoAccessor { + public DefaultSourceRepoAccessor(SourceRepo repo) { + super(repo); + } + + @Override + protected InputStream load(Path path) { + try { + return Files.newInputStream(path); + } catch (Exception e) { + throw new CouldNotLoadRepoFile("Could not load file: " + path, e); + } + } + + public static class CouldNotLoadRepoFile extends RuntimeException { + public CouldNotLoadRepoFile(String message, Throwable cause) { + super(message, cause); + } + } +} diff --git a/RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java b/RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java new file mode 100644 index 000000000..f6b37cabf --- /dev/null +++ b/RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java @@ -0,0 +1,52 @@ +package com.rfs.common; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; + + +/* + * Provides access to the underlying files in the source repo, but deletes the files after they are read. This + * is useful/interesting in the case where the files are large/numerous and you can easily re-acquire them - such as + * if they are being loaded from S3. + */ +public class DeletingSourceRepoAccessor extends SourceRepoAccessor { + public DeletingSourceRepoAccessor(SourceRepo repo) { + super(repo); + } + + @Override + protected InputStream load(Path path) { + try { + return new DeletingFileInputStream(path); + } catch (Exception e) { + throw new CouldNotLoadRepoFile("Could not load file: " + path, e); + } + } + + public static class DeletingFileInputStream extends FileInputStream { + private final Path filePath; + + public DeletingFileInputStream(Path filePath) throws IOException { + super(filePath.toFile()); + this.filePath = filePath; + } + + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + Files.deleteIfExists(filePath); + } + } + } + + public static class CouldNotLoadRepoFile extends RuntimeException { + public CouldNotLoadRepoFile(String message, Throwable cause) { + super(message, cause); + } + } +} diff --git a/RFS/src/main/java/com/rfs/common/PartSliceStream.java b/RFS/src/main/java/com/rfs/common/PartSliceStream.java index a58110af4..6dbc3a827 100644 --- a/RFS/src/main/java/com/rfs/common/PartSliceStream.java +++ b/RFS/src/main/java/com/rfs/common/PartSliceStream.java @@ -12,7 +12,7 @@ */ public class PartSliceStream extends InputStream { - private final SourceRepo repo; + private final SourceRepoAccessor repoAccessor; private final ShardMetadata.FileInfo fileMetadata; private final String indexId; private final int shardId; @@ -20,16 +20,15 @@ public class PartSliceStream extends InputStream { private InputStream currentStream; private boolean initialized = false; - public PartSliceStream(SourceRepo repo, ShardMetadata.FileInfo fileMetadata, String indexId, int shardId) { - this.repo = repo; + public PartSliceStream(SourceRepoAccessor repoAccessor, ShardMetadata.FileInfo fileMetadata, String indexId, int shardId) { + this.repoAccessor = repoAccessor; this.fileMetadata = fileMetadata; this.indexId = indexId; this.shardId = shardId; } - protected InputStream openSlice(long slice) throws IOException { - Path filePath = repo.getBlobFilePath(indexId, shardId, fileMetadata.partName(slice)); - return Files.newInputStream(filePath); + protected InputStream openSlice(long slice) { + return repoAccessor.getBlobFile(indexId, shardId, fileMetadata.partName(slice)); } private InputStream nextStream() throws IOException { diff --git a/RFS/src/main/java/com/rfs/common/ShardMetadata.java b/RFS/src/main/java/com/rfs/common/ShardMetadata.java index dc4ef4f7e..f4dcac6c1 100644 --- a/RFS/src/main/java/com/rfs/common/ShardMetadata.java +++ b/RFS/src/main/java/com/rfs/common/ShardMetadata.java @@ -77,7 +77,7 @@ public static interface Data { public long getStartTime(); public long getTime(); public int getNumberOfFiles(); - public long getTotalSize(); + public long getTotalSizeBytes(); public List getFiles(); } diff --git a/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java b/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java index 5c5a56522..de3ab976c 100644 --- a/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java +++ b/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java @@ -1,6 +1,8 @@ package com.rfs.common; +import java.io.IOException; import java.io.InputStream; +import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -18,7 +20,7 @@ @RequiredArgsConstructor public class SnapshotShardUnpacker { private static final Logger logger = LogManager.getLogger(SnapshotShardUnpacker.class); - protected final SourceRepo repo; + protected final SourceRepoAccessor repoAccessor; protected final Path luceneFilesBasePath; protected final int bufferSize; @@ -28,7 +30,7 @@ public void unpack(ShardMetadata.Data shardMetadata) { NativeFSLockFactory lockFactory = NativeFSLockFactory.INSTANCE; // Ensure the blob files are prepped, if they need to be - repo.prepBlobFiles(shardMetadata); + repoAccessor.prepBlobFiles(shardMetadata); // Create the directory for the shard's lucene files Path luceneIndexDir = Paths.get(luceneFilesBasePath + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId()); @@ -43,7 +45,7 @@ public void unpack(ShardMetadata.Data shardMetadata) { final BytesRef hash = fileMetadata.getMetaHash(); indexOutput.writeBytes(hash.bytes, hash.offset, hash.length); } else { - try (InputStream stream = new PartSliceStream(repo, fileMetadata, shardMetadata.getIndexId(), shardMetadata.getShardId())) { + try (InputStream stream = new PartSliceStream(repoAccessor, fileMetadata, shardMetadata.getIndexId(), shardMetadata.getShardId())) { final byte[] buffer = new byte[Math.toIntExact(Math.min(bufferSize, fileMetadata.getLength()))]; int length; while ((length = stream.read(buffer)) > 0) { @@ -58,6 +60,35 @@ public void unpack(ShardMetadata.Data shardMetadata) { } } + public void cleanUp(ShardMetadata.Data shardMetadata) { + try { + Path luceneIndexDir = Paths.get(luceneFilesBasePath + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId()); + if (Files.exists(luceneIndexDir)) { + deleteRecursively(luceneIndexDir); + } + + } catch (Exception e) { + throw new CouldNotCleanUpShard("Could not clean up shard: Index " + shardMetadata.getIndexId() + ", Shard " + shardMetadata.getShardId(), e); + } + } + + protected void deleteRecursively(Path path) throws IOException { + if (Files.isDirectory(path)) { + try (DirectoryStream entries = Files.newDirectoryStream(path)) { + for (Path entry : entries) { + deleteRecursively(entry); + } + } + } + Files.delete(path); + } + + public static class CouldNotCleanUpShard extends RfsException { + public CouldNotCleanUpShard(String message, Exception e) { + super(message, e); + } + } + public static class CouldNotUnpackShard extends RfsException { public CouldNotUnpackShard(String message, Exception e) { super(message, e); diff --git a/RFS/src/main/java/com/rfs/common/SourceRepoAccessor.java b/RFS/src/main/java/com/rfs/common/SourceRepoAccessor.java new file mode 100644 index 000000000..9c9534ac5 --- /dev/null +++ b/RFS/src/main/java/com/rfs/common/SourceRepoAccessor.java @@ -0,0 +1,47 @@ +package com.rfs.common; + +import java.io.InputStream; +import java.nio.file.Path; + +public abstract class SourceRepoAccessor { + private final SourceRepo repo; + + public SourceRepoAccessor(SourceRepo repo) { + this.repo = repo; + } + + public Path getRepoRootDir() { + return repo.getRepoRootDir(); + } + + public InputStream getSnapshotRepoDataFile(){ + return load(repo.getSnapshotRepoDataFilePath()); + }; + + public InputStream getGlobalMetadataFile(String snapshotId) { + return load(repo.getGlobalMetadataFilePath(snapshotId)); + } + + public InputStream getSnapshotMetadataFile(String snapshotId) { + return load(repo.getSnapshotMetadataFilePath(snapshotId)); + } + + public InputStream getIndexMetadataFile(String indexId, String indexFileId){ + return load(repo.getIndexMetadataFilePath(indexId, indexFileId)); + } + + public InputStream getShardDir(String indexId, int shardId){ + return load(repo.getShardDirPath(indexId, shardId)); + } + public InputStream getShardMetadataFile(String snapshotId, String indexId, int shardId){ + return load(repo.getShardMetadataFilePath(snapshotId, indexId, shardId)); + } + public InputStream getBlobFile(String indexId, int shardId, String blobName){ + return load(repo.getBlobFilePath(indexId, shardId, blobName)); + } + public void prepBlobFiles(ShardMetadata.Data shardMetadata){ + repo.prepBlobFiles(shardMetadata); + } + + protected abstract InputStream load(Path path); +} diff --git a/RFS/src/main/java/com/rfs/version_es_6_8/ShardMetadataData_ES_6_8.java b/RFS/src/main/java/com/rfs/version_es_6_8/ShardMetadataData_ES_6_8.java index 88db38052..b528e95f9 100644 --- a/RFS/src/main/java/com/rfs/version_es_6_8/ShardMetadataData_ES_6_8.java +++ b/RFS/src/main/java/com/rfs/version_es_6_8/ShardMetadataData_ES_6_8.java @@ -101,7 +101,7 @@ public int getNumberOfFiles() { } @Override - public long getTotalSize() { + public long getTotalSizeBytes() { return totalSize; } diff --git a/RFS/src/main/java/com/rfs/version_es_7_10/ShardMetadataData_ES_7_10.java b/RFS/src/main/java/com/rfs/version_es_7_10/ShardMetadataData_ES_7_10.java index 77f7cf4ea..40e5a7710 100644 --- a/RFS/src/main/java/com/rfs/version_es_7_10/ShardMetadataData_ES_7_10.java +++ b/RFS/src/main/java/com/rfs/version_es_7_10/ShardMetadataData_ES_7_10.java @@ -100,7 +100,7 @@ public int getNumberOfFiles() { } @Override - public long getTotalSize() { + public long getTotalSizeBytes() { return totalSize; } diff --git a/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java b/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java index e76ea739d..7ee50e128 100644 --- a/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java +++ b/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java @@ -18,10 +18,10 @@ public class DocumentsRunner implements Runner { private final DocumentsStep.SharedMembers members; - public DocumentsRunner(GlobalState globalState, CmsClient cmsClient, String snapshotName, IndexMetadata.Factory metadataFactory, - ShardMetadata.Factory shardMetadataFactory, SnapshotShardUnpacker unpacker, LuceneDocumentsReader reader, - DocumentReindexer reindexer) { - this.members = new DocumentsStep.SharedMembers(globalState, cmsClient, snapshotName, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + public DocumentsRunner(GlobalState globalState, CmsClient cmsClient, String snapshotName, long maxShardSizeBytes, + IndexMetadata.Factory metadataFactory, ShardMetadata.Factory shardMetadataFactory, SnapshotShardUnpacker unpacker, + LuceneDocumentsReader reader, DocumentReindexer reindexer) { + this.members = new DocumentsStep.SharedMembers(globalState, cmsClient, snapshotName, maxShardSizeBytes, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); } @Override diff --git a/RFS/src/main/java/com/rfs/worker/DocumentsStep.java b/RFS/src/main/java/com/rfs/worker/DocumentsStep.java index 21d666acb..a7e6c43ea 100644 --- a/RFS/src/main/java/com/rfs/worker/DocumentsStep.java +++ b/RFS/src/main/java/com/rfs/worker/DocumentsStep.java @@ -27,6 +27,7 @@ public static class SharedMembers { protected final GlobalState globalState; protected final CmsClient cmsClient; protected final String snapshotName; + protected final long maxShardSizeBytes; protected final IndexMetadata.Factory metadataFactory; protected final ShardMetadata.Factory shardMetadataFactory; protected final SnapshotShardUnpacker unpacker; @@ -364,17 +365,24 @@ public void run() { )); logger.info("Work item set"); + ShardMetadata.Data shardMetadata = null; try { logger.info("Migrating docs: Index " + workItem.indexName + ", Shard " + workItem.shardId); - ShardMetadata.Data shardMetadata = members.shardMetadataFactory.fromRepo(members.snapshotName, workItem.indexName, workItem.shardId); + shardMetadata = members.shardMetadataFactory.fromRepo(members.snapshotName, workItem.indexName, workItem.shardId); + + logger.info("Shard size: " + shardMetadata.getTotalSizeBytes()); + if (shardMetadata.getTotalSizeBytes() > members.maxShardSizeBytes) { + throw new ShardTooLarge(shardMetadata.getTotalSizeBytes(), members.maxShardSizeBytes); + } + members.unpacker.unpack(shardMetadata); Flux documents = members.reader.readDocuments(shardMetadata.getIndexName(), shardMetadata.getShardId()); - final int finalShardId = shardMetadata.getShardId(); // Define in local context for the lambda + final ShardMetadata.Data finalShardMetadata = shardMetadata; // Define in local context for the lambda members.reindexer.reindex(shardMetadata.getIndexName(), documents) .doOnError(error -> logger.error("Error during reindexing: " + error)) - .doOnSuccess(done -> logger.info("Reindexing completed for Index " + shardMetadata.getIndexName() + ", Shard " + finalShardId)) + .doOnSuccess(done -> logger.info("Reindexing completed for Index " + finalShardMetadata.getIndexName() + ", Shard " + finalShardMetadata.getShardId())) // Wait for the reindexing to complete before proceeding .block(); logger.info("Docs migrated"); @@ -408,6 +416,12 @@ public void run() { value -> logger.info("Documents Work Item (Index: " + workItem.indexName + ", Shard: " + workItem.shardId + ") attempt count was incremented"), () ->logger.info("Unable to increment attempt count of Documents Work Item (Index: " + workItem.indexName + ", Shard: " + workItem.shardId + ")") ); + } finally { + if (shardMetadata != null) { + logger.info("Cleaning up the unpacked shard..."); + members.unpacker.cleanUp(shardMetadata); + logger.info("Shard cleaned up"); + } } logger.info("Clearing the worker's current work item..."); @@ -506,6 +520,12 @@ public WorkerStep nextStep() { } } + public static class ShardTooLarge extends RfsException { + public ShardTooLarge(long shardSizeBytes, long maxShardSize) { + super("The shard size of " + shardSizeBytes + " bytes exceeds the maximum shard size of " + maxShardSize + " bytes"); + } + } + public static class DocumentsMigrationFailed extends RfsException { public DocumentsMigrationFailed(String message) { super("The Documents Migration has failed. Reason: " + message); diff --git a/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java b/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java index 37c98b35d..33ad619cb 100644 --- a/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java +++ b/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.IOUtils; +import com.rfs.common.DefaultSourceRepoAccessor; import com.rfs.common.DocumentReindexer; import com.rfs.common.FileSystemRepo; import com.rfs.common.IndexMetadata; @@ -45,7 +46,8 @@ public List extractSnapshotIndexData(final String localPath, for (final IndexMetadata.Data index : indices) { for (int shardId = 0; shardId < index.getNumberOfShards(); shardId++) { var shardMetadata = new ShardMetadataFactory_ES_7_10(snapShotProvider).fromRepo(snapshotName, index.getName(), shardId); - SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, unpackedShardDataDir, Integer.MAX_VALUE); + DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo); + SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repoAccessor, unpackedShardDataDir, Integer.MAX_VALUE); unpacker.unpack(shardMetadata); } } diff --git a/RFS/src/test/java/com/rfs/worker/DocumentsRunnerTest.java b/RFS/src/test/java/com/rfs/worker/DocumentsRunnerTest.java index 3729bc23e..c1674374a 100644 --- a/RFS/src/test/java/com/rfs/worker/DocumentsRunnerTest.java +++ b/RFS/src/test/java/com/rfs/worker/DocumentsRunnerTest.java @@ -25,6 +25,7 @@ void run_encountersAnException_asExpected() { GlobalState globalState = Mockito.mock(GlobalState.class); CmsClient cmsClient = Mockito.mock(CmsClient.class); String snapshotName = "testSnapshot"; + long maxShardSizeBytes = 50 * 1024 * 1024 * 1024L; IndexMetadata.Factory metadataFactory = Mockito.mock(IndexMetadata.Factory.class); ShardMetadata.Factory shardMetadataFactory = Mockito.mock(ShardMetadata.Factory.class); @@ -37,7 +38,7 @@ void run_encountersAnException_asExpected() { when(globalState.getPhase()).thenReturn(GlobalState.Phase.DOCUMENTS_IN_PROGRESS); // Run the test - DocumentsRunner testRunner = new DocumentsRunner(globalState, cmsClient, snapshotName, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + DocumentsRunner testRunner = new DocumentsRunner(globalState, cmsClient, snapshotName, maxShardSizeBytes, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); final var e = assertThrows(DocumentsRunner.DocumentsMigrationPhaseFailed.class, () -> testRunner.run()); // Verify the results diff --git a/RFS/src/test/java/com/rfs/worker/DocumentsStepTest.java b/RFS/src/test/java/com/rfs/worker/DocumentsStepTest.java index 5ffa034fb..1f42da08d 100644 --- a/RFS/src/test/java/com/rfs/worker/DocumentsStepTest.java +++ b/RFS/src/test/java/com/rfs/worker/DocumentsStepTest.java @@ -51,13 +51,14 @@ void setUp() { GlobalState globalState = Mockito.mock(GlobalState.class); CmsClient cmsClient = Mockito.mock(CmsClient.class); String snapshotName = "test"; + long maxShardSizeBytes = 50 * 1024 * 1024 * 1024L; IndexMetadata.Factory metadataFactory = Mockito.mock(IndexMetadata.Factory.class); ShardMetadata.Factory shardMetadataFactory = Mockito.mock(ShardMetadata.Factory.class); SnapshotShardUnpacker unpacker = Mockito.mock(SnapshotShardUnpacker.class); LuceneDocumentsReader reader = Mockito.mock(LuceneDocumentsReader.class); DocumentReindexer reindexer = Mockito.mock(DocumentReindexer.class); - testMembers = new SharedMembers(globalState, cmsClient, snapshotName, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + testMembers = new SharedMembers(globalState, cmsClient, snapshotName, maxShardSizeBytes, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); } @Test @@ -524,6 +525,7 @@ void MigrateDocuments_workToDo_AsExpected(CmsEntry.DocumentsWorkItem workItem, C // Check the results Mockito.verify(testMembers.reindexer, times(1)).reindex(workItem.indexName, documents); + Mockito.verify(testMembers.unpacker, times(1)).cleanUp(shardMetadata); Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsWorkItemForceful(updatedItem); assertEquals(DocumentsStep.GetDocumentsToMigrate.class, nextStep.getClass()); } @@ -557,10 +559,38 @@ void MigrateDocuments_failedItem_AsExpected() { // Check the results Mockito.verify(testMembers.reindexer, times(1)).reindex(workItem.indexName, documents); + Mockito.verify(testMembers.unpacker, times(1)).cleanUp(shardMetadata); Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsWorkItem(updatedItem, workItem); assertEquals(DocumentsStep.GetDocumentsToMigrate.class, nextStep.getClass()); } + @Test + void MigrateDocuments_largeShard_AsExpected() { + // Set up the test + CmsEntry.DocumentsWorkItem workItem = new CmsEntry.DocumentsWorkItem( + "index1", 0, CmsEntry.DocumentsWorkItemStatus.NOT_STARTED, "42", 1 + ); + testMembers.cmsWorkEntry = Optional.of(workItem); + + CmsEntry.DocumentsWorkItem updatedItem = new CmsEntry.DocumentsWorkItem( + "index1", 0, CmsEntry.DocumentsWorkItemStatus.NOT_STARTED, "42", 2 + ); + + ShardMetadata.Data shardMetadata = Mockito.mock(ShardMetadata.Data.class); + Mockito.when(shardMetadata.getTotalSizeBytes()).thenReturn(testMembers.maxShardSizeBytes + 1); + Mockito.when(testMembers.shardMetadataFactory.fromRepo(testMembers.snapshotName, workItem.indexName, workItem.shardId)).thenReturn(shardMetadata); + + // Run the test + DocumentsStep.MigrateDocuments testStep = new DocumentsStep.MigrateDocuments(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsWorkItem(updatedItem, workItem); + Mockito.verify(testMembers.unpacker, times(1)).cleanUp(shardMetadata); + assertEquals(DocumentsStep.GetDocumentsToMigrate.class, nextStep.getClass()); + } + @Test void MigrateDocuments_exceededAttempts_AsExpected() { // Set up the test From 9692d21771cc2bcd79ebc595be6079897ad45c32 Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Wed, 12 Jun 2024 11:06:20 -0500 Subject: [PATCH 3/6] Updates per PR comments Signed-off-by: Chris Helma --- .../java/com/rfs/DemoPrintOutSnapshot.java | 7 ++++-- .../java/com/rfs/ReindexFromSnapshot.java | 6 +++-- RFS/src/main/java/com/rfs/RunRfsWorker.java | 4 +-- .../rfs/common/DefaultSourceRepoAccessor.java | 6 ----- .../common/DeletingSourceRepoAccessor.java | 8 +----- .../com/rfs/common/SnapshotShardUnpacker.java | 25 ++++++++++++++----- .../com/rfs/common/SourceRepoAccessor.java | 9 +++++++ .../java/com/rfs/worker/DocumentsRunner.java | 4 +-- .../java/com/rfs/worker/DocumentsStep.java | 12 +++------ .../SimpleRestoreFromSnapshot_ES_7_10.java | 4 +-- .../com/rfs/worker/DocumentsRunnerTest.java | 4 +-- .../com/rfs/worker/DocumentsStepTest.java | 18 +++++++++---- 12 files changed, 63 insertions(+), 44 deletions(-) diff --git a/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java b/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java index 888381548..6c1af1208 100644 --- a/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java +++ b/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java @@ -221,8 +221,11 @@ public static void main(String[] args) { bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES; } DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo); - SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repoAccessor, Paths.get(luceneBasePathString), bufferSize); - unpacker.unpack(shardMetadata); + + SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, Paths.get(luceneBasePathString), bufferSize); + try (SnapshotShardUnpacker unpacker = unpackerFactory.create(shardMetadata)) { + unpacker.unpack(); + } // Now, read the documents back out System.out.println("--- Reading docs in the shard ---"); diff --git a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java index 67d6cdc4d..f8efcc61b 100644 --- a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java +++ b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java @@ -330,7 +330,7 @@ public static void main(String[] args) throws InterruptedException { bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES; } DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo); - SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repoAccessor, luceneDirPath, bufferSize); + SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,luceneDirPath, bufferSize); for (IndexMetadata.Data indexMetadata : indexMetadatas) { logger.info("Processing index: " + indexMetadata.getName()); @@ -346,7 +346,9 @@ public static void main(String[] args) throws InterruptedException { } // Unpack the shard - unpacker.unpack(shardMetadata); + try (SnapshotShardUnpacker unpacker = unpackerFactory.create(shardMetadata)) { + unpacker.unpack(); + } } } diff --git a/RFS/src/main/java/com/rfs/RunRfsWorker.java b/RFS/src/main/java/com/rfs/RunRfsWorker.java index 7a6457807..4fb4afed1 100644 --- a/RFS/src/main/java/com/rfs/RunRfsWorker.java +++ b/RFS/src/main/java/com/rfs/RunRfsWorker.java @@ -172,10 +172,10 @@ public static void main(String[] args) throws Exception { ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); DeletingSourceRepoAccessor repoAccessor = new DeletingSourceRepoAccessor(sourceRepo); - SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repoAccessor, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); + SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath); DocumentReindexer reindexer = new DocumentReindexer(targetClient); - DocumentsRunner documentsWorker = new DocumentsRunner(globalState, cmsClient, snapshotName, maxShardSizeBytes, indexMetadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + DocumentsRunner documentsWorker = new DocumentsRunner(globalState, cmsClient, snapshotName, maxShardSizeBytes, indexMetadataFactory, shardMetadataFactory, unpackerFactory, reader, reindexer); documentsWorker.run(); } catch (Runner.PhaseFailed e) { diff --git a/RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java b/RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java index b9c06d2b1..1bd729f48 100644 --- a/RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java +++ b/RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java @@ -20,10 +20,4 @@ protected InputStream load(Path path) { throw new CouldNotLoadRepoFile("Could not load file: " + path, e); } } - - public static class CouldNotLoadRepoFile extends RuntimeException { - public CouldNotLoadRepoFile(String message, Throwable cause) { - super(message, cause); - } - } } diff --git a/RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java b/RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java index f6b37cabf..d4073e680 100644 --- a/RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java +++ b/RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java @@ -8,7 +8,7 @@ /* - * Provides access to the underlying files in the source repo, but deletes the files after they are read. This + * Provides access to the underlying files in the source repo and deletes the files after the Stream is closed. This * is useful/interesting in the case where the files are large/numerous and you can easily re-acquire them - such as * if they are being loaded from S3. */ @@ -43,10 +43,4 @@ public void close() throws IOException { } } } - - public static class CouldNotLoadRepoFile extends RuntimeException { - public CouldNotLoadRepoFile(String message, Throwable cause) { - super(message, cause); - } - } } diff --git a/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java b/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java index de3ab976c..c680b473b 100644 --- a/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java +++ b/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java @@ -18,13 +18,25 @@ import org.apache.lucene.util.BytesRef; @RequiredArgsConstructor -public class SnapshotShardUnpacker { +public class SnapshotShardUnpacker implements AutoCloseable { private static final Logger logger = LogManager.getLogger(SnapshotShardUnpacker.class); - protected final SourceRepoAccessor repoAccessor; - protected final Path luceneFilesBasePath; - protected final int bufferSize; + private final SourceRepoAccessor repoAccessor; + private final Path luceneFilesBasePath; + private final ShardMetadata.Data shardMetadata; + private final int bufferSize; - public void unpack(ShardMetadata.Data shardMetadata) { + @RequiredArgsConstructor + public static class Factory { + private final SourceRepoAccessor repoAccessor; + private final Path luceneFilesBasePath; + private final int bufferSize; + + public SnapshotShardUnpacker create(ShardMetadata.Data shardMetadata) { + return new SnapshotShardUnpacker(repoAccessor, luceneFilesBasePath, shardMetadata, bufferSize); + } + } + + public void unpack() { try { // Some constants NativeFSLockFactory lockFactory = NativeFSLockFactory.INSTANCE; @@ -60,7 +72,8 @@ public void unpack(ShardMetadata.Data shardMetadata) { } } - public void cleanUp(ShardMetadata.Data shardMetadata) { + @Override + public void close() { try { Path luceneIndexDir = Paths.get(luceneFilesBasePath + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId()); if (Files.exists(luceneIndexDir)) { diff --git a/RFS/src/main/java/com/rfs/common/SourceRepoAccessor.java b/RFS/src/main/java/com/rfs/common/SourceRepoAccessor.java index 9c9534ac5..54159c365 100644 --- a/RFS/src/main/java/com/rfs/common/SourceRepoAccessor.java +++ b/RFS/src/main/java/com/rfs/common/SourceRepoAccessor.java @@ -33,15 +33,24 @@ public InputStream getIndexMetadataFile(String indexId, String indexFileId){ public InputStream getShardDir(String indexId, int shardId){ return load(repo.getShardDirPath(indexId, shardId)); } + public InputStream getShardMetadataFile(String snapshotId, String indexId, int shardId){ return load(repo.getShardMetadataFilePath(snapshotId, indexId, shardId)); } + public InputStream getBlobFile(String indexId, int shardId, String blobName){ return load(repo.getBlobFilePath(indexId, shardId, blobName)); } + public void prepBlobFiles(ShardMetadata.Data shardMetadata){ repo.prepBlobFiles(shardMetadata); } protected abstract InputStream load(Path path); + + public static class CouldNotLoadRepoFile extends RuntimeException { + public CouldNotLoadRepoFile(String message, Throwable cause) { + super(message, cause); + } + } } diff --git a/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java b/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java index 7ee50e128..e8c9ee6f9 100644 --- a/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java +++ b/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java @@ -19,9 +19,9 @@ public class DocumentsRunner implements Runner { private final DocumentsStep.SharedMembers members; public DocumentsRunner(GlobalState globalState, CmsClient cmsClient, String snapshotName, long maxShardSizeBytes, - IndexMetadata.Factory metadataFactory, ShardMetadata.Factory shardMetadataFactory, SnapshotShardUnpacker unpacker, + IndexMetadata.Factory metadataFactory, ShardMetadata.Factory shardMetadataFactory, SnapshotShardUnpacker.Factory unpackerFactory, LuceneDocumentsReader reader, DocumentReindexer reindexer) { - this.members = new DocumentsStep.SharedMembers(globalState, cmsClient, snapshotName, maxShardSizeBytes, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + this.members = new DocumentsStep.SharedMembers(globalState, cmsClient, snapshotName, maxShardSizeBytes, metadataFactory, shardMetadataFactory, unpackerFactory, reader, reindexer); } @Override diff --git a/RFS/src/main/java/com/rfs/worker/DocumentsStep.java b/RFS/src/main/java/com/rfs/worker/DocumentsStep.java index a7e6c43ea..4d14d2969 100644 --- a/RFS/src/main/java/com/rfs/worker/DocumentsStep.java +++ b/RFS/src/main/java/com/rfs/worker/DocumentsStep.java @@ -30,7 +30,7 @@ public static class SharedMembers { protected final long maxShardSizeBytes; protected final IndexMetadata.Factory metadataFactory; protected final ShardMetadata.Factory shardMetadataFactory; - protected final SnapshotShardUnpacker unpacker; + protected final SnapshotShardUnpacker.Factory unpackerFactory; protected final LuceneDocumentsReader reader; protected final DocumentReindexer reindexer; protected Optional cmsEntry = Optional.empty(); @@ -375,7 +375,9 @@ public void run() { throw new ShardTooLarge(shardMetadata.getTotalSizeBytes(), members.maxShardSizeBytes); } - members.unpacker.unpack(shardMetadata); + try (SnapshotShardUnpacker unpacker = members.unpackerFactory.create(shardMetadata)) { + unpacker.unpack(); + } Flux documents = members.reader.readDocuments(shardMetadata.getIndexName(), shardMetadata.getShardId()); @@ -416,12 +418,6 @@ public void run() { value -> logger.info("Documents Work Item (Index: " + workItem.indexName + ", Shard: " + workItem.shardId + ") attempt count was incremented"), () ->logger.info("Unable to increment attempt count of Documents Work Item (Index: " + workItem.indexName + ", Shard: " + workItem.shardId + ")") ); - } finally { - if (shardMetadata != null) { - logger.info("Cleaning up the unpacked shard..."); - members.unpacker.cleanUp(shardMetadata); - logger.info("Shard cleaned up"); - } } logger.info("Clearing the worker's current work item..."); diff --git a/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java b/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java index 33ad619cb..dedc3d8ae 100644 --- a/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java +++ b/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java @@ -47,8 +47,8 @@ public List extractSnapshotIndexData(final String localPath, for (int shardId = 0; shardId < index.getNumberOfShards(); shardId++) { var shardMetadata = new ShardMetadataFactory_ES_7_10(snapShotProvider).fromRepo(snapshotName, index.getName(), shardId); DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo); - SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repoAccessor, unpackedShardDataDir, Integer.MAX_VALUE); - unpacker.unpack(shardMetadata); + SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repoAccessor, unpackedShardDataDir, shardMetadata, Integer.MAX_VALUE); + unpacker.unpack(); } } return indices; diff --git a/RFS/src/test/java/com/rfs/worker/DocumentsRunnerTest.java b/RFS/src/test/java/com/rfs/worker/DocumentsRunnerTest.java index c1674374a..1f50b176e 100644 --- a/RFS/src/test/java/com/rfs/worker/DocumentsRunnerTest.java +++ b/RFS/src/test/java/com/rfs/worker/DocumentsRunnerTest.java @@ -29,7 +29,7 @@ void run_encountersAnException_asExpected() { IndexMetadata.Factory metadataFactory = Mockito.mock(IndexMetadata.Factory.class); ShardMetadata.Factory shardMetadataFactory = Mockito.mock(ShardMetadata.Factory.class); - SnapshotShardUnpacker unpacker = Mockito.mock(SnapshotShardUnpacker.class); + SnapshotShardUnpacker.Factory unpackerFactory = Mockito.mock(SnapshotShardUnpacker.Factory.class); LuceneDocumentsReader reader = Mockito.mock(LuceneDocumentsReader.class); DocumentReindexer reindexer = Mockito.mock(DocumentReindexer.class); RfsException testException = new RfsException("Unit test"); @@ -38,7 +38,7 @@ void run_encountersAnException_asExpected() { when(globalState.getPhase()).thenReturn(GlobalState.Phase.DOCUMENTS_IN_PROGRESS); // Run the test - DocumentsRunner testRunner = new DocumentsRunner(globalState, cmsClient, snapshotName, maxShardSizeBytes, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + DocumentsRunner testRunner = new DocumentsRunner(globalState, cmsClient, snapshotName, maxShardSizeBytes, metadataFactory, shardMetadataFactory, unpackerFactory, reader, reindexer); final var e = assertThrows(DocumentsRunner.DocumentsMigrationPhaseFailed.class, () -> testRunner.run()); // Verify the results diff --git a/RFS/src/test/java/com/rfs/worker/DocumentsStepTest.java b/RFS/src/test/java/com/rfs/worker/DocumentsStepTest.java index 1f42da08d..c86bca9ac 100644 --- a/RFS/src/test/java/com/rfs/worker/DocumentsStepTest.java +++ b/RFS/src/test/java/com/rfs/worker/DocumentsStepTest.java @@ -45,6 +45,7 @@ @ExtendWith(MockitoExtension.class) public class DocumentsStepTest { private SharedMembers testMembers; + private SnapshotShardUnpacker unpacker; @BeforeEach void setUp() { @@ -55,10 +56,14 @@ void setUp() { IndexMetadata.Factory metadataFactory = Mockito.mock(IndexMetadata.Factory.class); ShardMetadata.Factory shardMetadataFactory = Mockito.mock(ShardMetadata.Factory.class); - SnapshotShardUnpacker unpacker = Mockito.mock(SnapshotShardUnpacker.class); + + unpacker = Mockito.mock(SnapshotShardUnpacker.class); + SnapshotShardUnpacker.Factory unpackerFactory = Mockito.mock(SnapshotShardUnpacker.Factory.class); + lenient().when(unpackerFactory.create(any())).thenReturn(unpacker); + LuceneDocumentsReader reader = Mockito.mock(LuceneDocumentsReader.class); DocumentReindexer reindexer = Mockito.mock(DocumentReindexer.class); - testMembers = new SharedMembers(globalState, cmsClient, snapshotName, maxShardSizeBytes, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + testMembers = new SharedMembers(globalState, cmsClient, snapshotName, maxShardSizeBytes, metadataFactory, shardMetadataFactory, unpackerFactory, reader, reindexer); } @Test @@ -525,7 +530,8 @@ void MigrateDocuments_workToDo_AsExpected(CmsEntry.DocumentsWorkItem workItem, C // Check the results Mockito.verify(testMembers.reindexer, times(1)).reindex(workItem.indexName, documents); - Mockito.verify(testMembers.unpacker, times(1)).cleanUp(shardMetadata); + Mockito.verify(testMembers.unpackerFactory, times(1)).create(shardMetadata); + Mockito.verify(unpacker, times(1)).close(); Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsWorkItemForceful(updatedItem); assertEquals(DocumentsStep.GetDocumentsToMigrate.class, nextStep.getClass()); } @@ -559,7 +565,8 @@ void MigrateDocuments_failedItem_AsExpected() { // Check the results Mockito.verify(testMembers.reindexer, times(1)).reindex(workItem.indexName, documents); - Mockito.verify(testMembers.unpacker, times(1)).cleanUp(shardMetadata); + Mockito.verify(testMembers.unpackerFactory, times(1)).create(shardMetadata); + Mockito.verify(unpacker, times(1)).close(); Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsWorkItem(updatedItem, workItem); assertEquals(DocumentsStep.GetDocumentsToMigrate.class, nextStep.getClass()); } @@ -587,7 +594,8 @@ void MigrateDocuments_largeShard_AsExpected() { // Check the results Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsWorkItem(updatedItem, workItem); - Mockito.verify(testMembers.unpacker, times(1)).cleanUp(shardMetadata); + Mockito.verify(testMembers.unpackerFactory, times(0)).create(shardMetadata); + Mockito.verify(unpacker, times(0)).close(); assertEquals(DocumentsStep.GetDocumentsToMigrate.class, nextStep.getClass()); } From bd17b10306a60549489f3bd987ea0dda494af2e8 Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Wed, 12 Jun 2024 11:23:16 -0500 Subject: [PATCH 4/6] Small RFS bug fix Signed-off-by: Chris Helma --- .../java/com/rfs/worker/DocumentsStep.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/RFS/src/main/java/com/rfs/worker/DocumentsStep.java b/RFS/src/main/java/com/rfs/worker/DocumentsStep.java index 4d14d2969..4eddb407e 100644 --- a/RFS/src/main/java/com/rfs/worker/DocumentsStep.java +++ b/RFS/src/main/java/com/rfs/worker/DocumentsStep.java @@ -377,17 +377,16 @@ public void run() { try (SnapshotShardUnpacker unpacker = members.unpackerFactory.create(shardMetadata)) { unpacker.unpack(); - } - - Flux documents = members.reader.readDocuments(shardMetadata.getIndexName(), shardMetadata.getShardId()); - final ShardMetadata.Data finalShardMetadata = shardMetadata; // Define in local context for the lambda - members.reindexer.reindex(shardMetadata.getIndexName(), documents) - .doOnError(error -> logger.error("Error during reindexing: " + error)) - .doOnSuccess(done -> logger.info("Reindexing completed for Index " + finalShardMetadata.getIndexName() + ", Shard " + finalShardMetadata.getShardId())) - // Wait for the reindexing to complete before proceeding - .block(); - logger.info("Docs migrated"); + Flux documents = members.reader.readDocuments(shardMetadata.getIndexName(), shardMetadata.getShardId()); + final ShardMetadata.Data finalShardMetadata = shardMetadata; // Define in local context for the lambda + members.reindexer.reindex(shardMetadata.getIndexName(), documents) + .doOnError(error -> logger.error("Error during reindexing: " + error)) + .doOnSuccess(done -> logger.info("Reindexing completed for Index " + finalShardMetadata.getIndexName() + ", Shard " + finalShardMetadata.getShardId())) + // Wait for the reindexing to complete before proceeding + .block(); + logger.info("Docs migrated"); + } logger.info("Updating the Documents Work Item to indicate it has been completed..."); CmsEntry.DocumentsWorkItem updatedEntry = new CmsEntry.DocumentsWorkItem( @@ -396,7 +395,7 @@ public void run() { CmsEntry.DocumentsWorkItemStatus.COMPLETED, workItem.leaseExpiry, workItem.numAttempts - ); + ); members.cmsWorkEntry = Optional.of(members.cmsClient.updateDocumentsWorkItemForceful(updatedEntry)); logger.info("Documents Work Item updated"); From 4c25b5311d84c141cce07ae4050d9b7933edba2f Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Wed, 12 Jun 2024 13:08:38 -0500 Subject: [PATCH 5/6] More updates per PR comments Signed-off-by: Chris Helma --- RFS/build.gradle | 5 - .../java/com/rfs/DemoPrintOutSnapshot.java | 275 ------------------ RFS/src/main/java/com/rfs/RunRfsWorker.java | 70 ++--- ....java => EphemeralSourceRepoAccessor.java} | 4 +- .../java/com/rfs/worker/DocumentsRunner.java | 19 +- 5 files changed, 52 insertions(+), 321 deletions(-) delete mode 100644 RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java rename RFS/src/main/java/com/rfs/common/{DeletingSourceRepoAccessor.java => EphemeralSourceRepoAccessor.java} (90%) diff --git a/RFS/build.gradle b/RFS/build.gradle index f784e738e..e727e6270 100644 --- a/RFS/build.gradle +++ b/RFS/build.gradle @@ -108,11 +108,6 @@ jacocoTestReport { } } -task demoPrintOutSnapshot (type: JavaExec) { - classpath = sourceSets.main.runtimeClasspath - mainClass = 'com.rfs.DemoPrintOutSnapshot' -} - task copyDockerRuntimeJars (type: Sync) { duplicatesStrategy = DuplicatesStrategy.EXCLUDE description = 'Copy runtime JARs and app jar to docker build directory' diff --git a/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java b/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java deleted file mode 100644 index 6c1af1208..000000000 --- a/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java +++ /dev/null @@ -1,275 +0,0 @@ -package com.rfs; - -import java.nio.file.Paths; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; - -import org.apache.lucene.document.Document; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.IndexableField; -import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.util.BytesRef; - -import com.rfs.common.Uid; -import com.rfs.version_es_6_8.*; -import com.rfs.common.GlobalMetadata; -import com.rfs.common.IndexMetadata; -import com.rfs.common.SourceRepo; -import com.rfs.common.ShardMetadata; -import com.rfs.common.SnapshotMetadata; -import com.rfs.common.SnapshotRepo; -import com.rfs.common.SnapshotShardUnpacker; -import com.rfs.common.ClusterVersion; -import com.rfs.common.DefaultSourceRepoAccessor; -import com.rfs.common.FileSystemRepo; -import com.rfs.version_es_7_10.*; - -public class DemoPrintOutSnapshot { - - public static class Args { - @Parameter(names = {"-n", "--snapshot-name"}, description = "The name of the snapshot to read", required = true) - public String snapshotName; - - @Parameter(names = {"-d", "--snapshot-dir"}, description = "The absolute path to the snapshot directory", required = true) - public String snapshotDirPath; - - @Parameter(names = {"-l", "--lucene-dir"}, description = "The absolute path to the directory where we'll put the Lucene docs", required = true) - public String luceneBasePathString; - - @Parameter(names = {"-v", "--source-version"}, description = "Source version", required = true, converter = ClusterVersion.ArgsConverter.class) - public ClusterVersion sourceVersion; - } - - public static void main(String[] args) { - Args arguments = new Args(); - JCommander.newBuilder() - .addObject(arguments) - .build() - .parse(args); - - String snapshotName = arguments.snapshotName; - String snapshotDirPath = arguments.snapshotDirPath; - String luceneBasePathString = arguments.luceneBasePathString; - ClusterVersion sourceVersion = arguments.sourceVersion; - - if (!((sourceVersion == ClusterVersion.ES_6_8) || (sourceVersion == ClusterVersion.ES_7_10))) { - throw new IllegalArgumentException("Unsupported source version: " + sourceVersion); - } - - SourceRepo repo = new FileSystemRepo(Path.of(snapshotDirPath)); - - try { - // ========================================================================================================== - // Read the Repo data file - // ========================================================================================================== - System.out.println("=================================================================="); - System.out.println("Attempting to read Repo data file..."); - - SnapshotRepo.Provider repoDataProvider; - if (sourceVersion == ClusterVersion.ES_6_8) { - repoDataProvider = new SnapshotRepoProvider_ES_6_8(repo); - } else { - repoDataProvider = new SnapshotRepoProvider_ES_7_10(repo); - } - - System.out.println("--- Snapshots ---"); - repoDataProvider.getSnapshots().forEach(snapshot -> System.out.println(snapshot.getName() + " - " + snapshot.getId())); - - for (SnapshotRepo.Snapshot snapshot : repoDataProvider.getSnapshots()) { - System.out.println("--- Indices in " + snapshot.getName() + " ---"); - for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshot.getName())) { - System.out.println(index.getName() + " - " + index.getId()); - } - } - System.out.println("Repo data read successfully"); - - // ========================================================================================================== - // Read the Snapshot details - // ========================================================================================================== - System.out.println("=================================================================="); - System.out.println("Attempting to read Snapshot details..."); - String snapshotIdString = repoDataProvider.getSnapshotId(snapshotName); - - if (snapshotIdString == null) { - System.out.println("Snapshot not found"); - return; - } - - SnapshotMetadata.Data snapshotMetadata; - if (sourceVersion == ClusterVersion.ES_6_8) { - snapshotMetadata = new SnapshotMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName); - } else { - snapshotMetadata = new SnapshotMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName); - } - - System.out.println("Snapshot Metadata State: " + snapshotMetadata.getState()); - System.out.println("Snapshot Metadata State Reason: " + snapshotMetadata.getReason()); - System.out.println("Snapshot Metadata Version: " + snapshotMetadata.getVersionId()); - System.out.println("Snapshot Metadata Indices: " + snapshotMetadata.getIndices()); - System.out.println("Snapshot Metadata Shards Total: " + snapshotMetadata.getTotalShards()); - System.out.println("Snapshot Metadata Shards Successful: " + snapshotMetadata.getSuccessfulShards()); - - // ========================================================================================================== - // Read the Global Metadata - // ========================================================================================================== - System.out.println("=================================================================="); - System.out.println("Attempting to read Global Metadata details..."); - - GlobalMetadata.Data globalMetadata; - if (sourceVersion == ClusterVersion.ES_6_8) { - globalMetadata = new GlobalMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName); - } else { - globalMetadata = new GlobalMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName); - } - - if (sourceVersion == ClusterVersion.ES_6_8) { - GlobalMetadataData_ES_6_8 globalMetadataES68 = (GlobalMetadataData_ES_6_8) globalMetadata; - - List templateKeys = new ArrayList<>(); - globalMetadataES68.getTemplates().fieldNames().forEachRemaining(templateKeys::add); - System.out.println("Templates Keys: " + templateKeys); - } else if (sourceVersion == ClusterVersion.ES_7_10) { - GlobalMetadataData_ES_7_10 globalMetadataES710 = (GlobalMetadataData_ES_7_10) globalMetadata; - - List indexTemplateKeys = new ArrayList<>(); - globalMetadataES710.getIndexTemplates().fieldNames().forEachRemaining(indexTemplateKeys::add); - System.out.println("Index Templates Keys: " + indexTemplateKeys); - - List componentTemplateKeys = new ArrayList<>(); - globalMetadataES710.getComponentTemplates().fieldNames().forEachRemaining(componentTemplateKeys::add); - System.out.println("Component Templates Keys: " + componentTemplateKeys); - } - - // ========================================================================================================== - // Read all the Index Metadata - // ========================================================================================================== - System.out.println("=================================================================="); - System.out.println("Attempting to read Index Metadata..."); - - Map indexMetadatas = new HashMap<>(); - if (sourceVersion == ClusterVersion.ES_6_8) { - for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) { - IndexMetadata.Data indexMetadata = new IndexMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, index.getName()); - indexMetadatas.put(index.getName(), indexMetadata); - } - } else { - for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) { - IndexMetadata.Data indexMetadata = new IndexMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, index.getName()); - indexMetadatas.put(index.getName(), indexMetadata); - } - } - - for (IndexMetadata.Data indexMetadata : indexMetadatas.values()) { - System.out.println("Reading Index Metadata for index: " + indexMetadata.getName()); - System.out.println("Index Id: " + indexMetadata.getId()); - System.out.println("Index Number of Shards: " + indexMetadata.getNumberOfShards()); - System.out.println("Index Settings: " + indexMetadata.getSettings().toString()); - System.out.println("Index Mappings: " + indexMetadata.getMappings().toString()); - System.out.println("Index Aliases: " + indexMetadata.getAliases().toString()); - } - - System.out.println("Index Metadata read successfully"); - - // ========================================================================================================== - // Read the Index Shard Metadata for the Snapshot - // ========================================================================================================== - System.out.println("=================================================================="); - System.out.println("Attempting to read Index Shard Metadata..."); - for (IndexMetadata.Data indexMetadata : indexMetadatas.values()) { - System.out.println("Reading Index Shard Metadata for index: " + indexMetadata.getName()); - for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) { - System.out.println("=== Shard ID: " + shardId + " ==="); - - // Get the file mapping for the shard - ShardMetadata.Data shardMetadata; - if (sourceVersion == ClusterVersion.ES_6_8) { - shardMetadata = new ShardMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); - } else { - shardMetadata = new ShardMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); - } - System.out.println("Shard Metadata: " + shardMetadata.toString()); - } - } - - // ========================================================================================================== - // Unpack the blob files - // ========================================================================================================== - System.out.println("=================================================================="); - System.out.println("Unpacking blob files to disk..."); - - for (IndexMetadata.Data indexMetadata : indexMetadatas.values()){ - for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) { - ShardMetadata.Data shardMetadata; - if (sourceVersion == ClusterVersion.ES_6_8) { - shardMetadata = new ShardMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); - } else { - shardMetadata = new ShardMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); - } - - // Unpack the shard - int bufferSize; - if (sourceVersion == ClusterVersion.ES_6_8) { - bufferSize = ElasticsearchConstants_ES_6_8.BUFFER_SIZE_IN_BYTES; - } else { - bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES; - } - DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo); - - SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, Paths.get(luceneBasePathString), bufferSize); - try (SnapshotShardUnpacker unpacker = unpackerFactory.create(shardMetadata)) { - unpacker.unpack(); - } - - // Now, read the documents back out - System.out.println("--- Reading docs in the shard ---"); - Path luceneIndexDir = Paths.get(luceneBasePathString + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId()); - readDocumentsFromLuceneIndex(luceneIndexDir); - } - - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - private static void readDocumentsFromLuceneIndex(Path indexDirectoryPath) throws Exception { - // Opening the directory that contains the Lucene index - try (FSDirectory directory = FSDirectory.open(indexDirectoryPath); - IndexReader reader = DirectoryReader.open(directory)) { - - // Iterating over all documents in the index - for (int i = 0; i < reader.maxDoc(); i++) { - System.out.println("Reading Document"); - Document document = reader.document(i); - - BytesRef source_bytes = document.getBinaryValue("_source"); - if (source_bytes == null || source_bytes.bytes.length == 0) { // Skip deleted documents - String id = Uid.decodeId(reader.document(i).getBinaryValue("_id").bytes); - System.out.println("Document " + id + " is deleted"); - continue; - } - - // Iterate over all fields in the document - List fields = document.getFields(); - for (IndexableField field : fields) { - if ("_source".equals(field.name())){ - String source_string = source_bytes.utf8ToString(); - System.out.println("Field name: " + field.name() + ", Field value: " + source_string); - } else if ("_id".equals(field.name())){ - String uid = Uid.decodeId(document.getBinaryValue(field.name()).bytes); - System.out.println("Field name: " + field.name() + ", Field value: " + uid); - } else { - System.out.println("Field name: " + field.name()); - } - } - } - } - } -} \ No newline at end of file diff --git a/RFS/src/main/java/com/rfs/RunRfsWorker.java b/RFS/src/main/java/com/rfs/RunRfsWorker.java index 4fb4afed1..57df88ed8 100644 --- a/RFS/src/main/java/com/rfs/RunRfsWorker.java +++ b/RFS/src/main/java/com/rfs/RunRfsWorker.java @@ -21,7 +21,7 @@ import com.rfs.cms.OpenSearchCmsClient; import com.rfs.common.ClusterVersion; import com.rfs.common.ConnectionDetails; -import com.rfs.common.DeletingSourceRepoAccessor; +import com.rfs.common.EphemeralSourceRepoAccessor; import com.rfs.common.DocumentReindexer; import com.rfs.common.GlobalMetadata; import com.rfs.common.IndexMetadata; @@ -124,57 +124,57 @@ public static void main(String[] args) throws Exception { .build() .parse(args); - String snapshotName = arguments.snapshotName; - Path s3LocalDirPath = Paths.get(arguments.s3LocalDirPath); - String s3RepoUri = arguments.s3RepoUri; - String s3Region = arguments.s3Region; - Path luceneDirPath = Paths.get(arguments.luceneDirPath); - String sourceHost = arguments.sourceHost; - String sourceUser = arguments.sourceUser; - String sourcePass = arguments.sourcePass; - String targetHost = arguments.targetHost; - String targetUser = arguments.targetUser; - String targetPass = arguments.targetPass; - List indexTemplateAllowlist = arguments.indexTemplateAllowlist; - List componentTemplateAllowlist = arguments.componentTemplateAllowlist; - long maxShardSizeBytes = arguments.maxShardSizeBytes; - int awarenessDimensionality = arguments.minNumberOfReplicas + 1; - Level logLevel = arguments.logLevel; + final String snapshotName = arguments.snapshotName; + final Path s3LocalDirPath = Paths.get(arguments.s3LocalDirPath); + final String s3RepoUri = arguments.s3RepoUri; + final String s3Region = arguments.s3Region; + final Path luceneDirPath = Paths.get(arguments.luceneDirPath); + final String sourceHost = arguments.sourceHost; + final String sourceUser = arguments.sourceUser; + final String sourcePass = arguments.sourcePass; + final String targetHost = arguments.targetHost; + final String targetUser = arguments.targetUser; + final String targetPass = arguments.targetPass; + final List indexTemplateAllowlist = arguments.indexTemplateAllowlist; + final List componentTemplateAllowlist = arguments.componentTemplateAllowlist; + final long maxShardSizeBytes = arguments.maxShardSizeBytes; + final int awarenessDimensionality = arguments.minNumberOfReplicas + 1; + final Level logLevel = arguments.logLevel; Logging.setLevel(logLevel); - ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass); - ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass); + final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass); + final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass); try { logger.info("Running RfsWorker"); GlobalState globalState = GlobalState.getInstance(); OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection); OpenSearchClient targetClient = new OpenSearchClient(targetConnection); - CmsClient cmsClient = new OpenSearchCmsClient(targetClient); + final CmsClient cmsClient = new OpenSearchCmsClient(targetClient); - SnapshotCreator snapshotCreator = new S3SnapshotCreator(snapshotName, sourceClient, s3RepoUri, s3Region); - SnapshotRunner snapshotWorker = new SnapshotRunner(globalState, cmsClient, snapshotCreator); + final SnapshotCreator snapshotCreator = new S3SnapshotCreator(snapshotName, sourceClient, s3RepoUri, s3Region); + final SnapshotRunner snapshotWorker = new SnapshotRunner(globalState, cmsClient, snapshotCreator); snapshotWorker.run(); - SourceRepo sourceRepo = S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region); - SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); - GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider); - GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), componentTemplateAllowlist, indexTemplateAllowlist); - Transformer transformer = TransformFunctions.getTransformer(ClusterVersion.ES_7_10, ClusterVersion.OS_2_11, awarenessDimensionality); + final SourceRepo sourceRepo = S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region); + final SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); + final GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider); + final GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), componentTemplateAllowlist, indexTemplateAllowlist); + final Transformer transformer = TransformFunctions.getTransformer(ClusterVersion.ES_7_10, ClusterVersion.OS_2_11, awarenessDimensionality); MetadataRunner metadataWorker = new MetadataRunner(globalState, cmsClient, snapshotName, metadataFactory, metadataCreator, transformer); metadataWorker.run(); - IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); - IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient); - IndexRunner indexWorker = new IndexRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, indexCreator, transformer); + final IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); + final IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient); + final IndexRunner indexWorker = new IndexRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, indexCreator, transformer); indexWorker.run(); - ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); - DeletingSourceRepoAccessor repoAccessor = new DeletingSourceRepoAccessor(sourceRepo); - SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); - LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath); - DocumentReindexer reindexer = new DocumentReindexer(targetClient); + final ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); + final EphemeralSourceRepoAccessor repoAccessor = new EphemeralSourceRepoAccessor(sourceRepo); + final SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); + final LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath); + final DocumentReindexer reindexer = new DocumentReindexer(targetClient); DocumentsRunner documentsWorker = new DocumentsRunner(globalState, cmsClient, snapshotName, maxShardSizeBytes, indexMetadataFactory, shardMetadataFactory, unpackerFactory, reader, reindexer); documentsWorker.run(); diff --git a/RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java b/RFS/src/main/java/com/rfs/common/EphemeralSourceRepoAccessor.java similarity index 90% rename from RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java rename to RFS/src/main/java/com/rfs/common/EphemeralSourceRepoAccessor.java index d4073e680..e5e4a9b11 100644 --- a/RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java +++ b/RFS/src/main/java/com/rfs/common/EphemeralSourceRepoAccessor.java @@ -12,8 +12,8 @@ * is useful/interesting in the case where the files are large/numerous and you can easily re-acquire them - such as * if they are being loaded from S3. */ -public class DeletingSourceRepoAccessor extends SourceRepoAccessor { - public DeletingSourceRepoAccessor(SourceRepo repo) { +public class EphemeralSourceRepoAccessor extends SourceRepoAccessor { + public EphemeralSourceRepoAccessor(SourceRepo repo) { super(repo); } diff --git a/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java b/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java index e8c9ee6f9..758236e55 100644 --- a/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java +++ b/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java @@ -18,10 +18,21 @@ public class DocumentsRunner implements Runner { private final DocumentsStep.SharedMembers members; - public DocumentsRunner(GlobalState globalState, CmsClient cmsClient, String snapshotName, long maxShardSizeBytes, - IndexMetadata.Factory metadataFactory, ShardMetadata.Factory shardMetadataFactory, SnapshotShardUnpacker.Factory unpackerFactory, - LuceneDocumentsReader reader, DocumentReindexer reindexer) { - this.members = new DocumentsStep.SharedMembers(globalState, cmsClient, snapshotName, maxShardSizeBytes, metadataFactory, shardMetadataFactory, unpackerFactory, reader, reindexer); + public DocumentsRunner( + GlobalState globalState, CmsClient cmsClient, String snapshotName, long maxShardSizeBytes, + IndexMetadata.Factory metadataFactory, ShardMetadata.Factory shardMetadataFactory, + SnapshotShardUnpacker.Factory unpackerFactory, LuceneDocumentsReader reader, DocumentReindexer reindexer) { + this.members = new DocumentsStep.SharedMembers( + globalState, + cmsClient, + snapshotName, + maxShardSizeBytes, + metadataFactory, + shardMetadataFactory, + unpackerFactory, + reader, + reindexer + ); } @Override From 30826c796dd02a6c0d468645846633103b90dde1 Mon Sep 17 00:00:00 2001 From: Chris Helma Date: Wed, 12 Jun 2024 17:24:43 -0500 Subject: [PATCH 6/6] More updates per PR discussion Signed-off-by: Chris Helma --- RFS/src/main/java/com/rfs/RunRfsWorker.java | 4 ++-- .../rfs/common/DefaultSourceRepoAccessor.java | 2 ++ .../common/EphemeralSourceRepoAccessor.java | 14 ++++++++--- .../java/com/rfs/common/PartSliceStream.java | 2 -- .../com/rfs/common/SnapshotShardUnpacker.java | 23 +++++++++---------- .../com/rfs/common/SourceRepoAccessor.java | 1 + 6 files changed, 27 insertions(+), 19 deletions(-) diff --git a/RFS/src/main/java/com/rfs/RunRfsWorker.java b/RFS/src/main/java/com/rfs/RunRfsWorker.java index 57df88ed8..245c712a6 100644 --- a/RFS/src/main/java/com/rfs/RunRfsWorker.java +++ b/RFS/src/main/java/com/rfs/RunRfsWorker.java @@ -21,7 +21,7 @@ import com.rfs.cms.OpenSearchCmsClient; import com.rfs.common.ClusterVersion; import com.rfs.common.ConnectionDetails; -import com.rfs.common.EphemeralSourceRepoAccessor; +import com.rfs.common.DefaultSourceRepoAccessor; import com.rfs.common.DocumentReindexer; import com.rfs.common.GlobalMetadata; import com.rfs.common.IndexMetadata; @@ -171,7 +171,7 @@ public static void main(String[] args) throws Exception { indexWorker.run(); final ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); - final EphemeralSourceRepoAccessor repoAccessor = new EphemeralSourceRepoAccessor(sourceRepo); + final DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo); final SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); final LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath); final DocumentReindexer reindexer = new DocumentReindexer(targetClient); diff --git a/RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java b/RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java index 1bd729f48..385a79ac5 100644 --- a/RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java +++ b/RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java @@ -6,6 +6,8 @@ /* * Provides "simple" access to the underlying files in the source repo without any special behavior + * + * TODO: find a better approach to this (see https://opensearch.atlassian.net/browse/MIGRATIONS-1786) */ public class DefaultSourceRepoAccessor extends SourceRepoAccessor { public DefaultSourceRepoAccessor(SourceRepo repo) { diff --git a/RFS/src/main/java/com/rfs/common/EphemeralSourceRepoAccessor.java b/RFS/src/main/java/com/rfs/common/EphemeralSourceRepoAccessor.java index e5e4a9b11..43e9d23e0 100644 --- a/RFS/src/main/java/com/rfs/common/EphemeralSourceRepoAccessor.java +++ b/RFS/src/main/java/com/rfs/common/EphemeralSourceRepoAccessor.java @@ -6,13 +6,19 @@ import java.nio.file.Files; import java.nio.file.Path; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + /* * Provides access to the underlying files in the source repo and deletes the files after the Stream is closed. This * is useful/interesting in the case where the files are large/numerous and you can easily re-acquire them - such as * if they are being loaded from S3. + * + * TODO: find a better approach to this (see https://opensearch.atlassian.net/browse/MIGRATIONS-1786) */ public class EphemeralSourceRepoAccessor extends SourceRepoAccessor { + private static final Logger logger = LogManager.getLogger(EphemeralSourceRepoAccessor.class); public EphemeralSourceRepoAccessor(SourceRepo repo) { super(repo); } @@ -20,16 +26,16 @@ public EphemeralSourceRepoAccessor(SourceRepo repo) { @Override protected InputStream load(Path path) { try { - return new DeletingFileInputStream(path); + return new EphemeralFileInputStream(path); } catch (Exception e) { throw new CouldNotLoadRepoFile("Could not load file: " + path, e); } } - public static class DeletingFileInputStream extends FileInputStream { + public static class EphemeralFileInputStream extends FileInputStream { private final Path filePath; - public DeletingFileInputStream(Path filePath) throws IOException { + public EphemeralFileInputStream(Path filePath) throws IOException { super(filePath.toFile()); this.filePath = filePath; } @@ -39,6 +45,8 @@ public void close() throws IOException { try { super.close(); } finally { + logger.info("Deleting local file: " + filePath.toString()); + logger.warn("See: https://opensearch.atlassian.net/browse/MIGRATIONS-1786"); Files.deleteIfExists(filePath); } } diff --git a/RFS/src/main/java/com/rfs/common/PartSliceStream.java b/RFS/src/main/java/com/rfs/common/PartSliceStream.java index 6dbc3a827..230e5b249 100644 --- a/RFS/src/main/java/com/rfs/common/PartSliceStream.java +++ b/RFS/src/main/java/com/rfs/common/PartSliceStream.java @@ -2,8 +2,6 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; /** * Taken from Elasticsearch 6.8, combining the SlicedInputStream and PartSliceStream classes with our special sauce diff --git a/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java b/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java index c680b473b..073080e44 100644 --- a/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java +++ b/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java @@ -51,21 +51,20 @@ public void unpack() { for (ShardMetadata.FileInfo fileMetadata : shardMetadata.getFiles()) { logger.info("Unpacking - Blob Name: " + fileMetadata.getName() + ", Lucene Name: " + fileMetadata.getPhysicalName()); - IndexOutput indexOutput = primaryDirectory.createOutput(fileMetadata.getPhysicalName(), IOContext.DEFAULT); - - if (fileMetadata.getName().startsWith("v__")) { - final BytesRef hash = fileMetadata.getMetaHash(); - indexOutput.writeBytes(hash.bytes, hash.offset, hash.length); - } else { - try (InputStream stream = new PartSliceStream(repoAccessor, fileMetadata, shardMetadata.getIndexId(), shardMetadata.getShardId())) { - final byte[] buffer = new byte[Math.toIntExact(Math.min(bufferSize, fileMetadata.getLength()))]; - int length; - while ((length = stream.read(buffer)) > 0) { - indexOutput.writeBytes(buffer, 0, length); + try (IndexOutput indexOutput = primaryDirectory.createOutput(fileMetadata.getPhysicalName(), IOContext.DEFAULT);){ + if (fileMetadata.getName().startsWith("v__")) { + final BytesRef hash = fileMetadata.getMetaHash(); + indexOutput.writeBytes(hash.bytes, hash.offset, hash.length); + } else { + try (InputStream stream = new PartSliceStream(repoAccessor, fileMetadata, shardMetadata.getIndexId(), shardMetadata.getShardId())) { + final byte[] buffer = new byte[Math.toIntExact(Math.min(bufferSize, fileMetadata.getLength()))]; + int length; + while ((length = stream.read(buffer)) > 0) { + indexOutput.writeBytes(buffer, 0, length); + } } } } - indexOutput.close(); } } catch (Exception e) { throw new CouldNotUnpackShard("Could not unpack shard: Index " + shardMetadata.getIndexId() + ", Shard " + shardMetadata.getShardId(), e); diff --git a/RFS/src/main/java/com/rfs/common/SourceRepoAccessor.java b/RFS/src/main/java/com/rfs/common/SourceRepoAccessor.java index 54159c365..c3a7d1e77 100644 --- a/RFS/src/main/java/com/rfs/common/SourceRepoAccessor.java +++ b/RFS/src/main/java/com/rfs/common/SourceRepoAccessor.java @@ -3,6 +3,7 @@ import java.io.InputStream; import java.nio.file.Path; +// TODO: find a better approach to this (see https://opensearch.atlassian.net/browse/MIGRATIONS-1786) public abstract class SourceRepoAccessor { private final SourceRepo repo;