Skip to content

Commit

Permalink
Fix StringSet tests on portable runners (#31999)
Browse files Browse the repository at this point in the history
* Revert "Exclude StringSet tests from portable runners and Dataflow LegacyRunn…"

This reverts commit b129433.

* Fix MetricsTest for StringSet metrics on portable runners

* trigger tests

* Re-exclude tests
  • Loading branch information
Abacn authored Jul 31, 2024
1 parent 25e839b commit eec2068
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 35 deletions.
1 change: 0 additions & 1 deletion runners/flink/job-server/flink_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpoi
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesStringSetMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) {
private static PortableMetrics convertMonitoringInfosToMetricResults(
JobApi.MetricResults jobMetrics) {
List<MetricsApi.MonitoringInfo> monitoringInfoList = new ArrayList<>();
// TODO(https://github.com/apache/beam/issues/32001) dedup Attempted and Committed metrics
monitoringInfoList.addAll(jobMetrics.getAttemptedList());
monitoringInfoList.addAll(jobMetrics.getCommittedList());
Iterable<MetricResult<Long>> countersFromJobMetrics =
Expand Down
1 change: 0 additions & 1 deletion runners/samza/job-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ def portableValidatesRunnerTask(String name, boolean docker) {
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesStringSetMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
Expand Down
2 changes: 0 additions & 2 deletions runners/spark/job-server/spark_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean docker,
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesStringSetMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
Expand Down Expand Up @@ -186,7 +185,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean docker,
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesStringSetMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.beam.sdk.metrics.MetricResultsMatchers.metricsResult;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.verify;
Expand All @@ -46,6 +45,7 @@
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.hamcrest.Matcher;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
Expand Down Expand Up @@ -418,15 +418,19 @@ public void testAttemptedStringSetMetrics() {
}
}

private static <T> Matcher<MetricResult<T>> metricsResultPatchStep(
final String name, final String step, final T value, final boolean isCommitted) {
return anyOf(
metricsResult(NAMESPACE, name, step, value, isCommitted),
// portable runner adds a suffix for metrics initiated outside anonymous pardo
metricsResult(NAMESPACE, name, step + "-ParMultiDo-Anonymous-", value, isCommitted));
}

private static void assertCounterMetrics(MetricQueryResults metrics, boolean isCommitted) {
System.out.println(metrics.getCounters());
assertThat(
metrics.getCounters(),
anyOf(
// Step names are different for portable and non-portable runners.
hasItem(metricsResult(NAMESPACE, "count", "MyStep1", 3L, isCommitted)),
hasItem(
metricsResult(
NAMESPACE, "count", "MyStep1-ParMultiDo-Anonymous-", 3L, isCommitted))));
hasItem(metricsResultPatchStep("count", "MyStep1", 3L, isCommitted)));

assertThat(
metrics.getCounters(),
Expand All @@ -446,27 +450,36 @@ private static void assertGaugeMetrics(MetricQueryResults metrics, boolean isCom
}

private static void assertStringSetMetrics(MetricQueryResults metrics, boolean isCommitted) {
// TODO(https://github.com/apache/beam/issues/32001) use containsInAnyOrder once portableMetrics
// duplicate metrics issue fixed
assertThat(
metrics.getStringSets(),
containsInAnyOrder(
metricsResult(
NAMESPACE,
hasItem(
metricsResultPatchStep(
"sources",
"MyStep1",
StringSetResult.create(ImmutableSet.of("gcs")),
isCommitted),
isCommitted)));
assertThat(
metrics.getStringSets(),
hasItem(
metricsResult(
NAMESPACE,
"sinks",
"MyStep2",
StringSetResult.create(ImmutableSet.of("kafka", "bq")),
isCommitted),
metricsResult(
NAMESPACE,
isCommitted)));
assertThat(
metrics.getStringSets(),
hasItem(
metricsResultPatchStep(
"sideinputs",
"MyStep1",
StringSetResult.create(ImmutableSet.of("bigtable", "spanner")),
isCommitted),
isCommitted)));
assertThat(
metrics.getStringSets(),
hasItem(
metricsResult(
NAMESPACE,
"sideinputs",
Expand All @@ -478,22 +491,9 @@ private static void assertStringSetMetrics(MetricQueryResults metrics, boolean i
private static void assertDistributionMetrics(MetricQueryResults metrics, boolean isCommitted) {
assertThat(
metrics.getDistributions(),
anyOf(
// Step names are different for portable and non-portable runners.
hasItem(
metricsResult(
NAMESPACE,
"input",
"MyStep1",
DistributionResult.create(26L, 3L, 5L, 13L),
isCommitted)),
hasItem(
metricsResult(
NAMESPACE,
"input",
"MyStep1-ParMultiDo-Anonymous-",
DistributionResult.create(26L, 3L, 5L, 13L),
isCommitted))));
hasItem(
metricsResultPatchStep(
"input", "MyStep1", DistributionResult.create(26L, 3L, 5L, 13L), isCommitted)));

assertThat(
metrics.getDistributions(),
Expand Down

0 comments on commit eec2068

Please sign in to comment.