Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown executors created by RFS LeaseExpireTrigger and KafkaTrafficCaptureSource #766

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,33 +101,34 @@ public static void main(String[] args) throws Exception {
.parse(args);

var luceneDirPath = Paths.get(arguments.luceneDirPath);
var processManager = new LeaseExpireTrigger(workItemId->{
try (var processManager = new LeaseExpireTrigger(workItemId->{
log.error("terminating RunRfsWorker because its lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(arguments.targetHost)),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString());

TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");

OpenSearchClient targetClient =
new OpenSearchClient(arguments.targetHost, arguments.targetUser, arguments.targetPass, false);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);

SourceRepo sourceRepo = S3Repo.create(Paths.get(arguments.s3LocalDirPath),
new S3Uri(arguments.s3RepoUri), arguments.s3Region);
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory,
arguments.snapshotName, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes);
});
}, Clock.systemUTC())) {
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(arguments.targetHost)),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString());

TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");

OpenSearchClient targetClient =
new OpenSearchClient(arguments.targetHost, arguments.targetUser, arguments.targetPass, false);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);

SourceRepo sourceRepo = S3Repo.create(Paths.get(arguments.s3LocalDirPath),
new S3Uri(arguments.s3RepoUri), arguments.s3Region);
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory,
arguments.snapshotName, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes);
});
}
}

public static DocumentsRunner.CompletionStatus run(Function<Path,LuceneDocumentsReader> readerFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,32 +242,32 @@ private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(SourceRep
}
return d;
};
var processManager = new LeaseExpireTrigger(workItemId->{
try (var processManager = new LeaseExpireTrigger(workItemId->{
log.atDebug().setMessage("Lease expired for " + workItemId + " making next document get throw").log();
shouldThrow.set(true);
});

DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
tempDir, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);
})) {
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
tempDir, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);
IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);
IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);

return RfsMigrateDocuments.run(path -> new FilteredLuceneDocumentsReader(path, terminatingDocumentFilter),
new DocumentReindexer(new OpenSearchClient(targetAddress, null)),
new OpenSearchWorkCoordinator(
new ApacheHttpClient(new URI(targetAddress)),
return RfsMigrateDocuments.run(path -> new FilteredLuceneDocumentsReader(path, terminatingDocumentFilter),
new DocumentReindexer(new OpenSearchClient(targetAddress, null)),
new OpenSearchWorkCoordinator(
new ApacheHttpClient(new URI(targetAddress)),
// new ReactorHttpClient(new ConnectionDetails(osTargetContainer.getHttpHostAddress(),
// null, null)),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString()),
processManager,
indexMetadataFactory,
snapshotName,
shardMetadataFactory,
unpackerFactory,
16*1024*1024);
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString()),
processManager,
indexMetadataFactory,
snapshotName,
shardMetadataFactory,
unpackerFactory,
16 * 1024 * 1024);
}
} finally {
deleteTree(tempDir);
}
Expand Down
37 changes: 19 additions & 18 deletions RFS/src/main/java/com/rfs/RunRfsWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ public static void main(String[] args) throws Exception {
final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);

try {
try (var processManager = new LeaseExpireTrigger(workItemId -> {
log.error("terminating RunRfsWorker because its lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC())) {
log.info("Running RfsWorker");
OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection);
OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
Expand All @@ -167,23 +170,21 @@ public static void main(String[] args) throws Exception {
var unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);
var processManager = new LeaseExpireTrigger(workItemId->{
log.error("terminating RunRfsWorker because its lease has expired for "+workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(targetHost)),
5, UUID.randomUUID().toString());
var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, processManager);
new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName);
new DocumentsRunner(scopedWorkCoordinator,
(name,shard) -> shardMetadataFactory.fromRepo(snapshotName,name,shard),
unpackerFactory,
path -> new LuceneDocumentsReader(path),
reindexer)
.migrateNextShard();
} catch (Exception e) {
log.error("Unexpected error running RfsWorker", e);
throw e;
try {
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(targetHost)),
5, UUID.randomUUID().toString());
var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, processManager);
new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName);
new DocumentsRunner(scopedWorkCoordinator,
(name, shard) -> shardMetadataFactory.fromRepo(snapshotName, name, shard),
unpackerFactory,
path -> new LuceneDocumentsReader(path),
reindexer)
.migrateNextShard();
} catch (Exception e) {
log.error("Unexpected error running RfsWorker", e);
throw e;
}
}
}
}
7 changes: 6 additions & 1 deletion RFS/src/main/java/com/rfs/cms/LeaseExpireTrigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* time unless the work is marked as complete before that expiration. This class may, but does not need to,
* synchronize its clock with an external source of truth for better accuracy.
*/
public class LeaseExpireTrigger {
public class LeaseExpireTrigger implements AutoCloseable {
private final ScheduledExecutorService scheduledExecutorService;
final ConcurrentHashMap<String, Instant> workItemToLeaseMap;
final Consumer<String> onLeaseExpired;
Expand Down Expand Up @@ -47,4 +47,9 @@ public void registerExpiration(String workItemId, Instant killTime) {
public void markWorkAsCompleted(String workItemId) {
workItemToLeaseMap.remove(workItemId);
}

@Override
public void close() throws Exception {
scheduledExecutorService.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,5 +240,6 @@ public CommitResult commitTrafficStream(ITrafficStreamKey trafficStreamKey) {
@Override
public void close() throws IOException, InterruptedException, ExecutionException {
kafkaExecutor.submit(trackingKafkaConsumer::close).get();
kafkaExecutor.shutdownNow();
}
}