Skip to content

Commit

Permalink
Fix JavaPreCommit on Java11 (#32912)
Browse files Browse the repository at this point in the history
* Disable affected Samza runner test in Java9+

* Specify text log_kind for platorm independent logging
  • Loading branch information
Abacn authored Oct 23, 2024
1 parent 7ac462e commit 172a029
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.slf4j.Logger;
Expand Down Expand Up @@ -157,6 +159,16 @@ abstract static class Builder {

abstract Builder setArguments(List<String> arguments);

Builder addArguments(List<String> arguments) {
Optional<List<String>> original = getArguments();
if (!original.isPresent()) {
return this.setArguments(arguments);
}
List<String> newArguments =
Stream.concat(original.get().stream(), arguments.stream()).collect(Collectors.toList());
return this.setArguments(newArguments);
}

abstract Optional<List<String>> getArguments();

abstract PrismExecutor autoBuild();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void executeWithStreamRedirectThenStop() throws IOException {
sleep(3000L);
executor.stop();
String output = outputStream.toString(StandardCharsets.UTF_8.name());
assertThat(output).contains("INFO Serving JobManagement endpoint=localhost:8073");
assertThat(output).contains("level=INFO msg=\"Serving JobManagement\" endpoint=localhost:8073");
}

@Test
Expand All @@ -71,29 +71,32 @@ public void executeWithFileOutputThenStop() throws IOException {
executor.stop();
try (Stream<String> stream = Files.lines(log.toPath(), StandardCharsets.UTF_8)) {
String output = stream.collect(Collectors.joining("\n"));
assertThat(output).contains("INFO Serving JobManagement endpoint=localhost:8073");
assertThat(output)
.contains("level=INFO msg=\"Serving JobManagement\" endpoint=localhost:8073");
}
}

@Test
public void executeWithCustomArgumentsThenStop() throws IOException {
PrismExecutor executor =
underTest()
.setArguments(Collections.singletonList("-" + JOB_PORT_FLAG_NAME + "=5555"))
.addArguments(Collections.singletonList("-" + JOB_PORT_FLAG_NAME + "=5555"))
.build();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
executor.execute(outputStream);
sleep(3000L);
executor.stop();
String output = outputStream.toString(StandardCharsets.UTF_8.name());
assertThat(output).contains("INFO Serving JobManagement endpoint=localhost:5555");
assertThat(output).contains("level=INFO msg=\"Serving JobManagement\" endpoint=localhost:5555");
}

@Test
public void executeWithPortFinderThenStop() throws IOException {}

private PrismExecutor.Builder underTest() {
return PrismExecutor.builder().setCommand(getLocalPrismBuildOrIgnoreTest());
return PrismExecutor.builder()
.setCommand(getLocalPrismBuildOrIgnoreTest())
.setArguments(Collections.singletonList("--log_kind=text")); // disable color control chars
}

private void sleep(long millis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -59,6 +60,8 @@
public class TestSamzaRunnerWithTransformMetrics {
@Test
public void testSamzaRunnerWithDefaultMetrics() {
// TODO(https://github.com/apache/beam/issues/32208)
assumeTrue(System.getProperty("java.version").startsWith("1."));
SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
InMemoryMetricsReporter inMemoryMetricsReporter = new InMemoryMetricsReporter();
options.setMetricsReporters(ImmutableList.of(inMemoryMetricsReporter));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.samza.runtime;

import static org.junit.Assume.assumeTrue;

import java.io.Serializable;
import java.util.Arrays;
import org.apache.beam.sdk.coders.KvCoder;
Expand All @@ -35,11 +37,19 @@
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

/** Tests for GroupByKeyOp. */
public class GroupByKeyOpTest implements Serializable {

@BeforeClass
public static void beforeClass() {
// TODO(https://github.com/apache/beam/issues/32208)
assumeTrue(System.getProperty("java.version").startsWith("1."));
}

@Rule
public final transient TestPipeline pipeline =
TestPipeline.fromOptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -75,6 +76,7 @@
import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory;
import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore;
import org.apache.samza.system.SystemStreamPartition;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -91,6 +93,12 @@ public class SamzaStoreStateInternalsTest implements Serializable {
TestPipeline.fromOptions(
PipelineOptionsFactory.fromArgs("--runner=TestSamzaRunner").create());

@BeforeClass
public static void beforeClass() {
// TODO(https://github.com/apache/beam/issues/32208)
assumeTrue(System.getProperty("java.version").startsWith("1."));
}

@Test
public void testMapStateIterator() {
final String stateId = "foo";
Expand Down

0 comments on commit 172a029

Please sign in to comment.