From 06e4509e0bc8a828f8529d8dfbf2e1ab2f060c54 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 10 Feb 2024 20:39:15 -0500 Subject: [PATCH 1/7] fixed the display issue for impersonateServiceAccount when using Beam Java SDK --- .../beam/runners/dataflow/DataflowRunner.java | 4 +++ .../worker/WorkerPipelineOptionsFactory.java | 6 ++++ .../extensions/gcp/options/GcpOptions.java | 1 - .../org/apache/beam/fn/harness/FnHarness.java | 30 +++++++++++++++++++ 4 files changed, 40 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 5d2b7d190227..98c81c0a5f01 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -270,6 +270,10 @@ public static DataflowRunner fromOptions(PipelineOptions options) { missing.add("appName"); } + LOG.info("ImpersonateServiceAccount is {}", dataflowOptions.getImpersonateServiceAccount()); + + dataflowOptions.setImpersonateServiceAccount(null); + if (Strings.isNullOrEmpty(dataflowOptions.getRegion()) && isServiceEndpoint(dataflowOptions.getDataflowEndpoint())) { missing.add("region"); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java index a3ec8933c331..1439681bc84f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java @@ -80,6 +80,12 @@ public static T createFromSystemPropert options.setWorkerPool(System.getProperty("worker_pool")); } + // Remove impersonate information from workers + // More details: https://cloud.google.com/dataflow/docs/reference/pipeline-options#security_and_networking + if (options.getImpersonateServiceAccount()!=null) { + options.setImpersonateServiceAccount(null); + } + return options; } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java index d4cff72c43f3..3c65f0fa748c 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -220,7 +220,6 @@ public List create(PipelineOptions options) { + " either a single service account as the impersonator, or a" + " comma-separated list of service accounts to create an" + " impersonation delegation chain.") - @JsonIgnore @Nullable String getImpersonateServiceAccount(); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 6d13b3704e16..417de65d6f29 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -17,6 +17,9 @@ */ package org.apache.beam.fn.harness; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -107,6 +110,28 @@ private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String des return apiServiceDescriptorBuilder.build(); } + + public static String removeNestedKey(String jsonString, String keyToRemove) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(jsonString); + + removeKeyRecursively(rootNode, keyToRemove); + + return mapper.writeValueAsString(rootNode); + } + + private static void removeKeyRecursively(JsonNode node, String keyToRemove) { + if (node.isObject()) { + node.fields().forEachRemaining(field -> { + if (field.getKey().equals(keyToRemove)) { + ((com.fasterxml.jackson.databind.node.ObjectNode) node).remove(keyToRemove); + } else { + removeKeyRecursively(field.getValue(), keyToRemove); + } + }); + } + } + public static void main(String[] args) throws Exception { main(System::getenv); } @@ -144,6 +169,11 @@ public static void main(Function environmentVarGetter) throws Ex } System.out.format("Pipeline options %s%n", pipelineOptionsJson); + + System.out.format("Pipeline options %s%n", pipelineOptionsJson); + pipelineOptionsJson = removeNestedKey(pipelineOptionsJson, "impersonateServiceAccount"); + System.out.format("New Pipeline options %s%n", pipelineOptionsJson); + PipelineOptions options = PipelineOptionsTranslation.fromJson(pipelineOptionsJson); Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor = From 8a4d7ece326149d52e3f222fcbd9ce22877d5bda Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 10 Feb 2024 20:39:15 -0500 Subject: [PATCH 2/7] fixed the display issue for impersonateServiceAccount when using Beam Java SDK --- .../beam/runners/dataflow/DataflowRunner.java | 4 +++ .../worker/WorkerPipelineOptionsFactory.java | 6 ++++ .../extensions/gcp/options/GcpOptions.java | 1 - .../org/apache/beam/fn/harness/FnHarness.java | 30 +++++++++++++++++++ 4 files changed, 40 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 5d2b7d190227..98c81c0a5f01 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -270,6 +270,10 @@ public static DataflowRunner fromOptions(PipelineOptions options) { missing.add("appName"); } + LOG.info("ImpersonateServiceAccount is {}", dataflowOptions.getImpersonateServiceAccount()); + + dataflowOptions.setImpersonateServiceAccount(null); + if (Strings.isNullOrEmpty(dataflowOptions.getRegion()) && isServiceEndpoint(dataflowOptions.getDataflowEndpoint())) { missing.add("region"); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java index a3ec8933c331..1439681bc84f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java @@ -80,6 +80,12 @@ public static T createFromSystemPropert options.setWorkerPool(System.getProperty("worker_pool")); } + // Remove impersonate information from workers + // More details: https://cloud.google.com/dataflow/docs/reference/pipeline-options#security_and_networking + if (options.getImpersonateServiceAccount()!=null) { + options.setImpersonateServiceAccount(null); + } + return options; } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java index d4cff72c43f3..3c65f0fa748c 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -220,7 +220,6 @@ public List create(PipelineOptions options) { + " either a single service account as the impersonator, or a" + " comma-separated list of service accounts to create an" + " impersonation delegation chain.") - @JsonIgnore @Nullable String getImpersonateServiceAccount(); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 6d13b3704e16..6bca89b0d0ca 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -17,6 +17,9 @@ */ package org.apache.beam.fn.harness; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -107,6 +110,30 @@ private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String des return apiServiceDescriptorBuilder.build(); } + + public static String removeNestedKey(String jsonString, String keyToRemove) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(jsonString); + + removeKeyRecursively(rootNode, keyToRemove); + + return mapper.writeValueAsString(rootNode); + } + + private static void removeKeyRecursively(JsonNode node, String keyToRemove) { + if (node.isObject()) { + Iterator> iterator = node.fields(); + while (iterator.hasNext()) { + Map.Entry field = iterator.next(); + if (field.getKey().equals(keyToRemove)) { + iterator.remove(); // Safe removal using Iterator + } else { + removeKeyRecursively(field.getValue(), keyToRemove); + } + } + } + } + public static void main(String[] args) throws Exception { main(System::getenv); } @@ -144,6 +171,9 @@ public static void main(Function environmentVarGetter) throws Ex } System.out.format("Pipeline options %s%n", pipelineOptionsJson); + pipelineOptionsJson = removeNestedKey(pipelineOptionsJson, "impersonateServiceAccount"); + System.out.format("New Pipeline options %s%n", pipelineOptionsJson); + PipelineOptions options = PipelineOptionsTranslation.fromJson(pipelineOptionsJson); Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor = From 069647d17bc5fd9afbbc3b692c24ff9e8e5547f9 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 10 Feb 2024 21:33:12 -0500 Subject: [PATCH 3/7] fixed the display issue for impersonateServiceAccount when using Beam Java SDK --- .../dataflow/worker/WorkerPipelineOptionsFactory.java | 5 +++-- .../src/main/java/org/apache/beam/fn/harness/FnHarness.java | 6 ++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java index 1439681bc84f..f0fd344159bf 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java @@ -81,8 +81,9 @@ public static T createFromSystemPropert } // Remove impersonate information from workers - // More details: https://cloud.google.com/dataflow/docs/reference/pipeline-options#security_and_networking - if (options.getImpersonateServiceAccount()!=null) { + // More details: + // https://cloud.google.com/dataflow/docs/reference/pipeline-options#security_and_networking + if (options.getImpersonateServiceAccount() != null) { options.setImpersonateServiceAccount(null); } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 0821ced1ecac..03c39748b2b0 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -19,19 +19,18 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; - import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; import java.util.EnumMap; +import java.util.Iterator; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.function.Function; -import java.util.Iterator; -import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.fn.harness.control.BeamFnControlClient; import org.apache.beam.fn.harness.control.ExecutionStateSampler; @@ -112,7 +111,6 @@ private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String des return apiServiceDescriptorBuilder.build(); } - public static String removeNestedKey(String jsonString, String keyToRemove) throws Exception { ObjectMapper mapper = new ObjectMapper(); JsonNode rootNode = mapper.readTree(jsonString); From 5fc234ea4a9a1afeb20be291e70de18b87770611 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 10 Feb 2024 21:33:12 -0500 Subject: [PATCH 4/7] fixed the display issue for impersonateServiceAccount when using Beam Java SDK --- .../dataflow/worker/WorkerPipelineOptionsFactory.java | 8 ++++++-- .../main/java/org/apache/beam/fn/harness/FnHarness.java | 6 ++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java index 1439681bc84f..c9df5d96eb6c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java @@ -81,8 +81,12 @@ public static T createFromSystemPropert } // Remove impersonate information from workers - // More details: https://cloud.google.com/dataflow/docs/reference/pipeline-options#security_and_networking - if (options.getImpersonateServiceAccount()!=null) { + // More details: + // https://cloud.google.com/dataflow/docs/reference/pipeline-options#security_and_networking + if (options.getImpersonateServiceAccount() != null) { + LOG.info( + "Remove the impersonateServiceAccount pipeline option ({}) when starting the Worker harness.", + options.getImpersonateServiceAccount()); options.setImpersonateServiceAccount(null); } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 0821ced1ecac..03c39748b2b0 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -19,19 +19,18 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; - import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; import java.util.EnumMap; +import java.util.Iterator; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.function.Function; -import java.util.Iterator; -import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.fn.harness.control.BeamFnControlClient; import org.apache.beam.fn.harness.control.ExecutionStateSampler; @@ -112,7 +111,6 @@ private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String des return apiServiceDescriptorBuilder.build(); } - public static String removeNestedKey(String jsonString, String keyToRemove) throws Exception { ObjectMapper mapper = new ObjectMapper(); JsonNode rootNode = mapper.readTree(jsonString); From 5ae496cae311865d8737eec131eae9c2abad36b9 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sun, 11 Feb 2024 15:22:01 -0500 Subject: [PATCH 5/7] Fixed the build --- .../beam/runners/dataflow/worker/StreamingDataflowWorker.java | 1 + sdks/java/harness/build.gradle | 1 + 2 files changed, 2 insertions(+) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index bca14923cfc9..f103892cfedc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -488,6 +488,7 @@ public static void main(String[] args) throws Exception { "%s cannot be main() class with beam_fn_api enabled", StreamingDataflowWorker.class.getSimpleName()); + LOG.info("Creating StreamingDataflowWorker from options: {}", options); StreamingDataflowWorker worker = StreamingDataflowWorker.fromOptions(options); // Use the MetricsLogger container which is used by BigQueryIO to periodically log process-wide diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle index 3c50f3c8edf2..e1f3d660bcab 100644 --- a/sdks/java/harness/build.gradle +++ b/sdks/java/harness/build.gradle @@ -31,6 +31,7 @@ dependencies { provided project(path: ":sdks:java:core", configuration: "shadow") provided project(path: ":sdks:java:transform-service:launcher") provided library.java.avro + provided library.java.jackson_databind provided library.java.joda_time provided library.java.slf4j_api provided library.java.vendored_grpc_1_60_1 From b416bbd875b7048cf4f7ea22580259aff6bfaed9 Mon Sep 17 00:00:00 2001 From: liferoad Date: Tue, 13 Feb 2024 09:42:26 -0500 Subject: [PATCH 6/7] Changed the info to debug --- .../beam/runners/dataflow/worker/StreamingDataflowWorker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index f103892cfedc..a933b9b55142 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -488,7 +488,7 @@ public static void main(String[] args) throws Exception { "%s cannot be main() class with beam_fn_api enabled", StreamingDataflowWorker.class.getSimpleName()); - LOG.info("Creating StreamingDataflowWorker from options: {}", options); + LOG.debug("Creating StreamingDataflowWorker from options: {}", options); StreamingDataflowWorker worker = StreamingDataflowWorker.fromOptions(options); // Use the MetricsLogger container which is used by BigQueryIO to periodically log process-wide From 4a2c5d074517e5557c1106c9761043626aeeecdf Mon Sep 17 00:00:00 2001 From: liferoad Date: Tue, 13 Feb 2024 11:41:33 -0500 Subject: [PATCH 7/7] Added the comment to improve how to handle impersonateServiceAccount --- .../src/main/java/org/apache/beam/fn/harness/FnHarness.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 03c39748b2b0..e22dd6d5b2ad 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -171,6 +171,7 @@ public static void main(Function environmentVarGetter) throws Ex } System.out.format("Pipeline options %s%n", pipelineOptionsJson); + // TODO: https://github.com/apache/beam/issues/30301 pipelineOptionsJson = removeNestedKey(pipelineOptionsJson, "impersonateServiceAccount"); PipelineOptions options = PipelineOptionsTranslation.fromJson(pipelineOptionsJson);