Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFS] Collection of Minor Changes #717

Merged
merged 6 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions RFS/build-preloaded-source-image.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def createNetworkTask = task createNetwork(type: Exec) {
}
}
task createInitialElasticsearchContainer(type: DockerCreateContainer) {
dependsOn createNetwork, buildDockerImage_emptyElasticsearchSource
targetImageId 'migrations/empty_elasticsearch_source:latest'
dependsOn createNetwork, buildDockerImage_emptyElasticsearchSource_7_17
targetImageId 'migrations/emptyElasticsearchSource_7_17:latest'
containerName = "elasticsearch-${uniqueId}"
hostConfig.network = myNetworkName
hostConfig.dns = ['elasticsearch']
Expand Down
7 changes: 5 additions & 2 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,12 @@ DockerServiceProps[] dockerServices = [
dockerImageName:"reindex_from_snapshot",
inputDir:"./docker",
taskDependencies:["copyDockerRuntimeJars"]]),
new DockerServiceProps([projectName:"emptyElasticsearchSource",
dockerImageName:"empty_elasticsearch_source",
new DockerServiceProps([projectName:"emptyElasticsearchSource_7_10",
dockerImageName:"empty_elasticsearch_source_7_10",
inputDir:"./docker/TestSource_ES_7_10"]),
new DockerServiceProps([projectName:"emptyElasticsearchSource_7_17",
dockerImageName:"empty_elasticsearch_source_7_17",
inputDir:"./docker/TestSource_ES_7_17"]),
new DockerServiceProps([projectName:"trafficGenerator",
dockerImageName:"osb_traffic_generator",
inputDir:"./docker/TrafficGenerator",
Expand Down
22 changes: 22 additions & 0 deletions RFS/docker/TestSource_ES_7_17/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM docker.elastic.co/elasticsearch/elasticsearch:7.17.21 AS base
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that dynamic dockerfiles aren't ideal, but generating these docker images with an array of different base images programmatically and en-masse could be an improvement for down the line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably; we'll want something like that for testing purposes once we have a bunch of different source/target versions to support. I'm unclear on how generalizable the Dockerfiles will be across major versions though.


# Configure Elastic
ENV ELASTIC_SEARCH_CONFIG_FILE=/usr/share/elasticsearch/config/elasticsearch.yml
# Prevents ES from complaining about nodes count
RUN echo "discovery.type: single-node" >> $ELASTIC_SEARCH_CONFIG_FILE
ENV PATH=${PATH}:/usr/share/elasticsearch/jdk/bin/

# Install the S3 Repo Plugin
RUN echo y | /usr/share/elasticsearch/bin/elasticsearch-plugin install repository-s3

# Install the AWS CLI for testing purposes
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
unzip awscliv2.zip && \
./aws/install

RUN mkdir /snapshots && chown elasticsearch /snapshots

# Install our custom entrypoint script
COPY ./container-start.sh /usr/share/elasticsearch/container-start.sh

CMD /usr/share/elasticsearch/container-start.sh
13 changes: 13 additions & 0 deletions RFS/docker/TestSource_ES_7_17/container-start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

echo "Setting AWS Creds from ENV Variables"
bin/elasticsearch-keystore create
echo $AWS_ACCESS_KEY_ID | bin/elasticsearch-keystore add s3.client.default.access_key --stdin
echo $AWS_SECRET_ACCESS_KEY | bin/elasticsearch-keystore add s3.client.default.secret_key --stdin

if [ -n "$AWS_SESSION_TOKEN" ]; then
echo $AWS_SESSION_TOKEN | bin/elasticsearch-keystore add s3.client.default.session_token --stdin
fi

echo "Starting Elasticsearch"
/usr/local/bin/docker-entrypoint.sh eswrapper
9 changes: 7 additions & 2 deletions RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.common.ClusterVersion;
import com.rfs.common.DefaultSourceRepoAccessor;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to keep this demo program around? Can it be deleted or moved to a testFixture?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one, definitely not; been keeping it around "just in case" but now that you've pointed it out I can't think of a case where I'd want it. Will delete

import com.rfs.common.FileSystemRepo;
import com.rfs.version_es_7_10.*;

Expand Down Expand Up @@ -219,8 +220,12 @@
} else {
bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES;
}
SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, Paths.get(luceneBasePathString), bufferSize);
unpacker.unpack(shardMetadata);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo);

Check warning on line 223 in RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java#L223

Added line #L223 was not covered by tests

SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, Paths.get(luceneBasePathString), bufferSize);
try (SnapshotShardUnpacker unpacker = unpackerFactory.create(shardMetadata)) {
unpacker.unpack();

Check warning on line 227 in RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java#L225-L227

Added lines #L225 - L227 were not covered by tests
}

