Skip to content

Commit

Permalink
Remove avro dependency from runners/core-construction-java
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Feb 8, 2024
1 parent ea68a45 commit 95d6ec0
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 40 deletions.
2 changes: 0 additions & 2 deletions runners/core-construction-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":sdks:java:extensions:avro")
implementation project(path: ":sdks:java:transform-service:launcher")
implementation library.java.vendored_grpc_1_60_1
implementation library.java.vendored_guava_32_1_2_jre
Expand All @@ -63,7 +62,6 @@ dependencies {
implementation library.java.joda_time
implementation library.java.slf4j_api
implementation library.java.jackson_annotations
implementation library.java.avro
compileOnly library.java.error_prone_annotations
// Avro 1.8 leaks an older version of parameter that conflicts in runtime with the dependencies
// of some runners so we need to fix it to a more recent but still compatible version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableBiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.dataflow.qual.Deterministic;

/** Converts to and from Beam Runner API representations of {@link Coder Coders}. */
@SuppressWarnings({
Expand Down Expand Up @@ -58,29 +60,40 @@ private static class DefaultTranslationContext implements TranslationContext {}
// TODO: standardize such things
public static final String JAVA_SERIALIZED_CODER_URN = "beam:coders:javasdk:0.1";

@VisibleForTesting
static final BiMap<Class<? extends Coder>, String> KNOWN_CODER_URNS = loadCoderURNs();
private static @MonotonicNonNull BiMap<Class<? extends Coder>, String> knownCoderUrns;

@VisibleForTesting
static final Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> KNOWN_TRANSLATORS =
loadTranslators();
private static @MonotonicNonNull Map<Class<? extends Coder>, CoderTranslator<? extends Coder>>
knownTranslators;

private static BiMap<Class<? extends Coder>, String> loadCoderURNs() {
ImmutableBiMap.Builder<Class<? extends Coder>, String> coderUrns = ImmutableBiMap.builder();
for (CoderTranslatorRegistrar registrar : ServiceLoader.load(CoderTranslatorRegistrar.class)) {
coderUrns.putAll(registrar.getCoderURNs());
@VisibleForTesting
@Deterministic
static BiMap<Class<? extends Coder>, String> getKnownCoderUrns() {
if (knownCoderUrns == null) {
ImmutableBiMap.Builder<Class<? extends Coder>, String> coderUrns = ImmutableBiMap.builder();
for (CoderTranslatorRegistrar registrar :
ServiceLoader.load(CoderTranslatorRegistrar.class)) {
coderUrns.putAll(registrar.getCoderURNs());
}
knownCoderUrns = coderUrns.build();
}
return coderUrns.build();

return knownCoderUrns;
}

private static Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> loadTranslators() {
ImmutableMap.Builder<Class<? extends Coder>, CoderTranslator<? extends Coder>> translators =
ImmutableMap.builder();
for (CoderTranslatorRegistrar coderTranslatorRegistrar :
ServiceLoader.load(CoderTranslatorRegistrar.class)) {
translators.putAll(coderTranslatorRegistrar.getCoderTranslators());
@VisibleForTesting
@Deterministic
static Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getKnownTranslators() {
if (knownTranslators == null) {
ImmutableMap.Builder<Class<? extends Coder>, CoderTranslator<? extends Coder>> translators =
ImmutableMap.builder();
for (CoderTranslatorRegistrar coderTranslatorRegistrar :
ServiceLoader.load(CoderTranslatorRegistrar.class)) {
translators.putAll(coderTranslatorRegistrar.getCoderTranslators());
}
knownTranslators = translators.build();
}
return translators.build();

return knownTranslators;
}

public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOException {
Expand All @@ -94,7 +107,7 @@ public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOE

public static RunnerApi.Coder toProto(Coder<?> coder, SdkComponents components)
throws IOException {
if (KNOWN_CODER_URNS.containsKey(coder.getClass())) {
if (getKnownCoderUrns().containsKey(coder.getClass())) {
return toKnownCoder(coder, components);
}

Expand All @@ -116,13 +129,13 @@ private static RunnerApi.Coder toUnknownCoderWrapper(UnknownCoderWrapper coder)

private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components)
throws IOException {
CoderTranslator translator = KNOWN_TRANSLATORS.get(coder.getClass());
CoderTranslator translator = getKnownTranslators().get(coder.getClass());
List<String> componentIds = registerComponents(coder, translator, components);
return RunnerApi.Coder.newBuilder()
.addAllComponentCoderIds(componentIds)
.setSpec(
FunctionSpec.newBuilder()
.setUrn(KNOWN_CODER_URNS.get(coder.getClass()))
.setUrn(getKnownCoderUrns().get(coder.getClass()))
.setPayload(ByteString.copyFrom(translator.getPayload(coder))))
.build();
}
Expand Down Expand Up @@ -173,8 +186,8 @@ private static Coder<?> fromKnownCoder(
components.getComponents().getCodersOrThrow(componentId), components, context);
coderComponents.add(innerCoder);
}
Class<? extends Coder> coderType = KNOWN_CODER_URNS.inverse().get(coderUrn);
CoderTranslator<?> translator = KNOWN_TRANSLATORS.get(coderType);
Class<? extends Coder> coderType = getKnownCoderUrns().inverse().get(coderUrn);
CoderTranslator<?> translator = getKnownTranslators().get(coderType);
if (translator != null) {
return translator.fromComponents(
coderComponents, coder.getSpec().getPayload().toByteArray(), context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ private static List<RunnerApi.ArtifactInformation> resolveArtifacts(
boolean isJavaSDKCompatible(RunnerApi.Components components, String coderId) {
RunnerApi.Coder coder = components.getCodersOrThrow(coderId);
if (!CoderTranslation.JAVA_SERIALIZED_CODER_URN.equals(coder.getSpec().getUrn())
&& !CoderTranslation.KNOWN_CODER_URNS.containsValue(coder.getSpec().getUrn())) {
&& !CoderTranslation.getKnownCoderUrns().containsValue(coder.getSpec().getUrn())) {
return false;
}
for (String componentId : coder.getComponentCoderIdsList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
import org.apache.avro.SchemaBuilder;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.runners.core.construction.CoderTranslation.TranslationContext;
Expand All @@ -47,7 +46,6 @@
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
Expand Down Expand Up @@ -136,7 +134,7 @@ public void validateCoderTranslators() {
equalTo(new ModelCoderRegistrar().getCoderTranslators().keySet()));
assertThat(
"All Model Coders should be registered",
CoderTranslation.KNOWN_TRANSLATORS.keySet(),
CoderTranslation.getKnownTranslators().keySet(),
hasItems(new ModelCoderRegistrar().getCoderTranslators().keySet().toArray(new Class[0])));
}
}
Expand All @@ -152,14 +150,12 @@ public static Iterable<Coder<?>> data() {
StringUtf8Coder.of(),
SerializableCoder.of(Record.class),
new RecordCoder(),
KvCoder.of(
new RecordCoder(),
AvroCoder.of(SchemaBuilder.record("record").fields().endRecord())))
KvCoder.of(new RecordCoder(), StringUtf8Coder.of()))
.add(
StringUtf8Coder.of(),
SerializableCoder.of(Record.class),
new RecordCoder(),
KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class)))
KvCoder.of(new RecordCoder(), StringUtf8Coder.of()))
.add(UnknownCoderWrapper.of("dummy_urn", new byte[] {}))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void visitValue(PValue value, Node producer) {

private void addCoders(Coder<?> coder) {
coders.add(coder);
if (CoderTranslation.KNOWN_CODER_URNS.containsKey(coder.getClass())) {
if (CoderTranslation.getKnownCoderUrns().containsKey(coder.getClass())) {
for (Coder<?> component : ((StructuredCoder<?>) coder).getComponents()) {
addCoders(component);
}
Expand Down
3 changes: 0 additions & 3 deletions runners/direct-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ applyJavaNature(
include(project(path: it, configuration: "shadow"))
}
}
relocate "org.apache.beam.runners.core", getJavaRelocatedPath("runners.core")
relocate "org.apache.beam.runners.fnexecution", getJavaRelocatedPath("runners.fnexecution")
relocate "org.apache.beam.runners.local", getJavaRelocatedPath("runners.local")
},
)

Expand Down
5 changes: 3 additions & 2 deletions sdks/java/extensions/avro/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ dependencies {
// Exclude Avro dependencies from "core" since Avro support moved to this extension
exclude group: "org.apache.avro", module: "avro"
}
implementation(project(path: ":runners:core-construction-java"))
implementation library.java.error_prone_annotations
implementation library.java.avro
implementation library.java.joda_time
Expand Down Expand Up @@ -101,7 +102,7 @@ avroVersions.each { k, v ->

// only use compileClasspath on purpose to not include generated test files
// will recompile with the propper generated sources
compileClasspath += configurations."avroVersion$k"
compileClasspath += sourceSets.main.output + configurations."avroVersion$k"
runtimeClasspath += configurations."avroVersion$k"
}
}
Expand Down Expand Up @@ -157,4 +158,4 @@ static def createTaskNames(Map<String, String> prefixMap, String suffix) {
return prefixMap.keySet().stream()
.map { version -> "avroVersion${version}${suffix}" }
.collect(Collectors.toList())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
package org.apache.beam.sdk.extensions.avro;

import com.google.auto.service.AutoService;
import java.util.Map;
import org.apache.beam.runners.core.construction.CoderTranslator;
import org.apache.beam.runners.core.construction.CoderTranslatorRegistrar;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
package org.apache.beam.sdk.extensions.avro;

import java.util.Collections;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.beam.runners.core.construction.CoderTranslation.TranslationContext;
import org.apache.beam.runners.core.construction.CoderTranslator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Defines {@link org.apache.beam.sdk.schemas.Schema} and other classes for representing schema'd
* data in a {@link org.apache.beam.sdk.Pipeline} using Apache Avro.
*/
@DefaultAnnotation(NonNull.class)
package org.apache.beam.sdk.extensions.avro;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import org.checkerframework.checker.nullness.qual.NonNull;
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.avro;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import org.apache.avro.SchemaBuilder;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.CoderTranslation.TranslationContext;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link AvroCoder} translation. */
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
})
@RunWith(JUnit4.class)
public class AvroCoderTranslationTest {
/** Tests round-trip coder encodings for both known and unknown {@link Coder coders}. */
@Test
public void toAndFromProto() throws Exception {
Coder<?> coder = AvroCoder.of(SchemaBuilder.record("record").fields().endRecord());
SdkComponents sdkComponents = SdkComponents.create();
sdkComponents.registerEnvironment(Environments.createDockerEnvironment("java"));
RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, sdkComponents);

Components encodedComponents = sdkComponents.toComponents();
Coder<?> decodedCoder =
CoderTranslation.fromProto(
coderProto,
RehydratedComponents.forComponents(encodedComponents),
TranslationContext.DEFAULT);
assertThat(decodedCoder, equalTo(coder));
}
}

0 comments on commit 95d6ec0

Please sign in to comment.