Skip to content

Commit

Permalink
Merge from main
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Nied <[email protected]>
  • Loading branch information
peternied committed Jul 10, 2024
2 parents a750ff9 + 07bc3ef commit 65d6cef
Show file tree
Hide file tree
Showing 78 changed files with 1,305 additions and 813 deletions.
1 change: 1 addition & 0 deletions RFS/src/main/java/com/rfs/common/ByteArrayIndexInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public ByteArrayIndexInput(String resourceDesc, byte[] bytes, int offset, int le

@Override
public void close() throws IOException {
// Empty in the original implementation, and seems to work
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions RFS/src/main/java/com/rfs/common/ConnectionDetails.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
* Stores the connection details (assuming basic auth) for an Elasticsearch/OpenSearch cluster
*/
public class ConnectionDetails {
public static enum AuthType {
public enum AuthType {
BASIC,
NONE
}

public static enum Protocol {
public enum Protocol {
HTTP,
HTTPS
}
Expand Down
8 changes: 4 additions & 4 deletions RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public Mono<Void> reindex(String indexName, Flux<Document> documentStream) {
return documentStream
.map(this::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation
.buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size
.doOnNext(bulk -> logger.info(bulk.size() + " documents in current bulk request"))
.doOnNext(bulk -> logger.info("{} documents in current bulk request", bulk.size()))
.map(this::convertToBulkRequestBody) // Assemble the bulk request body from the parts
.flatMap(bulkJson -> client.sendBulkRequest(indexName, bulkJson) // Send the request
.doOnSuccess(unused -> logger.debug("Batch succeeded"))
Expand Down Expand Up @@ -50,9 +50,9 @@ private String convertToBulkRequestBody(List<String> bulkSections) {
return builder.toString();
}

public void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception {
public void refreshAllDocuments(ConnectionDetails targetConnection) {
// Send the request
OpenSearchClient client = new OpenSearchClient(targetConnection);
client.refresh();
OpenSearchClient refreshClient = new OpenSearchClient(targetConnection);
refreshClient.refresh();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void close() throws IOException {
try {
super.close();
} finally {
logger.info("Deleting local file: " + filePath.toString());
logger.info("Deleting local file: {}", filePath);
logger.warn("See: https://opensearch.atlassian.net/browse/MIGRATIONS-1786");
Files.deleteIfExists(filePath);
}
Expand Down
18 changes: 6 additions & 12 deletions RFS/src/main/java/com/rfs/common/FileSystemRepo.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,40 +57,34 @@ public Path getSnapshotRepoDataFilePath() {

@Override
public Path getGlobalMetadataFilePath(String snapshotId) {
String filePath = getRepoRootDir().toString() + "/meta-" + snapshotId + ".dat";
return Path.of(filePath);
return getRepoRootDir().resolve("meta-" + snapshotId + ".dat");
}

@Override
public Path getSnapshotMetadataFilePath(String snapshotId) {
String filePath = getRepoRootDir().toString() + "/snap-" + snapshotId + ".dat";
return Path.of(filePath);
return getRepoRootDir().resolve("snap-" + snapshotId + ".dat");
}

@Override
public Path getIndexMetadataFilePath(String indexId, String indexFileId) {
String filePath = getRepoRootDir().toString() + "/indices/" + indexId + "/meta-" + indexFileId + ".dat";
return Path.of(filePath);
return getRepoRootDir().resolve("indices").resolve(indexId).resolve("meta-" + indexFileId + ".dat");
}

@Override
public Path getShardDirPath(String indexId, int shardId) {
String shardDirPath = getRepoRootDir().toString() + "/indices/" + indexId + "/" + shardId;
String shardDirPath = getRepoRootDir().resolve("indices").resolve(indexId).resolve(String.valueOf(shardId)).toString();
return Path.of(shardDirPath);
}

@Override
public Path getShardMetadataFilePath(String snapshotId, String indexId, int shardId) {
Path shardDirPath = getShardDirPath(indexId, shardId);
Path filePath = shardDirPath.resolve("snap-" + snapshotId + ".dat");
return filePath;
return getShardDirPath(indexId, shardId).resolve("snap-" + snapshotId + ".dat");
}

@Override
public Path getBlobFilePath(String indexId, int shardId, String blobName) {
Path shardDirPath = getShardDirPath(indexId, shardId);
Path filePath = shardDirPath.resolve(blobName);
return filePath;
return shardDirPath.resolve(blobName);
}

@Override
Expand Down
1 change: 1 addition & 0 deletions RFS/src/main/java/com/rfs/common/FilterScheme.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.function.Predicate;

public class FilterScheme {
private FilterScheme() {}

public static Predicate<SnapshotRepo.Index> filterIndicesByAllowList(List<String> indexAllowlist, BiConsumer<String, Boolean> indexNameAcceptanceObserver) {
return index -> {
Expand Down
2 changes: 2 additions & 0 deletions RFS/src/main/java/com/rfs/common/Logging.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.apache.logging.log4j.Level;

public class Logging {
private Logging() {}

public static void setLevel(Level level) {
LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
Configuration config = ctx.getConfiguration();
Expand Down
4 changes: 2 additions & 2 deletions RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected IndexReader openIndexReader(Path indexDirectoryPath) throws IOExceptio
protected Document getDocument(IndexReader reader, int docId) {
try {
Document document = reader.document(docId);
BytesRef source_bytes = document.getBinaryValue("_source");
BytesRef sourceBytes = document.getBinaryValue("_source");
String id;
try {
id = Uid.decodeId(document.getBinaryValue("_id").bytes);
Expand All @@ -62,7 +62,7 @@ protected Document getDocument(IndexReader reader, int docId) {
log.error(errorMessage.toString());
return null; // Skip documents with missing id
}
if (source_bytes == null || source_bytes.bytes.length == 0) {
if (sourceBytes == null || sourceBytes.bytes.length == 0) {
log.warn("Document " + id + " is deleted or doesn't have the _source field enabled");
return null; // Skip these too
}
Expand Down
4 changes: 0 additions & 4 deletions RFS/src/main/java/com/rfs/common/OpenSearchClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -13,9 +11,7 @@
import org.apache.logging.log4j.Logger;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import reactor.core.publisher.Mono;
Expand Down
2 changes: 1 addition & 1 deletion RFS/src/main/java/com/rfs/common/PartSliceStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ protected InputStream openSlice(long slice) {
}

private InputStream nextStream() throws IOException {
assert initialized == false || currentStream != null;
assert !initialized || currentStream != null;
initialized = true;

if (currentStream != null) {
Expand Down
6 changes: 3 additions & 3 deletions RFS/src/main/java/com/rfs/common/S3Repo.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ private void ensureFileExistsLocally(S3Uri s3Uri, Path localPath) {
ensureS3LocalDirectoryExists(localPath.getParent());

if (doesFileExistLocally(localPath)) {
logger.debug("File already exists locally: " + localPath);
logger.debug("File already exists locally: {}", localPath);
return;
}

logger.info("Downloading file from S3: " + s3Uri.uri + " to " + localPath);
logger.info("Downloading file from S3: {} to {}", s3Uri.uri, localPath);
GetObjectRequest getObjectRequest = GetObjectRequest.builder()
.bucket(s3Uri.bucketName)
.key(s3Uri.key)
Expand Down Expand Up @@ -190,7 +190,7 @@ public void prepBlobFiles(ShardMetadata shardMetadata) {

String blobFilesS3Prefix = s3RepoUri.key + "indices/" + shardMetadata.getIndexId() + "/" + shardMetadata.getShardId() + "/";

logger.info("Downloading blob files from S3: s3://" + s3RepoUri.bucketName + "/" + blobFilesS3Prefix + " to " + shardDirPath);
logger.info("Downloading blob files from S3: s3://%s/%s to %s", s3RepoUri.bucketName, blobFilesS3Prefix, shardDirPath);
DirectoryDownload directoryDownload = transferManager.downloadDirectory(
DownloadDirectoryRequest.builder()
.destination(shardDirPath)
Expand Down
8 changes: 4 additions & 4 deletions RFS/src/main/java/com/rfs/common/SnapshotCreator.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public void createSnapshot() {
// Create the snapshot; idempotent operation
try {
client.createSnapshot(getRepoName(), snapshotName, body);
logger.info("Snapshot " + snapshotName + " creation initiated");
logger.info("Snapshot {} creation initiated", snapshotName);
} catch (Exception e) {
logger.error("Snapshot " + snapshotName + " creation failed", e);
logger.error("Snapshot {} creation failed", snapshotName, e);
throw new SnapshotCreationFailed(snapshotName);
}
}
Expand All @@ -74,7 +74,7 @@ public boolean isSnapshotFinished() {
}

if (response.isEmpty()) {
logger.error("Snapshot " + snapshotName + " does not exist");
logger.error("Snapshot {} does not exist", snapshotName);
throw new SnapshotDoesNotExist(snapshotName);
}

Expand All @@ -88,7 +88,7 @@ public boolean isSnapshotFinished() {
} else if (state.equals("IN_PROGRESS")) {
return false;
} else {
logger.error("Snapshot " + snapshotName + " has failed with state " + state);
logger.error("Snapshot {} has failed with state {}", snapshotName, state);
throw new SnapshotCreationFailed(snapshotName);
}
}
Expand Down
1 change: 1 addition & 0 deletions RFS/src/main/java/com/rfs/common/SnapshotRepo.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.List;

public class SnapshotRepo {
private SnapshotRepo() {}

/**
* Defines the behavior required to surface a snapshot repo's metadata
Expand Down
30 changes: 15 additions & 15 deletions RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,25 @@ public Path unpack() {
// Create the directory for the shard's lucene files
Path luceneIndexDir = Paths.get(luceneFilesBasePath + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId());
Files.createDirectories(luceneIndexDir);
final FSDirectory primaryDirectory = FSDirectory.open(luceneIndexDir, lockFactory);

for (ShardFileInfo fileMetadata : shardMetadata.getFiles()) {
logger.info("Unpacking - Blob Name: " + fileMetadata.getName() + ", Lucene Name: " + fileMetadata.getPhysicalName());
try (IndexOutput indexOutput = primaryDirectory.createOutput(fileMetadata.getPhysicalName(), IOContext.DEFAULT);){
if (fileMetadata.getName().startsWith("v__")) {
final BytesRef hash = fileMetadata.getMetaHash();
indexOutput.writeBytes(hash.bytes, hash.offset, hash.length);
} else {
try (InputStream stream = new PartSliceStream(repoAccessor, fileMetadata, shardMetadata.getIndexId(), shardMetadata.getShardId())) {
final byte[] buffer = new byte[Math.toIntExact(Math.min(bufferSize, fileMetadata.getLength()))];
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
try (FSDirectory primaryDirectory = FSDirectory.open(luceneIndexDir, lockFactory)) {
for (ShardFileInfo fileMetadata : shardMetadata.getFiles()) {
logger.info("Unpacking - Blob Name: {}, Lucene Name: {}", fileMetadata.getName(), fileMetadata.getPhysicalName());
try (IndexOutput indexOutput = primaryDirectory.createOutput(fileMetadata.getPhysicalName(), IOContext.DEFAULT);){
if (fileMetadata.getName().startsWith("v__")) {
final BytesRef hash = fileMetadata.getMetaHash();
indexOutput.writeBytes(hash.bytes, hash.offset, hash.length);
} else {
try (InputStream stream = new PartSliceStream(repoAccessor, fileMetadata, shardMetadata.getIndexId(), shardMetadata.getShardId())) {
final byte[] buffer = new byte[Math.toIntExact(Math.min(bufferSize, fileMetadata.getLength()))];
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
}
}
}
}
}
}
}
return luceneIndexDir;
} catch (Exception e) {
throw new CouldNotUnpackShard("Could not unpack shard: Index " + shardMetadata.getIndexId() + ", Shard " + shardMetadata.getShardId(), e);
Expand Down
2 changes: 0 additions & 2 deletions RFS/src/main/java/com/rfs/common/TryHandlePhaseFailure.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.rfs.common;

import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@Slf4j
public class TryHandlePhaseFailure {
Expand Down
4 changes: 3 additions & 1 deletion RFS/src/main/java/com/rfs/common/Uid.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
* See: https://github.com/elastic/elasticsearch/blob/6.8/server/src/main/java/org/elasticsearch/index/mapper/Uid.java#L32
*/
public class Uid {
private Uid() {}

public static final int UTF8 = 0xff;
public static final int NUMERIC = 0xfe;
public static final int BASE64_ESCAPE = 0xfd;
Expand Down Expand Up @@ -41,7 +43,7 @@ private static String decodeBase64Id(byte[] idBytes, int offset, int length) {
assert Byte.toUnsignedInt(idBytes[offset]) <= BASE64_ESCAPE;
if (Byte.toUnsignedInt(idBytes[offset]) == BASE64_ESCAPE) {
idBytes = Arrays.copyOfRange(idBytes, offset + 1, offset + length);
} else if ((idBytes.length == length && offset == 0) == false) { // no need to copy if it's not a slice
} else if (!(idBytes.length == length && offset == 0)) { // no need to copy if it's not a slice
idBytes = Arrays.copyOfRange(idBytes, offset, offset + length);
}
return Base64.getUrlEncoder().withoutPadding().encodeToString(idBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ services:
- MIGRATION_KAFKA_BROKER_ENDPOINTS=kafka:9092
- AWS_PROFILE=default
# command: ./runTestBenchmarks.sh
# Uncomment below to enable Migration Console API in addition to enable in Dockerfile
# ports:
# - "8000:8000"


volumes:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,25 @@ RUN chmod ug+x /root/kafka-tools/kafkaExport.sh
COPY loadServicesFromParameterStore.sh /root/
RUN chmod ug+x /root/loadServicesFromParameterStore.sh

RUN apt-get install -y pkg-config libhdf5-dev

COPY lib /root/lib
WORKDIR /root/lib/console_link
RUN pipenv install --system --deploy --ignore-pipfile
WORKDIR /root/lib/integ_test
RUN pipenv install --system --deploy --ignore-pipfile


# Experimental console API, not started by default
COPY console_api /root/console_api
WORKDIR /root/console_api
RUN pipenv install --system --deploy --ignore-pipfile

WORKDIR /root
#CMD pipenv run python manage.py runserver_plus 0.0.0.0:8000

# Ensure bash completion is installed
RUN apt-get install -y bash-completion

CMD /root/loadServicesFromParameterStore.sh && tail -f /dev/null
CMD /root/loadServicesFromParameterStore.sh && tail -f /dev/null

# Experimental console API, uncomment to use in addition to uncomment port mapping in docker-compose.yml
# CMD /root/loadServicesFromParameterStore.sh && python3 /root/console_api/manage.py runserver_plus 0.0.0.0:8000 --cert-file cert.crt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ django-extensions = "==3.2.3"
djangorestframework = "==3.15.2"
werkzeug = "==3.0.3"
sqlparse = "==0.5.0"
pyopenssl = "*"

[dev-packages]

Expand Down
Loading

0 comments on commit 65d6cef

Please sign in to comment.