Skip to content

Commit

Permalink
Rename com.rfs through the codebase to org.opensearch.migrations.bulk…
Browse files Browse the repository at this point in the history
…load.

CreateSnapshot & RfsMigrateDocuments both live under org.opensearch.migrations (not bulkload) to match parity with MetadataMigration and since CreateSnapshot isn't really a bulkload thing.
I've also renamed the cms directory (also under ...bulkload) to workcoordination.
Some of the recent changes for SonarLint appeasement have been reverted as they created some compilation errors (visibility of classes across packages).  The old code was safer, if terser, anyway.
Further directory/package reorganizations would be beneficial, as would unifying structures w/ the replayer and capture proxy code, but those can be for later.  For now, the main goal of this commit is to sanitize the package names to be something that the maintainers of this project control.

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Sep 22, 2024
1 parent d99beb6 commit 85a40b0
Show file tree
Hide file tree
Showing 192 changed files with 608 additions and 619 deletions.
2 changes: 1 addition & 1 deletion CreateSnapshot/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ dependencies {
}

application {
mainClassName = 'com.rfs.CreateSnapshot'
mainClassName = 'org.opensearch.migrations.CreateSnapshot'
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package com.rfs;

package org.opensearch.migrations;

import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.S3SnapshotCreator;
import org.opensearch.migrations.bulkload.common.SnapshotCreator;
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.tracing.IRfsContexts.ICreateSnapshotContext;
import org.opensearch.migrations.bulkload.worker.SnapshotRunner;
import org.opensearch.migrations.snapshot.creation.tracing.RootSnapshotContext;
import org.opensearch.migrations.tracing.ActiveContextTracker;
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
Expand All @@ -11,13 +18,6 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.ParametersDelegate;
import com.rfs.common.FileSystemSnapshotCreator;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.S3SnapshotCreator;
import com.rfs.common.SnapshotCreator;
import com.rfs.common.http.ConnectionContext;
import com.rfs.tracing.IRfsContexts.ICreateSnapshotContext;
import com.rfs.worker.SnapshotRunner;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down
2 changes: 1 addition & 1 deletion CreateSnapshot/src/test/resources/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ rootLogger.level = info
rootLogger.appenderRef.console.ref = Console

# Allow customization of owned package logs
logger.rfs.name = com.rfs
logger.rfs.name = org.opensearch.migrations.bulkload
logger.rfs.level = ${ownedPackagesLogLevel}
logger.migration.name = org.opensearch.migrations
logger.migration.level = ${ownedPackagesLogLevel}
2 changes: 1 addition & 1 deletion DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ dependencies {
}

application {
mainClassName = 'com.rfs.RfsMigrateDocuments'
mainClassName = 'org.opensearch.migrations.RfsMigrateDocuments'
}

// Cleanup additional docker build directory
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,33 @@
package com.rfs;
package org.opensearch.migrations;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.opensearch.migrations.Version;
import org.opensearch.migrations.VersionConverter;
import org.opensearch.migrations.bulkload.common.DefaultSourceRepoAccessor;
import org.opensearch.migrations.bulkload.common.DocumentReindexer;
import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.common.LuceneDocumentsReader;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.S3Repo;
import org.opensearch.migrations.bulkload.common.S3Uri;
import org.opensearch.migrations.bulkload.common.SnapshotShardUnpacker;
import org.opensearch.migrations.bulkload.common.SourceRepo;
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.models.IndexMetadata;
import org.opensearch.migrations.bulkload.models.ShardMetadata;
import org.opensearch.migrations.bulkload.workcoordination.CoordinateWorkHttpClient;
import org.opensearch.migrations.bulkload.workcoordination.IWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.LeaseExpireTrigger;
import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.ScopedWorkCoordinator;
import org.opensearch.migrations.bulkload.worker.DocumentsRunner;
import org.opensearch.migrations.bulkload.worker.ShardWorkPreparer;
import org.opensearch.migrations.cluster.ClusterProviderRegistry;
import org.opensearch.migrations.cluster.ClusterSnapshotReader;
import org.opensearch.migrations.reindexer.tracing.RootDocumentMigrationContext;
import org.opensearch.migrations.tracing.ActiveContextTracker;
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
Expand All @@ -25,28 +40,6 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.ParametersDelegate;
import com.rfs.RfsMigrateDocuments.RunParameters;
import com.rfs.cms.CoordinateWorkHttpClient;
import com.rfs.cms.IWorkCoordinator;
import com.rfs.cms.LeaseExpireTrigger;
import com.rfs.cms.OpenSearchWorkCoordinator;
import com.rfs.cms.ScopedWorkCoordinator;
import com.rfs.common.DefaultSourceRepoAccessor;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.FileSystemRepo;
import com.rfs.common.LuceneDocumentsReader;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.S3Repo;
import com.rfs.common.S3Uri;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.common.SourceRepo;
import com.rfs.common.http.ConnectionContext;
import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;
import com.rfs.worker.DocumentsRunner;
import com.rfs.worker.ShardWorkPreparer;
import lombok.Builder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;

Expand Down Expand Up @@ -227,19 +220,19 @@ public static void main(String[] args) throws Exception {
sourceResourceProvider.getBufferSizeInBytes()
);

run(RunParameters.builder()
.leaseExpireTrigger(processManager)
.workCoordinator(workCoordinator)
.reindexer(reindexer)
.snapshotName(arguments.snapshotName)
.snapshotReader(sourceResourceProvider)
.snapshotUnpacker(unpackerFactory)
.documentReader(LuceneDocumentsReader.getFactory(sourceResourceProvider))
.indexAllowlist(arguments.indexAllowlist)
.maxInitialLeaseDuration(arguments.initialLeaseDuration)
.maxShardSizeBytes(arguments.maxShardSizeBytes)
.tracingContext(context)
.build());
run(
LuceneDocumentsReader.getFactory(sourceResourceProvider),
reindexer,
workCoordinator,
arguments.initialLeaseDuration,
processManager,
sourceResourceProvider.getIndexMetadata(),
arguments.snapshotName,
arguments.indexAllowlist,
sourceResourceProvider.getShardMetadata(),
unpackerFactory,
arguments.maxShardSizeBytes,
context);
} catch (Exception e) {
log.atError().setMessage("Unexpected error running RfsWorker").setCause(e).log();
throw e;
Expand All @@ -259,38 +252,40 @@ private static RootDocumentMigrationContext makeRootContext(Args arguments, Stri
return new RootDocumentMigrationContext(otelSdk, compositeContextTracker);
}

public static DocumentsRunner.CompletionStatus run(RunParameters params) throws Exception {
var scopedWorkCoordinator = new ScopedWorkCoordinator(params.workCoordinator, params.leaseExpireTrigger);
confirmShardPrepIsComplete(
params.snapshotReader.getIndexMetadata(),
params.snapshotName,
params.indexAllowlist,
public static DocumentsRunner.CompletionStatus run(Function<Path, LuceneDocumentsReader> readerFactory,
DocumentReindexer reindexer,
IWorkCoordinator workCoordinator,
Duration maxInitialLeaseDuration,
LeaseExpireTrigger leaseExpireTrigger,
IndexMetadata.Factory indexMetadataFactory,
String snapshotName,
List<String> indexAllowlist,
ShardMetadata.Factory shardMetadataFactory,
SnapshotShardUnpacker.Factory unpackerFactory,
long maxShardSizeBytes,
RootDocumentMigrationContext rootDocumentContext) throws Exception
{
var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, leaseExpireTrigger);
confirmShardPrepIsComplete(indexMetadataFactory,
snapshotName,
indexAllowlist,
scopedWorkCoordinator,
params.tracingContext
rootDocumentContext
);
if (!params.workCoordinator.workItemsArePending(
params.tracingContext.getWorkCoordinationContext()::createItemsPendingContext
if (!workCoordinator.workItemsArePending(
rootDocumentContext.getWorkCoordinationContext()::createItemsPendingContext
)) {
throw new NoWorkLeftException("No work items are pending/all work items have been processed. Returning.");
}
BiFunction<String, Integer, ShardMetadata> shardFactory = (name, shard) -> {
var shardMetadataFactory = params.snapshotReader.getShardMetadata();
var shardMetadata = shardMetadataFactory.fromRepo(params.snapshotName, name, shard);
var runner = new DocumentsRunner(scopedWorkCoordinator, maxInitialLeaseDuration, (name, shard) -> {
var shardMetadata = shardMetadataFactory.fromRepo(snapshotName, name, shard);
log.info("Shard size: " + shardMetadata.getTotalSizeBytes());
if (shardMetadata.getTotalSizeBytes() > params.maxShardSizeBytes) {
throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), params.maxShardSizeBytes);
if (shardMetadata.getTotalSizeBytes() > maxShardSizeBytes) {
throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), maxShardSizeBytes);
}
return shardMetadata;
};
var runner = new DocumentsRunner(
scopedWorkCoordinator,
params.maxInitialLeaseDuration,
shardFactory,
params.snapshotUnpacker,
params.documentReader,
params.reindexer);
var migrationStatus = runner.migrateNextShard(params.tracingContext::createReindexContext);
return migrationStatus;
}, unpackerFactory, readerFactory, reindexer);
return runner.migrateNextShard(rootDocumentContext::createReindexContext);
}

private static void confirmShardPrepIsComplete(
Expand Down Expand Up @@ -333,29 +328,4 @@ private static void confirmShardPrepIsComplete(
}
}
}

