Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFS] Added coordinated doc migration to RFS Worker #708

Merged
merged 3 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions RFS/docs/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ ID: metadata_status
FIELDS:
* status (string): IN_PROGRESS, COMPLETED, FAILED
* leaseExpiry (timestamp): When the current work lease expires
* num_attempts (integer): Times the task has been attempted
* numAttempts (integer): Times the task has been attempted

INDEX MIGRATION STATUS RECORD
ID: index_status
Expand All @@ -287,9 +287,10 @@ FIELDS:
SHARD WORK ENTRY RECORD
ID: <name of the index to be migrated>_<shard number>
FIELDS:
* index (string): The index name
* shard (integer): The shard number
* indexName (string): The index name
* shardId (integer): The shard number
* status (string): NOT_STARTED, COMPLETED, FAILED
* leaseExpiry (timestamp): When the current work lease expires
* numAttempts (integer): Times the task has been attempted
```

Expand Down
11 changes: 6 additions & 5 deletions RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ public static void main(String[] args) {
// Get the file mapping for the shard
ShardMetadata.Data shardMetadata;
if (sourceVersion == ClusterVersion.ES_6_8) {
shardMetadata = new ShardMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId);
shardMetadata = new ShardMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId);
} else {
shardMetadata = new ShardMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId);
shardMetadata = new ShardMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId);
}
System.out.println("Shard Metadata: " + shardMetadata.toString());
}
Expand All @@ -207,9 +207,9 @@ public static void main(String[] args) {
for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) {
ShardMetadata.Data shardMetadata;
if (sourceVersion == ClusterVersion.ES_6_8) {
shardMetadata = new ShardMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId);
shardMetadata = new ShardMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId);
} else {
shardMetadata = new ShardMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId);
shardMetadata = new ShardMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId);
}

// Unpack the shard
Expand All @@ -219,7 +219,8 @@ public static void main(String[] args) {
} else {
bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES;
}
SnapshotShardUnpacker.unpack(repo, shardMetadata, Paths.get(luceneBasePathString), bufferSize);
SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, Paths.get(luceneBasePathString), bufferSize);
unpacker.unpack(shardMetadata);

// Now, read the documents back out
System.out.println("--- Reading docs in the shard ---");
Expand Down
30 changes: 17 additions & 13 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,14 @@ public static void main(String[] args) throws InterruptedException {
logger.info("==================================================================");
logger.info("Unpacking blob files to disk...");

int bufferSize;
if (sourceVersion == ClusterVersion.ES_6_8) {
bufferSize = ElasticsearchConstants_ES_6_8.BUFFER_SIZE_IN_BYTES;
} else {
bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES;
}
SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, luceneDirPath, bufferSize);

for (IndexMetadata.Data indexMetadata : indexMetadatas) {
logger.info("Processing index: " + indexMetadata.getName());
for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) {
Expand All @@ -331,20 +339,13 @@ public static void main(String[] args) throws InterruptedException {
// Get the shard metadata
ShardMetadata.Data shardMetadata;
if (sourceVersion == ClusterVersion.ES_6_8) {
shardMetadata = new ShardMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId);
shardMetadata = new ShardMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId);
} else {
shardMetadata = new ShardMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId);
shardMetadata = new ShardMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId);
}

// Unpack the shard
int bufferSize;
if (sourceVersion == ClusterVersion.ES_6_8) {
bufferSize = ElasticsearchConstants_ES_6_8.BUFFER_SIZE_IN_BYTES;
} else {
bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES;
}

SnapshotShardUnpacker.unpack(repo, shardMetadata, luceneDirPath, bufferSize);
unpacker.unpack(shardMetadata);
}
}

Expand All @@ -356,15 +357,18 @@ public static void main(String[] args) throws InterruptedException {
logger.info("==================================================================");
logger.info("Reindexing the documents...");

LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);

for (IndexMetadata.Data indexMetadata : indexMetadatas) {
for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) {
logger.info("=== Index Id: " + indexMetadata.getName() + ", Shard ID: " + shardId + " ===");

Flux<Document> documents = new LuceneDocumentsReader().readDocuments(luceneDirPath, indexMetadata.getName(), shardId);
Flux<Document> documents = reader.readDocuments(indexMetadata.getName(), shardId);
String targetIndex = indexMetadata.getName() + indexSuffix;

final int finalShardId = shardId; // Define in local context for the lambda
DocumentReindexer.reindex(targetIndex, documents, targetClient)
reindexer.reindex(targetIndex, documents)
.doOnError(error -> logger.error("Error during reindexing: " + error))
.doOnSuccess(done -> logger.info("Reindexing completed for index " + targetIndex + ", shard " + finalShardId))
// Wait for the shard reindexing to complete before proceeding; fine in this demo script, but
Expand All @@ -373,7 +377,7 @@ public static void main(String[] args) throws InterruptedException {
}
}
logger.info("Refreshing target cluster to reflect newly added documents");
DocumentReindexer.refreshAllDocuments(targetConnection);
reindexer.refreshAllDocuments(targetConnection);
logger.info("Refresh complete");
}

Expand Down
38 changes: 28 additions & 10 deletions RFS/src/main/java/com/rfs/RunRfsWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,30 @@
import com.rfs.cms.OpenSearchCmsClient;
import com.rfs.common.ClusterVersion;
import com.rfs.common.ConnectionDetails;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.GlobalMetadata;
import com.rfs.common.IndexMetadata;
import com.rfs.common.Logging;
import com.rfs.common.LuceneDocumentsReader;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.S3Uri;
import com.rfs.common.ShardMetadata;
import com.rfs.common.S3Repo;
import com.rfs.common.SnapshotCreator;
import com.rfs.common.SourceRepo;
import com.rfs.common.S3SnapshotCreator;
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.transformers.TransformFunctions;
import com.rfs.transformers.Transformer;
import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10;
import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10;
import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11;
import com.rfs.version_os_2_11.IndexCreator_OS_2_11;
import com.rfs.worker.DocumentsRunner;
import com.rfs.worker.GlobalState;
import com.rfs.worker.IndexRunner;
import com.rfs.worker.MetadataRunner;
Expand All @@ -61,6 +68,9 @@ public static class Args {
@Parameter(names = {"--s3-region"}, description = "The AWS Region the S3 bucket is in, like: us-east-2", required = true)
public String s3Region;

@Parameter(names = {"--lucene-dir"}, description = "The absolute path to the directory where we'll put the Lucene docs", required = true)
public String luceneDirPath;

@Parameter(names = {"--source-host"}, description = "The source host and port (e.g. http://localhost:9200)", required = true)
public String sourceHost;

Expand All @@ -79,17 +89,17 @@ public static class Args {
@Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false)
public String targetPass = null;

@Parameter(names = {"--index-whitelist"}, description = ("Optional. List of index names to migrate"
@Parameter(names = {"--index-allowlist"}, description = ("Optional. List of index names to migrate"
+ " (e.g. 'logs_2024_01, logs_2024_02'). Default: all indices"), required = false)
public List<String> indexWhitelist = List.of();
public List<String> indexAllowlist = List.of();

@Parameter(names = {"--index-template-whitelist"}, description = ("Optional. List of index template names to migrate"
@Parameter(names = {"--index-template-allowlist"}, description = ("Optional. List of index template names to migrate"
+ " (e.g. 'posts_index_template1, posts_index_template2'). Default: empty list"), required = false)
public List<String> indexTemplateWhitelist = List.of();
public List<String> indexTemplateAllowlist = List.of();

@Parameter(names = {"--component-template-whitelist"}, description = ("Optional. List of component template names to migrate"
@Parameter(names = {"--component-template-allowlist"}, description = ("Optional. List of component template names to migrate"
+ " (e.g. 'posts_template1, posts_template2'). Default: empty list"), required = false)
public List<String> componentTemplateWhitelist = List.of();
public List<String> componentTemplateAllowlist = List.of();

//https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/
@Parameter(names = {"--min-replicas"}, description = ("Optional. The minimum number of replicas configured for migrated indices on the target."
Expand All @@ -113,14 +123,15 @@ public static void main(String[] args) throws Exception {
Path s3LocalDirPath = Paths.get(arguments.s3LocalDirPath);
String s3RepoUri = arguments.s3RepoUri;
String s3Region = arguments.s3Region;
Path luceneDirPath = Paths.get(arguments.luceneDirPath);
String sourceHost = arguments.sourceHost;
String sourceUser = arguments.sourceUser;
String sourcePass = arguments.sourcePass;
String targetHost = arguments.targetHost;
String targetUser = arguments.targetUser;
String targetPass = arguments.targetPass;
List<String> indexTemplateWhitelist = arguments.indexTemplateWhitelist;
List<String> componentTemplateWhitelist = arguments.componentTemplateWhitelist;
List<String> indexTemplateAllowlist = arguments.indexTemplateAllowlist;
List<String> componentTemplateAllowlist = arguments.componentTemplateAllowlist;
int awarenessDimensionality = arguments.minNumberOfReplicas + 1;
Level logLevel = arguments.logLevel;

Expand All @@ -143,7 +154,7 @@ public static void main(String[] args) throws Exception {
SourceRepo sourceRepo = S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region);
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);
GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider);
GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), componentTemplateWhitelist, indexTemplateWhitelist);
GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), componentTemplateAllowlist, indexTemplateAllowlist);
Transformer transformer = TransformFunctions.getTransformer(ClusterVersion.ES_7_10, ClusterVersion.OS_2_11, awarenessDimensionality);
MetadataRunner metadataWorker = new MetadataRunner(globalState, cmsClient, snapshotName, metadataFactory, metadataCreator, transformer);
metadataWorker.run();
Expand All @@ -152,6 +163,13 @@ public static void main(String[] args) throws Exception {
IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
IndexRunner indexWorker = new IndexRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, indexCreator, transformer);
indexWorker.run();

ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you look into how SimpleRestoreFromSnapshot_ES_7_10 can be updated to use this code instead of the ad-hoc version? Seems like a great opportunity to reuse this system

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100%, agreed. Will try to work into one of the small cleanup tasks I have next.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to get this started, but seems like a refactor might be better to get this working: chelma/opensearch-migrations@main...peternied:documents-runner-test

SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(sourceRepo, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);
LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);
DocumentsRunner documentsWorker = new DocumentsRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, shardMetadataFactory, unpacker, reader, reindexer);
documentsWorker.run();

} catch (Runner.PhaseFailed e) {
logPhaseFailureRecord(e.phase, e.nextStep, e.cmsEntry, e.e);
Expand All @@ -173,7 +191,7 @@ public static void logPhaseFailureRecord(GlobalState.Phase phase, WorkerStep nex
String currentStep = (nextStep != null) ? nextStep.getClass().getSimpleName() : "null";
errorBlob.put("currentStep", currentStep);

String currentEntry = (cmsEntry.isPresent()) ? cmsEntry.toString() : "null";
String currentEntry = (cmsEntry.isPresent()) ? cmsEntry.get().toRepresentationString() : "null";
errorBlob.put("cmsEntry", currentEntry);


Expand Down
41 changes: 41 additions & 0 deletions RFS/src/main/java/com/rfs/cms/CmsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,45 @@ public interface CmsClient {
* Retrieves a set of Index Work Items from the CMS that appear ready to be worked on, up to the specified limit.
*/
public List<CmsEntry.IndexWorkItem> getAvailableIndexWorkItems(int maxItems);

/*
* Creates a new entry in the CMS for the Documents Migration's progress. Returns an Optional; if the document was
* created, it will be the created entry and empty otherwise.
*/
public Optional<CmsEntry.Documents> createDocumentsEntry();

/*
* Attempt to retrieve the Documents Migration entry from the CMS, if it exists. Returns an Optional; if the document
* exists, it will be the retrieved entry and empty otherwise.
*/
public Optional<CmsEntry.Documents> getDocumentsEntry();

/*
* Updates the Documents Migration entry in the CMS. Returns an Optional; if the document was updated,
* it will be the updated entry and empty otherwise.
*/
public Optional<CmsEntry.Documents> updateDocumentsEntry(CmsEntry.Documents newEntry, CmsEntry.Documents lastEntry);

/*
* Creates a new entry in the CMS for an Documents Work Item. Returns an Optional; if the document was
* created, it will be the created entry and empty otherwise.
*/
public Optional<CmsEntry.DocumentsWorkItem> createDocumentsWorkItem(String indexName, int shardId);

/*
* Updates the Documents Work Item in the CMS. Returns an Optional; if the document was updated,
* it will be the updated entry and empty otherwise.
*/
public Optional<CmsEntry.DocumentsWorkItem> updateDocumentsWorkItem(CmsEntry.DocumentsWorkItem newEntry, CmsEntry.DocumentsWorkItem lastEntry);

/*
* Forcefully updates the Documents Work Item in the CMS. This method should be used when you don't care about collisions
* and just want to overwrite the existing entry no matter what. Returns the updated entry.
*/
public CmsEntry.DocumentsWorkItem updateDocumentsWorkItemForceful(CmsEntry.DocumentsWorkItem newEntry);

/*
* Retrieves a Documents Work Items from the CMS that appears ready to be worked on.
*/
public Optional<CmsEntry.DocumentsWorkItem> getAvailableDocumentsWorkItem();
}
Loading
Loading