Skip to content

Commit

Permalink
Propagate trace information into user job environment
Browse files Browse the repository at this point in the history
Exported as GENIE_B3_TRACE_ID_HIGH, GENIE_B3_TRACE_ID_LOW, GENIE_B3_PARENT_SPAN_ID and GENIE_B3_SAMPLED environment variables
Can be used by downstream user job clients to continue to propagate the trace (e.g. to spark or presto, etc)
  • Loading branch information
tgianos committed Apr 12, 2021
1 parent 5c7c1a3 commit 02d8047
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package com.netflix.genie.agent.execution.process.impl;

import brave.Span;
import brave.Tracer;
import com.netflix.genie.agent.cli.logging.ConsoleLog;
import com.netflix.genie.agent.execution.exceptions.JobLaunchException;
import com.netflix.genie.agent.execution.process.JobProcessManager;
Expand All @@ -25,6 +27,8 @@
import com.netflix.genie.agent.utils.PathUtils;
import com.netflix.genie.common.dto.JobStatusMessages;
import com.netflix.genie.common.external.dtos.v4.JobStatus;
import com.netflix.genie.common.internal.tracing.brave.BraveTracePropagator;
import com.netflix.genie.common.internal.tracing.brave.BraveTracingComponents;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.scheduling.TaskScheduler;
Expand Down Expand Up @@ -60,15 +64,20 @@ public class JobProcessManagerImpl implements JobProcessManager {
private final AtomicReference<ScheduledFuture> timeoutKillThread = new AtomicReference<>();
private final AtomicReference<File> initFailedFileRef = new AtomicReference<>();
private final TaskScheduler taskScheduler;
private final Tracer tracer;
private final BraveTracePropagator tracePropagator;
private boolean isInteractiveMode;

/**
* Constructor.
*
* @param taskScheduler The {@link TaskScheduler} instance to use to run scheduled asynchronous tasks
* @param taskScheduler The {@link TaskScheduler} instance to use to run scheduled asynchronous tasks
* @param tracingComponents The {@link BraveTracingComponents} instance to use for propagating trace information
*/
public JobProcessManagerImpl(final TaskScheduler taskScheduler) {
public JobProcessManagerImpl(final TaskScheduler taskScheduler, final BraveTracingComponents tracingComponents) {
this.taskScheduler = taskScheduler;
this.tracer = tracingComponents.getTracer();
this.tracePropagator = tracingComponents.getTracePropagator();
}

/**
Expand All @@ -80,7 +89,8 @@ public void launchProcess(
final File jobScript,
final boolean interactive,
@Nullable final Integer timeout,
final boolean launchInJobDirectory) throws JobLaunchException {
final boolean launchInJobDirectory
) throws JobLaunchException {
if (!this.launched.compareAndSet(false, true)) {
throw new IllegalStateException("Job already launched");
}
Expand Down Expand Up @@ -127,6 +137,11 @@ public void launchProcess(
processBuilder.redirectOutput(PathUtils.jobStdOutPath(jobDirectory).toFile());
}

final Span currentSpan = this.tracer.currentSpan();
if (currentSpan != null) {
processBuilder.environment().putAll(this.tracePropagator.injectForJob(currentSpan.context()));
}

if (this.killed.get()) {
log.info("Job aborted, skipping launch");
return;
Expand Down Expand Up @@ -183,23 +198,6 @@ public void kill(final KillService.KillSource source) {
log.error("Failed to kill job process");
}

private void gracefullyKill(final Process process) throws Exception {
final Instant graceKillEnd = Instant.now().plusSeconds(KILL_WAIT_SECS);
process.destroy();
while (process.isAlive() && Instant.now().isBefore(graceKillEnd)) {
process.waitFor(KILL_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
}

private void forcefullyKill(final Process process) throws Exception {
final Instant forceKillEnd = Instant.now().plusSeconds(KILL_WAIT_SECS);
// In Java8, this is exactly destroy(). However, this behavior can be changed in future java.
process.destroyForcibly();
while (process.isAlive() && Instant.now().isBefore(forceKillEnd)) {
process.waitFor(KILL_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -284,6 +282,23 @@ public JobProcessResult waitFor() throws InterruptedException {
return new JobProcessResult.Builder(JobStatus.FAILED, statusMessage, exitCode).build();
}

private void gracefullyKill(final Process process) throws Exception {
final Instant graceKillEnd = Instant.now().plusSeconds(KILL_WAIT_SECS);
process.destroy();
while (process.isAlive() && Instant.now().isBefore(graceKillEnd)) {
process.waitFor(KILL_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
}

private void forcefullyKill(final Process process) throws Exception {
final Instant forceKillEnd = Instant.now().plusSeconds(KILL_WAIT_SECS);
// In Java8, this is exactly destroy(). However, this behavior can be changed in future java.
process.destroyForcibly();
while (process.isAlive() && Instant.now().isBefore(forceKillEnd)) {
process.waitFor(KILL_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
}

/* TODO: HACK, Process does not expose PID in Java 8 API */
private long getPid(final Process process) {
long pid = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.netflix.genie.agent.execution.process.JobProcessManager;
import com.netflix.genie.agent.execution.process.impl.JobProcessManagerImpl;
import com.netflix.genie.common.internal.tracing.brave.BraveTracingComponents;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
Expand All @@ -38,15 +39,17 @@ public class ProcessAutoConfiguration {
/**
* Provide a lazy {@link JobProcessManager} bean if one hasn't already been defined.
*
* @param taskScheduler The {@link TaskScheduler} instance to use
* @param taskScheduler The {@link TaskScheduler} instance to use
* @param tracingComponents The {@link BraveTracingComponents} instance to use
* @return A {@link JobProcessManagerImpl} instance
*/
@Bean
@Lazy
@ConditionalOnMissingBean(JobProcessManager.class)
public JobProcessManagerImpl jobProcessManager(
@Qualifier("sharedAgentTaskScheduler") final TaskScheduler taskScheduler
@Qualifier("sharedAgentTaskScheduler") final TaskScheduler taskScheduler,
final BraveTracingComponents tracingComponents
) {
return new JobProcessManagerImpl(taskScheduler);
return new JobProcessManagerImpl(taskScheduler, tracingComponents);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@
*/
package com.netflix.genie.agent.execution.process.impl

import brave.Span
import brave.Tracer
import brave.propagation.TraceContext
import com.netflix.genie.agent.execution.exceptions.JobLaunchException
import com.netflix.genie.agent.execution.process.JobProcessManager
import com.netflix.genie.agent.execution.process.JobProcessResult
import com.netflix.genie.agent.execution.services.KillService
import com.netflix.genie.agent.utils.PathUtils
import com.netflix.genie.common.dto.JobStatusMessages
import com.netflix.genie.common.external.dtos.v4.JobStatus
import com.netflix.genie.common.internal.tracing.brave.BraveTagAdapter
import com.netflix.genie.common.internal.tracing.brave.BraveTracePropagator
import com.netflix.genie.common.internal.tracing.brave.BraveTracingCleanup
import com.netflix.genie.common.internal.tracing.brave.BraveTracingComponents
import org.springframework.core.io.ClassPathResource
import org.springframework.scheduling.TaskScheduler
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
Expand All @@ -49,14 +56,36 @@ class JobProcessManagerImplSpec extends Specification {
File stdErr
TaskScheduler scheduler
JobProcessManager manager
Tracer tracer
BraveTracePropagator tracePropagator
Span span
TraceContext traceContext

def setup() {
this.stdOut = PathUtils.jobStdOutPath(temporaryFolder.toFile()).toFile()
this.stdErr = PathUtils.jobStdErrPath(temporaryFolder.toFile()).toFile()
Files.createDirectories(this.stdOut.getParentFile().toPath())
Files.createDirectories(this.stdErr.getParentFile().toPath())
this.scheduler = Mock(TaskScheduler)
this.manager = new JobProcessManagerImpl(this.scheduler)
this.tracer = Mock(Tracer)
this.tracePropagator = Mock(BraveTracePropagator)
UUID uuid = UUID.randomUUID()
this.traceContext = TraceContext.newBuilder()
.traceId(uuid.getLeastSignificantBits())
.traceIdHigh(uuid.getMostSignificantBits())
.spanId(UUID.randomUUID().getLeastSignificantBits())
.sampled(true)
.build()
this.span = Mock(Span)
this.manager = new JobProcessManagerImpl(
this.scheduler,
new BraveTracingComponents(
this.tracer,
this.tracePropagator,
Mock(BraveTracingCleanup),
Mock(BraveTagAdapter)
)
)
}

def cleanup() {
Expand Down Expand Up @@ -84,6 +113,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)

when:
Expand Down Expand Up @@ -125,6 +157,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> ["HI": "bye"]
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)

when:
Expand Down Expand Up @@ -157,6 +192,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> null
0 * this.span.context()
0 * this.tracePropagator.injectForJob(this.traceContext)
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)

when:
Expand Down Expand Up @@ -192,6 +230,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)

when:
Expand All @@ -218,6 +259,7 @@ class JobProcessManagerImplSpec extends Specification {

then:
thrown(JobLaunchException)
0 * this.tracer.currentSpan()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
}

Expand Down Expand Up @@ -248,6 +290,7 @@ class JobProcessManagerImplSpec extends Specification {

then:
thrown(JobLaunchException)
0 * this.tracer.currentSpan()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
}

Expand All @@ -263,6 +306,7 @@ class JobProcessManagerImplSpec extends Specification {

then:
thrown(JobLaunchException)
0 * this.tracer.currentSpan()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
}

Expand All @@ -280,6 +324,7 @@ class JobProcessManagerImplSpec extends Specification {

then:
thrown(JobLaunchException)
0 * this.tracer.currentSpan()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
}

Expand All @@ -298,6 +343,7 @@ class JobProcessManagerImplSpec extends Specification {

then:
thrown(JobLaunchException)
0 * this.tracer.currentSpan()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
}

Expand All @@ -319,6 +365,9 @@ class JobProcessManagerImplSpec extends Specification {

then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
1 * this.scheduler.schedule(_ as Runnable, _ as Instant) >> future

when:
Expand Down Expand Up @@ -356,6 +405,9 @@ class JobProcessManagerImplSpec extends Specification {
then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
when:
Expand Down Expand Up @@ -401,6 +453,9 @@ class JobProcessManagerImplSpec extends Specification {
then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
when:
Expand All @@ -420,9 +475,9 @@ class JobProcessManagerImplSpec extends Specification {
result.getExitCode() == 143
where:
interactive | expectedStatusMessage
true | JobStatusMessages.JOB_KILLED_BY_USER
false | JobStatusMessages.JOB_KILLED_BY_SYSTEM
interactive | expectedStatusMessage
true | JobStatusMessages.JOB_KILLED_BY_USER
false | JobStatusMessages.JOB_KILLED_BY_SYSTEM
}
def "Kill completed process"() {
Expand All @@ -442,6 +497,9 @@ class JobProcessManagerImplSpec extends Specification {
then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
// Wait until the process actually completes by checking the existence of the file
Expand Down Expand Up @@ -492,6 +550,9 @@ class JobProcessManagerImplSpec extends Specification {
then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
when:
Expand Down Expand Up @@ -523,6 +584,9 @@ class JobProcessManagerImplSpec extends Specification {
then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
when:
Expand All @@ -536,6 +600,7 @@ class JobProcessManagerImplSpec extends Specification {
then:
thrown(IllegalStateException)
0 * this.tracer.currentSpan()
0 * this.scheduler.schedule(_ as Runnable, _ as Instant)
}
Expand All @@ -558,7 +623,15 @@ class JobProcessManagerImplSpec extends Specification {
threadPoolScheduler.setThreadNamePrefix("job-process-manager-impl-spec-")
threadPoolScheduler.setWaitForTasksToCompleteOnShutdown(false)
threadPoolScheduler.initialize()
def realManager = new JobProcessManagerImpl(threadPoolScheduler)
def realManager = new JobProcessManagerImpl(
threadPoolScheduler,
new BraveTracingComponents(
this.tracer,
this.tracePropagator,
Mock(BraveTracingCleanup),
Mock(BraveTagAdapter)
)
)
when:
realManager.launchProcess(
Expand All @@ -571,6 +644,9 @@ class JobProcessManagerImplSpec extends Specification {
then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
when:
def result = realManager.waitFor()
Expand Down Expand Up @@ -614,6 +690,9 @@ class JobProcessManagerImplSpec extends Specification {
then:
noExceptionThrown()
1 * this.tracer.currentSpan() >> this.span
1 * this.span.context() >> this.traceContext
1 * this.tracePropagator.injectForJob(this.traceContext) >> new HashMap<>()
1 * this.scheduler.schedule(_ as Runnable, _ as Instant) >> future
// Wait until the process actually starts by checking the existence of the runfile
Expand Down
Loading

0 comments on commit 02d8047

Please sign in to comment.