Skip to content

Commit

Permalink
Add getMetricsScope interceptor (temporalio#2224)
Browse files Browse the repository at this point in the history
Add getMetricsScope interceptor
  • Loading branch information
Quinn-With-Two-Ns authored Sep 23, 2024
1 parent 97322ec commit 0e58687
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.temporal.common.interceptors;

import com.uber.m3.tally.Scope;
import io.temporal.activity.ActivityOptions;
import io.temporal.activity.LocalActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
Expand Down Expand Up @@ -621,6 +622,9 @@ <R> R mutableSideEffect(

void upsertMemo(Map<String, Object> memo);

/** Intercepts call to get the metric scope in a workflow. */
Scope getMetricsScope();

/**
* Intercepts creation of a workflow child thread.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.temporal.common.interceptors;

import com.uber.m3.tally.Scope;
import io.temporal.common.SearchAttributeUpdate;
import io.temporal.workflow.Functions.Func;
import io.temporal.workflow.Promise;
Expand Down Expand Up @@ -167,6 +168,11 @@ public void upsertMemo(Map<String, Object> memo) {
next.upsertMemo(memo);
}

@Override
public Scope getMetricsScope() {
return next.getMetricsScope();
}

@Override
public Object newChildThread(Runnable runnable, boolean detached, String name) {
return next.newChildThread(runnable, detached, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,7 @@ public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
return new CancelWorkflowOutput(result);
}

@Override
public Scope getMetricsScope() {
return replayContext.getMetricsScope();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ public static Optional<UpdateInfo> getCurrentUpdateInfo() {
}

public static Scope getMetricsScope() {
return getRootWorkflowContext().getMetricsScope();
return getWorkflowOutboundInterceptor().getMetricsScope();
}

private static boolean isLoggingEnabledInReplay() {
Expand Down
69 changes: 69 additions & 0 deletions temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import io.temporal.workflow.shared.TestWorkflows.ReceiveSignalObjectWorkflow;
import io.temporal.workflow.shared.TestWorkflows.TestWorkflowReturnString;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
Expand All @@ -77,6 +78,7 @@ public class MetricsTest {

private static final long REPORTING_FLUSH_TIME = 600;
private static final String TASK_QUEUE = "metrics_test";
private static final String TEST_TAG = "test_tag";
private TestWorkflowEnvironment testEnvironment;
private TestStatsReporter reporter;

Expand Down Expand Up @@ -243,6 +245,44 @@ public void testWorkflowMetrics() throws InterruptedException {
reporter.assertCounter(TEMPORAL_REQUEST, workflowTaskCompletionTags, 4);
}

@Test
public void testWorkflowMetricsInterceptor() throws InterruptedException {
setUp(
WorkerFactoryOptions.getDefaultInstance().toBuilder()
.setWorkerInterceptors(new WorkerInterceptor())
.build());

Worker worker = testEnvironment.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(
TestCustomMetricsInWorkflow.class, TestMetricsInChildWorkflow.class);
worker.registerActivitiesImplementations(new TestActivityImpl());
testEnvironment.start();

WorkflowClient workflowClient = testEnvironment.getWorkflowClient();
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowRunTimeout(Duration.ofSeconds(1000))
.setTaskQueue(TASK_QUEUE)
.build();
NoArgsWorkflow workflow = workflowClient.newWorkflowStub(NoArgsWorkflow.class, options);
workflow.execute();

Thread.sleep(REPORTING_FLUSH_TIME);

Map<String, String> workflowTags = new LinkedHashMap<>(TAGS_TASK_QUEUE);
// Assert the interceptor added the extra tag
workflowTags.put(TEST_TAG, NAMESPACE);

workflowTags.put(MetricsTag.WORKFLOW_TYPE, "NoArgsWorkflow");
reporter.assertCounter("test_started", workflowTags, 1);
reporter.assertCounter("test_done", workflowTags, 1);

workflowTags.put(MetricsTag.WORKFLOW_TYPE, "TestChildWorkflow");
reporter.assertCounter("test_child_started", workflowTags, 1);
reporter.assertCounter("test_child_done", workflowTags, 1);
reporter.assertTimerMinDuration("test_timer", workflowTags, Duration.ofSeconds(3));
}

@Test
public void testCorruptedSignalMetrics() throws InterruptedException {
setUp(
Expand Down Expand Up @@ -600,4 +640,33 @@ public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
overrideArgs.apply(args)));
}
}

private static class WorkerInterceptor extends WorkerInterceptorBase {
@Override
public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) {
return new WorkflowInboundCallsInterceptorBase(next) {
@Override
public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
next.init(new OutboundCallsInterceptor(outboundCalls));
}
};
}
}

private static class OutboundCallsInterceptor extends WorkflowOutboundCallsInterceptorBase {
WorkflowOutboundCallsInterceptor next;

public OutboundCallsInterceptor(WorkflowOutboundCallsInterceptor next) {
super(next);
this.next = next;
}

@Override
public Scope getMetricsScope() {
return next.getMetricsScope()
.tagged(
Collections.singletonMap(
TEST_TAG, String.valueOf(Workflow.getInfo().getNamespace())));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,11 @@ public void upsertMemo(Map<String, Object> memo) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public Scope getMetricsScope() {
throw new UnsupportedOperationException("not implemented");
}

@Override
public Object newChildThread(Runnable runnable, boolean detached, String name) {
throw new UnsupportedOperationException("not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import com.uber.m3.tally.Scope;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.client.ActivityCompletionException;
import io.temporal.common.SearchAttributeUpdate;
Expand Down Expand Up @@ -398,6 +399,14 @@ public void upsertMemo(Map<String, Object> memo) {
next.upsertMemo(memo);
}

@Override
public Scope getMetricsScope() {
if (!WorkflowUnsafe.isReplaying()) {
trace.add("getMetricsScope");
}
return next.getMetricsScope();
}

@Override
public Object newChildThread(Runnable runnable, boolean detached, String name) {
if (!WorkflowUnsafe.isReplaying()) {
Expand Down

0 comments on commit 0e58687

Please sign in to comment.