Skip to content

Commit

Permalink
Add a node id, which is either an ECS task id, Kubernetes hostname, o…
Browse files Browse the repository at this point in the history
…r random UUID (as per env variables).

Put that worker/node id into all of the spans as a resource and continue to use it to thread the MDC environment for the work coordinator.

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Aug 13, 2024
1 parent 73d991a commit 6a2d28e
Show file tree
Hide file tree
Showing 14 changed files with 115 additions and 55 deletions.
4 changes: 3 additions & 1 deletion CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
import org.opensearch.migrations.tracing.CompositeContextTracker;
import org.opensearch.migrations.tracing.RootOtelContext;
import org.opensearch.migrations.utils.ProcessHelpers;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
Expand Down Expand Up @@ -83,7 +84,8 @@ public static void main(String[] args) throws Exception {
}

var rootContext = new RootSnapshotContext(
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(arguments.otelCollectorEndpoint, "rfs"),
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(arguments.otelCollectorEndpoint,
RootSnapshotContext.SCOPE_NAME, ProcessHelpers.getNodeInstanceName()),
new CompositeContextTracker(new ActiveContextTracker(), new ActiveContextTrackerByActivityType())
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
import org.opensearch.migrations.tracing.CompositeContextTracker;
import org.opensearch.migrations.tracing.RootOtelContext;
import org.opensearch.migrations.utils.ProcessHelpers;

import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
Expand Down Expand Up @@ -171,7 +172,8 @@ public static void main(String[] args) throws Exception {

validateArgs(arguments);

var rootDocumentContext = makeRootContext(arguments);
var workerId = ProcessHelpers.getNodeInstanceName();
var rootDocumentContext = makeRootContext(arguments, workerId);
var luceneDirPath = Paths.get(arguments.luceneDir);
var snapshotLocalDirPath = arguments.snapshotLocalDir != null ? Paths.get(arguments.snapshotLocalDir) : null;

Expand All @@ -180,15 +182,14 @@ public static void main(String[] args) throws Exception {
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC())) {
ConnectionContext connectionContext = arguments.targetArgs.toConnectionContext();
final var workerId = UUID.randomUUID().toString();
var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId
);
MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main
TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsMigrateDocuments with workerId = " + workerId);
log.info("Running RfsMigrateDocuments");

OpenSearchClient targetClient = new OpenSearchClient(connectionContext);
DocumentReindexer reindexer = new DocumentReindexer(targetClient, arguments.numDocsPerBulkRequest,
Expand Down Expand Up @@ -233,17 +234,17 @@ public static void main(String[] args) throws Exception {
}
}

private static RootDocumentMigrationContext makeRootContext(Args arguments) {
private static RootDocumentMigrationContext makeRootContext(Args arguments, String workerId) {
var compositeContextTracker = new CompositeContextTracker(
new ActiveContextTracker(),
new ActiveContextTrackerByActivityType()
);
var otelSdk = RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(
arguments.otelCollectorEndpoint,
"docMigration"
RootDocumentMigrationContext.SCOPE_NAME,
workerId
);
var workContext = new RootWorkCoordinationContext(otelSdk, compositeContextTracker);
return new RootDocumentMigrationContext(otelSdk, compositeContextTracker, workContext);
return new RootDocumentMigrationContext(otelSdk, compositeContextTracker);
}

public static DocumentsRunner.CompletionStatus run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -78,7 +79,7 @@ public class ParallelDocumentMigrationsTest extends SourceTestBase {

public static Stream<Arguments> makeDocumentMigrationArgs() {
List<Object[]> sourceImageArgs = SOURCE_IMAGES.stream()
.map(name -> makeParamsForBase(name))
.map(SourceTestBase::makeParamsForBase)
.collect(Collectors.toList());
var targetImageNames = TARGET_IMAGES.stream()
.map(SearchClusterContainer.Version::getImageName)
Expand Down Expand Up @@ -114,8 +115,7 @@ public void testDocumentMigration(
var executorService = Executors.newFixedThreadPool(numWorkers);
final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking();
final var testMetadataMigrationContext = MetadataMigrationTestContext.factory().noOtelTracking();
final var workCoordinationContext = WorkCoordinationTestContext.factory().withAllTracking();
final var testDocMigrationContext = DocumentMigrationTestContext.factory(workCoordinationContext)
final var testDocMigrationContext = DocumentMigrationTestContext.factory()
.withAllTracking();

try (
Expand Down Expand Up @@ -226,8 +226,17 @@ public void testDocumentMigration(
}
}

// @Test
// public void testDocumentMigrationForBigMonolithicShardWorks() throws Exception {
// testDocumentMigration(1,
// SearchClusterContainer.OS_V2_14_0.getImageName(),
// SearchClusterContainer.OS_V2_14_0,
// GENERATOR_BASE_IMAGE,
// new String[]{"tail", "-f", "/dev/null"});
// }

private void verifyWorkMetrics(DocumentMigrationTestContext rootContext, int numWorkers, int numRuns) {
var workMetrics = rootContext.getWorkCoordinationContext().inMemoryInstrumentationBundle.getFinishedMetrics();
var workMetrics = rootContext.inMemoryInstrumentationBundle.getFinishedMetrics();
var migrationMetrics = rootContext.inMemoryInstrumentationBundle.getFinishedMetrics();

verifyCoordinatorBehavior(workMetrics, numRuns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ enum FailHow {
// The Document Migration process will throw an exception immediately, which will cause an exit.
"AT_STARTUP, 1",
// This test is dependent upon the max lease duration that is passed to the command line. It's set
// to such a short value (1s), that no document migration will exit in that amount of time. For good
// to such a short value (1s) that no document migration will exit in that amount of time. For good
// measure though, the toxiproxy also adds latency to the requests to make it impossible for the
// migration to complete w/in that 1s.
"WITH_DELAYS, 2" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
import org.opensearch.migrations.tracing.CompositeContextTracker;
import org.opensearch.migrations.tracing.RootOtelContext;
import org.opensearch.migrations.utils.ProcessHelpers;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
Expand Down Expand Up @@ -102,7 +103,9 @@ public static void main(String[] args) throws Exception {
}

var rootContext = new RootMetadataMigrationContext(
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(arguments.otelCollectorEndpoint, "rfs"),
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(arguments.otelCollectorEndpoint,
RootMetadataMigrationContext.SCOPE_NAME,
ProcessHelpers.getNodeInstanceName()),
new CompositeContextTracker(new ActiveContextTracker(), new ActiveContextTrackerByActivityType())
);

Expand Down
10 changes: 10 additions & 0 deletions RFS/src/main/java/com/rfs/tracing/RootWorkCoordinationContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
public class RootWorkCoordinationContext extends RootOtelContext {
private static final String SCOPE_NAME = "workCoordination";

public final RootOtelContext enclosingScope;

public final WorkCoordinationContexts.InitializeCoordinatorStateContext.MetricInstruments initializeMetrics;
public final WorkCoordinationContexts.CreateUnassignedWorkItemContext.MetricInstruments createUnassignedWorkMetrics;
public final WorkCoordinationContexts.Refresh.MetricInstruments refreshMetrics;
Expand All @@ -18,7 +20,15 @@ public class RootWorkCoordinationContext extends RootOtelContext {
public final WorkCoordinationContexts.AcquireNextWorkItemContext.MetricInstruments acquireNextWorkMetrics;

public RootWorkCoordinationContext(OpenTelemetry sdk, IContextTracker contextTracker) {
this(sdk, contextTracker, null);
}

public RootWorkCoordinationContext(OpenTelemetry sdk,
IContextTracker contextTracker,
RootOtelContext enclosingScope)
{
super(SCOPE_NAME, contextTracker, sdk);
this.enclosingScope = enclosingScope;
var meter = this.getMeterProvider().get(SCOPE_NAME);

initializeMetrics = WorkCoordinationContexts.InitializeCoordinatorStateContext.makeMetrics(meter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ public class RootDocumentMigrationContext extends BaseRootRfsContext implements

public RootDocumentMigrationContext(
OpenTelemetry sdk,
IContextTracker contextTracker,
RootWorkCoordinationContext workCoordinationContext
IContextTracker contextTracker
) {
super(SCOPE_NAME, sdk, contextTracker);
var meter = this.getMeterProvider().get(SCOPE_NAME);
this.workCoordinationContext = workCoordinationContext;
workCoordinationContext = new RootWorkCoordinationContext(sdk, contextTracker, this);
documentReindexInstruments = DocumentMigrationContexts.DocumentReindexContext.makeMetrics(meter);
shardSetupMetrics = DocumentMigrationContexts.ShardSetupAttemptContext.makeMetrics(meter);
addShardWorkItemMetrics = DocumentMigrationContexts.AddShardWorkItemContext.makeMetrics(meter);
Expand Down
9 changes: 3 additions & 6 deletions RFS/src/test/java/com/rfs/integration/SnapshotStateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ public void tearDown() throws Exception {
@Test
public void SingleSnapshot_SingleDocument() throws Exception {
// Setup
final var workCoordinationContext = WorkCoordinationTestContext.factory().noOtelTracking();
final var testContext = DocumentMigrationTestContext.factory(workCoordinationContext).noOtelTracking();
final var testContext = DocumentMigrationTestContext.factory().noOtelTracking();
final var indexName = "my-index";
final var document1Id = "doc1";
final var document1Body = "{\"fo$o\":\"bar\"}";
Expand Down Expand Up @@ -103,8 +102,7 @@ public void SingleSnapshot_SingleDocument() throws Exception {
@Disabled("https://opensearch.atlassian.net/browse/MIGRATIONS-1746")
public void SingleSnapshot_SingleDocument_Then_DeletedDocument() throws Exception {
// Setup
final var workCoordinationContext = WorkCoordinationTestContext.factory().noOtelTracking();
final var testContext = DocumentMigrationTestContext.factory(workCoordinationContext).noOtelTracking();
final var testContext = DocumentMigrationTestContext.factory().noOtelTracking();
final var indexName = "my-index-with-deleted-item";
final var document1Id = "doc1-going-to-be-deleted";
final var document1Body = "{\"foo\":\"bar\"}";
Expand Down Expand Up @@ -142,8 +140,7 @@ public void SingleSnapshot_SingleDocument_Then_DeletedDocument() throws Exceptio
@Disabled("https://opensearch.atlassian.net/browse/MIGRATIONS-1747")
public void SingleSnapshot_SingleDocument_Then_UpdateDocument() throws Exception {
// Setup
final var workCoordinationContext = WorkCoordinationTestContext.factory().noOtelTracking();
final var testContext = DocumentMigrationTestContext.factory(workCoordinationContext).noOtelTracking();
final var testContext = DocumentMigrationTestContext.factory().noOtelTracking();
final var indexName = "my-index-with-updated-item";
final var document1Id = "doc1-going-to-be-updated";
final var document1BodyOrginal = "{\"foo\":\"bar\"}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,16 @@
public class DocumentMigrationTestContext extends RootDocumentMigrationContext {
public final InMemoryInstrumentationBundle inMemoryInstrumentationBundle;

@Override
public WorkCoordinationTestContext getWorkCoordinationContext() {
return (WorkCoordinationTestContext) super.getWorkCoordinationContext();
}

public DocumentMigrationTestContext(
InMemoryInstrumentationBundle inMemoryInstrumentationBundle,
IContextTracker contextTracker,
WorkCoordinationTestContext workCoordinationContext
IContextTracker contextTracker
) {
super(inMemoryInstrumentationBundle.openTelemetrySdk, contextTracker, workCoordinationContext);
super(inMemoryInstrumentationBundle.openTelemetrySdk, contextTracker);
this.inMemoryInstrumentationBundle = inMemoryInstrumentationBundle;
}

public static TrackingTestContextFactory<DocumentMigrationTestContext> factory(
WorkCoordinationTestContext rootWorkCoordinationContext
) {
return new TrackingTestContextFactory<>(
(a, b) -> new DocumentMigrationTestContext(a, b, rootWorkCoordinationContext)
);
public static TrackingTestContextFactory<DocumentMigrationTestContext> factory() {
return new TrackingTestContextFactory<>(DocumentMigrationTestContext::new);
}

public IRfsContexts.IRequestContext createUnboundRequestContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.migrations.trafficcapture.netty.HeaderValueFilteringCapturePredicate;
import org.opensearch.migrations.trafficcapture.proxyserver.netty.BacksideConnectionPool;
import org.opensearch.migrations.trafficcapture.proxyserver.netty.NettyScanningHttpProxy;
import org.opensearch.migrations.utils.ProcessHelpers;
import org.opensearch.security.ssl.DefaultSecurityKeyStore;
import org.opensearch.security.ssl.util.SSLConfigConstants;

Expand Down Expand Up @@ -313,7 +314,8 @@ public static void main(String[] args) throws InterruptedException, IOException
var backsideUri = convertStringToUri(params.backsideUriString);

var rootContext = new RootCaptureContext(
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint, "capture"),
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint, "capture",
ProcessHelpers.getNodeInstanceName()),
new CompositeContextTracker(new ActiveContextTracker(), new ActiveContextTrackerByActivityType())
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

Expand All @@ -27,6 +28,7 @@
import org.opensearch.migrations.transform.RemovingAuthTransformerFactory;
import org.opensearch.migrations.transform.SigV4AuthTransformerFactory;
import org.opensearch.migrations.transform.StaticAuthTransformerFactory;
import org.opensearch.migrations.utils.ProcessHelpers;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
Expand Down Expand Up @@ -224,7 +226,8 @@ public static void main(String[] args) throws Exception {
var activeContextLogger = LoggerFactory.getLogger(ALL_ACTIVE_CONTEXTS_MONITOR_LOGGER);
var params = parseArgs(args);
URI uri;
System.err.println("Starting Traffic Replayer");
final var workerId = UUID.randomUUID().toString();
System.err.println("Starting Traffic Replayer with id=" + workerId);
System.err.println("Got args: " + String.join("; ", args));
try {
uri = new URI(params.targetUriString);
Expand Down Expand Up @@ -258,7 +261,9 @@ public static void main(String[] args) throws Exception {
);
var contextTrackers = new CompositeContextTracker(globalContextTracker, perContextTracker);
var topContext = new RootReplayerContext(
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint, "replay"),
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(params.otelCollectorEndpoint,
"replay",
ProcessHelpers.getNodeInstanceName()),
contextTrackers
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,9 @@ public class RootOtelContext implements IRootOtelContext {

public static OpenTelemetry initializeOpenTelemetryForCollector(
@NonNull String collectorEndpoint,
@NonNull String serviceName
@NonNull String serviceName,
String nodeName
) {
var serviceResource = Resource.getDefault()
.toBuilder()
.put(ResourceAttributes.SERVICE_NAME, serviceName)
.build();

final var spanProcessor = BatchSpanProcessor.builder(
OtlpGrpcSpanExporter.builder().setEndpoint(collectorEndpoint).setTimeout(2, TimeUnit.SECONDS).build()
).build();
Expand All @@ -56,10 +52,22 @@ public static OpenTelemetry initializeOpenTelemetryForCollector(

var openTelemetrySdk = OpenTelemetrySdk.builder()
.setTracerProvider(
SdkTracerProvider.builder().setResource(serviceResource).addSpanProcessor(spanProcessor).build()
SdkTracerProvider.builder()
.setResource(Resource.getDefault()
.toBuilder()
.put(ResourceAttributes.SERVICE_NAME, serviceName)
.put(ResourceAttributes.SERVICE_INSTANCE_ID, nodeName)
.build())
.addSpanProcessor(spanProcessor)
.build()
)
.setMeterProvider(
SdkMeterProvider.builder().setResource(serviceResource).registerMetricReader(metricReader).build()
SdkMeterProvider.builder()
.setResource(Resource.getDefault()
.toBuilder()
.put(ResourceAttributes.SERVICE_NAME, serviceName)
.build())
.registerMetricReader(metricReader).build()
)
.build();

Expand All @@ -81,10 +89,11 @@ public static OpenTelemetry initializeNoopOpenTelemetry() {
*/
public static OpenTelemetry initializeOpenTelemetryWithCollectorOrAsNoop(
String collectorEndpoint,
String serviceName
String serviceName,
String instanceName
) {
return Optional.ofNullable(collectorEndpoint)
.map(endpoint -> initializeOpenTelemetryForCollector(endpoint, serviceName))
.map(endpoint -> initializeOpenTelemetryForCollector(endpoint, serviceName, instanceName))
.orElseGet(() -> {
if (serviceName != null) {
log.atWarn()
Expand Down Expand Up @@ -114,12 +123,13 @@ public CommonMetricInstruments getMetrics() {
return null;
}

public RootOtelContext(String scopeName, IContextTracker contextTracker) {
this(scopeName, contextTracker, null);
public RootOtelContext(String scopeName, IContextTracker contextTracker, String instanceName) {
this(scopeName, contextTracker,
initializeOpenTelemetryWithCollectorOrAsNoop(null, null, instanceName));
}

public RootOtelContext(String scopeName, IContextTracker contextTracker, OpenTelemetry sdk) {
openTelemetryImpl = sdk != null ? sdk : initializeOpenTelemetryWithCollectorOrAsNoop(null, null);
public RootOtelContext(String scopeName, IContextTracker contextTracker, @NonNull OpenTelemetry sdk) {
openTelemetryImpl = sdk;
this.scopeName = scopeName;
this.contextTracker = contextTracker;
}
Expand Down
Loading

0 comments on commit 6a2d28e

Please sign in to comment.