// Now, read the documents back out
System.out.println("--- Reading docs in the shard ---");
Expand Down
7 changes: 5 additions & 2 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@
} else {
bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES;
}
SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, luceneDirPath, bufferSize);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question on this class. Can we delete it now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can once we integrate the new version of this stuff into the Migration Assistant, which currently has a dependency on this file.

SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,luceneDirPath, bufferSize);

Check warning on line 333 in RFS/src/main/java/com/rfs/ReindexFromSnapshot.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/ReindexFromSnapshot.java#L332-L333

Added lines #L332 - L333 were not covered by tests

for (IndexMetadata.Data indexMetadata : indexMetadatas) {
logger.info("Processing index: " + indexMetadata.getName());
Expand All @@ -345,7 +346,9 @@
}

// Unpack the shard
unpacker.unpack(shardMetadata);
try (SnapshotShardUnpacker unpacker = unpackerFactory.create(shardMetadata)) {
unpacker.unpack();

Check warning on line 350 in RFS/src/main/java/com/rfs/ReindexFromSnapshot.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/ReindexFromSnapshot.java#L349-L350

Added lines #L349 - L350 were not covered by tests
}
}
}

Expand Down
11 changes: 9 additions & 2 deletions RFS/src/main/java/com/rfs/RunRfsWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.rfs.cms.OpenSearchCmsClient;
import com.rfs.common.ClusterVersion;
import com.rfs.common.ConnectionDetails;
import com.rfs.common.DeletingSourceRepoAccessor;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.GlobalMetadata;
import com.rfs.common.IndexMetadata;
Expand Down Expand Up @@ -100,6 +101,10 @@
@Parameter(names = {"--component-template-allowlist"}, description = ("Optional. List of component template names to migrate"
+ " (e.g. 'posts_template1, posts_template2'). Default: empty list"), required = false)
public List<String> componentTemplateAllowlist = List.of();

@Parameter(names = {"--max-shard-size-bytes"}, description = ("Optional. The maximum shard size, in bytes, to allow when"

Check warning on line 105 in RFS/src/main/java/com/rfs/RunRfsWorker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/RunRfsWorker.java#L105

Added line #L105 was not covered by tests
+ " performing the document migration. Useful for preventing disk overflow. Default: 50 * 1024 * 1024 * 1024 (50 GB)"), required = false)
public long maxShardSizeBytes = 50 * 1024 * 1024 * 1024L;

//https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/
@Parameter(names = {"--min-replicas"}, description = ("Optional. The minimum number of replicas configured for migrated indices on the target."
Expand Down Expand Up @@ -132,6 +137,7 @@
String targetPass = arguments.targetPass;
List<String> indexTemplateAllowlist = arguments.indexTemplateAllowlist;
List<String> componentTemplateAllowlist = arguments.componentTemplateAllowlist;
long maxShardSizeBytes = arguments.maxShardSizeBytes;

Check warning on line 140 in RFS/src/main/java/com/rfs/RunRfsWorker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/RunRfsWorker.java#L140

Added line #L140 was not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, why do you copy the arg values as non-final locals?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess they should be finals, huh? Will start doing that. I make the intermediate copies to shorten the name when referring to them and also to make a conceptual split between the argument passed in and the thing we're using in the program. Makes more sense when we're doing validation or other processing first, but I just tend to apply the pattern generally.

Will make them final per your suggestion, though.

Copy link
Collaborator

@gregschohn gregschohn Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question was more - why setup the aliases in the first place?

int awarenessDimensionality = arguments.minNumberOfReplicas + 1;
Level logLevel = arguments.logLevel;

Expand Down Expand Up @@ -165,10 +171,11 @@
indexWorker.run();

ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(sourceRepo, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);
DeletingSourceRepoAccessor repoAccessor = new DeletingSourceRepoAccessor(sourceRepo);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeletingSourceRepoAccessor sounds really weird. Do you want to say EphemeralSourceRepoAccessor or SourceRepoCachingAccessor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to rename. EphemeralSourceRepoAccessor tickles my brain a bit better, so I'll go with that.

SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

Check warning on line 175 in RFS/src/main/java/com/rfs/RunRfsWorker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/RunRfsWorker.java#L174-L175

Added lines #L174 - L175 were not covered by tests
LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);
DocumentsRunner documentsWorker = new DocumentsRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, shardMetadataFactory, unpacker, reader, reindexer);
DocumentsRunner documentsWorker = new DocumentsRunner(globalState, cmsClient, snapshotName, maxShardSizeBytes, indexMetadataFactory, shardMetadataFactory, unpackerFactory, reader, reindexer);