@Builder
static class RunParameters {
@NonNull
final LeaseExpireTrigger leaseExpireTrigger;
@NonNull
final IWorkCoordinator workCoordinator;
@NonNull
final String snapshotName;
@NonNull
final ClusterSnapshotReader snapshotReader;
@NonNull
final SnapshotShardUnpacker.Factory snapshotUnpacker;
@NonNull
final Function<Path, LuceneDocumentsReader> documentReader;
@NonNull
final DocumentReindexer reindexer;
@NonNull
final List<String> indexAllowlist;
@NonNull
final Duration maxInitialLeaseDuration;
final long maxShardSizeBytes;
@NonNull
final RootDocumentMigrationContext tracingContext;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rfs;
package org.opensearch.migrations.bulkload;

import java.io.File;
import java.util.List;
Expand All @@ -10,17 +10,17 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.http.ClusterOperations;
import org.opensearch.migrations.bulkload.worker.DocumentsRunner;
import org.opensearch.migrations.bulkload.worker.SnapshotRunner;
import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext;
import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext;

import com.rfs.common.FileSystemRepo;
import com.rfs.common.FileSystemSnapshotCreator;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.http.ConnectionContextTestParams;
import com.rfs.framework.SearchClusterContainer;
import com.rfs.http.ClusterOperations;
import com.rfs.worker.DocumentsRunner;
import com.rfs.worker.SnapshotRunner;
import lombok.SneakyThrows;

import static org.hamcrest.CoreMatchers.equalTo;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rfs;
package org.opensearch.migrations.bulkload;

import java.nio.file.Files;
import java.util.ArrayList;
Expand All @@ -20,12 +20,13 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import org.opensearch.migrations.CreateSnapshot;
import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.framework.PreloadedSearchClusterContainer;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext;
import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext;

import com.rfs.common.FileSystemRepo;
import com.rfs.framework.PreloadedSearchClusterContainer;
import com.rfs.framework.SearchClusterContainer;
import lombok.Lombok;
import lombok.extern.slf4j.Slf4j;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rfs;
package org.opensearch.migrations.bulkload;

import java.nio.file.Paths;
import java.util.List;
Expand All @@ -16,14 +16,14 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import org.opensearch.migrations.bulkload.common.DocumentReindexer;
import org.opensearch.migrations.bulkload.common.LuceneDocumentsReader;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.OpenSearchClient.BulkResponse;
import org.opensearch.migrations.bulkload.common.RfsLuceneDocument;
import org.opensearch.migrations.bulkload.tracing.IRfsContexts;
import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts;

import com.rfs.common.DocumentReindexer;
import com.rfs.common.LuceneDocumentsReader;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.OpenSearchClient.BulkResponse;
import com.rfs.common.RfsLuceneDocument;
import com.rfs.tracing.IRfsContexts;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rfs;
package org.opensearch.migrations.bulkload;

import java.io.BufferedReader;
import java.io.File;
Expand All @@ -16,12 +16,13 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import org.opensearch.migrations.CreateSnapshot;
import org.opensearch.migrations.bulkload.framework.PreloadedSearchClusterContainer;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext;
import org.opensearch.migrations.testutils.ToxiProxyWrapper;
import org.opensearch.testcontainers.OpensearchContainer;

import com.rfs.framework.PreloadedSearchClusterContainer;
import com.rfs.framework.SearchClusterContainer;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -190,7 +191,7 @@ private static ProcessBuilder setupProcess(
javaExecutable,
"-cp",
classpath,
"com.rfs.RfsMigrateDocuments"
"org.opensearch.migrations.RfsMigrateDocuments"
);
processBuilder.command().addAll(Arrays.asList(args));
processBuilder.redirectErrorStream(true);
Expand Down
Loading

0 comments on commit 85a40b0

Please sign in to comment.