Skip to content

Commit

Permalink
A lot of bugfixes to get the FullTest moving further along
Browse files Browse the repository at this point in the history
Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Jun 19, 2024
1 parent 8703c20 commit 0275dd5
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 42 deletions.
10 changes: 9 additions & 1 deletion DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,19 @@ dependencies {
implementation project(":commonDependencyVersionConstraints")

implementation project(":RFS")
implementation group: 'org.apache.logging.log4j', name: 'log4j-api'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core'
implementation group: 'com.beust', name: 'jcommander'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-annotations'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core'
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-smile'
implementation group: 'io.projectreactor.netty', name: 'reactor-netty-core'
implementation group: 'io.projectreactor.netty', name:'reactor-netty-http'
implementation group: 'org.slf4j', name: 'slf4j-api'
implementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl'


testImplementation testFixtures(project(":RFS"))
testImplementation project(":CreateSnapshot")
testImplementation project(":MetadataMigration")
Expand All @@ -32,7 +41,6 @@ dependencies {
testImplementation group: 'org.testcontainers', name: 'testcontainers'

testImplementation platform('io.projectreactor:reactor-bom:2023.0.5')
testImplementation group: 'io.projectreactor.netty', name: 'reactor-netty-core'
}

application {
Expand Down
52 changes: 35 additions & 17 deletions DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
package com.rfs;

import com.rfs.cms.AbstractedHttpClient;
import com.rfs.cms.ApacheHttpClient;
import com.rfs.cms.OpenSearchWorkCoordinator;
import com.rfs.cms.ProcessManager;
import com.rfs.cms.ReactorHttpClient;
import com.rfs.common.ClusterVersion;
import com.rfs.common.ConnectionDetails;
import com.rfs.common.DefaultSourceRepoAccessor;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.FileSystemRepo;
import com.rfs.common.FileSystemSnapshotCreator;
import com.rfs.common.GlobalMetadata;
import com.rfs.common.IndexMetadata;
import com.rfs.common.LuceneDocumentsReader;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.ShardMetadata;
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.common.SourceRepo;
import com.rfs.framework.ElasticsearchContainer;
import com.rfs.transformers.TransformFunctions;
import com.rfs.transformers.Transformer;
import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10;
import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10;
import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11;
import com.rfs.version_os_2_11.IndexCreator_OS_2_11;
Expand All @@ -31,18 +40,14 @@
import org.opensearch.testcontainers.OpensearchContainer;
import reactor.core.publisher.Flux;

import javax.print.Doc;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
import java.util.AbstractQueue;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

Expand All @@ -55,36 +60,35 @@ public class FullTest {
"preloaded-ES_7_10"));
final static OpensearchContainer<?> osTargetContainer =
new OpensearchContainer<>("opensearchproject/opensearch:2.13.0");
private static Supplier<ApacheHttpClient> esHttpClientSupplier;

@BeforeAll
static void setupSourceContainer() {
esSourceContainer.start();
osTargetContainer.start();
esHttpClientSupplier = () -> new ApacheHttpClient(URI.create(esSourceContainer.getUrl()));
}

@Test
public void test() throws Exception {
final var SNAPSHOT_NAME = "testSnapshot";
final var SNAPSHOT_NAME = "test_snapshot";
CreateSnapshot.run(
c -> new FileSystemSnapshotCreator(SNAPSHOT_NAME, c, ElasticsearchContainer.CLUSTER_SNAPSHOT_DIR),
new OpenSearchClient(esSourceContainer.getUrl(), null));
var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot");
try {
esSourceContainer.copySnapshotData(tempDir.toString());

var targetClient = new OpenSearchClient(osTargetContainer.getHost(), null);
migrateMetadata(tempDir, targetClient, SNAPSHOT_NAME);

migrateDocumentsWithOneWorker(SNAPSHOT_NAME);
var targetClient = new OpenSearchClient(osTargetContainer.getHttpHostAddress(), null);
var sourceRepo = new FileSystemRepo(tempDir);
migrateMetadata(sourceRepo, targetClient, SNAPSHOT_NAME);

migrateDocumentsWithOneWorker(sourceRepo, SNAPSHOT_NAME);
} finally {
deleteTree(tempDir);
}
}

private static void migrateMetadata(Path tempDir, OpenSearchClient targetClient, String snapshotName) {
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(new FileSystemRepo(tempDir));
private static void migrateMetadata(SourceRepo sourceRepo, OpenSearchClient targetClient, String snapshotName) {
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);
GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider);
GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient,
List.of(), List.of(), List.of());
Expand Down Expand Up @@ -113,7 +117,7 @@ public Flux<Document> readDocuments(String indexName, int shard) {

static class LeasePastError extends Error { }

private void migrateDocumentsWithOneWorker(String snapshotName) throws Exception {
private void migrateDocumentsWithOneWorker(SourceRepo sourceRepo, String snapshotName) throws Exception {
var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");
try {
var shouldThrow = new AtomicBoolean();
Expand All @@ -128,12 +132,26 @@ private void migrateDocumentsWithOneWorker(String snapshotName) throws Exception
shouldThrow.set(true);
});

DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
tempDir, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);
IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);

