Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed the display issue for impersonateServiceAccount with Java #30283

Merged
merged 13 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ public static void main(String[] args) throws Exception {
"%s cannot be main() class with beam_fn_api enabled",
StreamingDataflowWorker.class.getSimpleName());

LOG.debug("Creating StreamingDataflowWorker from options: {}", options);
StreamingDataflowWorker worker = StreamingDataflowWorker.fromOptions(options);

// Use the MetricsLogger container which is used by BigQueryIO to periodically log process-wide
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ public static <T extends DataflowWorkerHarnessOptions> T createFromSystemPropert
options.setWorkerPool(System.getProperty("worker_pool"));
}

// Remove impersonate information from workers
// More details:
// https://cloud.google.com/dataflow/docs/reference/pipeline-options#security_and_networking
if (options.getImpersonateServiceAccount() != null) {
LOG.info(
"Remove the impersonateServiceAccount pipeline option ({}) when starting the Worker harness.",
options.getImpersonateServiceAccount());
options.setImpersonateServiceAccount(null);
}

return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ public List<String> create(PipelineOptions options) {
+ " either a single service account as the impersonator, or a"
+ " comma-separated list of service accounts to create an"
+ " impersonation delegation chain.")
@JsonIgnore
@Nullable
String getImpersonateServiceAccount();

Expand Down
1 change: 1 addition & 0 deletions sdks/java/harness/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
provided project(path: ":sdks:java:core", configuration: "shadow")
provided project(path: ":sdks:java:transform-service:launcher")
provided library.java.avro
provided library.java.jackson_databind
provided library.java.joda_time
provided library.java.slf4j_api
provided library.java.vendored_grpc_1_60_1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
*/
package org.apache.beam.fn.harness;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -107,6 +111,29 @@ private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String des
return apiServiceDescriptorBuilder.build();
}

public static String removeNestedKey(String jsonString, String keyToRemove) throws Exception {
liferoad marked this conversation as resolved.
Show resolved Hide resolved
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(jsonString);

removeKeyRecursively(rootNode, keyToRemove);

return mapper.writeValueAsString(rootNode);
}

private static void removeKeyRecursively(JsonNode node, String keyToRemove) {
if (node.isObject()) {
Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
while (iterator.hasNext()) {
Map.Entry<String, JsonNode> field = iterator.next();
if (field.getKey().equals(keyToRemove)) {
iterator.remove(); // Safe removal using Iterator
} else {
removeKeyRecursively(field.getValue(), keyToRemove);
}
}
}
}

public static void main(String[] args) throws Exception {
main(System::getenv);
}
Expand Down Expand Up @@ -144,6 +171,8 @@ public static void main(Function<String, String> environmentVarGetter) throws Ex
}

System.out.format("Pipeline options %s%n", pipelineOptionsJson);
pipelineOptionsJson = removeNestedKey(pipelineOptionsJson, "impersonateServiceAccount");
liferoad marked this conversation as resolved.
Show resolved Hide resolved

PipelineOptions options = PipelineOptionsTranslation.fromJson(pipelineOptionsJson);

Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
Expand Down
Loading