diff --git a/scio-avro/src/main/scala/com/spotify/scio/avro/AvroDatumFactory.scala b/scio-avro/src/main/scala/com/spotify/scio/avro/AvroDatumFactory.scala new file mode 100644 index 0000000000..ea893f29cd --- /dev/null +++ b/scio-avro/src/main/scala/com/spotify/scio/avro/AvroDatumFactory.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2023 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.avro + +import org.apache.avro.Schema +import org.apache.avro.io.{DatumReader, DatumWriter} +import org.apache.avro.reflect.{ReflectData, ReflectDatumReader, ReflectDatumWriter} +import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory +import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils + +/** + * Custom AvroDatumFactory for avro AvroDatumFactory relying on avro reflect so that underlying + * CharSequence type is String + */ +private[scio] class SpecificRecordDatumFactory[T](recordType: Class[T]) + extends AvroDatumFactory[T](recordType) { + override def apply(writer: Schema, reader: Schema): DatumReader[T] = { + val data = new ReflectData(recordType.getClassLoader) + AvroUtils.addLogicalTypeConversions(data) + new ReflectDatumReader[T](writer, reader, data) + } + + override def apply(writer: Schema): DatumWriter[T] = { + val data = new ReflectData(recordType.getClassLoader) + AvroUtils.addLogicalTypeConversions(data) + new ReflectDatumWriter[T](writer, data) + } +} diff --git a/scio-avro/src/main/scala/com/spotify/scio/avro/AvroIO.scala b/scio-avro/src/main/scala/com/spotify/scio/avro/AvroIO.scala index 2cbab6327d..08a678d680 100644 --- a/scio-avro/src/main/scala/com/spotify/scio/avro/AvroIO.scala +++ b/scio-avro/src/main/scala/com/spotify/scio/avro/AvroIO.scala @@ -183,6 +183,7 @@ final case class SpecificRecordIO[T <: SpecificRecord: ClassTag: Coder](path: St val t = BAvroIO .read(cls) .from(filePattern) + .withDatumReaderFactory(new SpecificRecordDatumFactory[T](cls)) sc .applyTransform(t) .setCoder(coder) @@ -194,7 +195,9 @@ final case class SpecificRecordIO[T <: SpecificRecord: ClassTag: Coder](path: St */ override protected def write(data: SCollection[T], params: WriteP): Tap[T] = { val cls = ScioUtil.classOf[T] - val t = BAvroIO.write(cls) + val t = BAvroIO + .write(cls) + .withDatumWriterFactory(new SpecificRecordDatumFactory[T](cls)) data.applyInternal( avroOut( diff --git a/scio-core/src/main/scala/com/spotify/scio/coders/instances/AvroCoders.scala b/scio-core/src/main/scala/com/spotify/scio/coders/instances/AvroCoders.scala index b52b7d0515..549404c22f 100644 --- a/scio-core/src/main/scala/com/spotify/scio/coders/instances/AvroCoders.scala +++ b/scio-core/src/main/scala/com/spotify/scio/coders/instances/AvroCoders.scala @@ -149,23 +149,18 @@ trait AvroCoders { throw new RuntimeException(msg) } + // same as SpecificRecordDatumFactory in scio-avro val factory = new AvroDatumFactory(clazz) { override def apply(writer: Schema, reader: Schema): DatumReader[T] = { - // create the datum writer using the schema api - // class API might be unsafe. See schemaForClass - val datumReader = new ReflectDatumReader[T](writer, reader, new ReflectData()) - // for backward compat, add logical type support by default - AvroUtils.addLogicalTypeConversions(datumReader.getData) - datumReader + val data = new ReflectData(clazz.getClassLoader) + AvroUtils.addLogicalTypeConversions(data) + new ReflectDatumReader[T](writer, reader, data) } override def apply(writer: Schema): DatumWriter[T] = { - // create the datum writer using the schema api - // class API might be unsafe. See schemaForClass - val datumWriter = new ReflectDatumWriter[T](writer, new ReflectData()) - // for backward compat, add logical type support by default - AvroUtils.addLogicalTypeConversions(datumWriter.getData) - datumWriter + val data = new ReflectData(clazz.getClassLoader) + AvroUtils.addLogicalTypeConversions(data) + new ReflectDatumWriter[T](writer, data) } } diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroFileOperations.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroFileOperations.java index dc174f6b11..42674829d1 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroFileOperations.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroFileOperations.java @@ -31,6 +31,7 @@ import org.apache.avro.io.DatumReader; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.avro.io.AvroIO; @@ -121,7 +122,16 @@ public GenericRecord formatRecord(ValueT element, Schema schema) { } }) .withCodec(codec.getCodec()) - : AvroIO.sink(recordClass).withCodec(codec.getCodec()); + : AvroIO.sink(recordClass) + .withCodec(codec.getCodec()) + .withDatumWriterFactory( + (writer) -> { + // same as SpecificRecordDatumFactory in scio-avro + ReflectData data = new ReflectData(recordClass.getClassLoader()); + org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils + .addLogicalTypeConversions(data); + return new ReflectDatumWriter<>(writer, data); + }); if (metadata != null) { return sink.withMetadata(metadata); @@ -193,10 +203,15 @@ private static class AvroReader extends FileOperations.Reader { public void prepareRead(ReadableByteChannel channel) throws IOException { final Schema schema = schemaSupplier.get(); - DatumReader datumReader = - recordClass == null - ? new GenericDatumReader<>(schema) - : new ReflectDatumReader<>(recordClass); + DatumReader datumReader; + if (recordClass == null) { + datumReader = new GenericDatumReader<>(schema); + } else { + // same as SpecificRecordDatumFactory in scio-avro + ReflectData data = new ReflectData(recordClass.getClassLoader()); + org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils.addLogicalTypeConversions(data); + datumReader = new ReflectDatumReader<>(data); + } reader = new DataFileStream<>(Channels.newInputStream(channel), datumReader); }