From be81e7dffa9eb3e30c62a9f2d35edc2f2c6c4174 Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Tue, 13 Aug 2024 19:01:10 -0400 Subject: [PATCH 1/3] Add a node id, which is either an ECS task id, Kubernetes hostname, or random UUID (as per env variables). Put that worker/node id into all of the spans as a resource and continue to use it to thread the MDC environment for the work coordinator. Signed-off-by: Greg Schohn --- .../src/main/java/com/rfs/CreateSnapshot.java | 4 +- .../java/com/rfs/RfsMigrateDocuments.java | 17 ++++----- .../rfs/ParallelDocumentMigrationsTest.java | 17 ++++++--- .../java/com/rfs/ProcessLifecycleTest.java | 2 +- .../main/java/com/rfs/MetadataMigration.java | 5 ++- .../tracing/RootWorkCoordinationContext.java | 10 +++++ .../tracing/RootDocumentMigrationContext.java | 5 +-- .../rfs/integration/SnapshotStateTest.java | 10 ++--- .../tracing/DocumentMigrationTestContext.java | 19 ++-------- .../proxyserver/CaptureProxy.java | 4 +- .../migrations/replay/TrafficReplayer.java | 9 ++++- .../migrations/tracing/RootOtelContext.java | 38 ++++++++++++------- .../migrations/utils/ProcessHelpers.java | 32 ++++++++++++++++ .../IInstrumentationAttributesTest.java | 2 +- 14 files changed, 114 insertions(+), 60 deletions(-) create mode 100644 coreUtilities/src/main/java/org/opensearch/migrations/utils/ProcessHelpers.java diff --git a/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java b/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java index dba9818a5..85379c6f8 100644 --- a/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java +++ b/CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java @@ -7,6 +7,7 @@ import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType; import org.opensearch.migrations.tracing.CompositeContextTracker; import org.opensearch.migrations.tracing.RootOtelContext; +import org.opensearch.migrations.utils.ProcessHelpers; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -83,7 +84,8 @@ public static void main(String[] args) throws Exception { } var rootContext = new RootSnapshotContext( - RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(arguments.otelCollectorEndpoint, "rfs"), + RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(arguments.otelCollectorEndpoint, + RootSnapshotContext.SCOPE_NAME, ProcessHelpers.getNodeInstanceName()), new CompositeContextTracker(new ActiveContextTracker(), new ActiveContextTrackerByActivityType()) ); diff --git a/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java index 26fccd406..6677a5790 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java @@ -6,7 +6,6 @@ import java.time.Clock; import java.time.Duration; import java.util.List; -import java.util.UUID; import java.util.function.Function; import org.opensearch.migrations.reindexer.tracing.RootDocumentMigrationContext; @@ -14,6 +13,7 @@ import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType; import org.opensearch.migrations.tracing.CompositeContextTracker; import org.opensearch.migrations.tracing.RootOtelContext; +import org.opensearch.migrations.utils.ProcessHelpers; import com.beust.jcommander.IStringConverter; import com.beust.jcommander.JCommander; @@ -39,7 +39,6 @@ import com.rfs.common.http.ConnectionContext; import com.rfs.models.IndexMetadata; import com.rfs.models.ShardMetadata; -import com.rfs.tracing.RootWorkCoordinationContext; import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10; import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10; import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10; @@ -171,7 +170,8 @@ public static void main(String[] args) throws Exception { validateArgs(arguments); - var rootDocumentContext = makeRootContext(arguments); + var workerId = ProcessHelpers.getNodeInstanceName(); + var rootDocumentContext = makeRootContext(arguments, workerId); var luceneDirPath = Paths.get(arguments.luceneDir); var snapshotLocalDirPath = arguments.snapshotLocalDir != null ? Paths.get(arguments.snapshotLocalDir) : null; @@ -180,7 +180,6 @@ public static void main(String[] args) throws Exception { System.exit(PROCESS_TIMED_OUT); }, Clock.systemUTC())) { ConnectionContext connectionContext = arguments.targetArgs.toConnectionContext(); - final var workerId = UUID.randomUUID().toString(); var workCoordinator = new OpenSearchWorkCoordinator( new CoordinateWorkHttpClient(connectionContext), TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, @@ -188,7 +187,7 @@ public static void main(String[] args) throws Exception { ); MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main TryHandlePhaseFailure.executeWithTryCatch(() -> { - log.info("Running RfsMigrateDocuments with workerId = " + workerId); + log.info("Running RfsMigrateDocuments"); OpenSearchClient targetClient = new OpenSearchClient(connectionContext); DocumentReindexer reindexer = new DocumentReindexer(targetClient, arguments.numDocsPerBulkRequest, @@ -233,17 +232,17 @@ public static void main(String[] args) throws Exception { } } - private static RootDocumentMigrationContext makeRootContext(Args arguments) { + private static RootDocumentMigrationContext makeRootContext(Args arguments, String workerId) { var compositeContextTracker = new CompositeContextTracker( new ActiveContextTracker(), new ActiveContextTrackerByActivityType() ); var otelSdk = RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop( arguments.otelCollectorEndpoint, - "docMigration" + RootDocumentMigrationContext.SCOPE_NAME, + workerId ); - var workContext = new RootWorkCoordinationContext(otelSdk, compositeContextTracker); - return new RootDocumentMigrationContext(otelSdk, compositeContextTracker, workContext); + return new RootDocumentMigrationContext(otelSdk, compositeContextTracker); } public static DocumentsRunner.CompletionStatus run( diff --git a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ParallelDocumentMigrationsTest.java b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ParallelDocumentMigrationsTest.java index 2857e6f51..07484aaf0 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ParallelDocumentMigrationsTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ParallelDocumentMigrationsTest.java @@ -32,7 +32,6 @@ import org.opensearch.migrations.metadata.tracing.MetadataMigrationTestContext; import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext; import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext; -import org.opensearch.migrations.workcoordination.tracing.WorkCoordinationTestContext; import org.opensearch.testcontainers.OpensearchContainer; import com.rfs.cms.CoordinateWorkHttpClient; @@ -78,7 +77,7 @@ public class ParallelDocumentMigrationsTest extends SourceTestBase { public static Stream makeDocumentMigrationArgs() { List sourceImageArgs = SOURCE_IMAGES.stream() - .map(name -> makeParamsForBase(name)) + .map(SourceTestBase::makeParamsForBase) .collect(Collectors.toList()); var targetImageNames = TARGET_IMAGES.stream() .map(SearchClusterContainer.Version::getImageName) @@ -114,8 +113,7 @@ public void testDocumentMigration( var executorService = Executors.newFixedThreadPool(numWorkers); final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking(); final var testMetadataMigrationContext = MetadataMigrationTestContext.factory().noOtelTracking(); - final var workCoordinationContext = WorkCoordinationTestContext.factory().withAllTracking(); - final var testDocMigrationContext = DocumentMigrationTestContext.factory(workCoordinationContext) + final var testDocMigrationContext = DocumentMigrationTestContext.factory() .withAllTracking(); try ( @@ -226,8 +224,17 @@ public void testDocumentMigration( } } +// @Test +// public void testDocumentMigrationForBigMonolithicShardWorks() throws Exception { +// testDocumentMigration(1, +// SearchClusterContainer.OS_V2_14_0.getImageName(), +// SearchClusterContainer.OS_V2_14_0, +// GENERATOR_BASE_IMAGE, +// new String[]{"tail", "-f", "/dev/null"}); +// } + private void verifyWorkMetrics(DocumentMigrationTestContext rootContext, int numWorkers, int numRuns) { - var workMetrics = rootContext.getWorkCoordinationContext().inMemoryInstrumentationBundle.getFinishedMetrics(); + var workMetrics = rootContext.inMemoryInstrumentationBundle.getFinishedMetrics(); var migrationMetrics = rootContext.inMemoryInstrumentationBundle.getFinishedMetrics(); verifyCoordinatorBehavior(workMetrics, numRuns); diff --git a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ProcessLifecycleTest.java b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ProcessLifecycleTest.java index 85978d172..3742587a9 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ProcessLifecycleTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ProcessLifecycleTest.java @@ -63,7 +63,7 @@ enum FailHow { // The Document Migration process will throw an exception immediately, which will cause an exit. "AT_STARTUP, 1", // This test is dependent upon the max lease duration that is passed to the command line. It's set - // to such a short value (1s), that no document migration will exit in that amount of time. For good + // to such a short value (1s) that no document migration will exit in that amount of time. For good // measure though, the toxiproxy also adds latency to the requests to make it impossible for the // migration to complete w/in that 1s. "WITH_DELAYS, 2" }) diff --git a/MetadataMigration/src/main/java/com/rfs/MetadataMigration.java b/MetadataMigration/src/main/java/com/rfs/MetadataMigration.java index 56df53393..1ccf6e341 100644 --- a/MetadataMigration/src/main/java/com/rfs/MetadataMigration.java +++ b/MetadataMigration/src/main/java/com/rfs/MetadataMigration.java @@ -9,6 +9,7 @@ import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType; import org.opensearch.migrations.tracing.CompositeContextTracker; import org.opensearch.migrations.tracing.RootOtelContext; +import org.opensearch.migrations.utils.ProcessHelpers; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -102,7 +103,9 @@ public static void main(String[] args) throws Exception { } var rootContext = new RootMetadataMigrationContext( - RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(arguments.otelCollectorEndpoint, "rfs"), + RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(arguments.otelCollectorEndpoint, + RootMetadataMigrationContext.SCOPE_NAME, + ProcessHelpers.getNodeInstanceName()), new CompositeContextTracker(new ActiveContextTracker(), new ActiveContextTrackerByActivityType()) ); diff --git a/RFS/src/main/java/com/rfs/tracing/RootWorkCoordinationContext.java b/RFS/src/main/java/com/rfs/tracing/RootWorkCoordinationContext.java index 47121ea34..4aa0d36ca 100644 --- a/RFS/src/main/java/com/rfs/tracing/RootWorkCoordinationContext.java +++ b/RFS/src/main/java/com/rfs/tracing/RootWorkCoordinationContext.java @@ -9,6 +9,8 @@ public class RootWorkCoordinationContext extends RootOtelContext { private static final String SCOPE_NAME = "workCoordination"; + public final RootOtelContext enclosingScope; + public final WorkCoordinationContexts.InitializeCoordinatorStateContext.MetricInstruments initializeMetrics; public final WorkCoordinationContexts.CreateUnassignedWorkItemContext.MetricInstruments createUnassignedWorkMetrics; public final WorkCoordinationContexts.Refresh.MetricInstruments refreshMetrics; @@ -18,7 +20,15 @@ public class RootWorkCoordinationContext extends RootOtelContext { public final WorkCoordinationContexts.AcquireNextWorkItemContext.MetricInstruments acquireNextWorkMetrics; public RootWorkCoordinationContext(OpenTelemetry sdk, IContextTracker contextTracker) { + this(sdk, contextTracker, null); + } + + public RootWorkCoordinationContext(OpenTelemetry sdk, + IContextTracker contextTracker, + RootOtelContext enclosingScope) + { super(SCOPE_NAME, contextTracker, sdk); + this.enclosingScope = enclosingScope; var meter = this.getMeterProvider().get(SCOPE_NAME); initializeMetrics = WorkCoordinationContexts.InitializeCoordinatorStateContext.makeMetrics(meter); diff --git a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/RootDocumentMigrationContext.java b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/RootDocumentMigrationContext.java index 6abe81ac9..0ffbbdd61 100644 --- a/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/RootDocumentMigrationContext.java +++ b/RFS/src/main/java/org/opensearch/migrations/reindexer/tracing/RootDocumentMigrationContext.java @@ -19,12 +19,11 @@ public class RootDocumentMigrationContext extends BaseRootRfsContext implements public RootDocumentMigrationContext( OpenTelemetry sdk, - IContextTracker contextTracker, - RootWorkCoordinationContext workCoordinationContext + IContextTracker contextTracker ) { super(SCOPE_NAME, sdk, contextTracker); var meter = this.getMeterProvider().get(SCOPE_NAME); - this.workCoordinationContext = workCoordinationContext; + workCoordinationContext = new RootWorkCoordinationContext(sdk, contextTracker, this); documentReindexInstruments = DocumentMigrationContexts.DocumentReindexContext.makeMetrics(meter); shardSetupMetrics = DocumentMigrationContexts.ShardSetupAttemptContext.makeMetrics(meter); addShardWorkItemMetrics = DocumentMigrationContexts.AddShardWorkItemContext.makeMetrics(meter); diff --git a/RFS/src/test/java/com/rfs/integration/SnapshotStateTest.java b/RFS/src/test/java/com/rfs/integration/SnapshotStateTest.java index 0a38d1a3f..b9bb94902 100644 --- a/RFS/src/test/java/com/rfs/integration/SnapshotStateTest.java +++ b/RFS/src/test/java/com/rfs/integration/SnapshotStateTest.java @@ -10,7 +10,6 @@ import org.junit.jupiter.api.io.TempDir; import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext; -import org.opensearch.migrations.workcoordination.tracing.WorkCoordinationTestContext; import com.rfs.common.OpenSearchClient; import com.rfs.framework.ClusterOperations; @@ -64,8 +63,7 @@ public void tearDown() throws Exception { @Test public void SingleSnapshot_SingleDocument() throws Exception { // Setup - final var workCoordinationContext = WorkCoordinationTestContext.factory().noOtelTracking(); - final var testContext = DocumentMigrationTestContext.factory(workCoordinationContext).noOtelTracking(); + final var testContext = DocumentMigrationTestContext.factory().noOtelTracking(); final var indexName = "my-index"; final var document1Id = "doc1"; final var document1Body = "{\"fo$o\":\"bar\"}"; @@ -103,8 +101,7 @@ public void SingleSnapshot_SingleDocument() throws Exception { @Disabled("https://opensearch.atlassian.net/browse/MIGRATIONS-1746") public void SingleSnapshot_SingleDocument_Then_DeletedDocument() throws Exception { // Setup - final var workCoordinationContext = WorkCoordinationTestContext.factory().noOtelTracking(); - final var testContext = DocumentMigrationTestContext.factory(workCoordinationContext).noOtelTracking(); + final var testContext = DocumentMigrationTestContext.factory().noOtelTracking(); final var indexName = "my-index-with-deleted-item"; final var document1Id = "doc1-going-to-be-deleted"; final var document1Body = "{\"foo\":\"bar\"}"; @@ -142,8 +139,7 @@ public void SingleSnapshot_SingleDocument_Then_DeletedDocument() throws Exceptio @Disabled("https://opensearch.atlassian.net/browse/MIGRATIONS-1747") public void SingleSnapshot_SingleDocument_Then_UpdateDocument() throws Exception { // Setup - final var workCoordinationContext = WorkCoordinationTestContext.factory().noOtelTracking(); - final var testContext = DocumentMigrationTestContext.factory(workCoordinationContext).noOtelTracking(); + final var testContext = DocumentMigrationTestContext.factory().noOtelTracking(); final var indexName = "my-index-with-updated-item"; final var document1Id = "doc1-going-to-be-updated"; final var document1BodyOrginal = "{\"foo\":\"bar\"}"; diff --git a/RFS/src/testFixtures/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationTestContext.java b/RFS/src/testFixtures/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationTestContext.java index 87216824e..3b0bfbaed 100644 --- a/RFS/src/testFixtures/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationTestContext.java +++ b/RFS/src/testFixtures/java/org/opensearch/migrations/reindexer/tracing/DocumentMigrationTestContext.java @@ -2,7 +2,6 @@ import org.opensearch.migrations.tracing.IContextTracker; import org.opensearch.migrations.tracing.InMemoryInstrumentationBundle; -import org.opensearch.migrations.workcoordination.tracing.WorkCoordinationTestContext; import com.rfs.framework.tracing.TrackingTestContextFactory; import com.rfs.tracing.IRfsContexts; @@ -11,26 +10,16 @@ public class DocumentMigrationTestContext extends RootDocumentMigrationContext { public final InMemoryInstrumentationBundle inMemoryInstrumentationBundle; - @Override - public WorkCoordinationTestContext getWorkCoordinationContext() { - return (WorkCoordinationTestContext) super.getWorkCoordinationContext(); - } - public DocumentMigrationTestContext( InMemoryInstrumentationBundle inMemoryInstrumentationBundle, - IContextTracker contextTracker, - WorkCoordinationTestContext workCoordinationContext + IContextTracker contextTracker ) { - super(inMemoryInstrumentationBundle.openTelemetrySdk, contextTracker, workCoordinationContext); + super(inMemoryInstrumentationBundle.openTelemetrySdk, contextTracker); this.inMemoryInstrumentationBundle = inMemoryInstrumentationBundle; } - public static TrackingTestContextFactory factory( - WorkCoordinationTestContext rootWorkCoordinationContext - ) { - return new TrackingTestContextFactory<>( - (a, b) -> new DocumentMigrationTestContext(a, b, rootWorkCoordinationContext) - ); + public static TrackingTestContextFactory factory() { + return new TrackingTestContextFactory<>(DocumentMigrationTestContext::new); } public IRfsContexts.IRequestContext createUnboundRequestContext() { diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java index 145cad444..912882193 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java @@ -40,6 +40,7 @@ import org.opensearch.migrations.trafficcapture.netty.HeaderValueFilteringCapturePredicate; import org.opensearch.migrations.trafficcapture.proxyserver.netty.BacksideConnectionPool; import org.opensearch.migrations.trafficcapture.proxyserver.netty.NettyScanningHttpProxy; +import org.opensearch.migrations.utils.ProcessHelpers; import org.opensearch.security.ssl.DefaultSecurityKeyStore; import org.opensearch.security.ssl.util.SSLConfigConstants; @@ -313,7 +314,8 @@ public static void main(String[] args) throws InterruptedException, IOException var backsideUri = convertStringToUri(params.backsideUriString); var rootContext = new RootCaptureContext( - RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint, "capture"), + RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint, "capture", + ProcessHelpers.getNodeInstanceName()), new CompositeContextTracker(new ActiveContextTracker(), new ActiveContextTrackerByActivityType()) ); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index af0c09fd7..cafadfa7d 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -11,6 +11,7 @@ import java.time.Instant; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -27,6 +28,7 @@ import org.opensearch.migrations.transform.RemovingAuthTransformerFactory; import org.opensearch.migrations.transform.SigV4AuthTransformerFactory; import org.opensearch.migrations.transform.StaticAuthTransformerFactory; +import org.opensearch.migrations.utils.ProcessHelpers; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -224,7 +226,8 @@ public static void main(String[] args) throws Exception { var activeContextLogger = LoggerFactory.getLogger(ALL_ACTIVE_CONTEXTS_MONITOR_LOGGER); var params = parseArgs(args); URI uri; - System.err.println("Starting Traffic Replayer"); + final var workerId = UUID.randomUUID().toString(); + System.err.println("Starting Traffic Replayer with id=" + workerId); System.err.println("Got args: " + String.join("; ", args)); try { uri = new URI(params.targetUriString); @@ -258,7 +261,9 @@ public static void main(String[] args) throws Exception { ); var contextTrackers = new CompositeContextTracker(globalContextTracker, perContextTracker); var topContext = new RootReplayerContext( - RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint, "replay"), + RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint, + "replay", + ProcessHelpers.getNodeInstanceName()), contextTrackers ); diff --git a/coreUtilities/src/main/java/org/opensearch/migrations/tracing/RootOtelContext.java b/coreUtilities/src/main/java/org/opensearch/migrations/tracing/RootOtelContext.java index ac05c44a5..77edc9a17 100644 --- a/coreUtilities/src/main/java/org/opensearch/migrations/tracing/RootOtelContext.java +++ b/coreUtilities/src/main/java/org/opensearch/migrations/tracing/RootOtelContext.java @@ -35,13 +35,9 @@ public class RootOtelContext implements IRootOtelContext { public static OpenTelemetry initializeOpenTelemetryForCollector( @NonNull String collectorEndpoint, - @NonNull String serviceName + @NonNull String serviceName, + String nodeName ) { - var serviceResource = Resource.getDefault() - .toBuilder() - .put(ResourceAttributes.SERVICE_NAME, serviceName) - .build(); - final var spanProcessor = BatchSpanProcessor.builder( OtlpGrpcSpanExporter.builder().setEndpoint(collectorEndpoint).setTimeout(2, TimeUnit.SECONDS).build() ).build(); @@ -56,10 +52,22 @@ public static OpenTelemetry initializeOpenTelemetryForCollector( var openTelemetrySdk = OpenTelemetrySdk.builder() .setTracerProvider( - SdkTracerProvider.builder().setResource(serviceResource).addSpanProcessor(spanProcessor).build() + SdkTracerProvider.builder() + .setResource(Resource.getDefault() + .toBuilder() + .put(ResourceAttributes.SERVICE_NAME, serviceName) + .put(ResourceAttributes.SERVICE_INSTANCE_ID, nodeName) + .build()) + .addSpanProcessor(spanProcessor) + .build() ) .setMeterProvider( - SdkMeterProvider.builder().setResource(serviceResource).registerMetricReader(metricReader).build() + SdkMeterProvider.builder() + .setResource(Resource.getDefault() + .toBuilder() + .put(ResourceAttributes.SERVICE_NAME, serviceName) + .build()) + .registerMetricReader(metricReader).build() ) .build(); @@ -81,10 +89,11 @@ public static OpenTelemetry initializeNoopOpenTelemetry() { */ public static OpenTelemetry initializeOpenTelemetryWithCollectorOrAsNoop( String collectorEndpoint, - String serviceName + String serviceName, + String instanceName ) { return Optional.ofNullable(collectorEndpoint) - .map(endpoint -> initializeOpenTelemetryForCollector(endpoint, serviceName)) + .map(endpoint -> initializeOpenTelemetryForCollector(endpoint, serviceName, instanceName)) .orElseGet(() -> { if (serviceName != null) { log.atWarn() @@ -114,12 +123,13 @@ public CommonMetricInstruments getMetrics() { return null; } - public RootOtelContext(String scopeName, IContextTracker contextTracker) { - this(scopeName, contextTracker, null); + public RootOtelContext(String scopeName, IContextTracker contextTracker, String instanceName) { + this(scopeName, contextTracker, + initializeOpenTelemetryWithCollectorOrAsNoop(null, null, instanceName)); } - public RootOtelContext(String scopeName, IContextTracker contextTracker, OpenTelemetry sdk) { - openTelemetryImpl = sdk != null ? sdk : initializeOpenTelemetryWithCollectorOrAsNoop(null, null); + public RootOtelContext(String scopeName, IContextTracker contextTracker, @NonNull OpenTelemetry sdk) { + openTelemetryImpl = sdk; this.scopeName = scopeName; this.contextTracker = contextTracker; } diff --git a/coreUtilities/src/main/java/org/opensearch/migrations/utils/ProcessHelpers.java b/coreUtilities/src/main/java/org/opensearch/migrations/utils/ProcessHelpers.java new file mode 100644 index 000000000..c5f7c89d0 --- /dev/null +++ b/coreUtilities/src/main/java/org/opensearch/migrations/utils/ProcessHelpers.java @@ -0,0 +1,32 @@ +package org.opensearch.migrations.utils; + +import java.util.UUID; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ProcessHelpers { + private static final String DEFAULT_NODE_ID = UUID.randomUUID().toString(); + + public static String getNodeInstanceName() { + String id = null; + try { + id = System.getenv("ECS_TASK_ID"); // for ECS deployments + if (id != null) { + return id; + } + id = System.getenv("HOSTNAME"); // for any kubernetes deployed pod + if (id != null) { + return id; + } + // add additional fallbacks here + id = DEFAULT_NODE_ID; + return id; + } finally { + if (id != null) { + String finalId = id; + log.atInfo().setMessage(() -> "getNodeInstanceName()=" + finalId).log(); + } + } + } +} diff --git a/coreUtilities/src/test/java/org/opensearch/migrations/tracing/IInstrumentationAttributesTest.java b/coreUtilities/src/test/java/org/opensearch/migrations/tracing/IInstrumentationAttributesTest.java index c094e7f98..2adccc4e6 100644 --- a/coreUtilities/src/test/java/org/opensearch/migrations/tracing/IInstrumentationAttributesTest.java +++ b/coreUtilities/src/test/java/org/opensearch/migrations/tracing/IInstrumentationAttributesTest.java @@ -62,7 +62,7 @@ public AttributesBuilder fillAttributesForSpansBelow(AttributesBuilder builder) @Test public void getPopulatedAttributesAreOverrideCorrectly() { - var rootCtx = new RootOtelContext("test", IContextTracker.DO_NOTHING_TRACKER); + var rootCtx = new RootOtelContext("test", IContextTracker.DO_NOTHING_TRACKER, "unit_test"); var aCtx = new AContext(rootCtx); var bCtx = new BContext(rootCtx, aCtx); From 8847c0349499cab26578f6434b862aa8821d54aa Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Tue, 10 Sep 2024 08:59:33 -0400 Subject: [PATCH 2/3] Add a log.info for the process id for RFS, Proxy, and the replayer. Signed-off-by: Greg Schohn --- .../src/main/java/com/rfs/RfsMigrateDocuments.java | 7 ++++--- .../trafficcapture/proxyserver/CaptureProxy.java | 2 +- .../org/opensearch/migrations/replay/TrafficReplayer.java | 7 ++++--- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java index 0b6b2a517..08fd1cbda 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java @@ -167,6 +167,10 @@ public static void validateArgs(Args args) { } public static void main(String[] args) throws Exception { + System.err.println("Got args: " + String.join("; ", args)); + var workerId = ProcessHelpers.getNodeInstanceName(); + log.info("Starting RfsMigrateDocuments with workerId =" + workerId); + Args arguments = new Args(); JCommander jCommander = JCommander.newBuilder().addObject(arguments).build(); jCommander.parse(args); @@ -178,7 +182,6 @@ public static void main(String[] args) throws Exception { validateArgs(arguments); - var workerId = ProcessHelpers.getNodeInstanceName(); var rootDocumentContext = makeRootContext(arguments, workerId); var luceneDirPath = Paths.get(arguments.luceneDir); var snapshotLocalDirPath = arguments.snapshotLocalDir != null ? Paths.get(arguments.snapshotLocalDir) : null; @@ -195,8 +198,6 @@ public static void main(String[] args) throws Exception { ); MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main TryHandlePhaseFailure.executeWithTryCatch(() -> { - log.info("Running RfsMigrateDocuments"); - OpenSearchClient targetClient = new OpenSearchClient(connectionContext); DocumentReindexer reindexer = new DocumentReindexer(targetClient, arguments.numDocsPerBulkRequest, diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java index 912882193..bbb350147 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java @@ -307,8 +307,8 @@ protected static Map convertPairListToMap(List list) { } public static void main(String[] args) throws InterruptedException, IOException { - System.err.println("Starting Capture Proxy"); System.err.println("Got args: " + String.join("; ", args)); + log.info("Starting Capture Proxy on " + ProcessHelpers.getNodeInstanceName()); var params = parseArgs(args); var backsideUri = convertStringToUri(params.backsideUriString); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index ca9f38ed6..7f89aae32 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -223,12 +223,13 @@ private static String getTransformerConfig(Parameters params) { } public static void main(String[] args) throws Exception { + System.err.println("Got args: " + String.join("; ", args)); + final var workerId = ProcessHelpers.getNodeInstanceName(); + log.info("Starting Traffic Replayer with id=" + workerId); + var activeContextLogger = LoggerFactory.getLogger(ALL_ACTIVE_CONTEXTS_MONITOR_LOGGER); var params = parseArgs(args); URI uri; - final var workerId = UUID.randomUUID().toString(); - System.err.println("Starting Traffic Replayer with id=" + workerId); - System.err.println("Got args: " + String.join("; ", args)); try { uri = new URI(params.targetUriString); } catch (Exception e) { From fd9b3064a49531487bc222853093b0fe6e3e3e0a Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Tue, 10 Sep 2024 09:25:25 -0400 Subject: [PATCH 3/3] Minor changes in instance logging thanks to PR feedback. Also removed the empty MetadataMigration file that was a result of the last merge (the file was moved) Signed-off-by: Greg Schohn --- .../rfs/ParallelDocumentMigrationsTest.java | 9 ------- .../main/java/com/rfs/MetadataMigration.java | 1 - .../migrations/replay/TrafficReplayer.java | 1 - .../migrations/tracing/RootOtelContext.java | 15 ++++++----- .../migrations/utils/ProcessHelpers.java | 27 +++++-------------- .../IInstrumentationAttributesTest.java | 2 +- 6 files changed, 17 insertions(+), 38 deletions(-) delete mode 100644 MetadataMigration/src/main/java/com/rfs/MetadataMigration.java diff --git a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ParallelDocumentMigrationsTest.java b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ParallelDocumentMigrationsTest.java index 084c61917..093f6360f 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ParallelDocumentMigrationsTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ParallelDocumentMigrationsTest.java @@ -194,15 +194,6 @@ public void testDocumentMigration( } } -// @Test -// public void testDocumentMigrationForBigMonolithicShardWorks() throws Exception { -// testDocumentMigration(1, -// SearchClusterContainer.OS_V2_14_0.getImageName(), -// SearchClusterContainer.OS_V2_14_0, -// GENERATOR_BASE_IMAGE, -// new String[]{"tail", "-f", "/dev/null"}); -// } - private void verifyWorkMetrics(DocumentMigrationTestContext rootContext, int numWorkers, int numRuns) { var workMetrics = rootContext.inMemoryInstrumentationBundle.getFinishedMetrics(); var migrationMetrics = rootContext.inMemoryInstrumentationBundle.getFinishedMetrics(); diff --git a/MetadataMigration/src/main/java/com/rfs/MetadataMigration.java b/MetadataMigration/src/main/java/com/rfs/MetadataMigration.java deleted file mode 100644 index 8b1378917..000000000 --- a/MetadataMigration/src/main/java/com/rfs/MetadataMigration.java +++ /dev/null @@ -1 +0,0 @@ - diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index 7f89aae32..828da65dc 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -11,7 +11,6 @@ import java.time.Instant; import java.util.List; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; diff --git a/coreUtilities/src/main/java/org/opensearch/migrations/tracing/RootOtelContext.java b/coreUtilities/src/main/java/org/opensearch/migrations/tracing/RootOtelContext.java index 77edc9a17..3e869cef3 100644 --- a/coreUtilities/src/main/java/org/opensearch/migrations/tracing/RootOtelContext.java +++ b/coreUtilities/src/main/java/org/opensearch/migrations/tracing/RootOtelContext.java @@ -36,7 +36,7 @@ public class RootOtelContext implements IRootOtelContext { public static OpenTelemetry initializeOpenTelemetryForCollector( @NonNull String collectorEndpoint, @NonNull String serviceName, - String nodeName + @NonNull String nodeName ) { final var spanProcessor = BatchSpanProcessor.builder( OtlpGrpcSpanExporter.builder().setEndpoint(collectorEndpoint).setTimeout(2, TimeUnit.SECONDS).build() @@ -89,8 +89,8 @@ public static OpenTelemetry initializeNoopOpenTelemetry() { */ public static OpenTelemetry initializeOpenTelemetryWithCollectorOrAsNoop( String collectorEndpoint, - String serviceName, - String instanceName + @NonNull String serviceName, + @NonNull String instanceName ) { return Optional.ofNullable(collectorEndpoint) .map(endpoint -> initializeOpenTelemetryForCollector(endpoint, serviceName, instanceName)) @@ -123,12 +123,15 @@ public CommonMetricInstruments getMetrics() { return null; } - public RootOtelContext(String scopeName, IContextTracker contextTracker, String instanceName) { + public RootOtelContext(@NonNull String scopeName, + IContextTracker contextTracker, + @NonNull String serviceName, + @NonNull String instanceName) { this(scopeName, contextTracker, - initializeOpenTelemetryWithCollectorOrAsNoop(null, null, instanceName)); + initializeOpenTelemetryWithCollectorOrAsNoop(null, serviceName, instanceName)); } - public RootOtelContext(String scopeName, IContextTracker contextTracker, @NonNull OpenTelemetry sdk) { + public RootOtelContext(String scopeName, IContextTracker contextTracker, @NonNull OpenTelemetry sdk) { openTelemetryImpl = sdk; this.scopeName = scopeName; this.contextTracker = contextTracker; diff --git a/coreUtilities/src/main/java/org/opensearch/migrations/utils/ProcessHelpers.java b/coreUtilities/src/main/java/org/opensearch/migrations/utils/ProcessHelpers.java index c5f7c89d0..d2d2dce02 100644 --- a/coreUtilities/src/main/java/org/opensearch/migrations/utils/ProcessHelpers.java +++ b/coreUtilities/src/main/java/org/opensearch/migrations/utils/ProcessHelpers.java @@ -1,32 +1,19 @@ package org.opensearch.migrations.utils; +import java.util.Optional; import java.util.UUID; import lombok.extern.slf4j.Slf4j; @Slf4j public class ProcessHelpers { - private static final String DEFAULT_NODE_ID = UUID.randomUUID().toString(); + private static final String DEFAULT_NODE_ID = "generated_" + UUID.randomUUID().toString(); public static String getNodeInstanceName() { - String id = null; - try { - id = System.getenv("ECS_TASK_ID"); // for ECS deployments - if (id != null) { - return id; - } - id = System.getenv("HOSTNAME"); // for any kubernetes deployed pod - if (id != null) { - return id; - } - // add additional fallbacks here - id = DEFAULT_NODE_ID; - return id; - } finally { - if (id != null) { - String finalId = id; - log.atInfo().setMessage(() -> "getNodeInstanceName()=" + finalId).log(); - } - } + var nodeId = Optional.of("ECS_TASK_ID").map(System::getenv) + .or(() -> Optional.of("HOSTNAME").map(System::getenv)) + .orElse(DEFAULT_NODE_ID); + log.atInfo().setMessage(() -> "getNodeInstanceName()=" + nodeId).log(); + return nodeId; } } diff --git a/coreUtilities/src/test/java/org/opensearch/migrations/tracing/IInstrumentationAttributesTest.java b/coreUtilities/src/test/java/org/opensearch/migrations/tracing/IInstrumentationAttributesTest.java index 2adccc4e6..06922dbac 100644 --- a/coreUtilities/src/test/java/org/opensearch/migrations/tracing/IInstrumentationAttributesTest.java +++ b/coreUtilities/src/test/java/org/opensearch/migrations/tracing/IInstrumentationAttributesTest.java @@ -62,7 +62,7 @@ public AttributesBuilder fillAttributesForSpansBelow(AttributesBuilder builder) @Test public void getPopulatedAttributesAreOverrideCorrectly() { - var rootCtx = new RootOtelContext("test", IContextTracker.DO_NOTHING_TRACKER, "unit_test"); + var rootCtx = new RootOtelContext("test", IContextTracker.DO_NOTHING_TRACKER, "unitTestSvc", "testNode"); var aCtx = new AContext(rootCtx); var bCtx = new BContext(rootCtx, aCtx);