Skip to content

Commit

Permalink
Add workflow init support (#2222)
Browse files Browse the repository at this point in the history
Add workflow Init support
  • Loading branch information
Quinn-With-Two-Ns authored Sep 17, 2024
1 parent 03f7182 commit 21d15ae
Show file tree
Hide file tree
Showing 15 changed files with 713 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@

import com.google.common.collect.ImmutableList;
import io.temporal.common.Experimental;
import io.temporal.internal.common.env.ReflectionUtils;
import java.lang.reflect.Constructor;
import java.util.*;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/**
* Rules:
Expand Down Expand Up @@ -69,6 +73,7 @@ public int hashCode() {
private final List<POJOWorkflowMethodMetadata> queryMethods;
private final List<POJOWorkflowMethodMetadata> updateMethods;
private final List<POJOWorkflowMethodMetadata> updateValidatorMethods;
private final Constructor<?> workflowInit;

/**
* Create POJOWorkflowImplMetadata for a workflow implementation class. The object must implement
Expand Down Expand Up @@ -161,6 +166,17 @@ private POJOWorkflowImplMetadata(Class<?> implClass, boolean listener) {
this.queryMethods = ImmutableList.copyOf(queryMethods.values());
this.updateMethods = ImmutableList.copyOf(updateMethods.values());
this.updateValidatorMethods = ImmutableList.copyOf(updateValidatorMethods.values());
if (!listener) {
this.workflowInit =
ReflectionUtils.getConstructor(
implClass,
this.workflowMethods.stream()
.map(POJOWorkflowMethodMetadata::getWorkflowMethod)
.collect(Collectors.toList()))
.orElse(null);
} else {
this.workflowInit = null;
}
}

/** List of workflow interfaces an object implements. */
Expand Down Expand Up @@ -194,4 +210,9 @@ public List<POJOWorkflowMethodMetadata> getUpdateMethods() {
public List<POJOWorkflowMethodMetadata> getUpdateValidatorMethods() {
return updateValidatorMethods;
}

@Experimental
public @Nullable Constructor<?> getWorkflowInit() {
return workflowInit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,69 @@
package io.temporal.internal.common.env;

import com.google.common.base.Joiner;
import io.temporal.workflow.WorkflowInit;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

public final class ReflectionUtils {
private ReflectionUtils() {}

public static Optional<Constructor<?>> getConstructor(
Class<?> clazz, List<Method> workflowMethod) {
// We iterate through all constructors to find the one annotated with @WorkflowInit
// and check if it has the same parameters as the workflow method.
// We check all declared constructors to find any constructors that are annotated with
// @WorkflowInit, but not public,
// to give a more informative error message.
Optional<Constructor<?>> workflowInit = Optional.empty();
Constructor<?> defaultConstructors = null;
for (Constructor<?> ctor : clazz.getDeclaredConstructors()) {
WorkflowInit wfInit = ctor.getAnnotation(WorkflowInit.class);
if (wfInit == null) {
if (ctor.getParameterCount() == 0 && Modifier.isPublic(ctor.getModifiers())) {
if (workflowInit.isPresent() || defaultConstructors != null) {
throw new IllegalArgumentException(
"Multiple constructors annotated with @WorkflowInit or a default constructor found: "
+ clazz.getName());
}
defaultConstructors = ctor;
continue;
}
continue;
}
if (workflowMethod.size() != 1) {
throw new IllegalArgumentException(
"Multiple interfaces implemented while using @WorkflowInit annotation. Only one is allowed: "
+ clazz.getName());
}
if (workflowInit.isPresent() || defaultConstructors != null) {
throw new IllegalArgumentException(
"Multiple constructors annotated with @WorkflowInit or a default constructor found: "
+ clazz.getName());
}
if (!Modifier.isPublic(ctor.getModifiers())) {
throw new IllegalArgumentException(
"Constructor with @WorkflowInit annotation must be public: " + clazz.getName());
}
if (!Arrays.equals(ctor.getParameterTypes(), workflowMethod.get(0).getParameterTypes())) {
throw new IllegalArgumentException(
"Constructor annotated with @WorkflowInit must have the same parameters as the workflow method: "
+ clazz.getName());
}
workflowInit = Optional.of(ctor);
}
if (!workflowInit.isPresent() && defaultConstructors == null) {
throw new IllegalArgumentException(
"No default constructor or constructor annotated with @WorkflowInit found: "
+ clazz.getName());
}
return workflowInit;
}

public static String getMethodNameForStackTraceCutoff(
Class<?> clazz, String methodName, Class<?>... parameterTypes) throws RuntimeException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@

final class DynamicSyncWorkflowDefinition implements SyncWorkflowDefinition {

private final Functions.Func<? extends DynamicWorkflow> factory;
private final Functions.Func1<EncodedValues, ? extends DynamicWorkflow> factory;
private final WorkerInterceptor[] workerInterceptors;
// don't pass it down to other classes, it's a "cached" instance for internal usage only
private final DataConverter dataConverterWithWorkflowContext;
private WorkflowInboundCallsInterceptor workflowInvoker;

public DynamicSyncWorkflowDefinition(
Functions.Func<? extends DynamicWorkflow> factory,
Functions.Func1<EncodedValues, ? extends DynamicWorkflow> factory,
WorkerInterceptor[] workerInterceptors,
DataConverter dataConverterWithWorkflowContext) {
this.factory = factory;
Expand All @@ -50,9 +50,9 @@ public DynamicSyncWorkflowDefinition(
}

@Override
public void initialize() {
public void initialize(Optional<Payloads> input) {
SyncWorkflowContext workflowContext = WorkflowInternal.getRootWorkflowContext();
workflowInvoker = new RootWorkflowInboundCallsInterceptor(workflowContext);
workflowInvoker = new RootWorkflowInboundCallsInterceptor(workflowContext, input);
for (WorkerInterceptor workerInterceptor : workerInterceptors) {
workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker);
}
Expand All @@ -71,15 +71,18 @@ public Optional<Payloads> execute(Header header, Optional<Payloads> input) {

class RootWorkflowInboundCallsInterceptor extends BaseRootWorkflowInboundCallsInterceptor {
private DynamicWorkflow workflow;
private Optional<Payloads> input;

public RootWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) {
public RootWorkflowInboundCallsInterceptor(
SyncWorkflowContext workflowContext, Optional<Payloads> input) {
super(workflowContext);
this.input = input;
}

@Override
public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
super.init(outboundCalls);
newInstance();
newInstance(input);
WorkflowInternal.registerListener(workflow);
}

Expand All @@ -89,11 +92,11 @@ public WorkflowOutput execute(WorkflowInput input) {
return new WorkflowOutput(result);
}

private void newInstance() {
private void newInstance(Optional<Payloads> input) {
if (workflow != null) {
throw new IllegalStateException("Already called");
}
workflow = factory.apply();
workflow = factory.apply(new EncodedValues(input, dataConverterWithWorkflowContext));
}
}
}
Loading

0 comments on commit 21d15ae

Please sign in to comment.