diff --git a/RFS/docs/DESIGN.md b/RFS/docs/DESIGN.md index d3d31ccb3..7a5e8775f 100644 --- a/RFS/docs/DESIGN.md +++ b/RFS/docs/DESIGN.md @@ -260,7 +260,7 @@ ID: metadata_status FIELDS: * status (string): IN_PROGRESS, COMPLETED, FAILED * leaseExpiry (timestamp): When the current work lease expires - * num_attempts (integer): Times the task has been attempted + * numAttempts (integer): Times the task has been attempted INDEX MIGRATION STATUS RECORD ID: index_status @@ -287,9 +287,10 @@ FIELDS: SHARD WORK ENTRY RECORD ID: _ FIELDS: - * index (string): The index name - * shard (integer): The shard number + * indexName (string): The index name + * shardId (integer): The shard number * status (string): NOT_STARTED, COMPLETED, FAILED + * leaseExpiry (timestamp): When the current work lease expires * numAttempts (integer): Times the task has been attempted ``` diff --git a/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java b/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java index f7dc877ba..6f8b4a3c7 100644 --- a/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java +++ b/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java @@ -189,9 +189,9 @@ public static void main(String[] args) { // Get the file mapping for the shard ShardMetadata.Data shardMetadata; if (sourceVersion == ClusterVersion.ES_6_8) { - shardMetadata = new ShardMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId); + shardMetadata = new ShardMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); } else { - shardMetadata = new ShardMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId); + shardMetadata = new ShardMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); } System.out.println("Shard Metadata: " + shardMetadata.toString()); } @@ -207,9 +207,9 @@ public static void main(String[] args) { for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) { ShardMetadata.Data shardMetadata; if (sourceVersion == ClusterVersion.ES_6_8) { - shardMetadata = new ShardMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId); + shardMetadata = new ShardMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); } else { - shardMetadata = new ShardMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId); + shardMetadata = new ShardMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); } // Unpack the shard @@ -219,7 +219,8 @@ public static void main(String[] args) { } else { bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES; } - SnapshotShardUnpacker.unpack(repo, shardMetadata, Paths.get(luceneBasePathString), bufferSize); + SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, Paths.get(luceneBasePathString), bufferSize); + unpacker.unpack(shardMetadata); // Now, read the documents back out System.out.println("--- Reading docs in the shard ---"); diff --git a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java index c9f6ddb3a..31ec61cac 100644 --- a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java +++ b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java @@ -323,6 +323,14 @@ public static void main(String[] args) throws InterruptedException { logger.info("=================================================================="); logger.info("Unpacking blob files to disk..."); + int bufferSize; + if (sourceVersion == ClusterVersion.ES_6_8) { + bufferSize = ElasticsearchConstants_ES_6_8.BUFFER_SIZE_IN_BYTES; + } else { + bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES; + } + SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, luceneDirPath, bufferSize); + for (IndexMetadata.Data indexMetadata : indexMetadatas) { logger.info("Processing index: " + indexMetadata.getName()); for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) { @@ -331,20 +339,13 @@ public static void main(String[] args) throws InterruptedException { // Get the shard metadata ShardMetadata.Data shardMetadata; if (sourceVersion == ClusterVersion.ES_6_8) { - shardMetadata = new ShardMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId); + shardMetadata = new ShardMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); } else { - shardMetadata = new ShardMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId); + shardMetadata = new ShardMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId); } // Unpack the shard - int bufferSize; - if (sourceVersion == ClusterVersion.ES_6_8) { - bufferSize = ElasticsearchConstants_ES_6_8.BUFFER_SIZE_IN_BYTES; - } else { - bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES; - } - - SnapshotShardUnpacker.unpack(repo, shardMetadata, luceneDirPath, bufferSize); + unpacker.unpack(shardMetadata); } } @@ -356,15 +357,18 @@ public static void main(String[] args) throws InterruptedException { logger.info("=================================================================="); logger.info("Reindexing the documents..."); + LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath); + DocumentReindexer reindexer = new DocumentReindexer(targetClient); + for (IndexMetadata.Data indexMetadata : indexMetadatas) { for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) { logger.info("=== Index Id: " + indexMetadata.getName() + ", Shard ID: " + shardId + " ==="); - Flux documents = new LuceneDocumentsReader().readDocuments(luceneDirPath, indexMetadata.getName(), shardId); + Flux documents = reader.readDocuments(indexMetadata.getName(), shardId); String targetIndex = indexMetadata.getName() + indexSuffix; final int finalShardId = shardId; // Define in local context for the lambda - DocumentReindexer.reindex(targetIndex, documents, targetClient) + reindexer.reindex(targetIndex, documents) .doOnError(error -> logger.error("Error during reindexing: " + error)) .doOnSuccess(done -> logger.info("Reindexing completed for index " + targetIndex + ", shard " + finalShardId)) // Wait for the shard reindexing to complete before proceeding; fine in this demo script, but @@ -373,7 +377,7 @@ public static void main(String[] args) throws InterruptedException { } } logger.info("Refreshing target cluster to reflect newly added documents"); - DocumentReindexer.refreshAllDocuments(targetConnection); + reindexer.refreshAllDocuments(targetConnection); logger.info("Refresh complete"); } diff --git a/RFS/src/main/java/com/rfs/RunRfsWorker.java b/RFS/src/main/java/com/rfs/RunRfsWorker.java index 11b6b1580..abcb7ac12 100644 --- a/RFS/src/main/java/com/rfs/RunRfsWorker.java +++ b/RFS/src/main/java/com/rfs/RunRfsWorker.java @@ -21,23 +21,30 @@ import com.rfs.cms.OpenSearchCmsClient; import com.rfs.common.ClusterVersion; import com.rfs.common.ConnectionDetails; +import com.rfs.common.DocumentReindexer; import com.rfs.common.GlobalMetadata; import com.rfs.common.IndexMetadata; import com.rfs.common.Logging; +import com.rfs.common.LuceneDocumentsReader; import com.rfs.common.OpenSearchClient; import com.rfs.common.S3Uri; +import com.rfs.common.ShardMetadata; import com.rfs.common.S3Repo; import com.rfs.common.SnapshotCreator; import com.rfs.common.SourceRepo; import com.rfs.common.S3SnapshotCreator; import com.rfs.common.SnapshotRepo; +import com.rfs.common.SnapshotShardUnpacker; import com.rfs.transformers.TransformFunctions; import com.rfs.transformers.Transformer; +import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10; import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10; import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10; +import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10; import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10; import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11; import com.rfs.version_os_2_11.IndexCreator_OS_2_11; +import com.rfs.worker.DocumentsRunner; import com.rfs.worker.GlobalState; import com.rfs.worker.IndexRunner; import com.rfs.worker.MetadataRunner; @@ -61,6 +68,9 @@ public static class Args { @Parameter(names = {"--s3-region"}, description = "The AWS Region the S3 bucket is in, like: us-east-2", required = true) public String s3Region; + @Parameter(names = {"--lucene-dir"}, description = "The absolute path to the directory where we'll put the Lucene docs", required = true) + public String luceneDirPath; + @Parameter(names = {"--source-host"}, description = "The source host and port (e.g. http://localhost:9200)", required = true) public String sourceHost; @@ -79,17 +89,17 @@ public static class Args { @Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false) public String targetPass = null; - @Parameter(names = {"--index-whitelist"}, description = ("Optional. List of index names to migrate" + @Parameter(names = {"--index-allowlist"}, description = ("Optional. List of index names to migrate" + " (e.g. 'logs_2024_01, logs_2024_02'). Default: all indices"), required = false) - public List indexWhitelist = List.of(); + public List indexAllowlist = List.of(); - @Parameter(names = {"--index-template-whitelist"}, description = ("Optional. List of index template names to migrate" + @Parameter(names = {"--index-template-allowlist"}, description = ("Optional. List of index template names to migrate" + " (e.g. 'posts_index_template1, posts_index_template2'). Default: empty list"), required = false) - public List indexTemplateWhitelist = List.of(); + public List indexTemplateAllowlist = List.of(); - @Parameter(names = {"--component-template-whitelist"}, description = ("Optional. List of component template names to migrate" + @Parameter(names = {"--component-template-allowlist"}, description = ("Optional. List of component template names to migrate" + " (e.g. 'posts_template1, posts_template2'). Default: empty list"), required = false) - public List componentTemplateWhitelist = List.of(); + public List componentTemplateAllowlist = List.of(); //https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/ @Parameter(names = {"--min-replicas"}, description = ("Optional. The minimum number of replicas configured for migrated indices on the target." @@ -113,14 +123,15 @@ public static void main(String[] args) throws Exception { Path s3LocalDirPath = Paths.get(arguments.s3LocalDirPath); String s3RepoUri = arguments.s3RepoUri; String s3Region = arguments.s3Region; + Path luceneDirPath = Paths.get(arguments.luceneDirPath); String sourceHost = arguments.sourceHost; String sourceUser = arguments.sourceUser; String sourcePass = arguments.sourcePass; String targetHost = arguments.targetHost; String targetUser = arguments.targetUser; String targetPass = arguments.targetPass; - List indexTemplateWhitelist = arguments.indexTemplateWhitelist; - List componentTemplateWhitelist = arguments.componentTemplateWhitelist; + List indexTemplateAllowlist = arguments.indexTemplateAllowlist; + List componentTemplateAllowlist = arguments.componentTemplateAllowlist; int awarenessDimensionality = arguments.minNumberOfReplicas + 1; Level logLevel = arguments.logLevel; @@ -143,7 +154,7 @@ public static void main(String[] args) throws Exception { SourceRepo sourceRepo = S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region); SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider); - GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), componentTemplateWhitelist, indexTemplateWhitelist); + GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), componentTemplateAllowlist, indexTemplateAllowlist); Transformer transformer = TransformFunctions.getTransformer(ClusterVersion.ES_7_10, ClusterVersion.OS_2_11, awarenessDimensionality); MetadataRunner metadataWorker = new MetadataRunner(globalState, cmsClient, snapshotName, metadataFactory, metadataCreator, transformer); metadataWorker.run(); @@ -152,6 +163,13 @@ public static void main(String[] args) throws Exception { IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient); IndexRunner indexWorker = new IndexRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, indexCreator, transformer); indexWorker.run(); + + ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); + SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(sourceRepo, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); + LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath); + DocumentReindexer reindexer = new DocumentReindexer(targetClient); + DocumentsRunner documentsWorker = new DocumentsRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + documentsWorker.run(); } catch (Runner.PhaseFailed e) { logPhaseFailureRecord(e.phase, e.nextStep, e.cmsEntry, e.e); @@ -173,7 +191,7 @@ public static void logPhaseFailureRecord(GlobalState.Phase phase, WorkerStep nex String currentStep = (nextStep != null) ? nextStep.getClass().getSimpleName() : "null"; errorBlob.put("currentStep", currentStep); - String currentEntry = (cmsEntry.isPresent()) ? cmsEntry.toString() : "null"; + String currentEntry = (cmsEntry.isPresent()) ? cmsEntry.get().toRepresentationString() : "null"; errorBlob.put("cmsEntry", currentEntry); diff --git a/RFS/src/main/java/com/rfs/cms/CmsClient.java b/RFS/src/main/java/com/rfs/cms/CmsClient.java index 8acb7e8e1..142d572f2 100644 --- a/RFS/src/main/java/com/rfs/cms/CmsClient.java +++ b/RFS/src/main/java/com/rfs/cms/CmsClient.java @@ -84,4 +84,45 @@ public interface CmsClient { * Retrieves a set of Index Work Items from the CMS that appear ready to be worked on, up to the specified limit. */ public List getAvailableIndexWorkItems(int maxItems); + + /* + * Creates a new entry in the CMS for the Documents Migration's progress. Returns an Optional; if the document was + * created, it will be the created entry and empty otherwise. + */ + public Optional createDocumentsEntry(); + + /* + * Attempt to retrieve the Documents Migration entry from the CMS, if it exists. Returns an Optional; if the document + * exists, it will be the retrieved entry and empty otherwise. + */ + public Optional getDocumentsEntry(); + + /* + * Updates the Documents Migration entry in the CMS. Returns an Optional; if the document was updated, + * it will be the updated entry and empty otherwise. + */ + public Optional updateDocumentsEntry(CmsEntry.Documents newEntry, CmsEntry.Documents lastEntry); + + /* + * Creates a new entry in the CMS for an Documents Work Item. Returns an Optional; if the document was + * created, it will be the created entry and empty otherwise. + */ + public Optional createDocumentsWorkItem(String indexName, int shardId); + + /* + * Updates the Documents Work Item in the CMS. Returns an Optional; if the document was updated, + * it will be the updated entry and empty otherwise. + */ + public Optional updateDocumentsWorkItem(CmsEntry.DocumentsWorkItem newEntry, CmsEntry.DocumentsWorkItem lastEntry); + + /* + * Forcefully updates the Documents Work Item in the CMS. This method should be used when you don't care about collisions + * and just want to overwrite the existing entry no matter what. Returns the updated entry. + */ + public CmsEntry.DocumentsWorkItem updateDocumentsWorkItemForceful(CmsEntry.DocumentsWorkItem newEntry); + + /* + * Retrieves a Documents Work Items from the CMS that appears ready to be worked on. + */ + public Optional getAvailableDocumentsWorkItem(); } diff --git a/RFS/src/main/java/com/rfs/cms/CmsEntry.java b/RFS/src/main/java/com/rfs/cms/CmsEntry.java index 1eec3e84f..7debaddbf 100644 --- a/RFS/src/main/java/com/rfs/cms/CmsEntry.java +++ b/RFS/src/main/java/com/rfs/cms/CmsEntry.java @@ -3,19 +3,23 @@ import com.rfs.common.RfsException; +import lombok.RequiredArgsConstructor; + public class CmsEntry { public static enum EntryType { SNAPSHOT, METADATA, INDEX, INDEX_WORK_ITEM, + DOCUMENTS, + DOCUMENTS_WORK_ITEM } public abstract static class Base { protected Base() {} - @Override - public abstract String toString(); + // Implementations of this method should provide a string version of the object that fully represents its contents + public abstract String toRepresentationString(); @Override public boolean equals(Object obj) { @@ -26,12 +30,12 @@ public boolean equals(Object obj) { return false; } Base other = (Base) obj; - return this.toString().equals(other.toString()); + return this.toRepresentationString().equals(other.toRepresentationString()); } @Override public int hashCode() { - return this.toString().hashCode(); + return this.toRepresentationString().hashCode(); } } @@ -47,9 +51,9 @@ protected Leasable() {} public static int getLeaseDurationMs(int numAttempts) { if (numAttempts > MAX_ATTEMPTS) { - throw new CouldNotFindNextLeaseDuration("numAttempts=" + numAttempts + " is greater than MAX_ATTEMPTS=" + MAX_ATTEMPTS); + throw new CouldNotGenerateNextLeaseDuration("numAttempts=" + numAttempts + " is greater than MAX_ATTEMPTS=" + MAX_ATTEMPTS); } else if (numAttempts < 1) { - throw new CouldNotFindNextLeaseDuration("numAttempts=" + numAttempts + " is less than 1"); + throw new CouldNotGenerateNextLeaseDuration("numAttempts=" + numAttempts + " is less than 1"); } return LEASE_MS * numAttempts; // Arbitratily chosen algorithm } @@ -71,19 +75,14 @@ public static enum SnapshotStatus { /* * Used to track the progress of taking a snapshot of the source cluster */ + @RequiredArgsConstructor public static class Snapshot extends Base { public final EntryType type = EntryType.SNAPSHOT; public final String name; public final SnapshotStatus status; - public Snapshot(String name, SnapshotStatus status) { - super(); - this.name = name; - this.status = status; - } - @Override - public String toString() { + public String toRepresentationString() { return "Snapshot(" + "type='" + type.toString() + "," + "name='" + name + "," @@ -101,21 +100,15 @@ public static enum MetadataStatus { /* * Used to track the progress of migrating all the templates from the source cluster */ + @RequiredArgsConstructor public static class Metadata extends Leasable { public final EntryType type = EntryType.METADATA; public final MetadataStatus status; public final String leaseExpiry; public final Integer numAttempts; - public Metadata(MetadataStatus status, String leaseExpiry, int numAttempts) { - super(); - this.status = status; - this.leaseExpiry = leaseExpiry; - this.numAttempts = numAttempts; - } - @Override - public String toString() { + public String toRepresentationString() { return "Metadata(" + "type='" + type.toString() + "," + "status=" + status.toString() + "," @@ -135,21 +128,15 @@ public static enum IndexStatus { /* * Used to track the progress of migrating all the indices from the soruce cluster */ + @RequiredArgsConstructor public static class Index extends Leasable { public final EntryType type = EntryType.INDEX; public final IndexStatus status; public final String leaseExpiry; public final Integer numAttempts; - public Index(IndexStatus status, String leaseExpiry, int numAttempts) { - super(); - this.status = status; - this.leaseExpiry = leaseExpiry; - this.numAttempts = numAttempts; - } - @Override - public String toString() { + public String toRepresentationString() { return "Index(" + "type='" + type.toString() + "," + "status=" + status.toString() + "," @@ -168,6 +155,7 @@ public static enum IndexWorkItemStatus { /* * Used to track the migration of a particular index from the source cluster */ + @RequiredArgsConstructor public static class IndexWorkItem extends Base { public final EntryType type = EntryType.INDEX_WORK_ITEM; public static final int ATTEMPTS_SOFT_LIMIT = 3; // will make at least this many attempts; arbitrarily chosen @@ -177,16 +165,8 @@ public static class IndexWorkItem extends Base { public final Integer numAttempts; public final Integer numShards; - public IndexWorkItem(String name, IndexWorkItemStatus status, int numAttempts, int numShards) { - super(); - this.name = name; - this.status = status; - this.numAttempts = numAttempts; - this.numShards = numShards; - } - @Override - public String toString() { + public String toRepresentationString() { return "IndexWorkItem(" + "type='" + type.toString() + "," + "name=" + name.toString() + "," @@ -197,8 +177,68 @@ public String toString() { } } - public static class CouldNotFindNextLeaseDuration extends RfsException { - public CouldNotFindNextLeaseDuration(String message) { + public static enum DocumentsStatus { + SETUP, + IN_PROGRESS, + COMPLETED, + FAILED, + } + + /* + * Used to track the progress of migrating all the documents from the soruce cluster + */ + @RequiredArgsConstructor + public static class Documents extends Leasable { + public final EntryType type = EntryType.DOCUMENTS; + public final DocumentsStatus status; + public final String leaseExpiry; + public final Integer numAttempts; + + @Override + public String toRepresentationString() { + return "Documents(" + + "type='" + type.toString() + "," + + "status=" + status.toString() + "," + + "leaseExpiry=" + leaseExpiry + "," + + "numAttempts=" + numAttempts.toString() + + ")"; + } + } + + public static enum DocumentsWorkItemStatus { + NOT_STARTED, + COMPLETED, + FAILED, + } + + /* + * Used to track the migration of a particular index from the source cluster + */ + @RequiredArgsConstructor + public static class DocumentsWorkItem extends Leasable { + public final EntryType type = EntryType.DOCUMENTS_WORK_ITEM; + + public final String indexName; + public final Integer shardId; + public final DocumentsWorkItemStatus status; + public final String leaseExpiry; + public final Integer numAttempts; + + @Override + public String toRepresentationString() { + return "DocumentsWorkItem(" + + "type='" + type.toString() + "," + + "indexName=" + indexName.toString() + "," + + "shardId=" + shardId.toString() + "," + + "status=" + status.toString() + "," + + "leaseExpiry=" + leaseExpiry.toString() + "," + + "numAttempts=" + numAttempts.toString() + + ")"; + } + } + + public static class CouldNotGenerateNextLeaseDuration extends RfsException { + public CouldNotGenerateNextLeaseDuration(String message) { super("Could not find next lease duration. Reason: " + message); } } diff --git a/RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java b/RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java index 31efa9e73..57697ccd8 100644 --- a/RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java +++ b/RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java @@ -18,12 +18,18 @@ public class OpenSearchCmsClient implements CmsClient { public static final String CMS_SNAPSHOT_DOC_ID = "snapshot_status"; public static final String CMS_METADATA_DOC_ID = "metadata_status"; public static final String CMS_INDEX_DOC_ID = "index_status"; + public static final String CMS_DOCUMENTS_DOC_ID = "documents_status"; public static String getIndexWorkItemDocId(String name) { // iwi => index work item return "iwi_" + name; } + public static String getDocumentsWorkItemDocId(String indexName, int shardId) { + // dwi => documents work item + return "dwi_" + indexName + "_" + shardId; + } + private final OpenSearchClient client; public OpenSearchCmsClient(OpenSearchClient client) { @@ -139,11 +145,11 @@ public Optional createIndexWorkItem(String name, int num public Optional updateIndexWorkItem(CmsEntry.IndexWorkItem newEntry, CmsEntry.IndexWorkItem lastEntry) { // Pull the existing entry to ensure that it hasn't changed since we originally retrieved it ObjectNode currentEntryRaw = client.getDocument(CMS_INDEX_NAME, getIndexWorkItemDocId(lastEntry.name)) - .orElseThrow(() -> new RfsException("Failed to update index work item: " + lastEntry.name + " does not exist")); + .orElseThrow(() -> new RfsException("Failed to update index work item: " + getIndexWorkItemDocId(lastEntry.name) + " does not exist")); OpenSearchCmsEntry.IndexWorkItem currentEntry = OpenSearchCmsEntry.IndexWorkItem.fromJson((ObjectNode) currentEntryRaw.get("_source")); if (!currentEntry.equals(new OpenSearchCmsEntry.IndexWorkItem(lastEntry))) { - logger.info("Failed to update index work item: " + lastEntry.name + " has changed we first retrieved it"); + logger.info("Failed to update index work item: " + getIndexWorkItemDocId(lastEntry.name) + " has changed we first retrieved it"); return Optional.empty(); } @@ -200,4 +206,112 @@ public List getAvailableIndexWorkItems(int maxItems) { return workItems; } + + @Override + public Optional createDocumentsEntry() { + OpenSearchCmsEntry.Documents entry = OpenSearchCmsEntry.Documents.getInitial(); + Optional createdEntry = client.createDocument(CMS_INDEX_NAME, CMS_DOCUMENTS_DOC_ID, entry.toJson()); + return createdEntry.map(OpenSearchCmsEntry.Documents::fromJson); + + } + + @Override + public Optional getDocumentsEntry() { + Optional document = client.getDocument(CMS_INDEX_NAME, CMS_DOCUMENTS_DOC_ID); + return document.map(doc -> (ObjectNode) doc.get("_source")) + .map(OpenSearchCmsEntry.Documents::fromJson); + } + + @Override + public Optional updateDocumentsEntry(CmsEntry.Documents newEntry, CmsEntry.Documents lastEntry) { + // Pull the existing entry to ensure that it hasn't changed since we originally retrieved it + ObjectNode currentEntryRaw = client.getDocument(CMS_INDEX_NAME, CMS_DOCUMENTS_DOC_ID) + .orElseThrow(() -> new RfsException("Failed to update documents entry: " + CMS_DOCUMENTS_DOC_ID + " does not exist")); + + OpenSearchCmsEntry.Documents currentEntry = OpenSearchCmsEntry.Documents.fromJson((ObjectNode) currentEntryRaw.get("_source")); + if (!currentEntry.equals(new OpenSearchCmsEntry.Documents(lastEntry))) { + logger.info("Failed to documents index entry: " + CMS_DOCUMENTS_DOC_ID + " has changed we first retrieved it"); + return Optional.empty(); + } + + // Now attempt the update + ObjectNode newEntryJson = new OpenSearchCmsEntry.Documents(newEntry).toJson(); + Optional updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_DOCUMENTS_DOC_ID, newEntryJson, currentEntryRaw); + return updatedEntry.map(OpenSearchCmsEntry.Documents::fromJson); + } + + @Override + public Optional createDocumentsWorkItem(String indexName, int shardId) { + OpenSearchCmsEntry.DocumentsWorkItem entry = OpenSearchCmsEntry.DocumentsWorkItem.getInitial(indexName, shardId); + Optional createdEntry = client.createDocument(CMS_INDEX_NAME, getDocumentsWorkItemDocId(indexName, shardId), entry.toJson()); + return createdEntry.map(OpenSearchCmsEntry.DocumentsWorkItem::fromJson); + } + + @Override + public Optional updateDocumentsWorkItem(CmsEntry.DocumentsWorkItem newEntry, CmsEntry.DocumentsWorkItem lastEntry) { + String docId = getDocumentsWorkItemDocId(lastEntry.indexName, lastEntry.shardId); + + // Pull the existing entry to ensure that it hasn't changed since we originally retrieved it + ObjectNode currentEntryRaw = client.getDocument(CMS_INDEX_NAME, docId) + .orElseThrow(() -> new RfsException("Failed to update documents work item: " + docId + " does not exist")); + + OpenSearchCmsEntry.DocumentsWorkItem currentEntry = OpenSearchCmsEntry.DocumentsWorkItem.fromJson((ObjectNode) currentEntryRaw.get("_source")); + if (!currentEntry.equals(new OpenSearchCmsEntry.DocumentsWorkItem(lastEntry))) { + logger.info("Failed to update documents work item: " + docId + " has changed we first retrieved it"); + return Optional.empty(); + } + + // Now attempt the update + ObjectNode newEntryJson = new OpenSearchCmsEntry.DocumentsWorkItem(newEntry).toJson(); + Optional updatedEntry = client.updateDocument(CMS_INDEX_NAME, docId, newEntryJson, currentEntryRaw); + return updatedEntry.map(OpenSearchCmsEntry.DocumentsWorkItem::fromJson); + } + + @Override + public CmsEntry.DocumentsWorkItem updateDocumentsWorkItemForceful(CmsEntry.DocumentsWorkItem newEntry) { + // Now attempt the update + ObjectNode newEntryJson = new OpenSearchCmsEntry.DocumentsWorkItem(newEntry).toJson(); + ObjectNode updatedEntry = client.updateDocumentForceful(CMS_INDEX_NAME, getDocumentsWorkItemDocId(newEntry.indexName, newEntry.shardId), newEntryJson); + return OpenSearchCmsEntry.DocumentsWorkItem.fromJson(updatedEntry); + } + + @Override + public Optional getAvailableDocumentsWorkItem() { + // Ensure we have a relatively fresh view of the index + client.refresh(); + + // Pull the docs + String queryBody = "{\n" + + " \"query\": {\n" + + " \"function_score\": {\n" + + " \"query\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"match\": {\n" + + " \"type\": \"DOCUMENTS_WORK_ITEM\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"match\": {\n" + + " \"status\": \"NOT_STARTED\"\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"random_score\": {}\n" + // Try to avoid the workers fighting for the same work items + " }\n" + + " },\n" + + " \"size\": 1\n" + // At most one result + "}"; + + List hits = client.searchDocuments(CMS_INDEX_NAME, queryBody); + List workItems = hits.stream() + .map(hit -> (ObjectNode) hit.get("_source")) + .map(OpenSearchCmsEntry.DocumentsWorkItem::fromJson) + .collect(Collectors.toList()); + + return workItems.isEmpty() ? Optional.empty() : Optional.of(workItems.get(0)); + } } diff --git a/RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java b/RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java index b339eec49..2be150867 100644 --- a/RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java +++ b/RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java @@ -46,7 +46,7 @@ public ObjectNode toJson() { } @Override - public String toString() { + public String toRepresentationString() { return this.toJson().toString(); } } @@ -97,7 +97,7 @@ public ObjectNode toJson() { } @Override - public String toString() { + public String toRepresentationString() { return this.toJson().toString(); } } @@ -110,7 +110,7 @@ public static class Index extends CmsEntry.Index { public static Index getInitial() { return new Index( - CmsEntry.IndexStatus.IN_PROGRESS, + CmsEntry.IndexStatus.SETUP, // TODO: We should be ideally setting the lease using the server's clock, but it's unclear on the best way // to do this. For now, we'll just use the client's clock. CmsEntry.Index.getLeaseExpiry(Instant.now().toEpochMilli(), 1), @@ -148,7 +148,7 @@ public ObjectNode toJson() { } @Override - public String toString() { + public String toRepresentationString() { return this.toJson().toString(); } } @@ -178,7 +178,7 @@ public static IndexWorkItem fromJson(ObjectNode node) { node.get(FIELD_NUM_SHARDS).asInt() ); } catch (Exception e) { - throw new CantParseCmsEntryFromJson(Index.class, node.toString(), e); + throw new CantParseCmsEntryFromJson(IndexWorkItem.class, node.toString(), e); } } @@ -201,11 +201,133 @@ public ObjectNode toJson() { } @Override - public String toString() { + public String toRepresentationString() { return this.toJson().toString(); } } + public static class Documents extends CmsEntry.Documents { + public static final String FIELD_TYPE = "type"; + public static final String FIELD_STATUS = "status"; + public static final String FIELD_LEASE_EXPIRY = "leaseExpiry"; + public static final String FIELD_NUM_ATTEMPTS = "numAttempts"; + + public static Documents getInitial() { + return new Documents( + CmsEntry.DocumentsStatus.SETUP, + // TODO: We should be ideally setting the lease using the server's clock, but it's unclear on the best way + // to do this. For now, we'll just use the client's clock. + CmsEntry.Documents.getLeaseExpiry(Instant.now().toEpochMilli(), 1), + 1 + ); + } + + public static Documents fromJson(ObjectNode node) { + try { + return new Documents( + CmsEntry.DocumentsStatus.valueOf(node.get(FIELD_STATUS).asText()), + node.get(FIELD_LEASE_EXPIRY).asText(), + node.get(FIELD_NUM_ATTEMPTS).asInt() + ); + } catch (Exception e) { + throw new CantParseCmsEntryFromJson(Documents.class, node.toString(), e); + } + } + + public Documents(CmsEntry.DocumentsStatus status, String leaseExpiry, Integer numAttempts) { + super(status, leaseExpiry, numAttempts); + } + + public Documents(CmsEntry.Documents entry) { + this(entry.status, entry.leaseExpiry, entry.numAttempts); + } + + public ObjectNode toJson() { + ObjectNode node = objectMapper.createObjectNode(); + node.put(FIELD_TYPE, type.toString()); + node.put(FIELD_STATUS, status.toString()); + node.put(FIELD_LEASE_EXPIRY, leaseExpiry); + node.put(FIELD_NUM_ATTEMPTS, numAttempts); + return node; + } + + @Override + public String toRepresentationString() { + return this.toJson().toString(); + } + } + + public static class DocumentsWorkItem extends CmsEntry.DocumentsWorkItem { + public static final String FIELD_TYPE = "type"; + public static final String FIELD_INDEX_NAME = "indexName"; + public static final String FIELD_SHARD_ID = "shardId"; + public static final String FIELD_STATUS = "status"; + public static final String FIELD_LEASE_EXPIRY = "leaseExpiry"; + public static final String FIELD_NUM_ATTEMPTS = "numAttempts"; + + public static DocumentsWorkItem getInitial(String indexName, int shardId) { + return new DocumentsWorkItem( + indexName, + shardId, + CmsEntry.DocumentsWorkItemStatus.NOT_STARTED, + // TODO: We should be ideally setting the lease using the server's clock, but it's unclear on the best way + // to do this. For now, we'll just use the client's clock. + CmsEntry.Documents.getLeaseExpiry(Instant.now().toEpochMilli(), 1), + 1 + ); + } + + public static DocumentsWorkItem fromJson(ObjectNode node) { + try { + return new DocumentsWorkItem( + node.get(FIELD_INDEX_NAME).asText(), + node.get(FIELD_SHARD_ID).asInt(), + CmsEntry.DocumentsWorkItemStatus.valueOf(node.get(FIELD_STATUS).asText()), + node.get(FIELD_LEASE_EXPIRY).asText(), + node.get(FIELD_NUM_ATTEMPTS).asInt() + ); + } catch (Exception e) { + throw new CantParseCmsEntryFromJson(DocumentsWorkItem.class, node.toString(), e); + } + } + + public DocumentsWorkItem(String indexName, int shardId, CmsEntry.DocumentsWorkItemStatus status, String leaseExpiry, int numAttempts) { + super(indexName, shardId, status, leaseExpiry, numAttempts); + } + + public DocumentsWorkItem(CmsEntry.DocumentsWorkItem entry) { + this(entry.indexName, entry.shardId, entry.status, entry.leaseExpiry, entry.numAttempts); + } + + public ObjectNode toJson() { + ObjectNode node = objectMapper.createObjectNode(); + node.put(FIELD_TYPE, type.toString()); + node.put(FIELD_INDEX_NAME, indexName); + node.put(FIELD_SHARD_ID, shardId); + node.put(FIELD_STATUS, status.toString()); + node.put(FIELD_LEASE_EXPIRY, leaseExpiry); + node.put(FIELD_NUM_ATTEMPTS, numAttempts); + return node; + } + + @Override + public String toRepresentationString() { + return this.toJson().toString(); + } + } + + + + + + + + + + + + + public static class CantParseCmsEntryFromJson extends RfsException { public CantParseCmsEntryFromJson(Class entryClass, String json, Exception e) { super("Failed to parse CMS entry of type " + entryClass.getName() + " from JSON: " + json, e); diff --git a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java index 9b678596b..fa93538cf 100644 --- a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java +++ b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java @@ -3,6 +3,7 @@ import java.time.Duration; import java.util.List; +import lombok.RequiredArgsConstructor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.document.Document; @@ -10,18 +11,19 @@ import reactor.core.publisher.Mono; import reactor.util.retry.Retry; - +@RequiredArgsConstructor public class DocumentReindexer { private static final Logger logger = LogManager.getLogger(DocumentReindexer.class); private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen + protected final OpenSearchClient client; - public static Mono reindex(String indexName, Flux documentStream, OpenSearchClient client) throws Exception { + public Mono reindex(String indexName, Flux documentStream) { return documentStream - .map(DocumentReindexer::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation + .map(this::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation .buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size .doOnNext(bulk -> logger.info(bulk.size() + " documents in current bulk request")) - .map(DocumentReindexer::convertToBulkRequestBody) // Assemble the bulk request body from the parts + .map(this::convertToBulkRequestBody) // Assemble the bulk request body from the parts .flatMap(bulkJson -> client.sendBulkRequest(indexName, bulkJson) // Send the request .doOnSuccess(unused -> logger.debug("Batch succeeded")) .doOnError(error -> logger.error("Batch failed", error)) @@ -32,7 +34,7 @@ public static Mono reindex(String indexName, Flux documentStream .then(); } - private static String convertDocumentToBulkSection(Document document) { + private String convertDocumentToBulkSection(Document document) { String id = Uid.decodeId(document.getBinaryValue("_id").bytes); String source = document.getBinaryValue("_source").utf8ToString(); String action = "{\"index\": {\"_id\": \"" + id + "\"}}"; @@ -40,7 +42,7 @@ private static String convertDocumentToBulkSection(Document document) { return action + "\n" + source; } - private static String convertToBulkRequestBody(List bulkSections) { + private String convertToBulkRequestBody(List bulkSections) { StringBuilder builder = new StringBuilder(); for (String section : bulkSections) { builder.append(section).append("\n"); @@ -48,7 +50,7 @@ private static String convertToBulkRequestBody(List bulkSections) { return builder.toString(); } - public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception { + public void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception { // Send the request OpenSearchClient client = new OpenSearchClient(targetConnection); client.refresh(); diff --git a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java index 2743ae5f8..ba34be8c6 100644 --- a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java +++ b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java @@ -12,13 +12,15 @@ import org.apache.lucene.util.BytesRef; import lombok.Lombok; +import lombok.RequiredArgsConstructor; import reactor.core.publisher.Flux; - +@RequiredArgsConstructor public class LuceneDocumentsReader { private static final Logger logger = LogManager.getLogger(LuceneDocumentsReader.class); + protected final Path luceneFilesBasePath; - public Flux readDocuments(Path luceneFilesBasePath, String indexName, int shardId) { + public Flux readDocuments(String indexName, int shardId) { Path indexDirectoryPath = luceneFilesBasePath.resolve(indexName).resolve(String.valueOf(shardId)); return Flux.using( diff --git a/RFS/src/main/java/com/rfs/common/ShardMetadata.java b/RFS/src/main/java/com/rfs/common/ShardMetadata.java index f8019a800..dc4ef4f7e 100644 --- a/RFS/src/main/java/com/rfs/common/ShardMetadata.java +++ b/RFS/src/main/java/com/rfs/common/ShardMetadata.java @@ -19,8 +19,8 @@ public class ShardMetadata { * Defines the behavior required to read a snapshot's shard metadata as JSON and convert it into a Data object */ public static interface Factory { - private JsonNode getJsonNode(SourceRepo repo, SnapshotRepo.Provider repoDataProvider, String snapshotId, String indexId, int shardId, SmileFactory smileFactory) throws Exception { - Path filePath = repo.getShardMetadataFilePath(snapshotId, indexId, shardId); + private JsonNode getJsonNode(String snapshotId, String indexId, int shardId, SmileFactory smileFactory) { + Path filePath = getRepoDataProvider().getRepo().getShardMetadataFilePath(snapshotId, indexId, shardId); try (InputStream fis = new FileInputStream(filePath.toFile())) { // Don't fully understand what the value of this code is, but it progresses the stream so we need to do it @@ -34,18 +34,33 @@ private JsonNode getJsonNode(SourceRepo repo, SnapshotRepo.Provider repoDataProv ObjectMapper smileMapper = new ObjectMapper(smileFactory); return smileMapper.readTree(bis); + } catch (Exception e) { + throw new CouldNotParseShardMetadata("Could not parse shard metadata for Snapshot " + snapshotId + ", Index " + indexId + ", Shard " + shardId, e); } } - default ShardMetadata.Data fromRepo(SourceRepo repo, SnapshotRepo.Provider repoDataProvider, String snapshotName, String indexName, int shardId) throws Exception { + default ShardMetadata.Data fromRepo(String snapshotName, String indexName, int shardId) { SmileFactory smileFactory = getSmileFactory(); - String snapshotId = repoDataProvider.getSnapshotId(snapshotName); - String indexId = repoDataProvider.getIndexId(indexName); - JsonNode root = getJsonNode(repo, repoDataProvider, snapshotId, indexId, shardId, smileFactory); + String snapshotId = getRepoDataProvider().getSnapshotId(snapshotName); + String indexId = getRepoDataProvider().getIndexId(indexName); + JsonNode root = getJsonNode(snapshotId, indexId, shardId, smileFactory); return fromJsonNode(root, indexId, indexName, shardId); } - public ShardMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName, int shardId) throws Exception; + + // Version-specific implementation + public ShardMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName, int shardId); + + // Version-specific implementation public SmileFactory getSmileFactory(); + + // Get the underlying SnapshotRepo Provider + public SnapshotRepo.Provider getRepoDataProvider(); + } + + public static class CouldNotParseShardMetadata extends RfsException { + public CouldNotParseShardMetadata(String message, Throwable cause) { + super(message, cause); + } } /** diff --git a/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java b/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java index b4641c785..5c5a56522 100644 --- a/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java +++ b/RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java @@ -5,6 +5,8 @@ import java.nio.file.Path; import java.nio.file.Paths; + +import lombok.RequiredArgsConstructor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.store.FSDirectory; @@ -13,40 +15,53 @@ import org.apache.lucene.store.NativeFSLockFactory; import org.apache.lucene.util.BytesRef; - +@RequiredArgsConstructor public class SnapshotShardUnpacker { private static final Logger logger = LogManager.getLogger(SnapshotShardUnpacker.class); + protected final SourceRepo repo; + protected final Path luceneFilesBasePath; + protected final int bufferSize; + + public void unpack(ShardMetadata.Data shardMetadata) { + try { + // Some constants + NativeFSLockFactory lockFactory = NativeFSLockFactory.INSTANCE; + + // Ensure the blob files are prepped, if they need to be + repo.prepBlobFiles(shardMetadata); - public static void unpack(SourceRepo repo, ShardMetadata.Data shardMetadata, Path luceneFilesBasePath, int bufferSize) throws Exception { - // Some constants - NativeFSLockFactory lockFactory = NativeFSLockFactory.INSTANCE; - - // Ensure the blob files are prepped, if they need to be - repo.prepBlobFiles(shardMetadata); - - // Create the directory for the shard's lucene files - Path luceneIndexDir = Paths.get(luceneFilesBasePath + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId()); - Files.createDirectories(luceneIndexDir); - final FSDirectory primaryDirectory = FSDirectory.open(luceneIndexDir, lockFactory); - - for (ShardMetadata.FileInfo fileMetadata : shardMetadata.getFiles()) { - logger.info("Unpacking - Blob Name: " + fileMetadata.getName() + ", Lucene Name: " + fileMetadata.getPhysicalName()); - IndexOutput indexOutput = primaryDirectory.createOutput(fileMetadata.getPhysicalName(), IOContext.DEFAULT); - - if (fileMetadata.getName().startsWith("v__")) { - final BytesRef hash = fileMetadata.getMetaHash(); - indexOutput.writeBytes(hash.bytes, hash.offset, hash.length); - } else { - try (InputStream stream = new PartSliceStream(repo, fileMetadata, shardMetadata.getIndexId(), shardMetadata.getShardId())) { - final byte[] buffer = new byte[Math.toIntExact(Math.min(bufferSize, fileMetadata.getLength()))]; - int length; - while ((length = stream.read(buffer)) > 0) { - indexOutput.writeBytes(buffer, 0, length); + // Create the directory for the shard's lucene files + Path luceneIndexDir = Paths.get(luceneFilesBasePath + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId()); + Files.createDirectories(luceneIndexDir); + final FSDirectory primaryDirectory = FSDirectory.open(luceneIndexDir, lockFactory); + + for (ShardMetadata.FileInfo fileMetadata : shardMetadata.getFiles()) { + logger.info("Unpacking - Blob Name: " + fileMetadata.getName() + ", Lucene Name: " + fileMetadata.getPhysicalName()); + IndexOutput indexOutput = primaryDirectory.createOutput(fileMetadata.getPhysicalName(), IOContext.DEFAULT); + + if (fileMetadata.getName().startsWith("v__")) { + final BytesRef hash = fileMetadata.getMetaHash(); + indexOutput.writeBytes(hash.bytes, hash.offset, hash.length); + } else { + try (InputStream stream = new PartSliceStream(repo, fileMetadata, shardMetadata.getIndexId(), shardMetadata.getShardId())) { + final byte[] buffer = new byte[Math.toIntExact(Math.min(bufferSize, fileMetadata.getLength()))]; + int length; + while ((length = stream.read(buffer)) > 0) { + indexOutput.writeBytes(buffer, 0, length); + } } } + indexOutput.close(); } - indexOutput.close(); - } + } catch (Exception e) { + throw new CouldNotUnpackShard("Could not unpack shard: Index " + shardMetadata.getIndexId() + ", Shard " + shardMetadata.getShardId(), e); + } + } + + public static class CouldNotUnpackShard extends RfsException { + public CouldNotUnpackShard(String message, Exception e) { + super(message, e); + } } } diff --git a/RFS/src/main/java/com/rfs/version_es_6_8/ShardMetadataFactory_ES_6_8.java b/RFS/src/main/java/com/rfs/version_es_6_8/ShardMetadataFactory_ES_6_8.java index 3286c24b5..30584ba79 100644 --- a/RFS/src/main/java/com/rfs/version_es_6_8/ShardMetadataFactory_ES_6_8.java +++ b/RFS/src/main/java/com/rfs/version_es_6_8/ShardMetadataFactory_ES_6_8.java @@ -5,35 +5,50 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import lombok.RequiredArgsConstructor; + import com.rfs.common.ShardMetadata; +import com.rfs.common.SnapshotRepo; +@RequiredArgsConstructor public class ShardMetadataFactory_ES_6_8 implements ShardMetadata.Factory { + protected final SnapshotRepo.Provider repoDataProvider; @Override - public ShardMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName, int shardId) throws Exception { + public ShardMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName, int shardId) { ObjectMapper objectMapper = new ObjectMapper(); SimpleModule module = new SimpleModule(); module.addDeserializer(ShardMetadataData_ES_6_8.FileInfoRaw.class, new ShardMetadataData_ES_6_8.FileInfoRawDeserializer()); objectMapper.registerModule(module); - ObjectNode objectNodeRoot = (ObjectNode) root; - ShardMetadataData_ES_6_8.DataRaw shardMetadataRaw = objectMapper.treeToValue(objectNodeRoot, ShardMetadataData_ES_6_8.DataRaw.class); - return new ShardMetadataData_ES_6_8( - shardMetadataRaw.name, - indexName, - indexId, - shardId, - shardMetadataRaw.indexVersion, - shardMetadataRaw.startTime, - shardMetadataRaw.time, - shardMetadataRaw.numberOfFiles, - shardMetadataRaw.totalSize, - shardMetadataRaw.files - ); + try { + ObjectNode objectNodeRoot = (ObjectNode) root; + ShardMetadataData_ES_6_8.DataRaw shardMetadataRaw = objectMapper.treeToValue(objectNodeRoot, ShardMetadataData_ES_6_8.DataRaw.class); + return new ShardMetadataData_ES_6_8( + shardMetadataRaw.name, + indexName, + indexId, + shardId, + shardMetadataRaw.indexVersion, + shardMetadataRaw.startTime, + shardMetadataRaw.time, + shardMetadataRaw.numberOfFiles, + shardMetadataRaw.totalSize, + shardMetadataRaw.files + ); + } catch (Exception e) { + throw new ShardMetadata.CouldNotParseShardMetadata("Could not parse shard metadata for Index " + indexId + ", Shard " + shardId, e); + } + } @Override public SmileFactory getSmileFactory() { return ElasticsearchConstants_ES_6_8.SMILE_FACTORY; } + + @Override + public SnapshotRepo.Provider getRepoDataProvider() { + return repoDataProvider; + } } diff --git a/RFS/src/main/java/com/rfs/version_es_7_10/ShardMetadataFactory_ES_7_10.java b/RFS/src/main/java/com/rfs/version_es_7_10/ShardMetadataFactory_ES_7_10.java index b44797a4f..49f5c6e45 100644 --- a/RFS/src/main/java/com/rfs/version_es_7_10/ShardMetadataFactory_ES_7_10.java +++ b/RFS/src/main/java/com/rfs/version_es_7_10/ShardMetadataFactory_ES_7_10.java @@ -5,35 +5,50 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.dataformat.smile.SmileFactory; + +import lombok.RequiredArgsConstructor; + import com.rfs.common.ShardMetadata; +import com.rfs.common.SnapshotRepo; +@RequiredArgsConstructor public class ShardMetadataFactory_ES_7_10 implements ShardMetadata.Factory { + protected final SnapshotRepo.Provider repoDataProvider; @Override - public ShardMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName, int shardId) throws Exception { + public ShardMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName, int shardId) { ObjectMapper objectMapper = new ObjectMapper(); SimpleModule module = new SimpleModule(); module.addDeserializer(ShardMetadataData_ES_7_10.FileInfoRaw.class, new ShardMetadataData_ES_7_10.FileInfoRawDeserializer()); objectMapper.registerModule(module); - ObjectNode objectNodeRoot = (ObjectNode) root; - ShardMetadataData_ES_7_10.DataRaw shardMetadataRaw = objectMapper.treeToValue(objectNodeRoot, ShardMetadataData_ES_7_10.DataRaw.class); - return new ShardMetadataData_ES_7_10( - shardMetadataRaw.name, - indexName, - indexId, - shardId, - shardMetadataRaw.indexVersion, - shardMetadataRaw.startTime, - shardMetadataRaw.time, - shardMetadataRaw.numberOfFiles, - shardMetadataRaw.totalSize, - shardMetadataRaw.files - ); + try { + ObjectNode objectNodeRoot = (ObjectNode) root; + ShardMetadataData_ES_7_10.DataRaw shardMetadataRaw = objectMapper.treeToValue(objectNodeRoot, ShardMetadataData_ES_7_10.DataRaw.class); + return new ShardMetadataData_ES_7_10( + shardMetadataRaw.name, + indexName, + indexId, + shardId, + shardMetadataRaw.indexVersion, + shardMetadataRaw.startTime, + shardMetadataRaw.time, + shardMetadataRaw.numberOfFiles, + shardMetadataRaw.totalSize, + shardMetadataRaw.files + ); + } catch (Exception e) { + throw new ShardMetadata.CouldNotParseShardMetadata("Could not parse shard metadata for Index " + indexId + ", Shard " + shardId, e); + } } @Override public SmileFactory getSmileFactory() { return ElasticsearchConstants_ES_7_10.SMILE_FACTORY; } + + @Override + public SnapshotRepo.Provider getRepoDataProvider() { + return repoDataProvider; + } } diff --git a/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java b/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java index 552b00f35..960573bd3 100644 --- a/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java +++ b/RFS/src/main/java/com/rfs/version_os_2_11/GlobalMetadataCreator_OS_2_11.java @@ -13,27 +13,27 @@ public class GlobalMetadataCreator_OS_2_11 { private static final Logger logger = LogManager.getLogger(GlobalMetadataCreator_OS_2_11.class); private final OpenSearchClient client; - private final List legacyTemplateWhitelist; - private final List componentTemplateWhitelist; - private final List indexTemplateWhitelist; + private final List legacyTemplateAllowlist; + private final List componentTemplateAllowlist; + private final List indexTemplateAllowlist; - public GlobalMetadataCreator_OS_2_11(OpenSearchClient client, List legacyTemplateWhitelist, List componentTemplateWhitelist, List indexTemplateWhitelist) { + public GlobalMetadataCreator_OS_2_11(OpenSearchClient client, List legacyTemplateAllowlist, List componentTemplateAllowlist, List indexTemplateAllowlist) { this.client = client; - this.legacyTemplateWhitelist = legacyTemplateWhitelist; - this.componentTemplateWhitelist = componentTemplateWhitelist; - this.indexTemplateWhitelist = indexTemplateWhitelist; + this.legacyTemplateAllowlist = legacyTemplateAllowlist; + this.componentTemplateAllowlist = componentTemplateAllowlist; + this.indexTemplateAllowlist = indexTemplateAllowlist; } public void create(ObjectNode root) { logger.info("Setting Global Metadata"); GlobalMetadataData_OS_2_11 globalMetadata = new GlobalMetadataData_OS_2_11(root); - createLegacyTemplates(globalMetadata, client, legacyTemplateWhitelist); - createComponentTemplates(globalMetadata, client, componentTemplateWhitelist); - createIndexTemplates(globalMetadata, client, indexTemplateWhitelist); + createLegacyTemplates(globalMetadata, client, legacyTemplateAllowlist); + createComponentTemplates(globalMetadata, client, componentTemplateAllowlist); + createIndexTemplates(globalMetadata, client, indexTemplateAllowlist); } - protected void createLegacyTemplates(GlobalMetadataData_OS_2_11 globalMetadata, OpenSearchClient client, List templateWhitelist) { + protected void createLegacyTemplates(GlobalMetadataData_OS_2_11 globalMetadata, OpenSearchClient client, List templateAllowlist) { logger.info("Setting Legacy Templates..."); ObjectNode templates = globalMetadata.getTemplates(); @@ -42,11 +42,11 @@ protected void createLegacyTemplates(GlobalMetadataData_OS_2_11 globalMetadata, return; } - if (templateWhitelist != null && templateWhitelist.size() == 0) { - logger.info("No Legacy Templates in specified whitelist"); + if (templateAllowlist != null && templateAllowlist.size() == 0) { + logger.info("No Legacy Templates in specified allowlist"); return; - } else if (templateWhitelist != null) { - for (String templateName : templateWhitelist) { + } else if (templateAllowlist != null) { + for (String templateName : templateAllowlist) { if (!templates.has(templateName) || templates.get(templateName) == null) { logger.warn("Legacy Template not found: " + templateName); continue; @@ -70,7 +70,7 @@ protected void createLegacyTemplates(GlobalMetadataData_OS_2_11 globalMetadata, } } - protected void createComponentTemplates(GlobalMetadataData_OS_2_11 globalMetadata, OpenSearchClient client, List templateWhitelist) { + protected void createComponentTemplates(GlobalMetadataData_OS_2_11 globalMetadata, OpenSearchClient client, List templateAllowlist) { logger.info("Setting Component Templates..."); ObjectNode templates = globalMetadata.getComponentTemplates(); @@ -79,11 +79,11 @@ protected void createComponentTemplates(GlobalMetadataData_OS_2_11 globalMetadat return; } - if (templateWhitelist != null && templateWhitelist.size() == 0) { - logger.info("No Component Templates in specified whitelist"); + if (templateAllowlist != null && templateAllowlist.size() == 0) { + logger.info("No Component Templates in specified allowlist"); return; - } else if (templateWhitelist != null) { - for (String templateName : templateWhitelist) { + } else if (templateAllowlist != null) { + for (String templateName : templateAllowlist) { if (!templates.has(templateName) || templates.get(templateName) == null) { logger.warn("Component Template not found: " + templateName); continue; @@ -107,7 +107,7 @@ protected void createComponentTemplates(GlobalMetadataData_OS_2_11 globalMetadat } } - protected void createIndexTemplates(GlobalMetadataData_OS_2_11 globalMetadata, OpenSearchClient client, List templateWhitelist) { + protected void createIndexTemplates(GlobalMetadataData_OS_2_11 globalMetadata, OpenSearchClient client, List templateAllowlist) { logger.info("Setting Index Templates..."); ObjectNode templates = globalMetadata.getIndexTemplates(); @@ -116,11 +116,11 @@ protected void createIndexTemplates(GlobalMetadataData_OS_2_11 globalMetadata, O return; } - if (templateWhitelist != null && templateWhitelist.size() == 0) { - logger.info("No Index Templates in specified whitelist"); + if (templateAllowlist != null && templateAllowlist.size() == 0) { + logger.info("No Index Templates in specified allowlist"); return; - } else if (templateWhitelist != null) { - for (String templateName : templateWhitelist) { + } else if (templateAllowlist != null) { + for (String templateName : templateAllowlist) { if (!templates.has(templateName) || templates.get(templateName) == null) { logger.warn("Index Template not found: " + templateName); continue; diff --git a/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java b/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java new file mode 100644 index 000000000..e76ea739d --- /dev/null +++ b/RFS/src/main/java/com/rfs/worker/DocumentsRunner.java @@ -0,0 +1,63 @@ +package com.rfs.worker; + +import java.util.Optional; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.rfs.cms.CmsClient; +import com.rfs.cms.CmsEntry; +import com.rfs.common.DocumentReindexer; +import com.rfs.common.IndexMetadata; +import com.rfs.common.LuceneDocumentsReader; +import com.rfs.common.ShardMetadata; +import com.rfs.common.SnapshotShardUnpacker; + +public class DocumentsRunner implements Runner { + private static final Logger logger = LogManager.getLogger(DocumentsRunner.class); + + private final DocumentsStep.SharedMembers members; + + public DocumentsRunner(GlobalState globalState, CmsClient cmsClient, String snapshotName, IndexMetadata.Factory metadataFactory, + ShardMetadata.Factory shardMetadataFactory, SnapshotShardUnpacker unpacker, LuceneDocumentsReader reader, + DocumentReindexer reindexer) { + this.members = new DocumentsStep.SharedMembers(globalState, cmsClient, snapshotName, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + } + + @Override + public void runInternal() { + WorkerStep nextStep = null; + try { + nextStep = new DocumentsStep.EnterPhase(members); + + while (nextStep != null) { + nextStep.run(); + nextStep = nextStep.nextStep(); + } + } catch (Exception e) { + throw new DocumentsMigrationPhaseFailed( + members.globalState.getPhase(), + nextStep, + members.cmsEntry.map(bar -> (CmsEntry.Base) bar), + e + ); + } + } + + @Override + public String getPhaseName() { + return "Documents Migration"; + } + + @Override + public Logger getLogger() { + return logger; + } + + public static class DocumentsMigrationPhaseFailed extends Runner.PhaseFailed { + public DocumentsMigrationPhaseFailed(GlobalState.Phase phase, WorkerStep nextStep, Optional cmsEntry, Exception e) { + super("Documents Migration Phase failed", phase, nextStep, cmsEntry, e); + } + } + +} diff --git a/RFS/src/main/java/com/rfs/worker/DocumentsStep.java b/RFS/src/main/java/com/rfs/worker/DocumentsStep.java new file mode 100644 index 000000000..21d666acb --- /dev/null +++ b/RFS/src/main/java/com/rfs/worker/DocumentsStep.java @@ -0,0 +1,535 @@ +package com.rfs.worker; + +import java.time.Instant; +import java.util.Optional; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.document.Document; + +import com.rfs.cms.CmsClient; +import com.rfs.cms.CmsEntry; +import com.rfs.cms.OpenSearchCmsClient; +import com.rfs.common.DocumentReindexer; +import com.rfs.common.IndexMetadata; +import com.rfs.common.LuceneDocumentsReader; +import com.rfs.common.RfsException; +import com.rfs.common.ShardMetadata; +import com.rfs.common.SnapshotRepo; +import com.rfs.common.SnapshotShardUnpacker; + +import lombok.RequiredArgsConstructor; +import reactor.core.publisher.Flux; + +public class DocumentsStep { + @RequiredArgsConstructor + public static class SharedMembers { + protected final GlobalState globalState; + protected final CmsClient cmsClient; + protected final String snapshotName; + protected final IndexMetadata.Factory metadataFactory; + protected final ShardMetadata.Factory shardMetadataFactory; + protected final SnapshotShardUnpacker unpacker; + protected final LuceneDocumentsReader reader; + protected final DocumentReindexer reindexer; + protected Optional cmsEntry = Optional.empty(); + protected Optional cmsWorkEntry = Optional.empty(); + + // Convient ways to check if the CMS entries are present before retrieving them. In some places, it's fine/expected + // for the CMS entry to be missing, but in others, it's a problem. + public CmsEntry.Documents getCmsEntryNotMissing() { + return cmsEntry.orElseThrow( + () -> new MissingDocumentsEntry() + ); + } + public CmsEntry.DocumentsWorkItem getCmsWorkEntryNotMissing() { + return cmsWorkEntry.orElseThrow( + () -> new MissingDocumentsEntry() + ); + } + } + + public static abstract class Base implements WorkerStep { + protected final Logger logger = LogManager.getLogger(getClass()); + protected final SharedMembers members; + + public Base(SharedMembers members) { + this.members = members; + } + } + + /* + * Updates the Worker's phase to indicate we're doing work on an Index Migration + */ + public static class EnterPhase extends Base { + public EnterPhase(SharedMembers members) { + super(members); + } + + @Override + public void run() { + logger.info("Documents Migration not yet completed, entering Documents Phase..."); + members.globalState.updatePhase(GlobalState.Phase.DOCUMENTS_IN_PROGRESS); + } + + @Override + public WorkerStep nextStep() { + return new GetEntry(members); + } + } + + /* + * Gets the current Documents Migration entry from the CMS, if it exists + */ + public static class GetEntry extends Base { + + public GetEntry(SharedMembers members) { + super(members); + } + + @Override + public void run() { + logger.info("Pulling the Documents Migration entry from the CMS, if it exists..."); + members.cmsEntry = members.cmsClient.getDocumentsEntry(); + } + + @Override + public WorkerStep nextStep() { + if (members.cmsEntry.isEmpty()) { + return new CreateEntry(members); + } + + CmsEntry.Documents currentEntry = members.cmsEntry.get(); + switch (currentEntry.status) { + case SETUP: + // TODO: This uses the client-side clock to evaluate the lease expiration, when we should + // ideally be using the server-side clock. Consider this a temporary solution until we find + // out how to use the server-side clock. + long leaseExpiryMillis = Long.parseLong(currentEntry.leaseExpiry); + Instant leaseExpiryInstant = Instant.ofEpochMilli(leaseExpiryMillis); + boolean leaseExpired = leaseExpiryInstant.isBefore(Instant.now()); + + // Don't try to acquire the lease if we're already at the max number of attempts + if (currentEntry.numAttempts >= CmsEntry.Documents.MAX_ATTEMPTS && leaseExpired) { + return new ExitPhaseFailed(members, new MaxAttemptsExceeded()); + } + + if (leaseExpired) { + return new AcquireLease(members, currentEntry); + } + + logger.info("Documents Migration entry found, but there's already a valid work lease on it"); + return new RandomWait(members); + + case IN_PROGRESS: + return new GetDocumentsToMigrate(members); + case COMPLETED: + return new ExitPhaseSuccess(members); + case FAILED: + return new ExitPhaseFailed(members, new FoundFailedDocumentsMigration()); + default: + throw new IllegalStateException("Unexpected documents migration status: " + currentEntry.status); + } + } + } + + public static class CreateEntry extends Base { + + public CreateEntry(SharedMembers members) { + super(members); + } + + @Override + public void run() { + logger.info("Documents Migration CMS Entry not found, attempting to create it..."); + members.cmsEntry = members.cmsClient.createDocumentsEntry(); + logger.info("Documents Migration CMS Entry created"); + } + + @Override + public WorkerStep nextStep() { + // Set up the documents work entries if we successfully created the CMS entry; otherwise, circle back to the beginning + if (members.cmsEntry.isPresent()) { + return new SetupDocumentsWorkEntries(members); + } else { + return new GetEntry(members); + } + } + } + + public static class AcquireLease extends Base { + protected final CmsEntry.Base entry; + protected Optional leasedEntry = Optional.empty(); + + public AcquireLease(SharedMembers members, CmsEntry.Base entry) { + super(members); + this.entry = entry; + } + + protected long getNowMs() { + return Instant.now().toEpochMilli(); + } + + @Override + public void run() { + + if (entry instanceof CmsEntry.Documents) { + CmsEntry.Documents currentCmsEntry = (CmsEntry.Documents) entry; + logger.info("Attempting to acquire lease on Documents Migration entry..."); + CmsEntry.Documents updatedEntry = new CmsEntry.Documents( + currentCmsEntry.status, + // Set the next CMS entry based on the current one + // TODO: Should be using the server-side clock here + CmsEntry.Documents.getLeaseExpiry(getNowMs(), currentCmsEntry.numAttempts + 1), + currentCmsEntry.numAttempts + 1 + ); + members.cmsEntry = members.cmsClient.updateDocumentsEntry(updatedEntry, currentCmsEntry); + leasedEntry = members.cmsEntry.map(bar -> (CmsEntry.Base) bar); + } else if (entry instanceof CmsEntry.DocumentsWorkItem) { + CmsEntry.DocumentsWorkItem currentCmsEntry = (CmsEntry.DocumentsWorkItem) entry; + logger.info("Attempting to acquire lease on Documents Work Item entry..."); + CmsEntry.DocumentsWorkItem updatedEntry = new CmsEntry.DocumentsWorkItem( + currentCmsEntry.indexName, + currentCmsEntry.shardId, + currentCmsEntry.status, + // Set the next CMS entry based on the current one + // TODO: Should be using the server-side clock here + CmsEntry.DocumentsWorkItem.getLeaseExpiry(getNowMs(), currentCmsEntry.numAttempts + 1), + currentCmsEntry.numAttempts + 1 + ); + members.cmsWorkEntry = members.cmsClient.updateDocumentsWorkItem(updatedEntry, currentCmsEntry); + leasedEntry = members.cmsWorkEntry.map(bar -> (CmsEntry.Base) bar); + } else { + throw new IllegalStateException("Unexpected CMS entry type: " + entry.getClass().getName()); + } + + if (leasedEntry.isPresent()) { + logger.info("Lease acquired"); + } else { + logger.info("Failed to acquire lease"); + } + } + + @Override + public WorkerStep nextStep() { + // Do work if we acquired the lease; otherwise, circle back to the beginning after a backoff + if (leasedEntry.isPresent()) { + if (leasedEntry.get() instanceof CmsEntry.Documents) { + return new SetupDocumentsWorkEntries(members); + } else if (entry instanceof CmsEntry.DocumentsWorkItem) { + return new MigrateDocuments(members); + } else { + throw new IllegalStateException("Unexpected CMS entry type: " + entry.getClass().getName()); + } + } else { + return new RandomWait(members); + } + } + } + + public static class SetupDocumentsWorkEntries extends Base { + + public SetupDocumentsWorkEntries(SharedMembers members) { + super(members); + } + + @Override + public void run() { + CmsEntry.Documents leasedCmsEntry = members.getCmsEntryNotMissing(); + + logger.info("Setting the worker's current work item to be creating the documents work entries..."); + members.globalState.updateWorkItem(new OpenSearchWorkItem(OpenSearchCmsClient.CMS_INDEX_NAME, OpenSearchCmsClient.CMS_DOCUMENTS_DOC_ID)); + logger.info("Work item set"); + + logger.info("Setting up the Documents Work Items..."); + SnapshotRepo.Provider repoDataProvider = members.metadataFactory.getRepoDataProvider(); + for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(members.snapshotName)) { + IndexMetadata.Data indexMetadata = members.metadataFactory.fromRepo(members.snapshotName, index.getName()); + logger.info("Index " + indexMetadata.getName() + " has " + indexMetadata.getNumberOfShards() + " shards"); + for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) { + logger.info("Creating Documents Work Item for index: " + indexMetadata.getName() + ", shard: " + shardId); + members.cmsClient.createDocumentsWorkItem(indexMetadata.getName(), shardId); + } + } + logger.info("Finished setting up the Documents Work Items."); + + logger.info("Updating the Documents Migration entry to indicate setup has been completed..."); + CmsEntry.Documents updatedEntry = new CmsEntry.Documents( + CmsEntry.DocumentsStatus.IN_PROGRESS, + leasedCmsEntry.leaseExpiry, + leasedCmsEntry.numAttempts + ); + + members.cmsEntry = members.cmsClient.updateDocumentsEntry(updatedEntry, leasedCmsEntry); + logger.info("Documents Migration entry updated"); + + logger.info("Clearing the worker's current work item..."); + members.globalState.updateWorkItem(null); + logger.info("Work item cleared"); + } + + @Override + public WorkerStep nextStep() { + if (members.cmsEntry.isEmpty()) { + // In this scenario, we've done all the work, but failed to update the CMS entry so that we know we've + // done the work. We circle back around to try again, which is made more reasonable by the fact we + // don't re-migrate templates that already exist on the target cluster. If we didn't circle back + // around, there would be a chance that the CMS entry would never be marked as completed. + // + // The CMS entry's retry limit still applies in this case, so there's a limiting factor here. + logger.warn("Completed creating the documents work entries but failed to update the Documents Migration entry; retrying..."); + return new GetEntry(members); + } + + return new GetDocumentsToMigrate(members); + } + } + + public static class GetDocumentsToMigrate extends Base { + + public GetDocumentsToMigrate(SharedMembers members) { + super(members); + } + + @Override + public void run() { + logger.info("Seeing if there are any docs left to migration according to the CMS..."); + members.cmsWorkEntry = members.cmsClient.getAvailableDocumentsWorkItem(); + members.cmsWorkEntry.ifPresentOrElse( + (item) -> logger.info("Found some docs to migrate"), + () -> logger.info("No docs found to migrate") + ); + } + + @Override + public WorkerStep nextStep() { + // No work left to do + if (members.cmsWorkEntry.isEmpty()) { + return new ExitPhaseSuccess(members); + } + return new MigrateDocuments(members); + } + } + + public static class MigrateDocuments extends Base { + + public MigrateDocuments(SharedMembers members) { + super(members); + } + + @Override + public void run() { + CmsEntry.DocumentsWorkItem workItem = members.getCmsWorkEntryNotMissing(); + + /* + * Try to migrate the documents. We should have a unique lease on the entry that guarantees that we're the only + * one working on it. However, we apply some care to ensure that even if that's not the case, something fairly + * reasonable happens. + * + * If we succeed, we forcefully mark it as completed. When we do so, we don't care if someone else has changed + * the record in the meantime; *we* completed it successfully and that's what matters. Because this is the only + * forceful operation on the entry, the other operations are safe to be non-forceful. + * + * If it's already exceeded the number of attempts, we attempt to mark it as failed. If someone else + * has updated the entry in the meantime, we just move on to the next work item. This is safe because + * it means someone else has either marked it as completed or failed, and either is fine. + * + * If we fail to migrate it, we attempt to increment the attempt count. It's fine if the increment + * fails because . + */ + if (workItem.numAttempts > CmsEntry.DocumentsWorkItem.MAX_ATTEMPTS) { + logger.warn("Documents Work Item (Index: " + workItem.indexName + ", Shard: " + workItem.shardId + ") has exceeded the maximum number of attempts; marking it as failed..."); + CmsEntry.DocumentsWorkItem updatedEntry = new CmsEntry.DocumentsWorkItem( + workItem.indexName, + workItem.shardId, + CmsEntry.DocumentsWorkItemStatus.FAILED, + workItem.leaseExpiry, + workItem.numAttempts + ); + + // We use optimistic locking here in the unlikely event someone else is working on this task despite the + // leasing system and managed to complete the task; in that case we want this update to bounce. + members.cmsWorkEntry = members.cmsClient.updateDocumentsWorkItem(updatedEntry, workItem); + members.cmsWorkEntry.ifPresentOrElse( + value -> logger.info("Documents Work Item (Index: " + workItem.indexName + ", Shard: " + workItem.shardId + ") marked as failed"), + () ->logger.warn("Unable to mark Documents Work Item (Index: " + workItem.indexName + ", Shard: " + workItem.shardId + ") as failed") + ); + return; + } + + logger.info("Setting the worker's current work item to be migrating the docs..."); + members.globalState.updateWorkItem(new OpenSearchWorkItem( + OpenSearchCmsClient.CMS_INDEX_NAME, + OpenSearchCmsClient.getDocumentsWorkItemDocId(workItem.indexName, workItem.shardId) + )); + logger.info("Work item set"); + + try { + logger.info("Migrating docs: Index " + workItem.indexName + ", Shard " + workItem.shardId); + ShardMetadata.Data shardMetadata = members.shardMetadataFactory.fromRepo(members.snapshotName, workItem.indexName, workItem.shardId); + members.unpacker.unpack(shardMetadata); + + Flux documents = members.reader.readDocuments(shardMetadata.getIndexName(), shardMetadata.getShardId()); + + final int finalShardId = shardMetadata.getShardId(); // Define in local context for the lambda + members.reindexer.reindex(shardMetadata.getIndexName(), documents) + .doOnError(error -> logger.error("Error during reindexing: " + error)) + .doOnSuccess(done -> logger.info("Reindexing completed for Index " + shardMetadata.getIndexName() + ", Shard " + finalShardId)) + // Wait for the reindexing to complete before proceeding + .block(); + logger.info("Docs migrated"); + + logger.info("Updating the Documents Work Item to indicate it has been completed..."); + CmsEntry.DocumentsWorkItem updatedEntry = new CmsEntry.DocumentsWorkItem( + workItem.indexName, + workItem.shardId, + CmsEntry.DocumentsWorkItemStatus.COMPLETED, + workItem.leaseExpiry, + workItem.numAttempts + ); + + members.cmsWorkEntry = Optional.of(members.cmsClient.updateDocumentsWorkItemForceful(updatedEntry)); + logger.info("Documents Work Item updated"); + } catch (Exception e) { + logger.info("Failed to documents: Index " + workItem.indexName + ", Shard " + workItem.shardId, e); + logger.info("Updating the Documents Work Item with incremented attempt count..."); + CmsEntry.DocumentsWorkItem updatedEntry = new CmsEntry.DocumentsWorkItem( + workItem.indexName, + workItem.shardId, + workItem.status, + workItem.leaseExpiry, + workItem.numAttempts + 1 + ); + + // We use optimistic locking here in the unlikely event someone else is working on this task despite the + // leasing system and managed to complete the task; in that case we want this update to bounce. + members.cmsWorkEntry = members.cmsClient.updateDocumentsWorkItem(updatedEntry, workItem); + members.cmsWorkEntry.ifPresentOrElse( + value -> logger.info("Documents Work Item (Index: " + workItem.indexName + ", Shard: " + workItem.shardId + ") attempt count was incremented"), + () ->logger.info("Unable to increment attempt count of Documents Work Item (Index: " + workItem.indexName + ", Shard: " + workItem.shardId + ")") + ); + } + + logger.info("Clearing the worker's current work item..."); + members.globalState.updateWorkItem(null); + logger.info("Work item cleared"); + } + + @Override + public WorkerStep nextStep() { + return new GetDocumentsToMigrate(members); + } + } + + public static class RandomWait extends Base { + private final static int WAIT_TIME_MS = 5 * 1000; // arbitrarily chosen + + public RandomWait(SharedMembers members) { + super(members); + } + + protected void waitABit() { + try { + Thread.sleep(WAIT_TIME_MS); + } catch (InterruptedException e) { + logger.error("Interrupted while performing a wait", e); + throw new DocumentsMigrationFailed("Interrupted"); + } + } + + @Override + public void run() { + logger.info("Backing off for " + WAIT_TIME_MS + " milliseconds before checking the Index Migration entry again..."); + waitABit(); + } + + @Override + public WorkerStep nextStep() { + return new GetEntry(members); + } + } + + public static class ExitPhaseSuccess extends Base { + public ExitPhaseSuccess(SharedMembers members) { + super(members); + } + + @Override + public void run() { + logger.info("Marking the Documents Migration as completed..."); + CmsEntry.Documents lastCmsEntry = members.getCmsEntryNotMissing(); + CmsEntry.Documents updatedEntry = new CmsEntry.Documents( + CmsEntry.DocumentsStatus.COMPLETED, + lastCmsEntry.leaseExpiry, + lastCmsEntry.numAttempts + ); + members.cmsClient.updateDocumentsEntry(updatedEntry, lastCmsEntry); + logger.info("Documents Migration marked as completed"); + + logger.info("Documents Migration completed, exiting Documents Phase..."); + members.globalState.updatePhase(GlobalState.Phase.DOCUMENTS_COMPLETED); + } + + @Override + public WorkerStep nextStep() { + return null; + } + } + + public static class ExitPhaseFailed extends Base { + protected final DocumentsMigrationFailed e; + + public ExitPhaseFailed(SharedMembers members, DocumentsMigrationFailed e) { + super(members); + this.e = e; + } + + @Override + public void run() { + // We either failed the Documents Migration or found it had already been failed; either way this + // should not be missing + CmsEntry.Documents lastCmsEntry = members.getCmsEntryNotMissing(); + + logger.error("Documents Migration failed"); + CmsEntry.Documents updatedEntry = new CmsEntry.Documents( + CmsEntry.DocumentsStatus.FAILED, + lastCmsEntry.leaseExpiry, + lastCmsEntry.numAttempts + ); + members.cmsClient.updateDocumentsEntry(updatedEntry, lastCmsEntry); + members.globalState.updatePhase(GlobalState.Phase.DOCUMENTS_FAILED); + } + + @Override + public WorkerStep nextStep() { + throw e; + } + } + + public static class DocumentsMigrationFailed extends RfsException { + public DocumentsMigrationFailed(String message) { + super("The Documents Migration has failed. Reason: " + message); + } + } + + public static class MissingDocumentsEntry extends RfsException { + public MissingDocumentsEntry() { + super("The Documents Migration CMS entry we expected to be stored in local memory was null." + + " This should never happen." + ); + } + } + + public static class FoundFailedDocumentsMigration extends DocumentsMigrationFailed { + public FoundFailedDocumentsMigration() { + super("We checked the status in the CMS and found it had failed. Aborting."); + } + } + + public static class MaxAttemptsExceeded extends DocumentsMigrationFailed { + public MaxAttemptsExceeded() { + super("We reached the limit of " + CmsEntry.Documents.MAX_ATTEMPTS + " attempts to complete the Documents Migration"); + } + } + +} diff --git a/RFS/src/main/java/com/rfs/worker/GlobalState.java b/RFS/src/main/java/com/rfs/worker/GlobalState.java index 813448792..679af0b6c 100644 --- a/RFS/src/main/java/com/rfs/worker/GlobalState.java +++ b/RFS/src/main/java/com/rfs/worker/GlobalState.java @@ -18,7 +18,10 @@ public enum Phase { METADATA_FAILED, INDEX_IN_PROGRESS, INDEX_COMPLETED, - INDEX_FAILED + INDEX_FAILED, + DOCUMENTS_IN_PROGRESS, + DOCUMENTS_COMPLETED, + DOCUMENTS_FAILED } private AtomicReference phase = new AtomicReference<>(Phase.UNSET); diff --git a/RFS/src/main/java/com/rfs/worker/IndexStep.java b/RFS/src/main/java/com/rfs/worker/IndexStep.java index 1af30a3de..f534455bb 100644 --- a/RFS/src/main/java/com/rfs/worker/IndexStep.java +++ b/RFS/src/main/java/com/rfs/worker/IndexStep.java @@ -17,8 +17,11 @@ import com.rfs.transformers.Transformer; import com.rfs.version_os_2_11.IndexCreator_OS_2_11; +import lombok.RequiredArgsConstructor; + public class IndexStep { + @RequiredArgsConstructor public static class SharedMembers { protected final GlobalState globalState; protected final CmsClient cmsClient; @@ -26,18 +29,7 @@ public static class SharedMembers { protected final IndexMetadata.Factory metadataFactory; protected final IndexCreator_OS_2_11 indexCreator; protected final Transformer transformer; - protected Optional cmsEntry; - - public SharedMembers(GlobalState globalState, CmsClient cmsClient, String snapshotName, IndexMetadata.Factory metadataFactory, - IndexCreator_OS_2_11 indexCreator, Transformer transformer) { - this.globalState = globalState; - this.cmsClient = cmsClient; - this.snapshotName = snapshotName; - this.metadataFactory = metadataFactory; - this.indexCreator = indexCreator; - this.transformer = transformer; - this.cmsEntry = Optional.empty(); - } + protected Optional cmsEntry = Optional.empty(); // A convient way to check if the CMS entry is present before retrieving it. In some places, it's fine/expected // for the CMS entry to be missing, but in others, it's a problem. @@ -127,7 +119,7 @@ public WorkerStep nextStep() { case FAILED: return new ExitPhaseFailed(members, new FoundFailedIndexMigration()); default: - throw new IllegalStateException("Unexpected metadata migration status: " + currentEntry.status); + throw new IllegalStateException("Unexpected index migration status: " + currentEntry.status); } } } @@ -171,10 +163,10 @@ public void run() { // We only get here if we know we want to acquire the lock, so we know the CMS entry should not be null CmsEntry.Index lastCmsEntry = members.getCmsEntryNotMissing(); - logger.info("Current Metadata Migration work lease appears to have expired; attempting to acquire it..."); + logger.info("Current Index Migration work lease appears to have expired; attempting to acquire it..."); CmsEntry.Index updatedEntry = new CmsEntry.Index( - CmsEntry.IndexStatus.IN_PROGRESS, + lastCmsEntry.status, // Set the next CMS entry based on the current one // TODO: Should be using the server-side clock here CmsEntry.Index.getLeaseExpiry(getNowMs(), lastCmsEntry.numAttempts + 1), @@ -216,8 +208,8 @@ public void run() { logger.info("Work item set"); logger.info("Setting up the Index Work Items..."); - SnapshotRepo.Provider repoDatProvider = members.metadataFactory.getRepoDataProvider(); - for (SnapshotRepo.Index index : repoDatProvider.getIndicesInSnapshot(members.snapshotName)) { + SnapshotRepo.Provider repoDataProvider = members.metadataFactory.getRepoDataProvider(); + for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(members.snapshotName)) { IndexMetadata.Data indexMetadata = members.metadataFactory.fromRepo(members.snapshotName, index.getName()); logger.info("Creating Index Work Item for index: " + indexMetadata.getName()); members.cmsClient.createIndexWorkItem(indexMetadata.getName(), indexMetadata.getNumberOfShards()); @@ -226,7 +218,7 @@ public void run() { logger.info("Updating the Index Migration entry to indicate setup has been completed..."); CmsEntry.Index updatedEntry = new CmsEntry.Index( - CmsEntry.IndexStatus.COMPLETED, + CmsEntry.IndexStatus.IN_PROGRESS, lastCmsEntry.leaseExpiry, lastCmsEntry.numAttempts ); @@ -310,7 +302,7 @@ public void run() { * fails because we guarantee that we'll attempt the work at least N times, not exactly N times. */ if (workItem.numAttempts > CmsEntry.IndexWorkItem.ATTEMPTS_SOFT_LIMIT) { - logger.info("Index Work Item " + workItem.name + " has exceeded the maximum number of attempts; marking it as failed..."); + logger.warn("Index Work Item " + workItem.name + " has exceeded the maximum number of attempts; marking it as failed..."); CmsEntry.IndexWorkItem updatedEntry = new CmsEntry.IndexWorkItem( workItem.name, CmsEntry.IndexWorkItemStatus.FAILED, @@ -405,6 +397,16 @@ public ExitPhaseSuccess(SharedMembers members) { @Override public void run() { + logger.info("Marking the Index Migration as completed..."); + CmsEntry.Index lastCmsEntry = members.getCmsEntryNotMissing(); + CmsEntry.Index updatedEntry = new CmsEntry.Index( + CmsEntry.IndexStatus.COMPLETED, + lastCmsEntry.leaseExpiry, + lastCmsEntry.numAttempts + ); + members.cmsClient.updateIndexEntry(updatedEntry, lastCmsEntry); + logger.info("Index Migration marked as completed"); + logger.info("Index Migration completed, exiting Index Phase..."); members.globalState.updatePhase(GlobalState.Phase.INDEX_COMPLETED); } @@ -425,11 +427,11 @@ public ExitPhaseFailed(SharedMembers members, IndexMigrationFailed e) { @Override public void run() { - // We either failed the Metadata Migration or found it had already been failed; either way this + // We either failed the Index Migration or found it had already been failed; either way this // should not be missing CmsEntry.Index lastCmsEntry = members.getCmsEntryNotMissing(); - logger.error("Metadata Migration failed"); + logger.error("Index Migration failed"); CmsEntry.Index updatedEntry = new CmsEntry.Index( CmsEntry.IndexStatus.FAILED, lastCmsEntry.leaseExpiry, diff --git a/RFS/src/main/java/com/rfs/worker/MetadataStep.java b/RFS/src/main/java/com/rfs/worker/MetadataStep.java index fec23f94b..60ed29cac 100644 --- a/RFS/src/main/java/com/rfs/worker/MetadataStep.java +++ b/RFS/src/main/java/com/rfs/worker/MetadataStep.java @@ -175,7 +175,7 @@ public void run() { logger.info("Current Metadata Migration work lease appears to have expired; attempting to acquire it..."); CmsEntry.Metadata updatedEntry = new CmsEntry.Metadata( - CmsEntry.MetadataStatus.IN_PROGRESS, + lastCmsEntry.status, // Set the next CMS entry based on the current one // TODO: Should be using the server-side clock here CmsEntry.Metadata.getLeaseExpiry(getNowMs(), lastCmsEntry.numAttempts + 1), diff --git a/RFS/src/test/java/com/rfs/cms/CmsEntryTest.java b/RFS/src/test/java/com/rfs/cms/CmsEntryTest.java index e25960c1b..a3e511b34 100644 --- a/RFS/src/test/java/com/rfs/cms/CmsEntryTest.java +++ b/RFS/src/test/java/com/rfs/cms/CmsEntryTest.java @@ -41,7 +41,7 @@ static Stream provide_Metadata_getLeaseExpiry_UnhappyPath_args() { @MethodSource("provide_Metadata_getLeaseExpiry_UnhappyPath_args") void Metadata_getLeaseExpiry_UnhappyPath(int numAttempts) { // Run the test - assertThrows(CmsEntry.CouldNotFindNextLeaseDuration.class, () -> { + assertThrows(CmsEntry.CouldNotGenerateNextLeaseDuration.class, () -> { CmsEntry.Metadata.getLeaseExpiry(0, numAttempts); }); } diff --git a/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java b/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java index e28daff81..2c071083b 100644 --- a/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java +++ b/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java @@ -18,6 +18,10 @@ import java.nio.file.Path; class TestLuceneDocumentsReader extends LuceneDocumentsReader { + public TestLuceneDocumentsReader(Path luceneFilesBasePath) { + super(luceneFilesBasePath); + } + // Helper method to correctly encode the Document IDs for test public static byte[] encodeUtf8Id(String id) { byte[] idBytes = id.getBytes(StandardCharsets.UTF_8); @@ -65,7 +69,7 @@ public class LuceneDocumentsReaderTest { @Test void ReadDocuments_AsExpected() { // Use the TestLuceneDocumentsReader to get the mocked documents - Flux documents = new TestLuceneDocumentsReader().readDocuments(Paths.get("/fake/path"), "testIndex", 1); + Flux documents = new TestLuceneDocumentsReader(Paths.get("/fake/path")).readDocuments("testIndex", 1); // Verify that the results are as expected StepVerifier.create(documents) diff --git a/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java b/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java index df26fad1c..37c98b35d 100644 --- a/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java +++ b/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java @@ -44,8 +44,9 @@ public List extractSnapshotIndexData(final String localPath, for (final IndexMetadata.Data index : indices) { for (int shardId = 0; shardId < index.getNumberOfShards(); shardId++) { - var shardMetadata = new ShardMetadataFactory_ES_7_10().fromRepo(repo, snapShotProvider, snapshotName, index.getName(), shardId); - SnapshotShardUnpacker.unpack(repo, shardMetadata, unpackedShardDataDir, Integer.MAX_VALUE); + var shardMetadata = new ShardMetadataFactory_ES_7_10(snapShotProvider).fromRepo(snapshotName, index.getName(), shardId); + SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, unpackedShardDataDir, Integer.MAX_VALUE); + unpacker.unpack(shardMetadata); } } return indices; @@ -54,10 +55,10 @@ public List extractSnapshotIndexData(final String localPath, public void updateTargetCluster(final List indices, final Path unpackedShardDataDir, final OpenSearchClient client) throws Exception { for (final IndexMetadata.Data index : indices) { for (int shardId = 0; shardId < index.getNumberOfShards(); shardId++) { - final var documents = new LuceneDocumentsReader().readDocuments(unpackedShardDataDir, index.getName(), shardId); + final var documents = new LuceneDocumentsReader(unpackedShardDataDir).readDocuments(index.getName(), shardId); final var finalShardId = shardId; - DocumentReindexer.reindex(index.getName(), documents, client) + new DocumentReindexer(client).reindex(index.getName(), documents) .doOnError(error -> logger.error("Error during reindexing: " + error)) .doOnSuccess(done -> logger.info("Reindexing completed for index " + index.getName() + ", shard " + finalShardId)) .block(); diff --git a/RFS/src/test/java/com/rfs/worker/DocumentsRunnerTest.java b/RFS/src/test/java/com/rfs/worker/DocumentsRunnerTest.java new file mode 100644 index 000000000..cc5c5fd06 --- /dev/null +++ b/RFS/src/test/java/com/rfs/worker/DocumentsRunnerTest.java @@ -0,0 +1,49 @@ +package com.rfs.worker; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.*; + +import java.util.Optional; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import com.rfs.cms.CmsClient; +import com.rfs.common.DocumentReindexer; +import com.rfs.common.IndexMetadata; +import com.rfs.common.LuceneDocumentsReader; +import com.rfs.common.RfsException; +import com.rfs.common.ShardMetadata; +import com.rfs.common.SnapshotShardUnpacker; + +public class DocumentsRunnerTest { + + @Test + void run_encountersAnException_asExpected() { + // Setup + GlobalState globalState = Mockito.mock(GlobalState.class); + CmsClient cmsClient = Mockito.mock(CmsClient.class); + String snapshotName = "testSnapshot"; + + IndexMetadata.Factory metadataFactory = Mockito.mock(IndexMetadata.Factory.class); + ShardMetadata.Factory shardMetadataFactory = Mockito.mock(ShardMetadata.Factory.class); + SnapshotShardUnpacker unpacker = Mockito.mock(SnapshotShardUnpacker.class); + LuceneDocumentsReader reader = Mockito.mock(LuceneDocumentsReader.class); + DocumentReindexer reindexer = Mockito.mock(DocumentReindexer.class); + RfsException testException = new RfsException("Unit test"); + + doThrow(testException).when(cmsClient).getDocumentsEntry(); + when(globalState.getPhase()).thenReturn(GlobalState.Phase.DOCUMENTS_IN_PROGRESS); + + // Run the test + DocumentsRunner testRunner = new DocumentsRunner(globalState, cmsClient, snapshotName, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + final var e = assertThrows(DocumentsRunner.DocumentsMigrationPhaseFailed.class, () -> testRunner.run()); + + // Verify the results + assertEquals(GlobalState.Phase.DOCUMENTS_IN_PROGRESS, e.phase); + assertEquals(DocumentsStep.GetEntry.class, e.nextStep.getClass()); + assertEquals(Optional.empty(), e.cmsEntry); + assertEquals(testException, e.e); + } +} diff --git a/RFS/src/test/java/com/rfs/worker/DocumentsStepTest.java b/RFS/src/test/java/com/rfs/worker/DocumentsStepTest.java new file mode 100644 index 000000000..5ffa034fb --- /dev/null +++ b/RFS/src/test/java/com/rfs/worker/DocumentsStepTest.java @@ -0,0 +1,671 @@ +package com.rfs.worker; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.apache.lucene.document.Document; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.rfs.cms.CmsClient; +import com.rfs.cms.CmsEntry; +import com.rfs.cms.OpenSearchCmsClient; +import com.rfs.common.DocumentReindexer; +import com.rfs.common.IndexMetadata; +import com.rfs.common.LuceneDocumentsReader; +import com.rfs.common.ShardMetadata; +import com.rfs.common.SnapshotRepo; +import com.rfs.common.SnapshotShardUnpacker; +import com.rfs.worker.DocumentsStep.SharedMembers; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import com.rfs.worker.DocumentsStep.MaxAttemptsExceeded; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + + +@ExtendWith(MockitoExtension.class) +public class DocumentsStepTest { + private SharedMembers testMembers; + + @BeforeEach + void setUp() { + GlobalState globalState = Mockito.mock(GlobalState.class); + CmsClient cmsClient = Mockito.mock(CmsClient.class); + String snapshotName = "test"; + + IndexMetadata.Factory metadataFactory = Mockito.mock(IndexMetadata.Factory.class); + ShardMetadata.Factory shardMetadataFactory = Mockito.mock(ShardMetadata.Factory.class); + SnapshotShardUnpacker unpacker = Mockito.mock(SnapshotShardUnpacker.class); + LuceneDocumentsReader reader = Mockito.mock(LuceneDocumentsReader.class); + DocumentReindexer reindexer = Mockito.mock(DocumentReindexer.class); + testMembers = new SharedMembers(globalState, cmsClient, snapshotName, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); + } + + @Test + void EnterPhase_AsExpected() { + // Run the test + DocumentsStep.EnterPhase testStep = new DocumentsStep.EnterPhase(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.globalState, times(1)).updatePhase(GlobalState.Phase.DOCUMENTS_IN_PROGRESS); + assertEquals(DocumentsStep.GetEntry.class, nextStep.getClass()); + } + + static Stream provideGetEntryArgs() { + return Stream.of( + // There is no CMS entry, so we need to create one + Arguments.of( + Optional.empty(), + DocumentsStep.CreateEntry.class + ), + + // The CMS entry has an expired lease and is under the retry limit, so we try to acquire the lease + Arguments.of( + Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.SETUP, + String.valueOf(Instant.now().minus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Documents.MAX_ATTEMPTS - 1 + )), + DocumentsStep.AcquireLease.class + ), + + // The CMS entry has an expired lease and is at the retry limit, so we exit as failed + Arguments.of( + Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.SETUP, + String.valueOf(Instant.now().minus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Documents.MAX_ATTEMPTS + )), + DocumentsStep.ExitPhaseFailed.class + ), + + // The CMS entry has an expired lease and is over the retry limit, so we exit as failed + Arguments.of( + Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.SETUP, + String.valueOf(Instant.now().minus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Documents.MAX_ATTEMPTS + 1 + )), + DocumentsStep.ExitPhaseFailed.class + ), + + // The CMS entry has valid lease and is under the retry limit, so we back off a bit + Arguments.of( + Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.SETUP, + String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Documents.MAX_ATTEMPTS - 1 + )), + DocumentsStep.RandomWait.class + ), + + // The CMS entry has valid lease and is at the retry limit, so we back off a bit + Arguments.of( + Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.SETUP, + String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Documents.MAX_ATTEMPTS + )), + DocumentsStep.RandomWait.class + ), + + // The CMS entry is marked as in progress, so we try to do some work + Arguments.of( + Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.IN_PROGRESS, + String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Documents.MAX_ATTEMPTS - 1 + )), + DocumentsStep.GetDocumentsToMigrate.class + ), + + // The CMS entry is marked as completed, so we exit as success + Arguments.of( + Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.COMPLETED, + String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Documents.MAX_ATTEMPTS - 1 + )), + DocumentsStep.ExitPhaseSuccess.class + ), + + // The CMS entry is marked as failed, so we exit as failed + Arguments.of( + Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.FAILED, + String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Documents.MAX_ATTEMPTS - 1 + )), + DocumentsStep.ExitPhaseFailed.class + ) + ); + } + + @ParameterizedTest + @MethodSource("provideGetEntryArgs") + void GetEntry_AsExpected(Optional index, Class nextStepClass) { + // Set up the test + Mockito.when(testMembers.cmsClient.getDocumentsEntry()).thenReturn(index); + + // Run the test + DocumentsStep.GetEntry testStep = new DocumentsStep.GetEntry(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.cmsClient, times(1)).getDocumentsEntry(); + assertEquals(nextStepClass, nextStep.getClass()); + } + + static Stream provideCreateEntryArgs() { + return Stream.of( + // We were able to create the CMS entry ourselves, so we have the work lease + Arguments.of( + Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.IN_PROGRESS, + String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), + 1 + )), + DocumentsStep.SetupDocumentsWorkEntries.class + ), + + // We were unable to create the CMS entry ourselves, so we do not have the work lease + Arguments.of(Optional.empty(), DocumentsStep.GetEntry.class) + ); + } + + @ParameterizedTest + @MethodSource("provideCreateEntryArgs") + void CreateEntry_AsExpected(Optional createdEntry, Class nextStepClass) { + // Set up the test + Mockito.when(testMembers.cmsClient.createDocumentsEntry()).thenReturn(createdEntry); + + // Run the test + DocumentsStep.CreateEntry testStep = new DocumentsStep.CreateEntry(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.cmsClient, times(1)).createDocumentsEntry(); + assertEquals(nextStepClass, nextStep.getClass()); + } + + public static class TestAcquireLease extends DocumentsStep.AcquireLease { + public static final int MILLI_SINCE_EPOCH = 42; // Arbitrarily chosen, but predictable + + public TestAcquireLease(SharedMembers members, CmsEntry.Base entry) { + super(members, entry); + } + + @Override + protected long getNowMs() { + return MILLI_SINCE_EPOCH; + } + } + + static Stream provideAcquireLeaseSetupArgs() { + return Stream.of( + // We were able to acquire the lease, and it's on the Document setup step + Arguments.of( + Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.SETUP, + String.valueOf(0L), + 1 + )), + Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.SETUP, + CmsEntry.Documents.getLeaseExpiry(TestAcquireLease.MILLI_SINCE_EPOCH, 2), + 2 + )), + Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.SETUP, + CmsEntry.Documents.getLeaseExpiry(TestAcquireLease.MILLI_SINCE_EPOCH, 2), + 2 + )), + DocumentsStep.SetupDocumentsWorkEntries.class + ), + + // We were unable to acquire the lease + Arguments.of( + Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.SETUP, + String.valueOf(0L), + 1 + )), + Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.SETUP, + CmsEntry.Documents.getLeaseExpiry(TestAcquireLease.MILLI_SINCE_EPOCH, 2), + 2 + )), + Optional.empty(), + DocumentsStep.RandomWait.class + ) + ); + } + + @ParameterizedTest + @MethodSource("provideAcquireLeaseSetupArgs") + void AcquireLease_Setup_AsExpected( + Optional existingEntry, // The entry we started with, before trying to acquire the lease + Optional updatedEntry, // The entry we try to update to + Optional responseEntry, // The response from the CMS client + Class nextStepClass) { + // Set up the test + testMembers.cmsEntry = existingEntry; + + Mockito.when(testMembers.cmsClient.updateDocumentsEntry( + any(CmsEntry.Documents.class), eq(existingEntry.get()) + )).thenReturn(responseEntry); + + // Run the test + DocumentsStep.AcquireLease testStep = new TestAcquireLease(testMembers, existingEntry.get()); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsEntry( + updatedEntry.get(), existingEntry.get() + ); + assertEquals(nextStepClass, nextStep.getClass()); + } + + static Stream provideAcquireLeaseWorkArgs() { + return Stream.of( + // We were able to acquire the lease, and it's on a Document Work Item setup + Arguments.of( + Optional.of(new CmsEntry.DocumentsWorkItem( + "index-name", + 1, + CmsEntry.DocumentsWorkItemStatus.NOT_STARTED, + String.valueOf(0L), + 1 + )), + Optional.of(new CmsEntry.DocumentsWorkItem( + "index-name", + 1, + CmsEntry.DocumentsWorkItemStatus.NOT_STARTED, + CmsEntry.DocumentsWorkItem.getLeaseExpiry(TestAcquireLease.MILLI_SINCE_EPOCH, 2), + 2 + )), + Optional.of(new CmsEntry.DocumentsWorkItem( + "index-name", + 1, + CmsEntry.DocumentsWorkItemStatus.NOT_STARTED, + CmsEntry.DocumentsWorkItem.getLeaseExpiry(TestAcquireLease.MILLI_SINCE_EPOCH, 2), + 2 + )), + DocumentsStep.MigrateDocuments.class + ), + + // We were unable to acquire the lease + Arguments.of( + Optional.of(new CmsEntry.DocumentsWorkItem( + "index-name", + 1, + CmsEntry.DocumentsWorkItemStatus.NOT_STARTED, + String.valueOf(0L), + 1 + )), + Optional.of(new CmsEntry.DocumentsWorkItem( + "index-name", + 1, + CmsEntry.DocumentsWorkItemStatus.NOT_STARTED, + CmsEntry.DocumentsWorkItem.getLeaseExpiry(TestAcquireLease.MILLI_SINCE_EPOCH, 2), + 2 + )), + Optional.empty(), + DocumentsStep.RandomWait.class + ) + ); + } + + @ParameterizedTest + @MethodSource("provideAcquireLeaseWorkArgs") + void AcquireLease_Work_AsExpected( + Optional existingEntry, // The entry we started with, before trying to acquire the lease + Optional updatedEntry, // The entry we try to update to + Optional responseEntry, // The response from the CMS client + Class nextStepClass) { + // Set up the test + testMembers.cmsWorkEntry = existingEntry; + + Mockito.when(testMembers.cmsClient.updateDocumentsWorkItem( + any(CmsEntry.DocumentsWorkItem.class), eq(existingEntry.get()) + )).thenReturn(responseEntry); + + // Run the test + DocumentsStep.AcquireLease testStep = new TestAcquireLease(testMembers, existingEntry.get()); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsWorkItem( + updatedEntry.get(), existingEntry.get() + ); + assertEquals(nextStepClass, nextStep.getClass()); + } + + static Stream provideSetupDocumentsWorkEntriesArgs() { + return Stream.of( + // We were able to mark the setup as completed + Arguments.of( + Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.IN_PROGRESS, + String.valueOf(42), + 1 + )), + DocumentsStep.GetDocumentsToMigrate.class + ), + + // We were unable to mark the setup as completed + Arguments.of(Optional.empty(), DocumentsStep.GetEntry.class) + ); + } + + @ParameterizedTest + @MethodSource("provideSetupDocumentsWorkEntriesArgs") + void SetupDocumentsWorkEntries_AsExpected(Optional returnedEntry, Class nextStepClass) { + // Set up the test + SnapshotRepo.Provider repoDataProvider = Mockito.mock(SnapshotRepo.Provider.class); + Mockito.when(testMembers.metadataFactory.getRepoDataProvider()).thenReturn(repoDataProvider); + + SnapshotRepo.Index index1 = Mockito.mock(SnapshotRepo.Index.class); + Mockito.when(index1.getName()).thenReturn("index1"); + SnapshotRepo.Index index2 = Mockito.mock(SnapshotRepo.Index.class); + Mockito.when(index2.getName()).thenReturn("index2"); + Mockito.when(repoDataProvider.getIndicesInSnapshot(testMembers.snapshotName)).thenReturn( + Stream.of(index1, index2).collect(Collectors.toList()) + ); + + IndexMetadata.Data indexMetadata1 = Mockito.mock(IndexMetadata.Data.class); + Mockito.when(indexMetadata1.getName()).thenReturn("index1"); + Mockito.when(indexMetadata1.getNumberOfShards()).thenReturn(1); + Mockito.when(testMembers.metadataFactory.fromRepo(testMembers.snapshotName, "index1")).thenReturn(indexMetadata1); + + IndexMetadata.Data indexMetadata2 = Mockito.mock(IndexMetadata.Data.class); + Mockito.when(indexMetadata2.getName()).thenReturn("index2"); + Mockito.when(indexMetadata2.getNumberOfShards()).thenReturn(2); + Mockito.when(testMembers.metadataFactory.fromRepo(testMembers.snapshotName, "index2")).thenReturn(indexMetadata2); + + var existingEntry = Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.SETUP, + String.valueOf(42), + 1 + )); + testMembers.cmsEntry = existingEntry; + + CmsEntry.Documents updatedEntry = new CmsEntry.Documents( + CmsEntry.DocumentsStatus.IN_PROGRESS, + existingEntry.get().leaseExpiry, + existingEntry.get().numAttempts + ); + Mockito.when(testMembers.cmsClient.updateDocumentsEntry( + updatedEntry, existingEntry.get() + )).thenReturn(returnedEntry); + + // Run the test + DocumentsStep.SetupDocumentsWorkEntries testStep = new DocumentsStep.SetupDocumentsWorkEntries(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.globalState, times(1)).updateWorkItem( + argThat(argument -> { + if (!(argument instanceof OpenSearchWorkItem)) { + return false; + } + OpenSearchWorkItem workItem = (OpenSearchWorkItem) argument; + return workItem.indexName.equals(OpenSearchCmsClient.CMS_INDEX_NAME) && + workItem.documentId.equals(OpenSearchCmsClient.CMS_DOCUMENTS_DOC_ID); + }) + ); + Mockito.verify(testMembers.cmsClient, times(1)).createDocumentsWorkItem( + "index1", 0 + ); + Mockito.verify(testMembers.cmsClient, times(1)).createDocumentsWorkItem( + "index2", 0 + ); + Mockito.verify(testMembers.cmsClient, times(1)).createDocumentsWorkItem( + "index2", 1 + ); + + Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsEntry( + updatedEntry, existingEntry.get() + ); + Mockito.verify(testMembers.globalState, times(1)).updateWorkItem( + null + ); + + assertEquals(nextStepClass, nextStep.getClass()); + } + + static Stream provideGetDocumentsToMigrateArgs() { + return Stream.of( + // There's still work to do + Arguments.of( + Optional.of(new CmsEntry.DocumentsWorkItem("index1", 0, CmsEntry.DocumentsWorkItemStatus.NOT_STARTED, "42", 1)), + DocumentsStep.MigrateDocuments.class + ), + + // There's no more work to do + Arguments.of(Optional.empty(), DocumentsStep.ExitPhaseSuccess.class) + ); + } + + @ParameterizedTest + @MethodSource("provideGetDocumentsToMigrateArgs") + void GetDocumentsToMigrate_AsExpected(Optional workItem, Class nextStepClass) { + // Set up the test + Mockito.when(testMembers.cmsClient.getAvailableDocumentsWorkItem()).thenReturn(workItem); + + // Run the test + DocumentsStep.GetDocumentsToMigrate testStep = new DocumentsStep.GetDocumentsToMigrate(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + assertEquals(nextStepClass, nextStep.getClass()); + } + + static Stream provideMigrateDocumentsArgs() { + return Stream.of( + // We have an to migrate and we create it + Arguments.of( + new CmsEntry.DocumentsWorkItem("index1", 0, CmsEntry.DocumentsWorkItemStatus.NOT_STARTED, "42", 1), + new CmsEntry.DocumentsWorkItem("index1", 0, CmsEntry.DocumentsWorkItemStatus.COMPLETED, "42", 1), + new CmsEntry.DocumentsWorkItem("index1", 0, CmsEntry.DocumentsWorkItemStatus.COMPLETED, "42", 1) + ) + ); + } + + @ParameterizedTest + @MethodSource("provideMigrateDocumentsArgs") + void MigrateDocuments_workToDo_AsExpected(CmsEntry.DocumentsWorkItem workItem, CmsEntry.DocumentsWorkItem updatedItem, CmsEntry.DocumentsWorkItem returnedItem) { + // Set up the test + testMembers.cmsWorkEntry = Optional.of(workItem); + + ShardMetadata.Data shardMetadata = Mockito.mock(ShardMetadata.Data.class); + Mockito.when(shardMetadata.getIndexName()).thenReturn(workItem.indexName); + Mockito.when(shardMetadata.getShardId()).thenReturn(workItem.shardId); + Mockito.when(testMembers.shardMetadataFactory.fromRepo(testMembers.snapshotName, workItem.indexName, workItem.shardId)).thenReturn(shardMetadata); + + Flux documents = Mockito.mock(Flux.class); + Mockito.when(testMembers.reader.readDocuments(shardMetadata.getIndexName(), shardMetadata.getShardId())).thenReturn(documents); + + Mockito.when(testMembers.reindexer.reindex(workItem.indexName, documents)).thenReturn(Mono.empty()); + + Mockito.when(testMembers.cmsClient.updateDocumentsWorkItemForceful(updatedItem)).thenReturn(returnedItem); + + // Run the test + DocumentsStep.MigrateDocuments testStep = new DocumentsStep.MigrateDocuments(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.reindexer, times(1)).reindex(workItem.indexName, documents); + Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsWorkItemForceful(updatedItem); + assertEquals(DocumentsStep.GetDocumentsToMigrate.class, nextStep.getClass()); + } + + @Test + void MigrateDocuments_failedItem_AsExpected() { + // Set up the test + CmsEntry.DocumentsWorkItem workItem = new CmsEntry.DocumentsWorkItem( + "index1", 0, CmsEntry.DocumentsWorkItemStatus.NOT_STARTED, "42", 1 + ); + testMembers.cmsWorkEntry = Optional.of(workItem); + + CmsEntry.DocumentsWorkItem updatedItem = new CmsEntry.DocumentsWorkItem( + "index1", 0, CmsEntry.DocumentsWorkItemStatus.NOT_STARTED, "42", 2 + ); + + ShardMetadata.Data shardMetadata = Mockito.mock(ShardMetadata.Data.class); + Mockito.when(shardMetadata.getIndexName()).thenReturn(workItem.indexName); + Mockito.when(shardMetadata.getShardId()).thenReturn(workItem.shardId); + Mockito.when(testMembers.shardMetadataFactory.fromRepo(testMembers.snapshotName, workItem.indexName, workItem.shardId)).thenReturn(shardMetadata); + + Flux documents = Mockito.mock(Flux.class); + Mockito.when(testMembers.reader.readDocuments(shardMetadata.getIndexName(), shardMetadata.getShardId())).thenReturn(documents); + + Mockito.doThrow(new RuntimeException("Test exception")).when(testMembers.reindexer).reindex(workItem.indexName, documents); + + // Run the test + DocumentsStep.MigrateDocuments testStep = new DocumentsStep.MigrateDocuments(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.reindexer, times(1)).reindex(workItem.indexName, documents); + Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsWorkItem(updatedItem, workItem); + assertEquals(DocumentsStep.GetDocumentsToMigrate.class, nextStep.getClass()); + } + + @Test + void MigrateDocuments_exceededAttempts_AsExpected() { + // Set up the test + CmsEntry.DocumentsWorkItem workItem = new CmsEntry.DocumentsWorkItem( + "index1", 0, CmsEntry.DocumentsWorkItemStatus.NOT_STARTED, "42", CmsEntry.DocumentsWorkItem.MAX_ATTEMPTS + 1 + ); + testMembers.cmsWorkEntry = Optional.of(workItem); + + CmsEntry.DocumentsWorkItem updatedItem = new CmsEntry.DocumentsWorkItem( + "index1", 0, CmsEntry.DocumentsWorkItemStatus.FAILED, "42", CmsEntry.DocumentsWorkItem.MAX_ATTEMPTS + 1 + ); + + // Run the test + DocumentsStep.MigrateDocuments testStep = new DocumentsStep.MigrateDocuments(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsWorkItem(updatedItem, workItem); + assertEquals(DocumentsStep.GetDocumentsToMigrate.class, nextStep.getClass()); + } + + public static class TestRandomWait extends DocumentsStep.RandomWait { + public TestRandomWait(SharedMembers members) { + super(members); + } + + @Override + protected void waitABit() { + // do nothing + } + } + + @Test + void RandomWait_AsExpected() { + // Run the test + DocumentsStep.RandomWait testStep = new TestRandomWait(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + assertEquals(DocumentsStep.GetEntry.class, nextStep.getClass()); + } + + @Test + void ExitPhaseSuccess_AsExpected() { + // Set up the test + var existingEntry = Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.IN_PROGRESS, + String.valueOf(42), + 1 + )); + testMembers.cmsEntry = existingEntry; + + // Run the test + DocumentsStep.ExitPhaseSuccess testStep = new DocumentsStep.ExitPhaseSuccess(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + var expectedEntry = new CmsEntry.Documents( + CmsEntry.DocumentsStatus.COMPLETED, + existingEntry.get().leaseExpiry, + existingEntry.get().numAttempts + ); + Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsEntry( + expectedEntry, existingEntry.get() + ); + + Mockito.verify(testMembers.globalState, times(1)).updatePhase( + GlobalState.Phase.DOCUMENTS_COMPLETED + ); + assertEquals(null, nextStep); + } + + @Test + void ExitPhaseFailed_AsExpected() { + // Set up the test + MaxAttemptsExceeded e = new MaxAttemptsExceeded(); + + var existingEntry = Optional.of(new CmsEntry.Documents( + CmsEntry.DocumentsStatus.SETUP, + String.valueOf(42), + 1 + )); + testMembers.cmsEntry = existingEntry; + + // Run the test + DocumentsStep.ExitPhaseFailed testStep = new DocumentsStep.ExitPhaseFailed(testMembers, e); + testStep.run(); + assertThrows(MaxAttemptsExceeded.class, () -> { + testStep.nextStep(); + }); + + // Check the results + var expectedEntry = new CmsEntry.Documents( + CmsEntry.DocumentsStatus.FAILED, + existingEntry.get().leaseExpiry, + existingEntry.get().numAttempts + ); + Mockito.verify(testMembers.cmsClient, times(1)).updateDocumentsEntry( + expectedEntry, existingEntry.get() + ); + Mockito.verify(testMembers.globalState, times(1)).updatePhase( + GlobalState.Phase.DOCUMENTS_FAILED + ); + } +} diff --git a/RFS/src/test/java/com/rfs/worker/IndexStepTest.java b/RFS/src/test/java/com/rfs/worker/IndexStepTest.java index ba201ff14..40f13cb05 100644 --- a/RFS/src/test/java/com/rfs/worker/IndexStepTest.java +++ b/RFS/src/test/java/com/rfs/worker/IndexStepTest.java @@ -219,9 +219,9 @@ protected long getNowMs() { static Stream provideAcquireLeaseArgs() { return Stream.of( // We were able to acquire the lease - Arguments.of( + Arguments.of( Optional.of(new CmsEntry.Index( - CmsEntry.IndexStatus.IN_PROGRESS, + CmsEntry.IndexStatus.SETUP, String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), 1 )), @@ -238,7 +238,7 @@ static Stream provideAcquireLeaseArgs() { void AcquireLease_AsExpected(Optional updatedEntry, Class nextStepClass) { // Set up the test var existingEntry = Optional.of(new CmsEntry.Index( - CmsEntry.IndexStatus.IN_PROGRESS, + CmsEntry.IndexStatus.SETUP, CmsEntry.Index.getLeaseExpiry(0L, CmsEntry.Index.MAX_ATTEMPTS - 1), CmsEntry.Index.MAX_ATTEMPTS - 1 )); @@ -255,7 +255,7 @@ void AcquireLease_AsExpected(Optional updatedEntry, Class nex // Check the results var expectedEntry = new CmsEntry.Index( - CmsEntry.IndexStatus.IN_PROGRESS, + CmsEntry.IndexStatus.SETUP, CmsEntry.Index.getLeaseExpiry(TestAcquireLease.milliSinceEpoch, CmsEntry.Index.MAX_ATTEMPTS), CmsEntry.Index.MAX_ATTEMPTS ); @@ -267,17 +267,17 @@ void AcquireLease_AsExpected(Optional updatedEntry, Class nex static Stream provideSetupIndexWorkEntriesArgs() { return Stream.of( - // We were able to acquire the lease + // We were able to mark the setup as completed Arguments.of( Optional.of(new CmsEntry.Index( - CmsEntry.IndexStatus.COMPLETED, + CmsEntry.IndexStatus.IN_PROGRESS, String.valueOf(42), 1 )), IndexStep.GetIndicesToMigrate.class ), - // We were unable to acquire the lease + // We were unable to mark the setup as completed Arguments.of(Optional.empty(), IndexStep.GetEntry.class) ); } @@ -287,7 +287,7 @@ static Stream provideSetupIndexWorkEntriesArgs() { void SetupIndexWorkEntries_AsExpected(Optional updatedEntry, Class nextStepClass) { // Set up the test var existingEntry = Optional.of(new CmsEntry.Index( - CmsEntry.IndexStatus.IN_PROGRESS, + CmsEntry.IndexStatus.SETUP, String.valueOf(42), 1 )); @@ -315,7 +315,7 @@ void SetupIndexWorkEntries_AsExpected(Optional updatedEntry, Cla Mockito.when(testMembers.metadataFactory.fromRepo(testMembers.snapshotName, "index2")).thenReturn(indexMetadata2); CmsEntry.Index expectedEntry = new CmsEntry.Index( - CmsEntry.IndexStatus.COMPLETED, + CmsEntry.IndexStatus.IN_PROGRESS, existingEntry.get().leaseExpiry, existingEntry.get().numAttempts ); @@ -486,12 +486,29 @@ void RandomWait_AsExpected() { @Test void ExitPhaseSuccess_AsExpected() { + // Set up the test + var existingEntry = Optional.of(new CmsEntry.Index( + CmsEntry.IndexStatus.IN_PROGRESS, + String.valueOf(42), + 1 + )); + testMembers.cmsEntry = existingEntry; + // Run the test IndexStep.ExitPhaseSuccess testStep = new IndexStep.ExitPhaseSuccess(testMembers); testStep.run(); WorkerStep nextStep = testStep.nextStep(); // Check the results + var expectedEntry = new CmsEntry.Index( + CmsEntry.IndexStatus.COMPLETED, + existingEntry.get().leaseExpiry, + existingEntry.get().numAttempts + ); + Mockito.verify(testMembers.cmsClient, times(1)).updateIndexEntry( + expectedEntry, existingEntry.get() + ); + Mockito.verify(testMembers.globalState, times(1)).updatePhase( GlobalState.Phase.INDEX_COMPLETED );