RfsMigrateDocuments.run(path -> new FilteredLuceneDocumentsReader(path, terminatingDocumentFilter),
new DocumentReindexer(new OpenSearchClient(osTargetContainer.getHost(), null)),
new OpenSearchWorkCoordinator(esHttpClientSupplier.get(),
new DocumentReindexer(new OpenSearchClient(osTargetContainer.getHttpHostAddress(), null)),
new OpenSearchWorkCoordinator(
new ApacheHttpClient(new URI(osTargetContainer.getHttpHostAddress())),
// new ReactorHttpClient(new ConnectionDetails(osTargetContainer.getHttpHostAddress(),
// null, null)),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString()),
processManager,
1,snapshotName,1,1,
indexMetadataFactory,
snapshotName,
shardMetadataFactory,
unpackerFactory,
16*1024*1024);
} finally {
deleteTree(tempDir);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
status = ERROR

appender.console.type = Console
appender.console.name = Console
appender.console.target = SYSTEM_OUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss.SSS} %threadName %-5p %c{1}:%L - %m%n

rootLogger.level = trace
rootLogger.appenderRef.console.ref = Console
9 changes: 6 additions & 3 deletions RFS/src/main/java/com/rfs/RunRfsWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ public static void main(String[] args) throws Exception {
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
var unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);
LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);
var processManager = new ProcessManager(workItemId->{
log.error("terminating RunRfsWorker because its lease has expired for "+workItemId);
Expand All @@ -178,8 +177,12 @@ public static void main(String[] args) throws Exception {
5, UUID.randomUUID().toString());
var scopedWorkCoordinator = new ScopedWorkCoordinatorHelper(workCoordinator, processManager);
new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName);
new DocumentsRunner(scopedWorkCoordinator, snapshotName,
shardMetadataFactory, unpackerFactory, reader, reindexer).migrateNextShard();
new DocumentsRunner(scopedWorkCoordinator,
(name,shard) -> shardMetadataFactory.fromRepo(snapshotName,name,shard),
unpackerFactory,
path -> new LuceneDocumentsReader(luceneDirPath),
reindexer)
.migrateNextShard();
} catch (Exception e) {
log.error("Unexpected error running RfsWorker", e);
throw e;
Expand Down
11 changes: 10 additions & 1 deletion RFS/src/main/java/com/rfs/cms/ApacheHttpClient.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.rfs.cms;

import lombok.Getter;
import lombok.Lombok;
import lombok.extern.slf4j.Slf4j;
import org.apache.hc.client5.http.classic.methods.HttpDelete;
import org.apache.hc.client5.http.classic.methods.HttpGet;
Expand All @@ -20,6 +21,7 @@
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

@Slf4j
Expand Down Expand Up @@ -62,7 +64,14 @@ public AbstractHttpResponse makeRequest(String method, String path,
}
return client.execute(request, fr -> new AbstractHttpResponse() {
@Getter
byte[] payloadBytes = fr.getEntity().getContent().readAllBytes();
final byte[] payloadBytes = Optional.ofNullable(fr.getEntity())
.map(e-> {
try {
return e.getContent().readAllBytes();
} catch (IOException ex) {
throw Lombok.sneakyThrow(ex);
}
}).orElse(null);

@Override
public String getStatusText() {
Expand Down
33 changes: 27 additions & 6 deletions RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -30,6 +31,7 @@ public class OpenSearchWorkCoordinator implements IWorkCoordinator {
public static final String OLD_EXPIRATION_THRESHOLD_TEMPLATE = "OLD_EXPIRATION_THRESHOLD";
public static final String VERSION_CONFLICTS_FIELD_NAME = "version_conflicts";
public static final String COMPLETED_AT_FIELD_NAME = "completedAt";
public static final String HEAD_METHOD = "HEAD";

private final long tolerableClientServerClockDifferenceSeconds;
private final AbstractedHttpClient httpClient;
Expand Down Expand Up @@ -57,6 +59,13 @@ public void close() throws Exception {
}

public void setup() throws IOException {
var indexCheckResponse = httpClient.makeJsonRequest(HEAD_METHOD, INDEX_NAME,null, null);
if (indexCheckResponse.getStatusCode() == 200) {
log.info("Not creating " + INDEX_NAME + " because it already exists");
return;
}
log.atInfo().setMessage("Creating " + INDEX_NAME + " because it's HEAD check returned " +
indexCheckResponse.getStatusCode()).log();
var body = "{\n" +
" \"settings\": {\n" +
" \"index\": {" +
Expand Down Expand Up @@ -88,7 +97,7 @@ public void setup() throws IOException {
if ((response.getStatusCode() / 100) != 2) {
throw new IOException("Could not setup " + INDEX_NAME + ". " +
"Got error code " + response.getStatusCode() + " and response: " +
Arrays.toString(response.getPayloadBytes()));
new String(response.getPayloadBytes(), StandardCharsets.UTF_8));
}
}

Expand Down Expand Up @@ -336,8 +345,19 @@ UpdateResult assignOneWorkItem(long expirationWindowSeconds) throws IOException
private WorkItemAndDuration getAssignedWorkItem() throws IOException {
final var queryWorkersAssignedItemsTemplate = "{\n" +
" \"query\": {\n" +
" \"term\": { \"" + LEASE_HOLDER_ID_FIELD_NAME + "\": \"" + WORKER_ID_TEMPLATE + "\"}\n" +
" }\n" +
" \"bool\": {" +
" \"must\": [" +
" {" +
" \"term\": { \"" + LEASE_HOLDER_ID_FIELD_NAME + "\": \"" + WORKER_ID_TEMPLATE + "\"}\n" +
" }" +
" ]," +
" \"must_not\": [" +
" {" +
" \"exists\": { \"field\": \"" + COMPLETED_AT_FIELD_NAME + "\"}\n" +
" }" +
" ]" +
" }" +
" }" +
"}";
final var body = queryWorkersAssignedItemsTemplate.replace(WORKER_ID_TEMPLATE, workerId);
var response = httpClient.makeJsonRequest(POST_METHOD, INDEX_NAME + "/_search",
Expand All @@ -348,9 +368,10 @@ private WorkItemAndDuration getAssignedWorkItem() throws IOException {
log.warn("Couldn't find the top level 'hits' field, returning null");
return null;
}
if (resultHitsUpper.path("total").path("value").longValue() != 1) {
log.warn("The query didn't return one item for the worker node, so returning null");
return null;
final var numDocs = resultHitsUpper.path("total").path("value").longValue();
if (numDocs != 1) {
throw new IllegalStateException("The query for the assigned work document returned " + numDocs +
" instead of one item");
}
var resultHitInner = resultHitsUpper.path("hits").path(0);
var expiration = resultHitInner.path("_source").path(EXPIRATION_FIELD_NAME).longValue();
Expand Down
Loading

0 comments on commit 0275dd5

Please sign in to comment.