Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a node id to all spans #896

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
import org.opensearch.migrations.tracing.CompositeContextTracker;
import org.opensearch.migrations.tracing.RootOtelContext;
import org.opensearch.migrations.utils.ProcessHelpers;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
Expand Down Expand Up @@ -83,7 +84,8 @@
}

var rootContext = new RootSnapshotContext(
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(arguments.otelCollectorEndpoint, "rfs"),
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(arguments.otelCollectorEndpoint,
RootSnapshotContext.SCOPE_NAME, ProcessHelpers.getNodeInstanceName()),

Check warning on line 88 in CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java

View check run for this annotation

Codecov / codecov/patch

CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java#L87-L88

Added lines #L87 - L88 were not covered by tests
new CompositeContextTracker(new ActiveContextTracker(), new ActiveContextTrackerByActivityType())
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.function.Function;

import org.opensearch.migrations.Version;
Expand All @@ -17,6 +16,7 @@
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
import org.opensearch.migrations.tracing.CompositeContextTracker;
import org.opensearch.migrations.tracing.RootOtelContext;
import org.opensearch.migrations.utils.ProcessHelpers;

import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
Expand All @@ -41,7 +41,6 @@
import com.rfs.common.http.ConnectionContext;
import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;
import com.rfs.tracing.RootWorkCoordinationContext;
import com.rfs.worker.DocumentsRunner;
import com.rfs.worker.ShardWorkPreparer;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -168,6 +167,10 @@
}

public static void main(String[] args) throws Exception {
System.err.println("Got args: " + String.join("; ", args));
peternied marked this conversation as resolved.
Show resolved Hide resolved
var workerId = ProcessHelpers.getNodeInstanceName();
log.info("Starting RfsMigrateDocuments with workerId =" + workerId);

Check warning on line 172 in DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java

View check run for this annotation

Codecov / codecov/patch

DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java#L170-L172

Added lines #L170 - L172 were not covered by tests

Args arguments = new Args();
JCommander jCommander = JCommander.newBuilder().addObject(arguments).build();
jCommander.parse(args);
Expand All @@ -179,7 +182,7 @@

validateArgs(arguments);

var rootDocumentContext = makeRootContext(arguments);
var rootDocumentContext = makeRootContext(arguments, workerId);

Check warning on line 185 in DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java

View check run for this annotation

Codecov / codecov/patch

DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java#L185

Added line #L185 was not covered by tests
var luceneDirPath = Paths.get(arguments.luceneDir);
var snapshotLocalDirPath = arguments.snapshotLocalDir != null ? Paths.get(arguments.snapshotLocalDir) : null;

Expand All @@ -188,16 +191,13 @@
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC())) {
ConnectionContext connectionContext = arguments.targetArgs.toConnectionContext();
final var workerId = UUID.randomUUID().toString();
var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId
);
MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main
TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsMigrateDocuments with workerId = " + workerId);

OpenSearchClient targetClient = new OpenSearchClient(connectionContext);
DocumentReindexer reindexer = new DocumentReindexer(targetClient,
arguments.numDocsPerBulkRequest,
Expand Down Expand Up @@ -243,17 +243,17 @@
}
}

