-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFS] Collection of Minor Changes #717
Conversation
Signed-off-by: Chris Helma <[email protected]>
Signed-off-by: Chris Helma <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #717 +/- ##
============================================
+ Coverage 63.24% 64.00% +0.76%
- Complexity 1578 1585 +7
============================================
Files 220 222 +2
Lines 9156 9083 -73
Branches 793 771 -22
============================================
+ Hits 5791 5814 +23
+ Misses 2956 2858 -98
- Partials 409 411 +2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
RFS/src/main/java/com/rfs/common/DeletingSourceRepoAccessor.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
public static class CouldNotLoadRepoFile extends RuntimeException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this out into an Exceptions package, we we don't need class specific versions of the same named exception between this class an DefaultSourceRepoAccessor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, done.
ShardMetadata.Factory shardMetadataFactory, SnapshotShardUnpacker unpacker, LuceneDocumentsReader reader, | ||
DocumentReindexer reindexer) { | ||
this.members = new DocumentsStep.SharedMembers(globalState, cmsClient, snapshotName, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); | ||
public DocumentsRunner(GlobalState globalState, CmsClient cmsClient, String snapshotName, long maxShardSizeBytes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nigh impossible to read - lets move to using @requireargsconstructor as a quick stop gap, but I think we need to have better data modeling so we don't have to pass every configuration option around, clustering by functionality or having a settings object that can be peaked into would handle this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't find it hard to read, but feel free to update it how you'd like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I go back and forth on this one in other areas of Java code. Doing the 'right' thing with builders is cumbersome and it only amounts to a runtime check.
Please try to put these on separate lines and order them to tell the best story possible. This is basically dependency injection and everything is required, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gregschohn Yeah, this is just dependency injection. I can put them on new lines if that makes it a bit clearer, but the bigger issue is that there's something that feels off with how the SharedMembers object is created in an intermediate scope between the main() and the Step classes that need it. It feels like the SharedMembers definition might not actually belong in the Step class?
Signed-off-by: Chris Helma <[email protected]>
Signed-off-by: Chris Helma <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused at what the deletion model looks like. Please help to clarify it so that I can understand and approve this.
@@ -0,0 +1,22 @@ | |||
FROM docker.elastic.co/elasticsearch/elasticsearch:7.17.21 AS base |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that dynamic dockerfiles aren't ideal, but generating these docker images with an array of different base images programmatically and en-masse could be an improvement for down the line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably; we'll want something like that for testing purposes once we have a bunch of different source/target versions to support. I'm unclear on how generalizable the Dockerfiles will be across major versions though.
@@ -27,6 +27,7 @@ | |||
import com.rfs.common.SnapshotRepo; | |||
import com.rfs.common.SnapshotShardUnpacker; | |||
import com.rfs.common.ClusterVersion; | |||
import com.rfs.common.DefaultSourceRepoAccessor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to keep this demo program around? Can it be deleted or moved to a testFixture?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one, definitely not; been keeping it around "just in case" but now that you've pointed it out I can't think of a case where I'd want it. Will delete
@@ -329,7 +329,8 @@ public static void main(String[] args) throws InterruptedException { | |||
} else { | |||
bufferSize = ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES; | |||
} | |||
SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(repo, luceneDirPath, bufferSize); | |||
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question on this class. Can we delete it now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can once we integrate the new version of this stuff into the Migration Assistant, which currently has a dependency on this file.
@@ -132,6 +137,7 @@ public static void main(String[] args) throws Exception { | |||
String targetPass = arguments.targetPass; | |||
List<String> indexTemplateAllowlist = arguments.indexTemplateAllowlist; | |||
List<String> componentTemplateAllowlist = arguments.componentTemplateAllowlist; | |||
long maxShardSizeBytes = arguments.maxShardSizeBytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious, why do you copy the arg values as non-final locals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess they should be finals
, huh? Will start doing that. I make the intermediate copies to shorten the name when referring to them and also to make a conceptual split between the argument passed in and the thing we're using in the program. Makes more sense when we're doing validation or other processing first, but I just tend to apply the pattern generally.
Will make them final per your suggestion, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My question was more - why setup the aliases in the first place?
@@ -165,10 +171,11 @@ public static void main(String[] args) throws Exception { | |||
indexWorker.run(); | |||
|
|||
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); | |||
SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker(sourceRepo, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); | |||
DeletingSourceRepoAccessor repoAccessor = new DeletingSourceRepoAccessor(sourceRepo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeletingSourceRepoAccessor sounds really weird. Do you want to say EphemeralSourceRepoAccessor or SourceRepoCachingAccessor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to rename. EphemeralSourceRepoAccessor
tickles my brain a bit better, so I'll go with that.
} | ||
} | ||
|
||
public static class DeletingFileInputStream extends FileInputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like it's really scary and not going to be maintainable. If a user needs to read from the file again from the beginning and just makes a new stream, they'll be surprised if they had closed the previous stream already. It's one thing to have a side effect to reverse another side effect that you're class was responsible for (you open a file descriptor in line 23, so you close it at line 40). In this case, you're passing ownership of the underlying file (not just the descriptor) to the DeletingFileInputStream.
What happens if some files aren't being consumed by the readers - what if there was an exception before you opened all of the streams? What if there are future refactorings?
A better approach might be to make a scratch directory for each migration-shard run (that way you can manage any number of sessions simultaneously within one process). As you finish running each of those sessions, just blow away everything that you had downloaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably easier to talk this one out, but - the way the SourceRepoAccessor works is that it wraps an underlying SourceRepo’s calls, which return the Path to a particular file within the repo and converts them to a Stream. In the case of an S3SourceRepo, when you get the Path to the file, it downloads it to disk if it’s not already there. So if you wrap an S3SourceRepo in an EphemeralSourceRepoAccessor, what happens is that you are returned an InputStream to a file which is downloaded if it doesn’t already exist and then that file is deleted when the Stream is closed.
It seems like any code that returns a stream would have the same problems you mentioned. I'm not opposed to doing it differently, but that would entail a wider refactoring and I was hoping to avoid that given we have more urgent things to be tackling right now (IMO).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussion, we decided to keep this around in case we need it in the future but use the Default (non-deleting) version in RFS for now. If the disk fills up from our s3 downloads, it should kill the process and naturally free up disk space by getting a new container.
@@ -12,24 +12,23 @@ | |||
*/ | |||
|
|||
public class PartSliceStream extends InputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from the javadoc - what's the purpose of this class? I don't understand what the 'special sauce' is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The special sauce is that it provides a single stream-like object that seamlessly reads through multiple Snapshot blob files that have split into multiple parts, per the Elasticsearch/OpenSearch convention of not having any individual file bigger than ~1 GB.
We don't have to do things this way, but it's what the ES/OS code did and I haven't had a reason to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it like the SequenceInputStream, or maybe like that with a bit more support through it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like "kinda"? Basically, you just have a base blobfile name (foo) and if it's larger than 1 GB it's split into multiple files (foo.part0, foo.part1, foo.part2). The PartSliceStream creates a stream that mimics as if they were all the same file you're reading from.
@Override | ||
public void close() { | ||
try { | ||
Path luceneIndexDir = Paths.get(luceneFilesBasePath + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like you're deleting a session directory here. Why did you need to delete files above when tied to the stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand the question. Can you re-phrase?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this redundant to what you are doing with the DeletingFileInputStream above? This place feels like where all of your cached files should be deleted - or are there other files that are created outside of this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per discussion - there are two things that we need to clean up: the raw snapshot files we download from s3, and the files we convert those into which Lucene actually cares about. This is for cleaning up the later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per the jira in the log message added above, we should eventually get this down to no extra file buffers, which reduces the number of resources that we're managing and should make the code more efficient and easier to maintain.
} | ||
|
||
} catch (Exception e) { | ||
throw new CouldNotCleanUpShard("Could not clean up shard: Index " + shardMetadata.getIndexId() + ", Shard " + shardMetadata.getShardId(), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should/does this kill the process? Seems like it probably should since it would be safer to recycle the process, especially since you were at the end of working (if you have other pending work, maybe it should not take more on and flush out the current work items).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. With the way things are currently implemented, I believe it will kill the process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have a test to confirm that? I ask because making sure that processes die, especially through refactorings, can be pretty tricky to maintain.
ShardMetadata.Factory shardMetadataFactory, SnapshotShardUnpacker unpacker, LuceneDocumentsReader reader, | ||
DocumentReindexer reindexer) { | ||
this.members = new DocumentsStep.SharedMembers(globalState, cmsClient, snapshotName, metadataFactory, shardMetadataFactory, unpacker, reader, reindexer); | ||
public DocumentsRunner(GlobalState globalState, CmsClient cmsClient, String snapshotName, long maxShardSizeBytes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I go back and forth on this one in other areas of Java code. Doing the 'right' thing with builders is cumbersome and it only amounts to a runtime check.
Please try to put these on separate lines and order them to tell the best story possible. This is basically dependency injection and everything is required, right?
Signed-off-by: Chris Helma <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Responded - trying to figure out where all of the ephemeral files are written and who all has responsibilities to delete them.
Signed-off-by: Chris Helma <[email protected]>
@Override | ||
public void close() { | ||
try { | ||
Path luceneIndexDir = Paths.get(luceneFilesBasePath + "/" + shardMetadata.getIndexName() + "/" + shardMetadata.getShardId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per the jira in the log message added above, we should eventually get this down to no extra file buffers, which reduces the number of resources that we're managing and should make the code more efficient and easier to maintain.
Description
Issues Resolved
Testing
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.