Skip to content

Commit

Permalink
Added Elasticsearch 6.8 support to Document Migrations (#921)
Browse files Browse the repository at this point in the history
* Added a speculative 5.6 docker-compose to RFS

Signed-off-by: Chris Helma <[email protected]>

* Checkpoint: adding version-specification to doc migration

Signed-off-by: Chris Helma <[email protected]>

* Created a 5.6 snapshot w/ updates & deletes, put in RFS test-resources

Signed-off-by: Chris Helma <[email protected]>

* Added 2 ES 6.8 snapshots to facilitate RFS testing

Signed-off-by: Chris Helma <[email protected]>

* Added a native ES 6.8 snapshot compatible w/ Lucene 8.X

Signed-off-by: Chris Helma <[email protected]>

* Unit test updates

Signed-off-by: Chris Helma <[email protected]>

* Removed an unnecessary local snapshot

Signed-off-by: Chris Helma <[email protected]>

* Ran spotlessApply

Signed-off-by: Chris Helma <[email protected]>

* Updates per PR comments

Signed-off-by: Chris Helma <[email protected]>

* Added ES 6.8 to RFS EndToEnd tests; refactored those tests

Signed-off-by: Chris Helma <[email protected]>

* More updates per PR comments

Signed-off-by: Chris Helma <[email protected]>

---------

Signed-off-by: Chris Helma <[email protected]>
  • Loading branch information
chelma authored Aug 29, 2024
1 parent b04657f commit b832306
Show file tree
Hide file tree
Showing 76 changed files with 996 additions and 540 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ TrafficCapture/**/out/
opensearch-cluster-cdk/
test/opensearch-cluster-cdk/
TrafficCapture/dockerSolution/src/main/docker/migrationConsole/staging

# Directory used to temporarily store snapshots during testing
DocumentsFromSnapshotMigration/docker/snapshots/
8 changes: 8 additions & 0 deletions DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ DockerServiceProps[] dockerServices = [
dockerImageName:"reindex_from_snapshot",
inputDir:"./docker",
taskDependencies:["copyDockerRuntimeJars"]]),
new DockerServiceProps([projectName:"emptyElasticsearchSource_5_6",
dockerImageName:"empty_elasticsearch_source_5_6",
inputDir:"./docker/TestSource_ES_5_6"]),
new DockerServiceProps([projectName:"emptyElasticsearchSource_6_8",
dockerImageName:"empty_elasticsearch_source_6_8",
inputDir:"./docker/TestSource_ES_6_8"]),
Expand All @@ -112,6 +115,7 @@ for (dockerService in dockerServices) {
dependsOn dep
}
inputDir = project.file(dockerService.inputDir)
// platform.set("linux/amd64")
buildArgs = dockerService.buildArgs
images.add("migrations/${dockerService.dockerImageName}:${hash}")
images.add("migrations/${dockerService.dockerImageName}:${version}")
Expand All @@ -123,6 +127,10 @@ dockerCompose {
useComposeFiles = ['docker/docker-compose-es710.yml']
projectName = 'rfs-compose'

es56 {
useComposeFiles = ['docker/docker-compose-es56.yml']
}

es68 {
useComposeFiles = ['docker/docker-compose-es68.yml']
}
Expand Down
20 changes: 20 additions & 0 deletions DocumentsFromSnapshotMigration/docker/TestSource_ES_5_6/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM docker.elastic.co/elasticsearch/elasticsearch:5.6.16 AS base

# Configure Elastic
ENV ELASTIC_SEARCH_CONFIG_FILE=/usr/share/elasticsearch/config/elasticsearch.yml
RUN echo "discovery.type: single-node" >> $ELASTIC_SEARCH_CONFIG_FILE
RUN echo "xpack.security.enabled: false" >> $ELASTIC_SEARCH_CONFIG_FILE
RUN echo "bootstrap.system_call_filter: false" >> $ELASTIC_SEARCH_CONFIG_FILE
ENV PATH=${PATH}:/usr/share/elasticsearch/jdk/bin/

# Make our snapshot directory
USER root
RUN mkdir /snapshots && chown elasticsearch /snapshots
USER elasticsearch

# We do not install the S3 Repo plugin here, because it is not compatible with modern
# IAM Roles. Specifically, it does not support the AWS_SESSION_TOKEN environment variable.
# We will instead take snapshots into a mounted local volume.

# Additionally, we will rely on the base image's default entrypoint command to start the
# Elasticsearch service.
47 changes: 47 additions & 0 deletions DocumentsFromSnapshotMigration/docker/docker-compose-es56.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
version: '3.7'
services:

elasticsearchsource:
image: 'migrations/empty_elasticsearch_source_5_6:latest'
platform: linux/amd64
networks:
- migrations
environment:
- path.repo=/snapshots
- AWS_ACCESS_KEY_ID=${access_key}
- AWS_SECRET_ACCESS_KEY=${secret_key}
- AWS_SESSION_TOKEN=${session_token}
ports:
- '19200:9200'
volumes:
- ./snapshots:/snapshots

reindex-from-snapshot:
image: 'migrations/reindex_from_snapshot:latest'
depends_on:
elasticsearchsource:
condition: service_started
opensearchtarget:
condition: service_started
networks:
- migrations
environment:
- AWS_ACCESS_KEY_ID=${access_key}
- AWS_SECRET_ACCESS_KEY=${secret_key}
- AWS_SESSION_TOKEN=${session_token}
volumes:
- ./snapshots:/snapshots

opensearchtarget:
image: 'opensearchproject/opensearch:2.11.1'
environment:
- discovery.type=single-node
- plugins.security.disabled=true
networks:
- migrations
ports:
- "29200:9200"

networks:
migrations:
driver: bridge
10 changes: 2 additions & 8 deletions DocumentsFromSnapshotMigration/docker/docker-compose-es68.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ services:
networks:
- migrations
environment:
- discovery.type=single-node
- path.repo=/snapshots
- AWS_ACCESS_KEY_ID=${access_key}
- AWS_SECRET_ACCESS_KEY=${secret_key}
- AWS_SESSION_TOKEN=${session_token}
ports:
- '19200:9200'
volumes:
- snapshotStorage:/snapshots
- ./snapshots:/snapshots

# Sample command to kick off RFS here: https://github.com/opensearch-project/opensearch-migrations/blob/main/RFS/README.md#using-docker
reindex-from-snapshot:
image: 'migrations/reindex_from_snapshot:latest'
depends_on:
Expand All @@ -31,7 +29,7 @@ services:
- AWS_SECRET_ACCESS_KEY=${secret_key}
- AWS_SESSION_TOKEN=${session_token}
volumes:
- snapshotStorage:/snapshots
- ./snapshots:/snapshots

opensearchtarget:
image: 'opensearchproject/opensearch:2.11.1'
Expand All @@ -46,7 +44,3 @@ services:
networks:
migrations:
driver: bridge

volumes:
snapshotStorage:
driver: local
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.rfs.cms.LeaseExpireTrigger;
import com.rfs.cms.OpenSearchWorkCoordinator;
import com.rfs.cms.ScopedWorkCoordinator;
import com.rfs.common.ClusterVersion;
import com.rfs.common.DefaultSourceRepoAccessor;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.FileSystemRepo;
Expand All @@ -35,15 +36,13 @@
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.common.SourceRepo;
import com.rfs.common.SourceResourceProvider;
import com.rfs.common.SourceResourceProviderFactory;
import com.rfs.common.TryHandlePhaseFailure;
import com.rfs.common.http.ConnectionContext;
import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;
import com.rfs.tracing.RootWorkCoordinationContext;
import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10;
import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10;
import com.rfs.worker.DocumentsRunner;
import com.rfs.worker.ShardWorkPreparer;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -132,6 +131,11 @@ public static class Args {
description = "Optional. The maximum number of connections to simultaneously " +
"used to communicate to the target, default 10")
int maxConnections = 10;

@Parameter(names = { "--source-version" }, description = ("Optional. Version of the source cluster. Possible "
+ "values include: ES_6_8, ES_7_10, ES_7_17. Default: ES_7_10"), required = false,
converter = ClusterVersion.ArgsConverter.class)
public ClusterVersion sourceVersion = ClusterVersion.ES_7_10;
}

