diff --git a/DocumentsFromSnapshotMigration/build.gradle b/DocumentsFromSnapshotMigration/build.gradle index e6b72770f..d8528d190 100644 --- a/DocumentsFromSnapshotMigration/build.gradle +++ b/DocumentsFromSnapshotMigration/build.gradle @@ -18,10 +18,19 @@ dependencies { implementation project(":commonDependencyVersionConstraints") implementation project(":RFS") + implementation group: 'org.apache.logging.log4j', name: 'log4j-api' + implementation group: 'org.apache.logging.log4j', name: 'log4j-core' implementation group: 'com.beust', name: 'jcommander' + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind' + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-annotations' + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core' + implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-smile' + implementation group: 'io.projectreactor.netty', name: 'reactor-netty-core' + implementation group: 'io.projectreactor.netty', name:'reactor-netty-http' implementation group: 'org.slf4j', name: 'slf4j-api' implementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl' + testImplementation testFixtures(project(":RFS")) testImplementation project(":CreateSnapshot") testImplementation project(":MetadataMigration") @@ -32,7 +41,6 @@ dependencies { testImplementation group: 'org.testcontainers', name: 'testcontainers' testImplementation platform('io.projectreactor:reactor-bom:2023.0.5') - testImplementation group: 'io.projectreactor.netty', name: 'reactor-netty-core' } application { diff --git a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java index 2cf9411db..5ff8c4648 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java @@ -1,9 +1,13 @@ package com.rfs; +import com.rfs.cms.AbstractedHttpClient; import com.rfs.cms.ApacheHttpClient; import com.rfs.cms.OpenSearchWorkCoordinator; import com.rfs.cms.ProcessManager; +import com.rfs.cms.ReactorHttpClient; import com.rfs.common.ClusterVersion; +import com.rfs.common.ConnectionDetails; +import com.rfs.common.DefaultSourceRepoAccessor; import com.rfs.common.DocumentReindexer; import com.rfs.common.FileSystemRepo; import com.rfs.common.FileSystemSnapshotCreator; @@ -11,12 +15,17 @@ import com.rfs.common.IndexMetadata; import com.rfs.common.LuceneDocumentsReader; import com.rfs.common.OpenSearchClient; +import com.rfs.common.ShardMetadata; import com.rfs.common.SnapshotRepo; +import com.rfs.common.SnapshotShardUnpacker; +import com.rfs.common.SourceRepo; import com.rfs.framework.ElasticsearchContainer; import com.rfs.transformers.TransformFunctions; import com.rfs.transformers.Transformer; +import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10; import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10; import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10; +import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10; import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10; import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11; import com.rfs.version_os_2_11.IndexCreator_OS_2_11; @@ -31,18 +40,14 @@ import org.opensearch.testcontainers.OpensearchContainer; import reactor.core.publisher.Flux; -import javax.print.Doc; import java.io.IOException; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; -import java.time.Clock; -import java.util.AbstractQueue; import java.util.Comparator; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.function.UnaryOperator; @@ -55,18 +60,16 @@ public class FullTest { "preloaded-ES_7_10")); final static OpensearchContainer osTargetContainer = new OpensearchContainer<>("opensearchproject/opensearch:2.13.0"); - private static Supplier esHttpClientSupplier; @BeforeAll static void setupSourceContainer() { esSourceContainer.start(); osTargetContainer.start(); - esHttpClientSupplier = () -> new ApacheHttpClient(URI.create(esSourceContainer.getUrl())); } @Test public void test() throws Exception { - final var SNAPSHOT_NAME = "testSnapshot"; + final var SNAPSHOT_NAME = "test_snapshot"; CreateSnapshot.run( c -> new FileSystemSnapshotCreator(SNAPSHOT_NAME, c, ElasticsearchContainer.CLUSTER_SNAPSHOT_DIR), new OpenSearchClient(esSourceContainer.getUrl(), null)); @@ -74,17 +77,18 @@ public void test() throws Exception { try { esSourceContainer.copySnapshotData(tempDir.toString()); - var targetClient = new OpenSearchClient(osTargetContainer.getHost(), null); - migrateMetadata(tempDir, targetClient, SNAPSHOT_NAME); - - migrateDocumentsWithOneWorker(SNAPSHOT_NAME); + var targetClient = new OpenSearchClient(osTargetContainer.getHttpHostAddress(), null); + var sourceRepo = new FileSystemRepo(tempDir); + migrateMetadata(sourceRepo, targetClient, SNAPSHOT_NAME); + + migrateDocumentsWithOneWorker(sourceRepo, SNAPSHOT_NAME); } finally { deleteTree(tempDir); } } - private static void migrateMetadata(Path tempDir, OpenSearchClient targetClient, String snapshotName) { - SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(new FileSystemRepo(tempDir)); + private static void migrateMetadata(SourceRepo sourceRepo, OpenSearchClient targetClient, String snapshotName) { + SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider); GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), List.of(), List.of()); @@ -113,7 +117,7 @@ public Flux readDocuments(String indexName, int shard) { static class LeasePastError extends Error { } - private void migrateDocumentsWithOneWorker(String snapshotName) throws Exception { + private void migrateDocumentsWithOneWorker(SourceRepo sourceRepo, String snapshotName) throws Exception { var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene"); try { var shouldThrow = new AtomicBoolean(); @@ -128,12 +132,26 @@ private void migrateDocumentsWithOneWorker(String snapshotName) throws Exception shouldThrow.set(true); }); + DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo); + SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, + tempDir, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); + + SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); + IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); + ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); + RfsMigrateDocuments.run(path -> new FilteredLuceneDocumentsReader(path, terminatingDocumentFilter), - new DocumentReindexer(new OpenSearchClient(osTargetContainer.getHost(), null)), - new OpenSearchWorkCoordinator(esHttpClientSupplier.get(), + new DocumentReindexer(new OpenSearchClient(osTargetContainer.getHttpHostAddress(), null)), + new OpenSearchWorkCoordinator( + new ApacheHttpClient(new URI(osTargetContainer.getHttpHostAddress())), +// new ReactorHttpClient(new ConnectionDetails(osTargetContainer.getHttpHostAddress(), +// null, null)), TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString()), processManager, - 1,snapshotName,1,1, + indexMetadataFactory, + snapshotName, + shardMetadataFactory, + unpackerFactory, 16*1024*1024); } finally { deleteTree(tempDir); diff --git a/DocumentsFromSnapshotMigration/src/test/resources/log4j2.properties b/DocumentsFromSnapshotMigration/src/test/resources/log4j2.properties new file mode 100644 index 000000000..d496070a2 --- /dev/null +++ b/DocumentsFromSnapshotMigration/src/test/resources/log4j2.properties @@ -0,0 +1,10 @@ +status = ERROR + +appender.console.type = Console +appender.console.name = Console +appender.console.target = SYSTEM_OUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss.SSS} %threadName %-5p %c{1}:%L - %m%n + +rootLogger.level = trace +rootLogger.appenderRef.console.ref = Console diff --git a/RFS/src/main/java/com/rfs/RunRfsWorker.java b/RFS/src/main/java/com/rfs/RunRfsWorker.java index 344e5a04a..2af4129cb 100644 --- a/RFS/src/main/java/com/rfs/RunRfsWorker.java +++ b/RFS/src/main/java/com/rfs/RunRfsWorker.java @@ -168,7 +168,6 @@ public static void main(String[] args) throws Exception { DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo); var unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES); - LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath); DocumentReindexer reindexer = new DocumentReindexer(targetClient); var processManager = new ProcessManager(workItemId->{ log.error("terminating RunRfsWorker because its lease has expired for "+workItemId); @@ -178,8 +177,12 @@ public static void main(String[] args) throws Exception { 5, UUID.randomUUID().toString()); var scopedWorkCoordinator = new ScopedWorkCoordinatorHelper(workCoordinator, processManager); new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName); - new DocumentsRunner(scopedWorkCoordinator, snapshotName, - shardMetadataFactory, unpackerFactory, reader, reindexer).migrateNextShard(); + new DocumentsRunner(scopedWorkCoordinator, + (name,shard) -> shardMetadataFactory.fromRepo(snapshotName,name,shard), + unpackerFactory, + path -> new LuceneDocumentsReader(luceneDirPath), + reindexer) + .migrateNextShard(); } catch (Exception e) { log.error("Unexpected error running RfsWorker", e); throw e; diff --git a/RFS/src/main/java/com/rfs/cms/ApacheHttpClient.java b/RFS/src/main/java/com/rfs/cms/ApacheHttpClient.java index 858beed37..35950dbf6 100644 --- a/RFS/src/main/java/com/rfs/cms/ApacheHttpClient.java +++ b/RFS/src/main/java/com/rfs/cms/ApacheHttpClient.java @@ -1,6 +1,7 @@ package com.rfs.cms; import lombok.Getter; +import lombok.Lombok; import lombok.extern.slf4j.Slf4j; import org.apache.hc.client5.http.classic.methods.HttpDelete; import org.apache.hc.client5.http.classic.methods.HttpGet; @@ -20,6 +21,7 @@ import java.util.AbstractMap; import java.util.Arrays; import java.util.Map; +import java.util.Optional; import java.util.stream.Stream; @Slf4j @@ -62,7 +64,14 @@ public AbstractHttpResponse makeRequest(String method, String path, } return client.execute(request, fr -> new AbstractHttpResponse() { @Getter - byte[] payloadBytes = fr.getEntity().getContent().readAllBytes(); + final byte[] payloadBytes = Optional.ofNullable(fr.getEntity()) + .map(e-> { + try { + return e.getContent().readAllBytes(); + } catch (IOException ex) { + throw Lombok.sneakyThrow(ex); + } + }).orElse(null); @Override public String getStatusText() { diff --git a/RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java b/RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java index 89b31a04a..ba9ff3498 100644 --- a/RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java +++ b/RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java @@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.time.Clock; import java.time.Duration; import java.time.Instant; @@ -30,6 +31,7 @@ public class OpenSearchWorkCoordinator implements IWorkCoordinator { public static final String OLD_EXPIRATION_THRESHOLD_TEMPLATE = "OLD_EXPIRATION_THRESHOLD"; public static final String VERSION_CONFLICTS_FIELD_NAME = "version_conflicts"; public static final String COMPLETED_AT_FIELD_NAME = "completedAt"; + public static final String HEAD_METHOD = "HEAD"; private final long tolerableClientServerClockDifferenceSeconds; private final AbstractedHttpClient httpClient; @@ -57,6 +59,13 @@ public void close() throws Exception { } public void setup() throws IOException { + var indexCheckResponse = httpClient.makeJsonRequest(HEAD_METHOD, INDEX_NAME,null, null); + if (indexCheckResponse.getStatusCode() == 200) { + log.info("Not creating " + INDEX_NAME + " because it already exists"); + return; + } + log.atInfo().setMessage("Creating " + INDEX_NAME + " because it's HEAD check returned " + + indexCheckResponse.getStatusCode()).log(); var body = "{\n" + " \"settings\": {\n" + " \"index\": {" + @@ -88,7 +97,7 @@ public void setup() throws IOException { if ((response.getStatusCode() / 100) != 2) { throw new IOException("Could not setup " + INDEX_NAME + ". " + "Got error code " + response.getStatusCode() + " and response: " + - Arrays.toString(response.getPayloadBytes())); + new String(response.getPayloadBytes(), StandardCharsets.UTF_8)); } } @@ -336,8 +345,19 @@ UpdateResult assignOneWorkItem(long expirationWindowSeconds) throws IOException private WorkItemAndDuration getAssignedWorkItem() throws IOException { final var queryWorkersAssignedItemsTemplate = "{\n" + " \"query\": {\n" + - " \"term\": { \"" + LEASE_HOLDER_ID_FIELD_NAME + "\": \"" + WORKER_ID_TEMPLATE + "\"}\n" + - " }\n" + + " \"bool\": {" + + " \"must\": [" + + " {" + + " \"term\": { \"" + LEASE_HOLDER_ID_FIELD_NAME + "\": \"" + WORKER_ID_TEMPLATE + "\"}\n" + + " }" + + " ]," + + " \"must_not\": [" + + " {" + + " \"exists\": { \"field\": \"" + COMPLETED_AT_FIELD_NAME + "\"}\n" + + " }" + + " ]" + + " }" + + " }" + "}"; final var body = queryWorkersAssignedItemsTemplate.replace(WORKER_ID_TEMPLATE, workerId); var response = httpClient.makeJsonRequest(POST_METHOD, INDEX_NAME + "/_search", @@ -348,9 +368,10 @@ private WorkItemAndDuration getAssignedWorkItem() throws IOException { log.warn("Couldn't find the top level 'hits' field, returning null"); return null; } - if (resultHitsUpper.path("total").path("value").longValue() != 1) { - log.warn("The query didn't return one item for the worker node, so returning null"); - return null; + final var numDocs = resultHitsUpper.path("total").path("value").longValue(); + if (numDocs != 1) { + throw new IllegalStateException("The query for the assigned work document returned " + numDocs + + " instead of one item"); } var resultHitInner = resultHitsUpper.path("hits").path(0); var expiration = resultHitInner.path("_source").path(EXPIRATION_FIELD_NAME).longValue(); diff --git a/RFS/src/main/java/com/rfs/cms/ReactorHttpClient.java b/RFS/src/main/java/com/rfs/cms/ReactorHttpClient.java new file mode 100644 index 000000000..a1eea5474 --- /dev/null +++ b/RFS/src/main/java/com/rfs/cms/ReactorHttpClient.java @@ -0,0 +1,97 @@ +package com.rfs.cms; + +import com.rfs.common.ConnectionDetails; +import com.rfs.common.RestClient; +import io.netty.handler.codec.http.HttpMethod; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; +import reactor.netty.ByteBufMono; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClientResponse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +public class ReactorHttpClient implements AbstractedHttpClient { + + private HttpClient client; + + @Getter + @AllArgsConstructor + public static class Response implements AbstractedHttpClient.AbstractHttpResponse { + List> headersList; + String statusText; + int statusCode; + byte[] payloadBytes; + + @Override + public Stream> getHeaders() { + return headersList.stream(); + } + } + + public ReactorHttpClient(ConnectionDetails connectionDetails) { + this.client = HttpClient.create() + .baseUrl(connectionDetails.url) + .headers(h -> { + h.add("Content-Type", "application/json"); + h.add("User-Agent", "RfsWorker-1.0"); + if (connectionDetails.authType == ConnectionDetails.AuthType.BASIC) { + String credentials = connectionDetails.username + ":" + connectionDetails.password; + String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes()); + h.add("Authorization", "Basic " + encodedCredentials); + } + }); + } + + @Override + public AbstractHttpResponse makeRequest(String method, String path, Map headers, String payload) + throws IOException { + var requestSender = client.request(HttpMethod.valueOf(method)) + .uri("/"+path); + BiFunction> responseWrapperFunction = + (response, bytes) -> { + try { + log.info("Received response with status: " + response.status()); + log.info("Response headers: " + response.responseHeaders().entries()); + + return bytes.asByteArray() + .map(b -> { + try { + log.info("Making wrapped response with status: " + response.status()); + + return new Response(new ArrayList<>(response.responseHeaders().entries()), + response.status().reasonPhrase(), + response.status().code(), + b); + } catch (Exception e) { + log.atError().setCause(e).setMessage("Caught exception").log(); + throw e; + } + }) + .or(Mono.fromSupplier(()-> + new Response(new ArrayList<>(response.responseHeaders().entries()), + response.status().reasonPhrase(), response.status().code(), null))); + } catch (Exception e) { + log.atError().setCause(e).setMessage("Caught exception").log(); + throw e; + } + }; + var monoResponse = payload != null ? + requestSender.send(ByteBufMono.fromString(Mono.just(payload))).responseSingle(responseWrapperFunction) : + requestSender.responseSingle(responseWrapperFunction); + return monoResponse.block(); + } + + @Override + public void close() throws Exception {} +} diff --git a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java index 5f5de88e7..d2f04faf3 100644 --- a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java +++ b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java @@ -79,7 +79,7 @@ public Flux readDocuments(String indexName, int shard) { } @SneakyThrows - protected IndexReader openIndexReader(Path indexDirectoryPath) { + protected IndexReader openIndexReader(Path indexDirectoryPath) throws IOException { return DirectoryReader.open(FSDirectory.open(indexDirectoryPath)); } diff --git a/RFS/src/main/java/com/rfs/worker/ShardWorkPreparer.java b/RFS/src/main/java/com/rfs/worker/ShardWorkPreparer.java index 270f8a4f3..1b27950cb 100644 --- a/RFS/src/main/java/com/rfs/worker/ShardWorkPreparer.java +++ b/RFS/src/main/java/com/rfs/worker/ShardWorkPreparer.java @@ -24,6 +24,10 @@ public class ShardWorkPreparer { public void run(ScopedWorkCoordinatorHelper scopedWorkCoordinator, IndexMetadata.Factory metadataFactory, String snapshotName) throws IOException { + + // ensure that there IS an index to house the shared state that we're going to be manipulating + scopedWorkCoordinator.workCoordinator.setup(); + scopedWorkCoordinator.ensurePhaseCompletion( wc -> { try { diff --git a/RFS/src/test/java/com/rfs/cms/WorkCoordinatorTest.java b/RFS/src/test/java/com/rfs/cms/WorkCoordinatorTest.java index 9ba17b152..d4c61691d 100644 --- a/RFS/src/test/java/com/rfs/cms/WorkCoordinatorTest.java +++ b/RFS/src/test/java/com/rfs/cms/WorkCoordinatorTest.java @@ -14,7 +14,9 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; /** @@ -36,7 +38,8 @@ public class WorkCoordinatorTest { final static OpensearchContainer container = - new OpensearchContainer<>("opensearchproject/opensearch:2.11.0"); + new OpensearchContainer<>("opensearchproject/opensearch:1.3.0"); + public static final String DUMMY_FINISHED_DOC_ID = "dummy_finished_doc"; private static Supplier httpClientSupplier; @@ -131,22 +134,16 @@ public void testAcquireLeaseForQuery() throws Exception { for (int runs=0; runs<2; ++runs) { final var seenWorkerItems = new ConcurrentHashMap(); - var allThreads = new ArrayList(); + var allFutures = new ArrayList>(); final var expiration = Duration.ofSeconds(5); for (int i = 0; i < NUM_DOCS; ++i) { int finalI = i; int finalRuns = runs; - var t = new Thread(() -> getWorkItemAndVerity(finalRuns + "-" + finalI, seenWorkerItems, expiration)); - allThreads.add(t); - t.start(); + var cf = CompletableFuture.supplyAsync(() -> + getWorkItemAndVerity(finalRuns + "-" + finalI, seenWorkerItems, expiration)); + allFutures.add(cf); } - allThreads.forEach(t -> { - try { - t.join(); - } catch (InterruptedException e) { - throw Lombok.sneakyThrow(e); - } - }); + CompletableFuture.allOf(allFutures.toArray(CompletableFuture[]::new)).join(); Assertions.assertEquals(NUM_DOCS, seenWorkerItems.size()); try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), @@ -160,17 +157,23 @@ public void testAcquireLeaseForQuery() throws Exception { } } + static AtomicInteger nonce = new AtomicInteger(); @SneakyThrows - private static void getWorkItemAndVerity(String workerSuffix, ConcurrentHashMap seenWorkerItems, + private static Void getWorkItemAndVerity(String workerSuffix, ConcurrentHashMap seenWorkerItems, Duration expirationWindow) { try (var workCoordinator = new OpenSearchWorkCoordinator(httpClientSupplier.get(), 3600, "firstPass_"+ workerSuffix)) { + var doneId = DUMMY_FINISHED_DOC_ID + "_" + nonce.incrementAndGet(); + workCoordinator.createOrUpdateLeaseForDocument(doneId, 1); + workCoordinator.completeWorkItem(doneId); + var nextWorkItem = workCoordinator.acquireNextWorkItem(expirationWindow); log.error("Next work item picked=" + nextWorkItem); Assertions.assertNotNull(nextWorkItem); Assertions.assertNotNull(nextWorkItem.workItemId); Assertions.assertTrue(nextWorkItem.leaseExpirationTime.isAfter(Instant.now())); seenWorkerItems.put(nextWorkItem.workItemId, nextWorkItem.workItemId); + return null; } } }