diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroDatumFactory.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroDatumFactory.java index 55c6e266e279..67125a6ad24d 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroDatumFactory.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/io/AvroDatumFactory.java @@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.avro.specific.SpecificDatumReader; @@ -33,6 +34,9 @@ import org.checkerframework.checker.nullness.qual.Nullable; /** Create {@link DatumReader} and {@link DatumWriter} for given schemas. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) public abstract class AvroDatumFactory implements AvroSource.DatumReaderFactory, AvroSink.DatumWriterFactory { @@ -168,25 +172,16 @@ public ReflectDatumFactory(Class type) { @Override public DatumReader apply(Schema writer, Schema reader) { - // create the datum writer using the Class api. - // avro will load the proper class loader - ReflectDatumReader datumReader = new ReflectDatumReader<>(type); - datumReader.setExpected(reader); - datumReader.setSchema(writer); - // for backward compat, add logical type support by default - AvroUtils.addLogicalTypeConversions(datumReader.getData()); - return datumReader; + ReflectData data = new ReflectData(type.getClassLoader()); + AvroUtils.addLogicalTypeConversions(data); + return new ReflectDatumReader<>(writer, reader, data); } @Override public DatumWriter apply(Schema writer) { - // create the datum writer using the Class api. - // avro will load the proper class loader - ReflectDatumWriter datumWriter = new ReflectDatumWriter<>(type); - datumWriter.setSchema(writer); - // for backward compat, add logical type support by default - AvroUtils.addLogicalTypeConversions(datumWriter.getData()); - return datumWriter; + ReflectData data = new ReflectData(type.getClassLoader()); + AvroUtils.addLogicalTypeConversions(data); + return new ReflectDatumWriter<>(writer, data); } public static ReflectDatumFactory of(Class type) {