private static RootDocumentMigrationContext makeRootContext(Args arguments) {
private static RootDocumentMigrationContext makeRootContext(Args arguments, String workerId) {
var compositeContextTracker = new CompositeContextTracker(
new ActiveContextTracker(),
new ActiveContextTrackerByActivityType()
);
var otelSdk = RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(
arguments.otelCollectorEndpoint,
"docMigration"
RootDocumentMigrationContext.SCOPE_NAME,
workerId
);
var workContext = new RootWorkCoordinationContext(otelSdk, compositeContextTracker);
return new RootDocumentMigrationContext(otelSdk, compositeContextTracker, workContext);
return new RootDocumentMigrationContext(otelSdk, compositeContextTracker);

Check warning on line 256 in DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java

View check run for this annotation

Codecov / codecov/patch

DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java#L256

Added line #L256 was not covered by tests
}

public static DocumentsRunner.CompletionStatus run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.opensearch.migrations.metadata.tracing.MetadataMigrationTestContext;
import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext;
import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext;
import org.opensearch.migrations.workcoordination.tracing.WorkCoordinationTestContext;

import com.rfs.common.FileSystemRepo;
import com.rfs.common.FileSystemSnapshotCreator;
Expand All @@ -37,8 +36,7 @@ public class EndToEndTest extends SourceTestBase {
public void migrateFrom_ES_v6_8(final SearchClusterContainer.ContainerVersion targetVersion) throws Exception {
final var snapshotContext = SnapshotTestContext.factory().noOtelTracking();
final var metadataContext = MetadataMigrationTestContext.factory().noOtelTracking();
final var workCoordinationContext = WorkCoordinationTestContext.factory().noOtelTracking();
final var testDocMigrationContext = DocumentMigrationTestContext.factory(workCoordinationContext).noOtelTracking();
final var testDocMigrationContext = DocumentMigrationTestContext.factory().noOtelTracking();

try (
final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.ES_V6_8_23);
Expand Down Expand Up @@ -163,8 +161,7 @@ private void migrateFrom_ES_v7_X(
) {
final var snapshotContext = SnapshotTestContext.factory().noOtelTracking();
final var metadataContext = MetadataMigrationTestContext.factory().noOtelTracking();
final var workCoordinationContext = WorkCoordinationTestContext.factory().noOtelTracking();
final var testDocMigrationContext = DocumentMigrationTestContext.factory(workCoordinationContext).noOtelTracking();
final var testDocMigrationContext = DocumentMigrationTestContext.factory().noOtelTracking();

try {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.opensearch.migrations.metadata.tracing.MetadataMigrationTestContext;
import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext;
import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext;
import org.opensearch.migrations.workcoordination.tracing.WorkCoordinationTestContext;

import com.rfs.common.FileSystemRepo;
import com.rfs.common.FileSystemSnapshotCreator;
Expand All @@ -45,7 +44,7 @@ public class ParallelDocumentMigrationsTest extends SourceTestBase {

public static Stream<Arguments> makeDocumentMigrationArgs() {
List<Object[]> sourceImageArgs = SOURCE_IMAGES.stream()
.map(name -> makeParamsForBase(name))
.map(SourceTestBase::makeParamsForBase)
.collect(Collectors.toList());
var targetImageNames = TARGET_IMAGES.stream()
.collect(Collectors.toList());
Expand Down Expand Up @@ -82,8 +81,7 @@ public void testDocumentMigration(
var executorService = Executors.newFixedThreadPool(numWorkers);
final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking();
final var testMetadataMigrationContext = MetadataMigrationTestContext.factory().noOtelTracking();
final var workCoordinationContext = WorkCoordinationTestContext.factory().withAllTracking();
final var testDocMigrationContext = DocumentMigrationTestContext.factory(workCoordinationContext)
final var testDocMigrationContext = DocumentMigrationTestContext.factory()
.withAllTracking();

try (
Expand Down Expand Up @@ -197,7 +195,7 @@ public void testDocumentMigration(
}

private void verifyWorkMetrics(DocumentMigrationTestContext rootContext, int numWorkers, int numRuns) {
var workMetrics = rootContext.getWorkCoordinationContext().inMemoryInstrumentationBundle.getFinishedMetrics();
var workMetrics = rootContext.inMemoryInstrumentationBundle.getFinishedMetrics();
var migrationMetrics = rootContext.inMemoryInstrumentationBundle.getFinishedMetrics();

verifyCoordinatorBehavior(workMetrics, numRuns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ enum FailHow {
// The Document Migration process will throw an exception immediately, which will cause an exit.
"AT_STARTUP, 1",
// This test is dependent upon the max lease duration that is passed to the command line. It's set
// to such a short value (1s), that no document migration will exit in that amount of time. For good
// to such a short value (1s) that no document migration will exit in that amount of time. For good
// measure though, the toxiproxy also adds latency to the requests to make it impossible for the
// migration to complete w/in that 1s.
"WITH_DELAYS, 2" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
import org.opensearch.migrations.tracing.CompositeContextTracker;
import org.opensearch.migrations.tracing.RootOtelContext;
import org.opensearch.migrations.utils.ProcessHelpers;

import com.beust.jcommander.JCommander;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -26,7 +27,8 @@
}

var context = new RootMetadataMigrationContext(
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(arguments.otelCollectorEndpoint, "metadata"),
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(arguments.otelCollectorEndpoint, "metadata",
ProcessHelpers.getNodeInstanceName()),

Check warning on line 31 in MetadataMigration/src/main/java/org/opensearch/migrations/MetadataMigration.java

View check run for this annotation

Codecov / codecov/patch

MetadataMigration/src/main/java/org/opensearch/migrations/MetadataMigration.java#L30-L31

Added lines #L30 - L31 were not covered by tests
new CompositeContextTracker(new ActiveContextTracker(), new ActiveContextTrackerByActivityType())
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
public class RootWorkCoordinationContext extends RootOtelContext {
private static final String SCOPE_NAME = "workCoordination";

public final RootOtelContext enclosingScope;

public final WorkCoordinationContexts.InitializeCoordinatorStateContext.MetricInstruments initializeMetrics;
public final WorkCoordinationContexts.CreateUnassignedWorkItemContext.MetricInstruments createUnassignedWorkMetrics;
public final WorkCoordinationContexts.Refresh.MetricInstruments refreshMetrics;
Expand All @@ -18,7 +20,15 @@ public class RootWorkCoordinationContext extends RootOtelContext {
public final WorkCoordinationContexts.AcquireNextWorkItemContext.MetricInstruments acquireNextWorkMetrics;

public RootWorkCoordinationContext(OpenTelemetry sdk, IContextTracker contextTracker) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets remove this overload to prevent null from leaking out

this(sdk, contextTracker, null);
}

public RootWorkCoordinationContext(OpenTelemetry sdk,
IContextTracker contextTracker,
RootOtelContext enclosingScope)
{
super(SCOPE_NAME, contextTracker, sdk);
this.enclosingScope = enclosingScope;
var meter = this.getMeterProvider().get(SCOPE_NAME);

initializeMetrics = WorkCoordinationContexts.InitializeCoordinatorStateContext.makeMetrics(meter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ public class RootDocumentMigrationContext extends BaseRootRfsContext implements

public RootDocumentMigrationContext(
OpenTelemetry sdk,
IContextTracker contextTracker,
RootWorkCoordinationContext workCoordinationContext
IContextTracker contextTracker
) {
super(SCOPE_NAME, sdk, contextTracker);
var meter = this.getMeterProvider().get(SCOPE_NAME);
this.workCoordinationContext = workCoordinationContext;
workCoordinationContext = new RootWorkCoordinationContext(sdk, contextTracker, this);
documentReindexInstruments = DocumentMigrationContexts.DocumentReindexContext.makeMetrics(meter);
shardSetupMetrics = DocumentMigrationContexts.ShardSetupAttemptContext.makeMetrics(meter);
addShardWorkItemMetrics = DocumentMigrationContexts.AddShardWorkItemContext.makeMetrics(meter);
Expand Down
10 changes: 3 additions & 7 deletions RFS/src/test/java/com/rfs/integration/SnapshotStateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.junit.jupiter.api.io.TempDir;

import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext;
import org.opensearch.migrations.workcoordination.tracing.WorkCoordinationTestContext;

import com.rfs.common.DocumentReindexer.BulkDocSection;
import com.rfs.common.OpenSearchClient;
Expand Down Expand Up @@ -67,8 +66,7 @@ public void tearDown() throws Exception {
@Test
public void SingleSnapshot_SingleDocument() throws Exception {
// Setup
final var workCoordinationContext = WorkCoordinationTestContext.factory().noOtelTracking();
final var testContext = DocumentMigrationTestContext.factory(workCoordinationContext).noOtelTracking();
final var testContext = DocumentMigrationTestContext.factory().noOtelTracking();
final var indexName = "my-index";
final var document1Id = "doc1";
final var document1Body = " \n {\n\"fo$o\":\"bar\"\n}\t \n"; // Verify that we trim and remove newlines
Expand Down Expand Up @@ -106,8 +104,7 @@ public void SingleSnapshot_SingleDocument() throws Exception {
@Test
public void SingleSnapshot_SingleDocument_Then_DeletedDocument() throws Exception {
// Setup
final var workCoordinationContext = WorkCoordinationTestContext.factory().noOtelTracking();
final var testContext = DocumentMigrationTestContext.factory(workCoordinationContext).noOtelTracking();
final var testContext = DocumentMigrationTestContext.factory().noOtelTracking();
final var indexName = "my-index-with-deleted-item";
final var document1Id = "doc1-going-to-be-deleted";
final var document1Body = "{\"foo\":\"bar\"}";
Expand Down Expand Up @@ -140,8 +137,7 @@ public void SingleSnapshot_SingleDocument_Then_DeletedDocument() throws Exceptio
@Test
public void SingleSnapshot_SingleDocument_Then_UpdateDocument() throws Exception {
// Setup
final var workCoordinationContext = WorkCoordinationTestContext.factory().noOtelTracking();
final var testContext = DocumentMigrationTestContext.factory(workCoordinationContext).noOtelTracking();
final var testContext = DocumentMigrationTestContext.factory().noOtelTracking();
final var indexName = "my-index-with-updated-item";
final var document1Id = "doc1-going-to-be-updated";
final var document1BodyOriginal = "{\"foo\":\"bar\"}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.opensearch.migrations.tracing.IContextTracker;
import org.opensearch.migrations.tracing.InMemoryInstrumentationBundle;
import org.opensearch.migrations.workcoordination.tracing.WorkCoordinationTestContext;

import com.rfs.framework.tracing.TrackingTestContextFactory;
import com.rfs.tracing.IRfsContexts;
Expand All @@ -11,26 +10,16 @@
public class DocumentMigrationTestContext extends RootDocumentMigrationContext {
public final InMemoryInstrumentationBundle inMemoryInstrumentationBundle;

@Override
public WorkCoordinationTestContext getWorkCoordinationContext() {
return (WorkCoordinationTestContext) super.getWorkCoordinationContext();
}

public DocumentMigrationTestContext(
InMemoryInstrumentationBundle inMemoryInstrumentationBundle,
IContextTracker contextTracker,
WorkCoordinationTestContext workCoordinationContext
IContextTracker contextTracker
) {
super(inMemoryInstrumentationBundle.openTelemetrySdk, contextTracker, workCoordinationContext);
super(inMemoryInstrumentationBundle.openTelemetrySdk, contextTracker);
this.inMemoryInstrumentationBundle = inMemoryInstrumentationBundle;
}

public static TrackingTestContextFactory<DocumentMigrationTestContext> factory(
WorkCoordinationTestContext rootWorkCoordinationContext
) {
return new TrackingTestContextFactory<>(
(a, b) -> new DocumentMigrationTestContext(a, b, rootWorkCoordinationContext)
);
public static TrackingTestContextFactory<DocumentMigrationTestContext> factory() {
return new TrackingTestContextFactory<>(DocumentMigrationTestContext::new);
}

public IRfsContexts.IRequestContext createUnboundRequestContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.migrations.trafficcapture.netty.HeaderValueFilteringCapturePredicate;
import org.opensearch.migrations.trafficcapture.proxyserver.netty.BacksideConnectionPool;
import org.opensearch.migrations.trafficcapture.proxyserver.netty.NettyScanningHttpProxy;
import org.opensearch.migrations.utils.ProcessHelpers;
import org.opensearch.security.ssl.DefaultSecurityKeyStore;
import org.opensearch.security.ssl.util.SSLConfigConstants;

Expand Down Expand Up @@ -306,14 +307,15 @@ protected static Map<String, String> convertPairListToMap(List<String> list) {
}

public static void main(String[] args) throws InterruptedException, IOException {
System.err.println("Starting Capture Proxy");
System.err.println("Got args: " + String.join("; ", args));
log.info("Starting Capture Proxy on " + ProcessHelpers.getNodeInstanceName());

var params = parseArgs(args);
var backsideUri = convertStringToUri(params.backsideUriString);

var rootContext = new RootCaptureContext(
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint, "capture"),
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint, "capture",
ProcessHelpers.getNodeInstanceName()),
new CompositeContextTracker(new ActiveContextTracker(), new ActiveContextTrackerByActivityType())
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.migrations.transform.RemovingAuthTransformerFactory;
import org.opensearch.migrations.transform.SigV4AuthTransformerFactory;
import org.opensearch.migrations.transform.StaticAuthTransformerFactory;
import org.opensearch.migrations.utils.ProcessHelpers;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
Expand Down Expand Up @@ -221,11 +222,13 @@
}

public static void main(String[] args) throws Exception {
System.err.println("Got args: " + String.join("; ", args));
final var workerId = ProcessHelpers.getNodeInstanceName();
log.info("Starting Traffic Replayer with id=" + workerId);

Check warning on line 227 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L225-L227

Added lines #L225 - L227 were not covered by tests

var activeContextLogger = LoggerFactory.getLogger(ALL_ACTIVE_CONTEXTS_MONITOR_LOGGER);
var params = parseArgs(args);
URI uri;
System.err.println("Starting Traffic Replayer");
System.err.println("Got args: " + String.join("; ", args));
try {
uri = new URI(params.targetUriString);
} catch (Exception e) {
Expand Down Expand Up @@ -258,7 +261,9 @@
);
var contextTrackers = new CompositeContextTracker(globalContextTracker, perContextTracker);
var topContext = new RootReplayerContext(
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint, "replay"),
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint,

Check warning on line 264 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L264

Added line #L264 was not covered by tests
"replay",
ProcessHelpers.getNodeInstanceName()),

Check warning on line 266 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L266

Added line #L266 was not covered by tests
contextTrackers
);

Expand Down
Loading