Skip to content

Commit

Permalink
[RFS] Added coordinated doc migration to RFS Worker (#708)
Browse files Browse the repository at this point in the history
* RFS coordinated doc migration running; need tests and cleanup

Signed-off-by: Chris Helma <[email protected]>

* Added RFS doc migration unit tests + a couple bug fixes in RFS

Signed-off-by: Chris Helma <[email protected]>

* Updates per PR comments

Signed-off-by: Chris Helma <[email protected]>

---------

Signed-off-by: Chris Helma <[email protected]>
  • Loading branch information
chelma authored Jun 10, 2024
1 parent 2c60787 commit c1c4383
Show file tree
Hide file tree
Showing 26 changed files with 1,966 additions and 216 deletions.
7 changes: 4 additions & 3 deletions RFS/docs/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ ID: metadata_status
FIELDS:
* status (string): IN_PROGRESS, COMPLETED, FAILED
* leaseExpiry (timestamp): When the current work lease expires
* num_attempts (integer): Times the task has been attempted
* numAttempts (integer): Times the task has been attempted
INDEX MIGRATION STATUS RECORD
ID: index_status
Expand All @@ -287,9 +287,10 @@ FIELDS:
SHARD WORK ENTRY RECORD
ID: <name of the index to be migrated>_<shard number>
FIELDS:
* index (string): The index name
* shard (integer): The shard number
* indexName (string): The index name
* shardId (integer): The shard number
* status (string): NOT_STARTED, COMPLETED, FAILED
* leaseExpiry (timestamp): When the current work lease expires
* numAttempts (integer): Times the task has been attempted
```

Expand Down
11 changes: 6 additions & 5 deletions RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ public static void main(String[] args) {
// Get the file mapping for the shard
ShardMetadata.Data shardMetadata;
if (sourceVersion == ClusterVersion.ES_6_8) {
shardMetadata = new ShardMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId);
shardMetadata = new ShardMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId);
} else {
shardMetadata = new ShardMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId);
shardMetadata = new ShardMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId);
}
System.out.println("Shard Metadata: " + shardMetadata.toString());
}
Expand All @@ -207,9 +207,9 @@ public static void main(String[] args) {
for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) {
ShardMetadata.Data shardMetadata;
if (sourceVersion == ClusterVersion.ES_6_8) {
shardMetadata = new ShardMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId);
shardMetadata = new ShardMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId);
} else {
shardMetadata = new ShardMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId);
shardMetadata = new ShardMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId);
}

// Unpack the shard
Expand All @@ -219,7 +219,8 @@ public static void main(String[] args) {
} else {
bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES;
}
SnapshotShardUnpacker.unpack(repo, shardMetadata, Paths.get(luceneBasePathString), bufferSize);
SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, Paths.get(luceneBasePathString), bufferSize);
unpacker.unpack(shardMetadata);

// Now, read the documents back out
System.out.println("--- Reading docs in the shard ---");
Expand Down
30 changes: 17 additions & 13 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,14 @@ public static void main(String[] args) throws InterruptedException {
logger.info("==================================================================");
logger.info("Unpacking blob files to disk...");

int bufferSize;
if (sourceVersion == ClusterVersion.ES_6_8) {
bufferSize = ElasticsearchConstants_ES_6_8.BUFFER_SIZE_IN_BYTES;
} else {
bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES;
}
SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, luceneDirPath, bufferSize);

for (IndexMetadata.Data indexMetadata : indexMetadatas) {
logger.info("Processing index: " + indexMetadata.getName());
for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) {
Expand All @@ -331,20 +339,13 @@ public static void main(String[] args) throws InterruptedException {
// Get the shard metadata
ShardMetadata.Data shardMetadata;
if (sourceVersion == ClusterVersion.ES_6_8) {
shardMetadata = new ShardMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId);
shardMetadata = new ShardMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId);
} else {
shardMetadata = new ShardMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName, indexMetadata.getName(), shardId);
shardMetadata = new ShardMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId);
}

// Unpack the shard
int bufferSize;
if (sourceVersion == ClusterVersion.ES_6_8) {
bufferSize = ElasticsearchConstants_ES_6_8.BUFFER_SIZE_IN_BYTES;
} else {
bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES;
}

SnapshotShardUnpacker.unpack(repo, shardMetadata, luceneDirPath, bufferSize);
unpacker.unpack(shardMetadata);
}
}

Expand All @@ -356,15 +357,18 @@ public static void main(String[] args) throws InterruptedException {
logger.info("==================================================================");
logger.info("Reindexing the documents...");

LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);

for (IndexMetadata.Data indexMetadata : indexMetadatas) {
for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) {
logger.info("=== Index Id: " + indexMetadata.getName() + ", Shard ID: " + shardId + " ===");

Flux<Document> documents = new LuceneDocumentsReader().readDocuments(luceneDirPath, indexMetadata.getName(), shardId);
Flux<Document> documents = reader.readDocuments(indexMetadata.getName(), shardId);
String targetIndex = indexMetadata.getName() + indexSuffix;

final int finalShardId = shardId; // Define in local context for the lambda
DocumentReindexer.reindex(targetIndex, documents, targetClient)
reindexer.reindex(targetIndex, documents)
.doOnError(error -> logger.error("Error during reindexing: " + error))
.doOnSuccess(done -> logger.info("Reindexing completed for index " + targetIndex + ", shard " + finalShardId))
// Wait for the shard reindexing to complete before proceeding; fine in this demo script, but
Expand All @@ -373,7 +377,7 @@ public static void main(String[] args) throws InterruptedException {
}
}
logger.info("Refreshing target cluster to reflect newly added documents");
DocumentReindexer.refreshAllDocuments(targetConnection);
reindexer.refreshAllDocuments(targetConnection);
logger.info("Refresh complete");
}

Expand Down
38 changes: 28 additions & 10 deletions RFS/src/main/java/com/rfs/RunRfsWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,30 @@
import com.rfs.cms.OpenSearchCmsClient;
import com.rfs.common.ClusterVersion;
import com.rfs.common.ConnectionDetails;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.GlobalMetadata;
import com.rfs.common.IndexMetadata;
import com.rfs.common.Logging;
import com.rfs.common.LuceneDocumentsReader;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.S3Uri;
import com.rfs.common.ShardMetadata;
import com.rfs.common.S3Repo;
import com.rfs.common.SnapshotCreator;
import com.rfs.common.SourceRepo;
import com.rfs.common.S3SnapshotCreator;
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.transformers.TransformFunctions;
import com.rfs.transformers.Transformer;
import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10;
import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10;
import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11;
import com.rfs.version_os_2_11.IndexCreator_OS_2_11;
import com.rfs.worker.DocumentsRunner;
import com.rfs.worker.GlobalState;
import com.rfs.worker.IndexRunner;
import com.rfs.worker.MetadataRunner;
Expand All @@ -61,6 +68,9 @@ public static class Args {
@Parameter(names = {"--s3-region"}, description = "The AWS Region the S3 bucket is in, like: us-east-2", required = true)
public String s3Region;

@Parameter(names = {"--lucene-dir"}, description = "The absolute path to the directory where we'll put the Lucene docs", required = true)
public String luceneDirPath;

@Parameter(names = {"--source-host"}, description = "The source host and port (e.g. http://localhost:9200)", required = true)
public String sourceHost;

Expand All @@ -79,17 +89,17 @@ public static class Args {
@Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false)
public String targetPass = null;

@Parameter(names = {"--index-whitelist"}, description = ("Optional. List of index names to migrate"
@Parameter(names = {"--index-allowlist"}, description = ("Optional. List of index names to migrate"
+ " (e.g. 'logs_2024_01, logs_2024_02'). Default: all indices"), required = false)
public List<String> indexWhitelist = List.of();
public List<String> indexAllowlist = List.of();

@Parameter(names = {"--index-template-whitelist"}, description = ("Optional. List of index template names to migrate"
@Parameter(names = {"--index-template-allowlist"}, description = ("Optional. List of index template names to migrate"
+ " (e.g. 'posts_index_template1, posts_index_template2'). Default: empty list"), required = false)
public List<String> indexTemplateWhitelist = List.of();
public List<String> indexTemplateAllowlist = List.of();

@Parameter(names = {"--component-template-whitelist"}, description = ("Optional. List of component template names to migrate"
@Parameter(names = {"--component-template-allowlist"}, description = ("Optional. List of component template names to migrate"
+ " (e.g. 'posts_template1, posts_template2'). Default: empty list"), required = false)
public List<String> componentTemplateWhitelist = List.of();
public List<String> componentTemplateAllowlist = List.of();

//https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/
@Parameter(names = {"--min-replicas"}, description = ("Optional. The minimum number of replicas configured for migrated indices on the target."
Expand All @@ -113,14 +123,15 @@ public static void main(String[] args) throws Exception {
Path s3LocalDirPath = Paths.get(arguments.s3LocalDirPath);
String s3RepoUri = arguments.s3RepoUri;
String s3Region = arguments.s3Region;
Path luceneDirPath = Paths.get(arguments.luceneDirPath);
String sourceHost = arguments.sourceHost;
String sourceUser = arguments.sourceUser;
String sourcePass = arguments.sourcePass;
String targetHost = arguments.targetHost;
String targetUser = arguments.targetUser;
String targetPass = arguments.targetPass;
List<String> indexTemplateWhitelist = arguments.indexTemplateWhitelist;
List<String> componentTemplateWhitelist = arguments.componentTemplateWhitelist;
List<String> indexTemplateAllowlist = arguments.indexTemplateAllowlist;
List<String> componentTemplateAllowlist = arguments.componentTemplateAllowlist;
int awarenessDimensionality = arguments.minNumberOfReplicas + 1;
Level logLevel = arguments.logLevel;

Expand All @@ -143,7 +154,7 @@ public static void main(String[] args) throws Exception {
SourceRepo sourceRepo = S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region);
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);
GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider);
GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), componentTemplateWhitelist, indexTemplateWhitelist);
GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), componentTemplateAllowlist, indexTemplateAllowlist);
Transformer transformer = TransformFunctions.getTransformer(ClusterVersion.ES_7_10, ClusterVersion.OS_2_11, awarenessDimensionality);
MetadataRunner metadataWorker = new MetadataRunner(globalState, cmsClient, snapshotName, metadataFactory, metadataCreator, transformer);
metadataWorker.run();
Expand All @@ -152,6 +163,13 @@ public static void main(String[] args) throws Exception {
IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
IndexRunner indexWorker = new IndexRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, indexCreator, transformer);
indexWorker.run();

ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(sourceRepo, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);
LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);
DocumentsRunner documentsWorker = new DocumentsRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, shardMetadataFactory, unpacker, reader, reindexer);
documentsWorker.run();

} catch (Runner.PhaseFailed e) {
logPhaseFailureRecord(e.phase, e.nextStep, e.cmsEntry, e.e);
Expand All @@ -173,7 +191,7 @@ public static void logPhaseFailureRecord(GlobalState.Phase phase, WorkerStep nex
String currentStep = (nextStep != null) ? nextStep.getClass().getSimpleName() : "null";
errorBlob.put("currentStep", currentStep);

String currentEntry = (cmsEntry.isPresent()) ? cmsEntry.toString() : "null";
String currentEntry = (cmsEntry.isPresent()) ? cmsEntry.get().toRepresentationString() : "null";
errorBlob.put("cmsEntry", currentEntry);


Expand Down
41 changes: 41 additions & 0 deletions RFS/src/main/java/com/rfs/cms/CmsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,45 @@ public interface CmsClient {
* Retrieves a set of Index Work Items from the CMS that appear ready to be worked on, up to the specified limit.
*/
public List<CmsEntry.IndexWorkItem> getAvailableIndexWorkItems(int maxItems);

/*
* Creates a new entry in the CMS for the Documents Migration's progress. Returns an Optional; if the document was
* created, it will be the created entry and empty otherwise.
*/
public Optional<CmsEntry.Documents> createDocumentsEntry();

/*
* Attempt to retrieve the Documents Migration entry from the CMS, if it exists. Returns an Optional; if the document
* exists, it will be the retrieved entry and empty otherwise.
*/
public Optional<CmsEntry.Documents> getDocumentsEntry();

/*
* Updates the Documents Migration entry in the CMS. Returns an Optional; if the document was updated,
* it will be the updated entry and empty otherwise.
*/
public Optional<CmsEntry.Documents> updateDocumentsEntry(CmsEntry.Documents newEntry, CmsEntry.Documents lastEntry);

/*
* Creates a new entry in the CMS for an Documents Work Item. Returns an Optional; if the document was
* created, it will be the created entry and empty otherwise.
*/
public Optional<CmsEntry.DocumentsWorkItem> createDocumentsWorkItem(String indexName, int shardId);

/*
* Updates the Documents Work Item in the CMS. Returns an Optional; if the document was updated,
* it will be the updated entry and empty otherwise.
*/
public Optional<CmsEntry.DocumentsWorkItem> updateDocumentsWorkItem(CmsEntry.DocumentsWorkItem newEntry, CmsEntry.DocumentsWorkItem lastEntry);

/*
* Forcefully updates the Documents Work Item in the CMS. This method should be used when you don't care about collisions
* and just want to overwrite the existing entry no matter what. Returns the updated entry.
*/
public CmsEntry.DocumentsWorkItem updateDocumentsWorkItemForceful(CmsEntry.DocumentsWorkItem newEntry);

/*
* Retrieves a Documents Work Items from the CMS that appears ready to be worked on.
*/
public Optional<CmsEntry.DocumentsWorkItem> getAvailableDocumentsWorkItem();
}
Loading

0 comments on commit c1c4383

Please sign in to comment.