Skip to content

Commit

Permalink
[Flink] spotless
Browse files Browse the repository at this point in the history
  • Loading branch information
jto committed Oct 17, 2024
1 parent a7b9023 commit ae0f3b4
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
*/
package org.apache.beam.runners.flink;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
Expand Down Expand Up @@ -58,14 +63,9 @@
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FlinkStreamingAggregationsTranslators {
public static class ConcatenateAsIterable<T> extends Combine.CombineFn<T, Iterable<T>, Iterable<T>> {
public static class ConcatenateAsIterable<T>
extends Combine.CombineFn<T, Iterable<T>, Iterable<T>> {
@Override
public Iterable<T> createAccumulator() {
return new ArrayList<>();
Expand Down Expand Up @@ -214,8 +214,7 @@ WindowDoFnOperator<K, InputAccumT, OutputAccumT> getWindowedAggregateDoFnOperato
WindowedValue.getFullCoder(workItemCoder, windowingStrategy.getWindowFn().windowCoder());

// Key selector
WorkItemKeySelector<K, InputAccumT> workItemKeySelector =
new WorkItemKeySelector<>(keyCoder);
WorkItemKeySelector<K, InputAccumT> workItemKeySelector = new WorkItemKeySelector<>(keyCoder);

return new WindowDoFnOperator<>(
reduceFn,
Expand Down Expand Up @@ -257,29 +256,30 @@ WindowDoFnOperator<K, InputAccumT, OutputAccumT> getWindowedAggregateDoFnOperato
}

private static class FlattenIterable<K, InputT>
implements FlatMapFunction<WindowedValue<KV<K, Iterable<Iterable<InputT>>>>, WindowedValue<KV<K, Iterable<InputT>>>> {
implements FlatMapFunction<
WindowedValue<KV<K, Iterable<Iterable<InputT>>>>,
WindowedValue<KV<K, Iterable<InputT>>>> {
@Override
public void flatMap(
WindowedValue<KV<K, Iterable<Iterable<InputT>>>> w,
Collector<WindowedValue<KV<K, Iterable<InputT>>>> collector) throws Exception {
WindowedValue<KV<K, Iterable<InputT>>> flattened = w.withValue(
KV.of(
w.getValue().getKey(),
Iterables.concat(w.getValue().getValue())));
Collector<WindowedValue<KV<K, Iterable<InputT>>>> collector)
throws Exception {
WindowedValue<KV<K, Iterable<InputT>>> flattened =
w.withValue(KV.of(w.getValue().getKey(), Iterables.concat(w.getValue().getValue())));
collector.collect(flattened);
}
}

public static <K, InputT, AccumT, OutputT>
SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> getBatchCombinePerKeyOperator(
FlinkStreamingTranslationContext context,
PCollection<KV<K, InputT>> input,
Map<Integer, PCollectionView<?>> sideInputTagMapping,
List<PCollectionView<?>> sideInputs,
Coder<WindowedValue<KV<K, AccumT>>> windowedAccumCoder,
CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> combineFn,
WindowDoFnOperator<K, AccumT, OutputT> finalDoFnOperator,
TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo){
SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> getBatchCombinePerKeyOperator(
FlinkStreamingTranslationContext context,
PCollection<KV<K, InputT>> input,
Map<Integer, PCollectionView<?>> sideInputTagMapping,
List<PCollectionView<?>> sideInputs,
Coder<WindowedValue<KV<K, AccumT>>> windowedAccumCoder,
CombineFnBase.GlobalCombineFn<InputT, AccumT, ?> combineFn,
WindowDoFnOperator<K, AccumT, OutputT> finalDoFnOperator,
TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo) {

String fullName = FlinkStreamingTransformTranslators.getCurrentTransformName(context);
DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
Expand Down Expand Up @@ -314,50 +314,55 @@ SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> getBatchCombinePerKeyO
if (sideInputs.isEmpty()) {
return inputDataStream
.transform(partialName, partialTypeInfo, partialDoFnOperator)
.uid(partialName).name(partialName)
.uid(partialName)
.name(partialName)
.keyBy(accumKeySelector)
.transform(fullName, outputTypeInfo, finalDoFnOperator)
.uid(fullName).name(fullName);
.uid(fullName)
.name(fullName);
} else {

Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs =
FlinkStreamingTransformTranslators.transformSideInputs(sideInputs, context);

TwoInputTransformation<
WindowedValue<KV<K, InputT>>, RawUnionValue, WindowedValue<KV<K, AccumT>>> rawPartialFlinkTransform =
new TwoInputTransformation<>(
inputDataStream.getTransformation(),
transformSideInputs.f1.broadcast().getTransformation(),
partialName,
partialDoFnOperator,
partialTypeInfo,
inputDataStream.getParallelism());
WindowedValue<KV<K, InputT>>, RawUnionValue, WindowedValue<KV<K, AccumT>>>
rawPartialFlinkTransform =
new TwoInputTransformation<>(
inputDataStream.getTransformation(),
transformSideInputs.f1.broadcast().getTransformation(),
partialName,
partialDoFnOperator,
partialTypeInfo,
inputDataStream.getParallelism());

SingleOutputStreamOperator<WindowedValue<KV<K, AccumT>>> partialyCombinedStream =
new SingleOutputStreamOperator<WindowedValue<KV<K, AccumT>>>(
inputDataStream.getExecutionEnvironment(),
rawPartialFlinkTransform) {}; // we have to cheat around the ctor being protected

inputDataStream.getExecutionEnvironment().addOperator(rawPartialFlinkTransform);
inputDataStream.getExecutionEnvironment().addOperator(rawPartialFlinkTransform);

return buildTwoInputStream(
partialyCombinedStream.keyBy(accumKeySelector),
transformSideInputs.f1,
fullName,
finalDoFnOperator,
outputTypeInfo);
return buildTwoInputStream(
partialyCombinedStream.keyBy(accumKeySelector),
transformSideInputs.f1,
fullName,
finalDoFnOperator,
outputTypeInfo);
}
}

/**
* Creates a two-steps GBK operation. Elements are first aggregated locally to save on serialized size since in batch
* it's very likely that all the elements will be within the same window and pane.
* The only difference with batchCombinePerKey is the nature of the SystemReduceFn used. It uses SystemReduceFn.buffering()
* instead of SystemReduceFn.combining() so that new element can simply be appended without accessing the existing state.
* Creates a two-steps GBK operation. Elements are first aggregated locally to save on serialized
* size since in batch it's very likely that all the elements will be within the same window and
* pane. The only difference with batchCombinePerKey is the nature of the SystemReduceFn used. It
* uses SystemReduceFn.buffering() instead of SystemReduceFn.combining() so that new element can
* simply be appended without accessing the existing state.
*/
public static <K, InputT> SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> batchGroupByKey(
FlinkStreamingTranslationContext context,
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform) {
public static <K, InputT>
SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> batchGroupByKey(
FlinkStreamingTranslationContext context,
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform) {

Map<Integer, PCollectionView<?>> sideInputTagMapping = new HashMap<>();
List<PCollectionView<?>> sideInputs = Collections.emptyList();
Expand All @@ -372,58 +377,64 @@ public static <K, InputT> SingleOutputStreamOperator<WindowedValue<KV<K, Iterabl
context.getTypeInfo(context.getOutput(transform));

Coder<Iterable<InputT>> accumulatorCoder = IterableCoder.of(inputKvCoder.getValueCoder());
KvCoder<K, Iterable<InputT>> accumKvCoder = KvCoder.of(inputKvCoder.getKeyCoder(), accumulatorCoder);
KvCoder<K, Iterable<InputT>> accumKvCoder =
KvCoder.of(inputKvCoder.getKeyCoder(), accumulatorCoder);

Coder<WindowedValue<KV<K, Iterable<InputT>>>> windowedAccumCoder =
WindowedValue.getFullCoder(
accumKvCoder, input.getWindowingStrategy().getWindowFn().windowCoder());

Coder<WindowedValue<KV<K, Iterable<Iterable<InputT>>>>> outputCoder =
WindowedValue.getFullCoder(
KvCoder.of(inputKvCoder.getKeyCoder(), IterableCoder.of(accumulatorCoder)) , input.getWindowingStrategy().getWindowFn().windowCoder());
KvCoder.of(inputKvCoder.getKeyCoder(), IterableCoder.of(accumulatorCoder)),
input.getWindowingStrategy().getWindowFn().windowCoder());

TypeInformation<WindowedValue<KV<K, Iterable<Iterable<InputT>>>>> accumulatedTypeInfo =
new CoderTypeInformation<>(
WindowedValue.getFullCoder(
KvCoder.of(inputKvCoder.getKeyCoder(), IterableCoder.of(IterableCoder.of(inputKvCoder.getValueCoder()))), input.getWindowingStrategy().getWindowFn().windowCoder()),
WindowedValue.getFullCoder(
KvCoder.of(
inputKvCoder.getKeyCoder(),
IterableCoder.of(IterableCoder.of(inputKvCoder.getValueCoder()))),
input.getWindowingStrategy().getWindowFn().windowCoder()),
serializablePipelineOptions);

// final aggregation
WindowDoFnOperator<K, Iterable<InputT>, Iterable<Iterable<InputT>>> finalDoFnOperator =
getWindowedAccumulateDoFnOperator(
context,
transform,
accumKvCoder,
outputCoder,
sideInputTagMapping,
sideInputs);

return
getBatchCombinePerKeyOperator(
context,
input,
sideInputTagMapping,
sideInputs,
windowedAccumCoder,
new ConcatenateAsIterable<>(),
finalDoFnOperator,
accumulatedTypeInfo
)
.flatMap(new FlattenIterable<>(), outputTypeInfo)
.name("concatenate");
getWindowedAccumulateDoFnOperator(
context, transform, accumKvCoder, outputCoder, sideInputTagMapping, sideInputs);

return getBatchCombinePerKeyOperator(
context,
input,
sideInputTagMapping,
sideInputs,
windowedAccumCoder,
new ConcatenateAsIterable<>(),
finalDoFnOperator,
accumulatedTypeInfo)
.flatMap(new FlattenIterable<>(), outputTypeInfo)
.name("concatenate");
}

private static <InputT, K> WindowDoFnOperator<K, Iterable<InputT>, Iterable<Iterable<InputT>>> getWindowedAccumulateDoFnOperator(
FlinkStreamingTranslationContext context,
PTransform<PCollection<KV<K,InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform,
KvCoder<K, Iterable<InputT>> accumKvCoder,
Coder<WindowedValue<KV<K, Iterable<Iterable<InputT>>>>> outputCoder,
Map<Integer, PCollectionView<?>> sideInputTagMapping,
List<PCollectionView<?>> sideInputs) {
private static <InputT, K>
WindowDoFnOperator<K, Iterable<InputT>, Iterable<Iterable<InputT>>>
getWindowedAccumulateDoFnOperator(
FlinkStreamingTranslationContext context,
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>
transform,
KvCoder<K, Iterable<InputT>> accumKvCoder,
Coder<WindowedValue<KV<K, Iterable<Iterable<InputT>>>>> outputCoder,
Map<Integer, PCollectionView<?>> sideInputTagMapping,
List<PCollectionView<?>> sideInputs) {

// Combining fn
SystemReduceFn<K, Iterable<InputT>, Iterable<Iterable<InputT>>, Iterable<Iterable<InputT>>, BoundedWindow> reduceFn =
SystemReduceFn.buffering(accumKvCoder.getValueCoder());
// Combining fn
SystemReduceFn<
K,
Iterable<InputT>,
Iterable<Iterable<InputT>>,
Iterable<Iterable<InputT>>,
BoundedWindow>
reduceFn = SystemReduceFn.buffering(accumKvCoder.getValueCoder());

return getWindowedAggregateDoFnOperator(
context, transform, accumKvCoder, outputCoder, reduceFn, sideInputTagMapping, sideInputs);
Expand Down Expand Up @@ -482,8 +493,7 @@ SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> batchCombinePerKey(
windowedAccumCoder,
combineFn,
finalDoFnOperator,
outputTypeInfo
);
outputTypeInfo);
}

@SuppressWarnings({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import org.apache.beam.runners.flink.adapter.FlinkKey;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,7 @@ private <K, V> SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<V>>>> add
new WorkItemKeySelector<>(inputElementCoder.getKeyCoder());

KeyedStream<WindowedValue<KV<K, V>>, FlinkKey> keyedWorkItemStream =
inputDataStream.keyBy(
new KvToFlinkKeyKeySelector(inputElementCoder.getKeyCoder()));
inputDataStream.keyBy(new KvToFlinkKeyKeySelector(inputElementCoder.getKeyCoder()));

SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn =
SystemReduceFn.buffering(inputElementCoder.getValueCoder());
Expand Down Expand Up @@ -829,8 +828,7 @@ private <InputT, OutputT> void translateExecutableStage(
}
if (stateful) {
keyCoder = ((KvCoder) valueCoder).getKeyCoder();
keySelector =
new KvToFlinkKeyKeySelector(keyCoder);
keySelector = new KvToFlinkKeyKeySelector(keyCoder);
} else {
// For an SDF, we know that the input element should be
// KV<KV<element, KV<restriction, watermarkState>>, size>. We are going to use the element
Expand All @@ -844,8 +842,7 @@ private <InputT, OutputT> void translateExecutableStage(
valueCoder.getClass().getSimpleName()));
}
keyCoder = ((KvCoder) ((KvCoder) valueCoder).getKeyCoder()).getKeyCoder();
keySelector =
new SdfFlinkKeyKeySelector(keyCoder);
keySelector = new SdfFlinkKeyKeySelector(keyCoder);
}
inputDataStream = inputDataStream.keyBy(keySelector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -591,8 +590,7 @@ static <InputT, OutputT> void translateParDo(
// Based on the fact that the signature is stateful, DoFnSignatures ensures
// that it is also keyed
keyCoder = ((KvCoder) input.getCoder()).getKeyCoder();
keySelector =
new KvToFlinkKeyKeySelector<>(keyCoder);
keySelector = new KvToFlinkKeyKeySelector<>(keyCoder);
final PTransform<?, PCollection<InputT>> producer = context.getProducer(input);
final String previousUrn =
producer != null
Expand All @@ -609,8 +607,7 @@ static <InputT, OutputT> void translateParDo(
} else if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) {
// we know that it is keyed on byte[]
keyCoder = ByteArrayCoder.of();
keySelector =
new WorkItemKeySelector<>(keyCoder);
keySelector = new WorkItemKeySelector<>(keyCoder);
stateful = true;
}

Expand Down Expand Up @@ -962,10 +959,7 @@ public void translateNode(
SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> outDataStream;
// Pre-aggregate before shuffle similar to group combine
if (!context.isStreaming()) {
outDataStream =
FlinkStreamingAggregationsTranslators.batchGroupByKey(
context,
transform);
outDataStream = FlinkStreamingAggregationsTranslators.batchGroupByKey(context, transform);
} else {
// No pre-aggregation in Streaming mode.
KvToFlinkKeyKeySelector<K, InputT> keySelector =
Expand Down Expand Up @@ -1046,8 +1040,7 @@ public void translateNode(
List<PCollectionView<?>> sideInputs = ((Combine.PerKey) transform).getSideInputs();

KeyedStream<WindowedValue<KV<K, InputT>>, FlinkKey> keyedStream =
inputDataStream.keyBy(
new KvToFlinkKeyKeySelector<>(keyCoder));
inputDataStream.keyBy(new KvToFlinkKeyKeySelector<>(keyCoder));

if (sideInputs.isEmpty()) {
SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream;
Expand Down Expand Up @@ -1147,8 +1140,7 @@ public void translateNode(
.name("ToKeyedWorkItem");

KeyedStream<WindowedValue<KeyedWorkItem<K, InputT>>, FlinkKey> keyedWorkItemStream =
workItemStream.keyBy(
new WorkItemKeySelector<>(inputKvCoder.getKeyCoder()));
workItemStream.keyBy(new WorkItemKeySelector<>(inputKvCoder.getKeyCoder()));

context.setOutputDataStream(context.getOutput(transform), keyedWorkItemStream);
}
Expand Down
Loading

0 comments on commit ae0f3b4

Please sign in to comment.