public static class NoWorkLeftException extends Exception {
Expand Down Expand Up @@ -212,20 +216,22 @@ public static void main(String[] args) throws Exception {
} else {
sourceRepo = new FileSystemRepo(snapshotLocalDirPath);
}
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);

SourceResourceProvider sourceResourceProvider = SourceResourceProviderFactory.getProvider(arguments.sourceVersion);

SnapshotRepo.Provider repoDataProvider = sourceResourceProvider.getSnapshotRepoProvider(sourceRepo);
IndexMetadata.Factory indexMetadataFactory = sourceResourceProvider.getIndexMetadataFactory(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = sourceResourceProvider.getShardMetadataFactory(repoDataProvider);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(
repoAccessor,
luceneDirPath,
ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES
sourceResourceProvider.getBufferSizeInBytes()
);

run(
LuceneDocumentsReader.getFactory(ElasticsearchConstants_ES_7_10.SOFT_DELETES_POSSIBLE,
ElasticsearchConstants_ES_7_10.SOFT_DELETES_FIELD),
LuceneDocumentsReader.getFactory(sourceResourceProvider.getSoftDeletesPossible(),
sourceResourceProvider.getSoftDeletesFieldData()),
reindexer,
workCoordinator,
arguments.initialLeaseDuration,
Expand Down
Loading

0 comments on commit b832306

Please sign in to comment.