Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use vendor guava 32.1.2-jre #27895

Merged
merged 2 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ private String getFilePath(String filePath) {
public void testDebuggingWordCount() throws Exception {
File inputFile = tmpFolder.newFile();
File outputFile = tmpFolder.newFile();
Files.write(
"stomach secret Flourish message Flourish here Flourish",
inputFile,
StandardCharsets.UTF_8);
Files.asCharSink(inputFile, StandardCharsets.UTF_8)
.write("stomach secret Flourish message Flourish here Flourish");
WordCountOptions options = TestPipeline.testingPipelineOptions().as(WordCountOptions.class);
options.setInputFile(getFilePath(inputFile.getAbsolutePath()));
options.setOutput(getFilePath(outputFile.getAbsolutePath()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand Down Expand Up @@ -120,7 +121,8 @@ public RunnerApi.PTransform translate(
entry
.getValue()
.toBuilder()
.setCoderId(coderRenameMap.getOrDefault(coderId, coderId))
.setCoderId(
Preconditions.checkNotNull(coderRenameMap.getOrDefault(coderId, coderId)))
.build());
}
}
Expand All @@ -141,12 +143,14 @@ public RunnerApi.PTransform translate(
for (Map.Entry<String, String> inputEntry : proto.getInputsMap().entrySet()) {
transformBuilder.putInputs(
inputEntry.getKey(),
pColRenameMap.getOrDefault(inputEntry.getValue(), inputEntry.getValue()));
Preconditions.checkNotNull(
pColRenameMap.getOrDefault(inputEntry.getValue(), inputEntry.getValue())));
}
for (Map.Entry<String, String> outputEntry : proto.getOutputsMap().entrySet()) {
transformBuilder.putOutputs(
outputEntry.getKey(),
pColRenameMap.getOrDefault(outputEntry.getValue(), outputEntry.getValue()));
Preconditions.checkNotNull(
pColRenameMap.getOrDefault(outputEntry.getValue(), outputEntry.getValue())));
}
mergingComponentsBuilder.putTransforms(entry.getKey(), transformBuilder.build());
}
Expand All @@ -161,7 +165,8 @@ public RunnerApi.PTransform translate(
for (Map.Entry<String, String> outputEntry : expandedTransform.getOutputsMap().entrySet()) {
rootTransformBuilder.putOutputs(
outputEntry.getKey(),
pColRenameMap.getOrDefault(outputEntry.getValue(), outputEntry.getValue()));
Preconditions.checkNotNull(
pColRenameMap.getOrDefault(outputEntry.getValue(), outputEntry.getValue())));
}
components.mergeFrom(mergingComponentsBuilder.build(), null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.direct;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -113,9 +115,9 @@ private class TeardownRemovedFnListener implements RemovalListener<Thread, DoFn<
@Override
public void onRemoval(RemovalNotification<Thread, DoFn<?, ?>> notification) {
try {
DoFnInvokers.invokerFor(notification.getValue()).invokeTeardown();
DoFnInvokers.invokerFor(checkNotNull(notification.getValue())).invokeTeardown();
} catch (Exception e) {
thrownOnTeardown.put(notification.getKey(), e);
thrownOnTeardown.put(checkNotNull(notification.getKey()), e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ private static void prepareEnvironment() throws Exception {
flinkCluster.getClusterPort(),
RestOptions.PORT.key(),
flinkCluster.getRestPort());

Files.write(file.toPath(), config.getBytes(Charsets.UTF_8));

// Create a new environment with the location of the Flink config for CliFrontend
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void teardown() {

private File makeFileWithContents(String name, String contents) throws Exception {
File tmpFile = tmpFolder.newFile(name);
Files.write(contents, tmpFile, StandardCharsets.UTF_8);
Files.asCharSink(tmpFile, StandardCharsets.UTF_8).write(contents);
assertTrue(tmpFile.setLastModified(0)); // required for determinism with directories
return tmpFile;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
@RunWith(JUnit4.class)
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"DoNotMock", // TODO: Use NetworkBuilder to create a real instance
})
public class IntrinsicMapTaskExecutorFactoryTest {
private static final String STAGE = "test";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ private String getArtifact(RunnerApi.ArtifactInformation artifact) {
return all.toStringUtf8();
}

@SuppressWarnings("InlineMeInliner") // inline `Strings.repeat()` - Java 11+ API only
@Test
public void testRetrieveArtifacts() throws IOException, InterruptedException {
Map<String, String> artifacts =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ private String getArtifact(RunnerApi.ArtifactInformation artifact) {
return all.toStringUtf8();
}

@SuppressWarnings("InlineMeInliner") // inline `Strings.repeat()` - Java 11+ API only
@Test
public void testStageArtifacts() throws InterruptedException, ExecutionException {
List<String> contentsList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
Expand Down Expand Up @@ -288,7 +289,7 @@ public void close() throws IOException {
}

@Override
protected WindowedValue<T> computeNext() {
protected @CheckForNull WindowedValue<T> computeNext() {
try {
if (started ? reader.advance() : start()) {
return timestampedValueInGlobalWindow(reader.getCurrent(), reader.getCurrentTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Deque;
import java.util.Map;
import java.util.function.Supplier;
import javax.annotation.CheckForNull;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DoFnRunnerFactory.DoFnRunnerWithTeardown;
Expand Down Expand Up @@ -169,7 +170,7 @@ private DoFnPartitionIt(Iterator<WindowedValue<InT>> partitionIt) {
}

@Override
protected OutT computeNext() {
protected @CheckForNull OutT computeNext() {
try {
while (true) {
if (!buffer.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckForNull;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
Expand Down Expand Up @@ -141,7 +142,7 @@ private class UnboundedInOutIterator<K>
}

@Override
protected Tuple2<TupleTag<?>, WindowedValue<?>> computeNext() {
protected @CheckForNull Tuple2<TupleTag<?>, WindowedValue<?>> computeNext() {
try {
// Process each element from the (input) iterator, which produces, zero, one or more
// output elements (of type V) in the output iterator. Note that the output
Expand Down Expand Up @@ -304,7 +305,7 @@ private class BoundedInOutIterator<K, InputT, OutputT>
}

@Override
protected Tuple2<TupleTag<?>, WindowedValue<?>> computeNext() {
protected @CheckForNull Tuple2<TupleTag<?>, WindowedValue<?>> computeNext() {

if (outputProducerTask == null) {
outputProducerTask = startOutputProducerTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ private MatchResult matchOne(String baseDir, String spec) {
StreamSupport.stream(files.spliterator(), false)
.filter(
Predicates.and(
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files.isFile(),
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files
.isFile(),
input -> matcher.matches(input.toPath()))
::apply)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.beam.sdk.values.ValueWithRecordId.StripIdsDoFn;
import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalListener;
Expand Down Expand Up @@ -500,7 +501,7 @@ public void setUp() throws Exception {
removalNotification -> {
if (removalNotification.wasEvicted()) {
try {
removalNotification.getValue().close();
Preconditions.checkNotNull(removalNotification.getValue()).close();
} catch (IOException e) {
LOG.warn("Failed to close UnboundedReader.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ProtocolMessageEnum;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -106,7 +107,8 @@ public static ResourceHints fromOptions(PipelineOptions options) {
} else {
urn = nameOrUrn;
}
ResourceHint value = parsers.getOrDefault(urn, s -> new StringHint(s)).apply(stringValue);
ResourceHint value =
Preconditions.checkNotNull(parsers.getOrDefault(urn, StringHint::new)).apply(stringValue);
result = result.withHint(urn, value);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand Down Expand Up @@ -357,7 +358,7 @@ public void encode(PaneInfo value, final OutputStream outStream)
@Override
public PaneInfo decode(final InputStream inStream) throws CoderException, IOException {
byte keyAndTag = (byte) inStream.read();
PaneInfo base = BYTE_TO_PANE_INFO.get((byte) (keyAndTag & 0x0F));
PaneInfo base = Preconditions.checkNotNull(BYTE_TO_PANE_INFO.get((byte) (keyAndTag & 0x0F)));
long index, onTimeIndex;
switch (Encoding.fromTag(keyAndTag)) {
case FIRST:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private void testCreate(Path path) throws Exception {
public void testReadWithExistingFile() throws Exception {
String expected = "my test string";
File existingFile = temporaryFolder.newFile();
Files.write(expected, existingFile, StandardCharsets.UTF_8);
Files.asCharSink(existingFile, StandardCharsets.UTF_8).write(expected);
String data;
try (Reader reader =
Channels.newReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public StateChangingSerializingCoder() {
changedState = 10;
}

@SuppressWarnings("InlineMeInliner") // inline `Strings.repeat()` - Java 11+ API only
@Override
public void encode(String value, OutputStream outStream) throws CoderException, IOException {
changedState += 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testPreconditionChecksumIsEmpty() throws IOException {
@Test
public void testMatcherThatVerifiesSingleFile() throws IOException {
File tmpFile = tmpFolder.newFile("result-000-of-001");
Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
Files.asCharSink(tmpFile, StandardCharsets.UTF_8).write("Test for file checksum verifier.");

assertThat(
new NumberedShardedFile(tmpFile.getPath()),
Expand All @@ -73,9 +73,9 @@ public void testMatcherThatVerifiesMultipleFiles() throws IOException {
File tmpFile1 = tmpFolder.newFile("result-000-of-002");
File tmpFile2 = tmpFolder.newFile("result-001-of-002");
File tmpFile3 = tmpFolder.newFile("tmp");
Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8);
Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8);
Files.write("tmp", tmpFile3, StandardCharsets.UTF_8);
Files.asCharSink(tmpFile1, StandardCharsets.UTF_8).write("To be or not to be, ");
Files.asCharSink(tmpFile2, StandardCharsets.UTF_8).write("it is not a question.");
Files.asCharSink(tmpFile3, StandardCharsets.UTF_8).write("tmp");

assertThat(
new NumberedShardedFile(tmpFolder.getRoot().toPath().resolve("result-*").toString()),
Expand All @@ -87,7 +87,7 @@ public void testMatcherThatVerifiesFileWithEmptyContent() throws IOException {
// TODO: Java core test failing on windows, https://github.com/apache/beam/issues/20483
assumeFalse(SystemUtils.IS_OS_WINDOWS);
File emptyFile = tmpFolder.newFile("result-000-of-001");
Files.write("", emptyFile, StandardCharsets.UTF_8);
Files.asCharSink(emptyFile, StandardCharsets.UTF_8).write("");

assertThat(
new NumberedShardedFile(tmpFolder.getRoot().toPath().resolve("*").toString()),
Expand All @@ -101,8 +101,8 @@ public void testMatcherThatUsesCustomizedTemplate() throws Exception {
assumeFalse(SystemUtils.IS_OS_WINDOWS);
File tmpFile1 = tmpFolder.newFile("result0-total2");
File tmpFile2 = tmpFolder.newFile("result1-total2");
Files.write("To be or not to be, ", tmpFile1, StandardCharsets.UTF_8);
Files.write("it is not a question.", tmpFile2, StandardCharsets.UTF_8);
Files.asCharSink(tmpFile1, StandardCharsets.UTF_8).write("To be or not to be, ");
Files.asCharSink(tmpFile2, StandardCharsets.UTF_8).write("it is not a question.");

Pattern customizedTemplate =
Pattern.compile("(?x) result (?<shardnum>\\d+) - total (?<numshards>\\d+)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,7 @@ public void processElement(ProcessContext c, BoundedWindow window) {
pipeline.run().waitUntilFinish();
}

@SuppressWarnings("InlineMeInliner") // inline `Strings.repeat()` - Java 11+ API only
@Test
@Category({
ValidatesRunner.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ public void testReadMultipleShards() throws Exception {
File tmpFile1 = tmpFolder.newFile("result-000-of-002");
File tmpFile2 = tmpFolder.newFile("result-001-of-002");
File tmpFile3 = tmpFolder.newFile("tmp");
Files.write(contents1, tmpFile1, StandardCharsets.UTF_8);
Files.write(contents2, tmpFile2, StandardCharsets.UTF_8);
Files.write(contents3, tmpFile3, StandardCharsets.UTF_8);
Files.asCharSink(tmpFile1, StandardCharsets.UTF_8).write(contents1);
Files.asCharSink(tmpFile2, StandardCharsets.UTF_8).write(contents2);
Files.asCharSink(tmpFile3, StandardCharsets.UTF_8).write(contents3);

filePattern =
LocalResources.fromFile(tmpFolder.getRoot(), true)
Expand All @@ -106,8 +106,8 @@ public void testReadMultipleShardsWithoutShardNumber() throws Exception {

File tmpFile1 = tmpFolder.newFile("result");
File tmpFile2 = tmpFolder.newFile("tmp");
Files.write(contents1, tmpFile1, StandardCharsets.UTF_8);
Files.write(contents2, tmpFile2, StandardCharsets.UTF_8);
Files.asCharSink(tmpFile1, StandardCharsets.UTF_8).write(contents1);
Files.asCharSink(tmpFile2, StandardCharsets.UTF_8).write(contents2);

FilePatternMatchingShardedFile shardedFile = new FilePatternMatchingShardedFile(filePattern);

Expand All @@ -117,7 +117,7 @@ public void testReadMultipleShardsWithoutShardNumber() throws Exception {
@Test
public void testReadEmpty() throws Exception {
File emptyFile = tmpFolder.newFile("result-000-of-001");
Files.write("", emptyFile, StandardCharsets.UTF_8);
Files.asCharSink(emptyFile, StandardCharsets.UTF_8).write("");
FilePatternMatchingShardedFile shardedFile = new FilePatternMatchingShardedFile(filePattern);

assertThat(shardedFile.readFilesWithRetries(), empty());
Expand All @@ -126,7 +126,7 @@ public void testReadEmpty() throws Exception {
@Test
public void testReadWithRetriesFailsSinceFilesystemError() throws Exception {
File tmpFile = tmpFolder.newFile();
Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8);
Files.asCharSink(tmpFile, StandardCharsets.UTF_8).write("Test for file checksum verifier.");
FilePatternMatchingShardedFile shardedFile =
spy(new FilePatternMatchingShardedFile(filePattern));
doThrow(IOException.class).when(shardedFile).readLines(anyCollection());
Expand Down
Loading
Loading