Check warning on line 178 in RFS/src/main/java/com/rfs/RunRfsWorker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/RunRfsWorker.java#L178

Added line #L178 was not covered by tests
documentsWorker.run();

} catch (Runner.PhaseFailed e) {
Expand Down
23 changes: 23 additions & 0 deletions RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.rfs.common;

import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;

/*
* Provides "simple" access to the underlying files in the source repo without any special behavior
*/
public class DefaultSourceRepoAccessor extends SourceRepoAccessor {
public DefaultSourceRepoAccessor(SourceRepo repo) {
super(repo);
}

@Override
protected InputStream load(Path path) {
try {
return Files.newInputStream(path);
} catch (Exception e) {
throw new CouldNotLoadRepoFile("Could not load file: " + path, e);

Check warning on line 20 in RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/DefaultSourceRepoAccessor.java#L19-L20

Added lines #L19 - L20 were not covered by tests
}
}
}
46 changes: 46 additions & 0 deletions RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.rfs.common;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;


/*
* Provides access to the underlying files in the source repo and deletes the files after the Stream is closed. This
* is useful/interesting in the case where the files are large/numerous and you can easily re-acquire them - such as
* if they are being loaded from S3.
*/
public class DeletingSourceRepoAccessor extends SourceRepoAccessor {
public DeletingSourceRepoAccessor(SourceRepo repo) {
super(repo);
}

Check warning on line 18 in RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java#L17-L18

Added lines #L17 - L18 were not covered by tests

@Override
protected InputStream load(Path path) {
try {
return new DeletingFileInputStream(path);
} catch (Exception e) {
throw new CouldNotLoadRepoFile("Could not load file: " + path, e);

Check warning on line 25 in RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java#L23-L25

Added lines #L23 - L25 were not covered by tests
}
}

public static class DeletingFileInputStream extends FileInputStream {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it's really scary and not going to be maintainable. If a user needs to read from the file again from the beginning and just makes a new stream, they'll be surprised if they had closed the previous stream already. It's one thing to have a side effect to reverse another side effect that you're class was responsible for (you open a file descriptor in line 23, so you close it at line 40). In this case, you're passing ownership of the underlying file (not just the descriptor) to the DeletingFileInputStream.

What happens if some files aren't being consumed by the readers - what if there was an exception before you opened all of the streams? What if there are future refactorings?

A better approach might be to make a scratch directory for each migration-shard run (that way you can manage any number of sessions simultaneously within one process). As you finish running each of those sessions, just blow away everything that you had downloaded.

Copy link
Member Author

@chelma chelma Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably easier to talk this one out, but - the way the SourceRepoAccessor works is that it wraps an underlying SourceRepo’s calls, which return the Path to a particular file within the repo and converts them to a Stream. In the case of an S3SourceRepo, when you get the Path to the file, it downloads it to disk if it’s not already there. So if you wrap an S3SourceRepo in an EphemeralSourceRepoAccessor, what happens is that you are returned an InputStream to a file which is downloaded if it doesn’t already exist and then that file is deleted when the Stream is closed.

It seems like any code that returns a stream would have the same problems you mentioned. I'm not opposed to doing it differently, but that would entail a wider refactoring and I was hoping to avoid that given we have more urgent things to be tackling right now (IMO).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion, we decided to keep this around in case we need it in the future but use the Default (non-deleting) version in RFS for now. If the disk fills up from our s3 downloads, it should kill the process and naturally free up disk space by getting a new container.

private final Path filePath;

public DeletingFileInputStream(Path filePath) throws IOException {
super(filePath.toFile());
this.filePath = filePath;
}

Check warning on line 35 in RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java#L33-L35

Added lines #L33 - L35 were not covered by tests

@Override
public void close() throws IOException {
try {
super.close();

Check warning on line 40 in RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java#L40

Added line #L40 was not covered by tests
} finally {
Files.deleteIfExists(filePath);

Check warning on line 42 in RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java#L42

Added line #L42 was not covered by tests
}
}

Check warning on line 44 in RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java#L44

Added line #L44 was not covered by tests
}
}
11 changes: 5 additions & 6 deletions RFS/src/main/java/com/rfs/common/PartSliceStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,23 @@
*/

public class PartSliceStream extends InputStream {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the javadoc - what's the purpose of this class? I don't understand what the 'special sauce' is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The special sauce is that it provides a single stream-like object that seamlessly reads through multiple Snapshot blob files that have split into multiple parts, per the Elasticsearch/OpenSearch convention of not having any individual file bigger than ~1 GB.

We don't have to do things this way, but it's what the ES/OS code did and I haven't had a reason to change it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it like the SequenceInputStream, or maybe like that with a bit more support through it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like "kinda"? Basically, you just have a base blobfile name (foo) and if it's larger than 1 GB it's split into multiple files (foo.part0, foo.part1, foo.part2). The PartSliceStream creates a stream that mimics as if they were all the same file you're reading from.

private final SourceRepo repo;
private final SourceRepoAccessor repoAccessor;
private final ShardMetadata.FileInfo fileMetadata;
private final String indexId;
private final int shardId;
private long slice = 0;
private InputStream currentStream;
private boolean initialized = false;

public PartSliceStream(SourceRepo repo, ShardMetadata.FileInfo fileMetadata, String indexId, int shardId) {
this.repo = repo;
public PartSliceStream(SourceRepoAccessor repoAccessor, ShardMetadata.FileInfo fileMetadata, String indexId, int shardId) {
this.repoAccessor = repoAccessor;
this.fileMetadata = fileMetadata;
this.indexId = indexId;
this.shardId = shardId;
}

protected InputStream openSlice(long slice) throws IOException {
Path filePath = repo.getBlobFilePath(indexId, shardId, fileMetadata.partName(slice));
return Files.newInputStream(filePath);
protected InputStream openSlice(long slice) {
return repoAccessor.getBlobFile(indexId, shardId, fileMetadata.partName(slice));
}

private InputStream nextStream() throws IOException {
Expand Down
2 changes: 1 addition & 1 deletion RFS/src/main/java/com/rfs/common/ShardMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public static interface Data {
public long getStartTime();
public long getTime();
public int getNumberOfFiles();
public long getTotalSize();
public long getTotalSizeBytes();
public List<FileInfo> getFiles();
}

Expand Down
58 changes: 51 additions & 7 deletions RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.rfs.common;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -16,19 +18,31 @@
import org.apache.lucene.util.BytesRef;

@RequiredArgsConstructor
public class SnapshotShardUnpacker {
public class SnapshotShardUnpacker implements AutoCloseable {
private static final Logger logger = LogManager.getLogger(SnapshotShardUnpacker.class);
protected final SourceRepo repo;
protected final Path luceneFilesBasePath;
protected final int bufferSize;
private final SourceRepoAccessor repoAccessor;
private final Path luceneFilesBasePath;
private final ShardMetadata.Data shardMetadata;
private final int bufferSize;

public void unpack(ShardMetadata.Data shardMetadata) {
@RequiredArgsConstructor

Check warning on line 28 in RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java#L28

Added line #L28 was not covered by tests
public static class Factory {
private final SourceRepoAccessor repoAccessor;
private final Path luceneFilesBasePath;
private final int bufferSize;

public SnapshotShardUnpacker create(ShardMetadata.Data shardMetadata) {
return new SnapshotShardUnpacker(repoAccessor, luceneFilesBasePath, shardMetadata, bufferSize);

Check warning on line 35 in RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java#L35

Added line #L35 was not covered by tests
}
}

public void unpack() {
try {
// Some constants
NativeFSLockFactory lockFactory = NativeFSLockFactory.INSTANCE;

// Ensure the blob files are prepped, if they need to be
repo.prepBlobFiles(shardMetadata);
repoAccessor.prepBlobFiles(shardMetadata);

// Create the directory for the shard's lucene files
Path luceneIndexDir = Paths.get(luceneFilesBasePath + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId());
Expand All @@ -43,7 +57,7 @@
final BytesRef hash = fileMetadata.getMetaHash();
indexOutput.writeBytes(hash.bytes, hash.offset, hash.length);
} else {
try (InputStream stream = new PartSliceStream(repo, fileMetadata, shardMetadata.getIndexId(), shardMetadata.getShardId())) {
try (InputStream stream = new PartSliceStream(repoAccessor, fileMetadata, shardMetadata.getIndexId(), shardMetadata.getShardId())) {
final byte[] buffer = new byte[Math.toIntExact(Math.min(bufferSize, fileMetadata.getLength()))];
int length;
while ((length = stream.read(buffer)) > 0) {
Expand All @@ -58,6 +72,36 @@
}
}

@Override
public void close() {
try {
Path luceneIndexDir = Paths.get(luceneFilesBasePath + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId());

Check warning on line 78 in RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java#L78

Added line #L78 was not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you're deleting a session directory here. Why did you need to delete files above when tied to the stream?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the question. Can you re-phrase?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this redundant to what you are doing with the DeletingFileInputStream above? This place feels like where all of your cached files should be deleted - or are there other files that are created outside of this class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussion - there are two things that we need to clean up: the raw snapshot files we download from s3, and the files we convert those into which Lucene actually cares about. This is for cleaning up the later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the jira in the log message added above, we should eventually get this down to no extra file buffers, which reduces the number of resources that we're managing and should make the code more efficient and easier to maintain.

if (Files.exists(luceneIndexDir)) {
deleteRecursively(luceneIndexDir);

Check warning on line 80 in RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java#L80

Added line #L80 was not covered by tests
}

} catch (Exception e) {
throw new CouldNotCleanUpShard("Could not clean up shard: Index " + shardMetadata.getIndexId() + ", Shard " + shardMetadata.getShardId(), e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should/does this kill the process? Seems like it probably should since it would be safer to recycle the process, especially since you were at the end of working (if you have other pending work, maybe it should not take more on and flush out the current work items).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. With the way things are currently implemented, I believe it will kill the process.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a test to confirm that? I ask because making sure that processes die, especially through refactorings, can be pretty tricky to maintain.

}
}

Check warning on line 86 in RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java#L83-L86

Added lines #L83 - L86 were not covered by tests

protected void deleteRecursively(Path path) throws IOException {
if (Files.isDirectory(path)) {
try (DirectoryStream<Path> entries = Files.newDirectoryStream(path)) {

Check warning on line 90 in RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java#L90

Added line #L90 was not covered by tests
for (Path entry : entries) {
deleteRecursively(entry);
}

Check warning on line 93 in RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java#L92-L93

Added lines #L92 - L93 were not covered by tests
}
}
Files.delete(path);
}

Check warning on line 97 in RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java#L96-L97

Added lines #L96 - L97 were not covered by tests

public static class CouldNotCleanUpShard extends RfsException {
public CouldNotCleanUpShard(String message, Exception e) {
super(message, e);
}

Check warning on line 102 in RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/SnapshotShardUnpacker.java#L101-L102

Added lines #L101 - L102 were not covered by tests
}

public static class CouldNotUnpackShard extends RfsException {
public CouldNotUnpackShard(String message, Exception e) {
super(message, e);
Expand Down
Loading
Loading