Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/ahmedabu98/beam into iceb…
Browse files Browse the repository at this point in the history
…erg_managed_catalogs
  • Loading branch information
ahmedabu98 committed Aug 7, 2024
2 parents 0483ffa + 07e692b commit c959546
Show file tree
Hide file tree
Showing 169 changed files with 3,726 additions and 1,243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ env:
INFLUXDB_USER: ${{ secrets.INFLUXDB_USER }}
INFLUXDB_USER_PASSWORD: ${{ secrets.INFLUXDB_USER_PASSWORD }}
GCLOUD_ZONE: us-central1-a
CLUSTER_NAME: beam-loadtests-python-cogbk-flink-batch-${{ github.run_id }}
CLUSTER_NAME: beam-loadtests-py-cogbk-flink-batch-${{ github.run_id }}
GCS_BUCKET: gs://beam-flink-cluster
FLINK_DOWNLOAD_URL: https://archive.apache.org/dist/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz
HADOOP_DOWNLOAD_URL: https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ env:
INFLUXDB_USER: ${{ secrets.INFLUXDB_USER }}
INFLUXDB_USER_PASSWORD: ${{ secrets.INFLUXDB_USER_PASSWORD }}
GCLOUD_ZONE: us-central1-a
CLUSTER_NAME: beam-loadtests-python-pardo-flink-batch-${{ github.run_id }}
CLUSTER_NAME: beam-loadtests-py-pardo-flink-batch-${{ github.run_id }}
GCS_BUCKET: gs://beam-flink-cluster
FLINK_DOWNLOAD_URL: https://archive.apache.org/dist/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz
HADOOP_DOWNLOAD_URL: https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ env:
INFLUXDB_USER: ${{ secrets.INFLUXDB_USER }}
INFLUXDB_USER_PASSWORD: ${{ secrets.INFLUXDB_USER_PASSWORD }}
GCLOUD_ZONE: us-central1-a
CLUSTER_NAME: beam-loadtests-python-pardo-flink-stream-${{ github.run_id }}
CLUSTER_NAME: beam-loadtests-py-pardo-flink-stream-${{ github.run_id }}
GCS_BUCKET: gs://beam-flink-cluster
FLINK_DOWNLOAD_URL: https://archive.apache.org/dist/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz
HADOOP_DOWNLOAD_URL: https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
Expand Down
23 changes: 4 additions & 19 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Go SDK Minimum Go Version updated to 1.21 ([#32092](https://github.com/apache/beam/pull/32092)).
* Updated Go protobuf package to new version (Go) ([#21515](https://github.com/apache/beam/issues/21515)).

## Breaking Changes

Expand All @@ -88,15 +90,10 @@

* ([#X](https://github.com/apache/beam/issues/X)).

# [2.58.0] - Unreleased
# [2.58.0] - 2024-08-06

## Highlights

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).

## I/Os

* Support for [Solace](https://solace.com/) source (`SolaceIO.Read`) added (Java) ([#31440](https://github.com/apache/beam/issues/31440)).

## New Features / Improvements
Expand All @@ -110,25 +107,13 @@

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* [IcebergIO] IcebergCatalogConfig was changed to support specifying catalog properties in a key-store fashion ([#31726](https://github.com/apache/beam/pull/31726))
* [SpannerIO] Added validation that query and table cannot be specified at the same time for SpannerIO.read(). Previously withQuery overrides withTable, if set ([#24956](https://github.com/apache/beam/issues/24956)).

## Deprecations

* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)).

## Bugfixes

* [BigQueryIO] Fixed a bug in batch Storage Write API that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710))
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).

## Known Issues

* ([#X](https://github.com/apache/beam/issues/X)).
* Fixed a logging issue where Python worker dependency installation logs sometimes were not emitted in a timely manner ([#31977](https://github.com/apache/beam/pull/31977))

# [2.57.0] - 2024-06-26

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.examples.complete.kafkatopubsub.transforms;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.apache.beam.examples.complete.kafkatopubsub.avro.AvroDataClass;
Expand All @@ -37,7 +38,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.common.serialization.StringDeserializer;

Expand Down Expand Up @@ -120,7 +120,8 @@ public PDone expand(PCollection<String> input) {
MapElements.into(TypeDescriptor.of(PubsubMessage.class))
.via(
(String json) ->
new PubsubMessage(json.getBytes(Charsets.UTF_8), ImmutableMap.of())))
new PubsubMessage(
json.getBytes(StandardCharsets.UTF_8), ImmutableMap.of())))
.apply(
"writePubsubMessagesToPubSub", PubsubIO.writeMessages().to(options.getOutputTopic()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -50,12 +49,16 @@ public static StringSetData empty() {
* Combines this {@link StringSetData} with other, both original StringSetData are left intact.
*/
public StringSetData combine(StringSetData other) {
// do not merge other on this as this StringSetData might hold an immutable set like in case
// of EmptyStringSetData
Set<String> combined = new HashSet<>();
combined.addAll(this.stringSet());
combined.addAll(other.stringSet());
return StringSetData.create(combined);
if (this.stringSet().isEmpty()) {
return other;
} else if (other.stringSet().isEmpty()) {
return this;
} else {
ImmutableSet.Builder<String> combined = ImmutableSet.builder();
combined.addAll(this.stringSet());
combined.addAll(other.stringSet());
return StringSetData.create(combined.build());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -60,7 +60,8 @@ public void run(SourceContext<WindowedValue<byte[]>> ctx) {
while (running && (messageCount == 0 || count < subtaskCount)) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(
WindowedValue.valueInGlobalWindow(String.valueOf(count).getBytes(Charsets.UTF_8)));
WindowedValue.valueInGlobalWindow(
String.valueOf(count).getBytes(StandardCharsets.UTF_8)));
count++;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import java.nio.charset.StandardCharsets;
import org.junit.Test;

/** Tests for {@link FlinkJobServerDriver}. */
Expand Down Expand Up @@ -104,7 +104,7 @@ public void testJobServerDriver() throws Exception {
boolean success = false;
while (!success) {
newErr.flush();
String output = baos.toString(Charsets.UTF_8.name());
String output = baos.toString(StandardCharsets.UTF_8.name());
if (output.contains("JobService started on localhost:")
&& output.contains("ArtifactStagingService started on localhost:")
&& output.contains("ExpansionService started on localhost:")) {
Expand All @@ -114,7 +114,8 @@ public void testJobServerDriver() throws Exception {
}
}
assertThat(driver.getJobServerUrl(), is(not(nullValue())));
assertThat(baos.toString(Charsets.UTF_8.name()), containsString(driver.getJobServerUrl()));
assertThat(
baos.toString(StandardCharsets.UTF_8.name()), containsString(driver.getJobServerUrl()));
assertThat(driverThread.isAlive(), is(true));
} catch (Throwable t) {
// restore to print exception
Expand Down Expand Up @@ -149,7 +150,7 @@ public void testJobServerDriverWithoutExpansionService() throws Exception {
boolean success = false;
while (!success) {
newErr.flush();
String output = baos.toString(Charsets.UTF_8.name());
String output = baos.toString(StandardCharsets.UTF_8.name());
if (output.contains("JobService started on localhost:")
&& output.contains("ArtifactStagingService started on localhost:")) {
success = true;
Expand All @@ -161,7 +162,8 @@ public void testJobServerDriverWithoutExpansionService() throws Exception {
}
}
assertThat(driver.getJobServerUrl(), is(not(nullValue())));
assertThat(baos.toString(Charsets.UTF_8.name()), containsString(driver.getJobServerUrl()));
assertThat(
baos.toString(StandardCharsets.UTF_8.name()), containsString(driver.getJobServerUrl()));
assertThat(driverThread.isAlive(), is(true));
} catch (Throwable t) {
// restore to print exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -56,7 +57,6 @@
import org.apache.beam.sdk.util.construction.PTransformMatchers;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.resources.PipelineResources;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.RemoteEnvironment;
Expand Down Expand Up @@ -373,7 +373,7 @@ public void processElement(ProcessContext ctx) {
}
replacementStdErr.flush();
assertThat(
new String(byteArrayOutputStream.toByteArray(), Charsets.UTF_8),
new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8),
containsString(
"UnboundedSources present which rely on checkpointing, but checkpointing is disabled."));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.File;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.security.Permission;
import java.util.Collection;
Expand All @@ -30,7 +31,6 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.construction.resources.PipelineResources;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -200,7 +200,7 @@ private static void prepareEnvironment() throws Exception {
RestOptions.PORT.key(),
flinkCluster.getRestPort());

Files.write(file.toPath(), config.getBytes(Charsets.UTF_8));
Files.write(file.toPath(), config.getBytes(StandardCharsets.UTF_8));

// Create a new environment with the location of the Flink config for CliFrontend
ImmutableMap<String, String> newEnv =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -818,7 +817,7 @@ private void testEnsureDeferredStateCleanupTimerFiring(boolean withCheckpointing
BagState<ByteString> state = // State from the SDK Harness is stored as ByteStrings
operator.keyedStateInternals.state(
stateNamespace, StateTags.bag(stateId, ByteStringCoder.of()));
state.add(ByteString.copyFrom("userstate".getBytes(Charsets.UTF_8)));
state.add(ByteString.copyFrom("userstate".getBytes(StandardCharsets.UTF_8)));
assertThat(testHarness.numKeyedStateEntries(), is(1));

// user timer that fires after the end of the window and after state cleanup
Expand Down Expand Up @@ -966,7 +965,7 @@ public void testEnsureStateCleanupOnFinalWatermark() throws Exception {
BagState<ByteString> state = // State from the SDK Harness is stored as ByteStrings
operator.keyedStateInternals.state(
stateNamespace, StateTags.bag(stateId, ByteStringCoder.of()));
state.add(ByteString.copyFrom("userstate".getBytes(Charsets.UTF_8)));
state.add(ByteString.copyFrom("userstate".getBytes(StandardCharsets.UTF_8)));
// No timers have been set for cleanup
assertThat(testHarness.numEventTimeTimers(), is(0));
// State has been created
Expand All @@ -988,8 +987,8 @@ public void testCacheTokenHandling() throws Exception {
new ExecutableStageDoFnOperator.BagUserStateFactory<>(
test, stateBackend, NoopLock.get(), null);

ByteString key1 = ByteString.copyFrom("key1", Charsets.UTF_8);
ByteString key2 = ByteString.copyFrom("key2", Charsets.UTF_8);
ByteString key1 = ByteString.copyFrom("key1", StandardCharsets.UTF_8);
ByteString key2 = ByteString.copyFrom("key2", StandardCharsets.UTF_8);

Map<String, Map<String, ProcessBundleDescriptors.BagUserStateSpec>> userStateMapMock =
Mockito.mock(Map.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import static org.hamcrest.core.Is.is;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.junit.Test;

/** Tests for {@link FlinkKeyUtils}. */
Expand Down Expand Up @@ -66,7 +66,7 @@ public void testCoderContext() throws Exception {
@Test
@SuppressWarnings("ByteBufferBackingArray")
public void testFromEncodedKey() {
ByteString input = ByteString.copyFrom("hello world".getBytes(Charsets.UTF_8));
ByteString input = ByteString.copyFrom("hello world".getBytes(StandardCharsets.UTF_8));
ByteBuffer encodedKey = FlinkKeyUtils.fromEncodedKey(input);
assertThat(encodedKey.array(), is(input.toByteArray()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.Step;
import com.google.api.services.dataflow.model.WorkerPool;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -110,7 +111,6 @@
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.commons.codec.EncoderException;
import org.apache.commons.codec.net.PercentCodec;
Expand Down Expand Up @@ -618,7 +618,7 @@ static class StepTranslator implements StepTranslationContext {
// For compatibility with URL encoding implementations that represent space as +,
// always encode + as %2b even though we don't encode space as +.
private final PercentCodec percentCodec =
new PercentCodec("+".getBytes(Charsets.US_ASCII), false);
new PercentCodec("+".getBytes(StandardCharsets.US_ASCII), false);

private StepTranslator(Translator translator, Step step) {
this.translator = translator;
Expand Down Expand Up @@ -764,7 +764,8 @@ private void addResourceHints(ResourceHints hints) {
try {
urlEncodedHints.put(
entry.getKey(),
new String(percentCodec.encode(entry.getValue().toBytes()), Charsets.US_ASCII));
new String(
percentCodec.encode(entry.getValue().toBytes()), StandardCharsets.US_ASCII));
} catch (EncoderException e) {
// Should never happen.
throw new RuntimeException("Invalid value for resource hint: " + entry.getKey(), e);
Expand Down
Loading

0 comments on commit c959546

Please sign in to comment.