diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java index abd9bc46bd46..5ccfe39b92af 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java @@ -39,20 +39,20 @@ "nullness", // TODO(https://github.com/apache/beam/issues/20497) "rawtypes" }) -public class AutoValueSchema extends GetterBasedSchemaProvider { +public class AutoValueSchema extends GetterBasedSchemaProviderV2 { /** {@link FieldValueTypeSupplier} that's based on AutoValue getters. */ @VisibleForTesting public static class AbstractGetterTypeSupplier implements FieldValueTypeSupplier { public static final AbstractGetterTypeSupplier INSTANCE = new AbstractGetterTypeSupplier(); @Override - public List get(Class clazz) { + public List get(TypeDescriptor typeDescriptor) { // If the generated class is passed in, we want to look at the base class to find the getters. - Class targetClass = AutoValueUtils.getBaseAutoValueClass(clazz); + TypeDescriptor targetTypeDescriptor = AutoValueUtils.getBaseAutoValueClass(typeDescriptor); List methods = - ReflectUtils.getMethods(targetClass).stream() + ReflectUtils.getMethods(targetTypeDescriptor.getRawType()).stream() .filter(ReflectUtils::isGetter) // All AutoValue getters are marked abstract. .filter(m -> Modifier.isAbstract(m.getModifiers())) @@ -89,9 +89,10 @@ private static void validateFieldNumbers(List types) } @Override - public List fieldValueGetters(Class targetClass, Schema schema) { + public List fieldValueGetters( + TypeDescriptor targetTypeDescriptor, Schema schema) { return JavaBeanUtils.getGetters( - targetClass, + targetTypeDescriptor, schema, AbstractGetterTypeSupplier.INSTANCE, new DefaultTypeConversionsFactory()); @@ -99,17 +100,19 @@ public List fieldValueGetters(Class targetClass, Schema sch @Override public List fieldValueTypeInformations( - Class targetClass, Schema schema) { - return JavaBeanUtils.getFieldTypes(targetClass, schema, AbstractGetterTypeSupplier.INSTANCE); + TypeDescriptor targetTypeDescriptor, Schema schema) { + return JavaBeanUtils.getFieldTypes( + targetTypeDescriptor, schema, AbstractGetterTypeSupplier.INSTANCE); } @Override - public SchemaUserTypeCreator schemaTypeCreator(Class targetClass, Schema schema) { + public SchemaUserTypeCreator schemaTypeCreator( + TypeDescriptor targetTypeDescriptor, Schema schema) { // If a static method is marked with @SchemaCreate, use that. - Method annotated = ReflectUtils.getAnnotatedCreateMethod(targetClass); + Method annotated = ReflectUtils.getAnnotatedCreateMethod(targetTypeDescriptor.getRawType()); if (annotated != null) { return JavaBeanUtils.getStaticCreator( - targetClass, + targetTypeDescriptor, annotated, schema, AbstractGetterTypeSupplier.INSTANCE, @@ -119,7 +122,8 @@ public SchemaUserTypeCreator schemaTypeCreator(Class targetClass, Schema sche // Try to find a generated builder class. If one exists, use that to generate a // SchemaTypeCreator for creating AutoValue objects. SchemaUserTypeCreator creatorFactory = - AutoValueUtils.getBuilderCreator(targetClass, schema, AbstractGetterTypeSupplier.INSTANCE); + AutoValueUtils.getBuilderCreator( + targetTypeDescriptor.getRawType(), schema, AbstractGetterTypeSupplier.INSTANCE); if (creatorFactory != null) { return creatorFactory; } @@ -128,9 +132,10 @@ public SchemaUserTypeCreator schemaTypeCreator(Class targetClass, Schema sche // class. Use that for creating AutoValue objects. creatorFactory = AutoValueUtils.getConstructorCreator( - targetClass, schema, AbstractGetterTypeSupplier.INSTANCE); + targetTypeDescriptor, schema, AbstractGetterTypeSupplier.INSTANCE); if (creatorFactory == null) { - throw new RuntimeException("Could not find a way to create AutoValue class " + targetClass); + throw new RuntimeException( + "Could not find a way to create AutoValue class " + targetTypeDescriptor); } return creatorFactory; @@ -139,6 +144,6 @@ public SchemaUserTypeCreator schemaTypeCreator(Class targetClass, Schema sche @Override public @Nullable Schema schemaFor(TypeDescriptor typeDescriptor) { return JavaBeanUtils.schemaFromJavaBeanClass( - typeDescriptor.getRawType(), AbstractGetterTypeSupplier.INSTANCE); + typeDescriptor, AbstractGetterTypeSupplier.INSTANCE); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/CachingFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/CachingFactory.java index 2c140bd1dfef..8725833bc1da 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/CachingFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/CachingFactory.java @@ -19,6 +19,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.sdk.values.TypeDescriptor; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -36,7 +37,7 @@ "rawtypes" }) public class CachingFactory implements Factory { - private transient @Nullable ConcurrentHashMap cache = null; + private transient @Nullable ConcurrentHashMap, CreatedT> cache = null; private final Factory innerFactory; @@ -45,16 +46,16 @@ public CachingFactory(Factory innerFactory) { } @Override - public CreatedT create(Class clazz, Schema schema) { + public CreatedT create(TypeDescriptor typeDescriptor, Schema schema) { if (cache == null) { cache = new ConcurrentHashMap<>(); } - CreatedT cached = cache.get(clazz); + CreatedT cached = cache.get(typeDescriptor); if (cached != null) { return cached; } - cached = innerFactory.create(clazz, schema); - cache.put(clazz, cached); + cached = innerFactory.create(typeDescriptor, schema); + cache.put(typeDescriptor, cached); return cached; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Factory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Factory.java index f9da36b97c77..f302f20cfb64 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Factory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Factory.java @@ -19,9 +19,10 @@ import java.io.Serializable; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.values.TypeDescriptor; /** A Factory interface for schema-related objects for a specific Java type. */ @Internal public interface Factory extends Serializable { - T create(Class clazz, Schema schema); + T create(TypeDescriptor typeDescriptor, Schema schema); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java index 53c098599c36..b839a19a8177 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.RowWithGetters; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Collections2; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; @@ -47,23 +48,28 @@ "rawtypes" }) class FromRowUsingCreator implements SerializableFunction, Function { - private final Class clazz; + private final TypeDescriptor typeDescriptor; private final GetterBasedSchemaProvider schemaProvider; private final Factory schemaTypeCreatorFactory; @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") private transient @MonotonicNonNull Function[] fieldConverters; - public FromRowUsingCreator(Class clazz, GetterBasedSchemaProvider schemaProvider) { - this(clazz, schemaProvider, new CachingFactory<>(schemaProvider::schemaTypeCreator), null); + public FromRowUsingCreator( + TypeDescriptor typeDescriptor, GetterBasedSchemaProvider schemaProvider) { + this( + typeDescriptor, + schemaProvider, + new CachingFactory<>(schemaProvider::schemaTypeCreator), + null); } private FromRowUsingCreator( - Class clazz, + TypeDescriptor typeDescriptor, GetterBasedSchemaProvider schemaProvider, Factory schemaTypeCreatorFactory, @Nullable Function[] fieldConverters) { - this.clazz = clazz; + this.typeDescriptor = typeDescriptor; this.schemaProvider = schemaProvider; this.schemaTypeCreatorFactory = schemaTypeCreatorFactory; this.fieldConverters = fieldConverters; @@ -77,7 +83,7 @@ public T apply(Row row) { } if (row instanceof RowWithGetters) { Object target = ((RowWithGetters) row).getGetterTarget(); - if (target.getClass().equals(clazz)) { + if (target.getClass().equals(typeDescriptor.getRawType())) { // Efficient path: simply extract the underlying object instead of creating a new one. return (T) target; } @@ -91,7 +97,8 @@ public T apply(Row row) { for (int i = 0; i < row.getFieldCount(); ++i) { params[i] = fieldConverters[i].apply(row.getValue(i)); } - SchemaUserTypeCreator creator = schemaTypeCreatorFactory.create(clazz, row.getSchema()); + SchemaUserTypeCreator creator = + schemaTypeCreatorFactory.create(typeDescriptor, row.getSchema()); return (T) creator.create(params); } @@ -99,13 +106,15 @@ private synchronized void initFieldConverters(Schema schema) { if (fieldConverters == null) { CachingFactory> typeFactory = new CachingFactory<>(schemaProvider::fieldValueTypeInformations); - fieldConverters = fieldConverters(clazz, schema, typeFactory); + fieldConverters = fieldConverters(typeDescriptor, schema, typeFactory); } } private Function[] fieldConverters( - Class clazz, Schema schema, Factory> typeFactory) { - List typeInfos = typeFactory.create(clazz, schema); + TypeDescriptor typeDescriptor, + Schema schema, + Factory> typeFactory) { + List typeInfos = typeFactory.create(typeDescriptor, schema); checkState( typeInfos.size() == schema.getFieldCount(), "Did not have a matching number of type informations and fields."); @@ -133,10 +142,9 @@ private Function fieldConverter( if (!needsConversion(type)) { return FieldConverter.IDENTITY; } else if (TypeName.ROW.equals(type.getTypeName())) { - Function[] converters = - fieldConverters(typeInfo.getRawType(), type.getRowSchema(), typeFactory); + Function[] converters = fieldConverters(typeInfo.getType(), type.getRowSchema(), typeFactory); return new FromRowUsingCreator( - typeInfo.getRawType(), schemaProvider, schemaTypeCreatorFactory, converters); + typeInfo.getType(), schemaProvider, schemaTypeCreatorFactory, converters); } else if (TypeName.ARRAY.equals(type.getTypeName())) { return new ConvertCollection( fieldConverter(type.getCollectionElementType(), typeInfo.getElementType(), typeFactory)); @@ -271,11 +279,11 @@ public boolean equals(@Nullable Object o) { return false; } FromRowUsingCreator that = (FromRowUsingCreator) o; - return clazz.equals(that.clazz) && schemaProvider.equals(that.schemaProvider); + return typeDescriptor.equals(that.typeDescriptor) && schemaProvider.equals(that.schemaProvider); } @Override public int hashCode() { - return Objects.hash(clazz, schemaProvider); + return Objects.hash(typeDescriptor, schemaProvider); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java index 2b697bebd815..ce5be71933b8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java @@ -41,22 +41,77 @@ /** * A {@link SchemaProvider} base class that vends schemas and rows based on {@link * FieldValueGetter}s. + * + * @deprecated new implementations should extend the {@link GetterBasedSchemaProviderV2} class' + * methods which receive {@link TypeDescriptor}s instead of ordinary {@link Class}es as + * arguments, which permits to support generic type signatures during schema inference */ @SuppressWarnings({ "nullness", // TODO(https://github.com/apache/beam/issues/20497) "rawtypes" }) +@Deprecated public abstract class GetterBasedSchemaProvider implements SchemaProvider { - /** Implementing class should override to return FieldValueGetters. */ + + /** + * Implementing class should override to return FieldValueGetters. + * + * @deprecated new implementations should override {@link #fieldValueGetters(TypeDescriptor, + * Schema)} and make this method throw an {@link UnsupportedOperationException} + */ + @Deprecated public abstract List fieldValueGetters(Class targetClass, Schema schema); - /** Implementing class should override to return a list of type-informations. */ + /** + * Delegates to the {@link #fieldValueGetters(Class, Schema)} for backwards compatibility, + * override it if you want to use the richer type signature contained in the {@link + * TypeDescriptor} not subject to the type erasure. + */ + public List fieldValueGetters( + TypeDescriptor targetTypeDescriptor, Schema schema) { + return fieldValueGetters(targetTypeDescriptor.getRawType(), schema); + } + + /** + * Implementing class should override to return a list of type-informations. + * + * @deprecated new implementations should override {@link + * #fieldValueTypeInformations(TypeDescriptor, Schema)} and make this method throw an {@link + * UnsupportedOperationException} + */ + @Deprecated public abstract List fieldValueTypeInformations( Class targetClass, Schema schema); - /** Implementing class should override to return a constructor. */ + /** + * Delegates to the {@link #fieldValueTypeInformations(Class, Schema)} for backwards + * compatibility, override it if you want to use the richer type signature contained in the {@link + * TypeDescriptor} not subject to the type erasure. + */ + public List fieldValueTypeInformations( + TypeDescriptor targetTypeDescriptor, Schema schema) { + return fieldValueTypeInformations(targetTypeDescriptor.getRawType(), schema); + } + + /** + * Implementing class should override to return a constructor. + * + * @deprecated new implementations should override {@link #schemaTypeCreator(TypeDescriptor, + * Schema)} and make this method throw an {@link UnsupportedOperationException} + */ + @Deprecated public abstract SchemaUserTypeCreator schemaTypeCreator(Class targetClass, Schema schema); + /** + * Delegates to the {@link #schemaTypeCreator(Class, Schema)} for backwards compatibility, + * override it if you want to use the richer type signature contained in the {@link + * TypeDescriptor} not subject to the type erasure. + */ + public SchemaUserTypeCreator schemaTypeCreator( + TypeDescriptor targetTypeDescriptor, Schema schema) { + return schemaTypeCreator(targetTypeDescriptor.getRawType(), schema); + } + private class ToRowWithValueGetters implements SerializableFunction { private final Schema schema; private final Factory> getterFactory; @@ -113,8 +168,7 @@ public SerializableFunction toRowFunction(TypeDescriptor typeDesc @Override @SuppressWarnings("unchecked") public SerializableFunction fromRowFunction(TypeDescriptor typeDescriptor) { - Class clazz = (Class) typeDescriptor.getType(); - return new FromRowUsingCreator<>(clazz, this); + return new FromRowUsingCreator<>(typeDescriptor, this); } @Override @@ -141,8 +195,8 @@ static Factory> of(Factory> getter } @Override - public List create(Class clazz, Schema schema) { - List getters = gettersFactory.create(clazz, schema); + public List create(TypeDescriptor typeDescriptor, Schema schema) { + List getters = gettersFactory.create(typeDescriptor, schema); List rowGetters = new ArrayList<>(getters.size()); for (int i = 0; i < getters.size(); i++) { rowGetters.add(rowValueGetter(getters.get(i), schema.getField(i).getType())); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProviderV2.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProviderV2.java new file mode 100644 index 000000000000..de31f9947c36 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProviderV2.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas; + +import java.util.List; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * A newer version of {@link GetterBasedSchemaProvider}, which works with {@link TypeDescriptor}s, + * and which by default delegates the old, {@link Class} based methods, to the new ones. + */ +@SuppressWarnings("rawtypes") +public abstract class GetterBasedSchemaProviderV2 extends GetterBasedSchemaProvider { + @Override + public List fieldValueGetters(Class targetClass, Schema schema) { + return fieldValueGetters(TypeDescriptor.of(targetClass), schema); + } + + @Override + public abstract List fieldValueGetters( + TypeDescriptor targetTypeDescriptor, Schema schema); + + @Override + public List fieldValueTypeInformations( + Class targetClass, Schema schema) { + return fieldValueTypeInformations(TypeDescriptor.of(targetClass), schema); + } + + @Override + public abstract List fieldValueTypeInformations( + TypeDescriptor targetTypeDescriptor, Schema schema); + + @Override + public SchemaUserTypeCreator schemaTypeCreator(Class targetClass, Schema schema) { + return schemaTypeCreator(TypeDescriptor.of(targetClass), schema); + } + + @Override + public abstract SchemaUserTypeCreator schemaTypeCreator( + TypeDescriptor targetTypeDescriptor, Schema schema); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java index 7024e8be86cf..a9cf01c52057 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java @@ -53,16 +53,16 @@ "nullness", // TODO(https://github.com/apache/beam/issues/20497) "rawtypes" }) -public class JavaBeanSchema extends GetterBasedSchemaProvider { +public class JavaBeanSchema extends GetterBasedSchemaProviderV2 { /** {@link FieldValueTypeSupplier} that's based on getter methods. */ @VisibleForTesting public static class GetterTypeSupplier implements FieldValueTypeSupplier { public static final GetterTypeSupplier INSTANCE = new GetterTypeSupplier(); @Override - public List get(Class clazz) { + public List get(TypeDescriptor typeDescriptor) { List methods = - ReflectUtils.getMethods(clazz).stream() + ReflectUtils.getMethods(typeDescriptor.getRawType()).stream() .filter(ReflectUtils::isGetter) .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class)) .collect(Collectors.toList()); @@ -110,8 +110,8 @@ public static class SetterTypeSupplier implements FieldValueTypeSupplier { private static final SetterTypeSupplier INSTANCE = new SetterTypeSupplier(); @Override - public List get(Class clazz) { - return ReflectUtils.getMethods(clazz).stream() + public List get(TypeDescriptor typeDescriptor) { + return ReflectUtils.getMethods(typeDescriptor.getRawType()).stream() .filter(ReflectUtils::isSetter) .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class)) .map(FieldValueTypeInformation::forSetter) @@ -120,19 +120,22 @@ public List get(Class clazz) { if (t.getMethod().getAnnotation(SchemaFieldNumber.class) != null) { throw new RuntimeException( String.format( - "@SchemaFieldNumber can only be used on getters in Java Beans. Found on setter '%s'", + "@SchemaFieldNumber can only be used on getters in Java Beans. Found on" + + " setter '%s'", t.getMethod().getName())); } if (t.getMethod().getAnnotation(SchemaFieldName.class) != null) { throw new RuntimeException( String.format( - "@SchemaFieldName can only be used on getters in Java Beans. Found on setter '%s'", + "@SchemaFieldName can only be used on getters in Java Beans. Found on" + + " setter '%s'", t.getMethod().getName())); } if (t.getMethod().getAnnotation(SchemaCaseFormat.class) != null) { throw new RuntimeException( String.format( - "@SchemaCaseFormat can only be used on getters in Java Beans. Found on setter '%s'", + "@SchemaCaseFormat can only be used on getters in Java Beans. Found on" + + " setter '%s'", t.getMethod().getName())); } return t; @@ -154,40 +157,44 @@ public boolean equals(@Nullable Object obj) { @Override public Schema schemaFor(TypeDescriptor typeDescriptor) { Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass( - typeDescriptor.getRawType(), GetterTypeSupplier.INSTANCE); + JavaBeanUtils.schemaFromJavaBeanClass(typeDescriptor, GetterTypeSupplier.INSTANCE); // If there are no creator methods, then validate that we have setters for every field. // Otherwise, we will have no way of creating instances of the class. if (ReflectUtils.getAnnotatedCreateMethod(typeDescriptor.getRawType()) == null && ReflectUtils.getAnnotatedConstructor(typeDescriptor.getRawType()) == null) { JavaBeanUtils.validateJavaBean( - GetterTypeSupplier.INSTANCE.get(typeDescriptor.getRawType(), schema), - SetterTypeSupplier.INSTANCE.get(typeDescriptor.getRawType(), schema), + GetterTypeSupplier.INSTANCE.get(typeDescriptor, schema), + SetterTypeSupplier.INSTANCE.get(typeDescriptor, schema), schema); } return schema; } @Override - public List fieldValueGetters(Class targetClass, Schema schema) { + public List fieldValueGetters( + TypeDescriptor targetTypeDescriptor, Schema schema) { return JavaBeanUtils.getGetters( - targetClass, schema, GetterTypeSupplier.INSTANCE, new DefaultTypeConversionsFactory()); + targetTypeDescriptor, + schema, + GetterTypeSupplier.INSTANCE, + new DefaultTypeConversionsFactory()); } @Override public List fieldValueTypeInformations( - Class targetClass, Schema schema) { - return JavaBeanUtils.getFieldTypes(targetClass, schema, GetterTypeSupplier.INSTANCE); + TypeDescriptor targetTypeDescriptor, Schema schema) { + return JavaBeanUtils.getFieldTypes(targetTypeDescriptor, schema, GetterTypeSupplier.INSTANCE); } @Override - public SchemaUserTypeCreator schemaTypeCreator(Class targetClass, Schema schema) { + public SchemaUserTypeCreator schemaTypeCreator( + TypeDescriptor targetTypeDescriptor, Schema schema) { // If a static method is marked with @SchemaCreate, use that. - Method annotated = ReflectUtils.getAnnotatedCreateMethod(targetClass); + Method annotated = ReflectUtils.getAnnotatedCreateMethod(targetTypeDescriptor.getRawType()); if (annotated != null) { return JavaBeanUtils.getStaticCreator( - targetClass, + targetTypeDescriptor, annotated, schema, GetterTypeSupplier.INSTANCE, @@ -195,10 +202,11 @@ public SchemaUserTypeCreator schemaTypeCreator(Class targetClass, Schema sche } // If a Constructor was tagged with @SchemaCreate, invoke that constructor. - Constructor constructor = ReflectUtils.getAnnotatedConstructor(targetClass); + Constructor constructor = + ReflectUtils.getAnnotatedConstructor(targetTypeDescriptor.getRawType()); if (constructor != null) { return JavaBeanUtils.getConstructorCreator( - targetClass, + targetTypeDescriptor, constructor, schema, GetterTypeSupplier.INSTANCE, @@ -208,15 +216,18 @@ public SchemaUserTypeCreator schemaTypeCreator(Class targetClass, Schema sche // Else try to make a setter-based creator Factory setterBasedFactory = new SetterBasedCreatorFactory(new JavaBeanSetterFactory()); - return setterBasedFactory.create(targetClass, schema); + return setterBasedFactory.create(targetTypeDescriptor, schema); } /** A factory for creating {@link FieldValueSetter} objects for a JavaBean object. */ private static class JavaBeanSetterFactory implements Factory> { @Override - public List create(Class targetClass, Schema schema) { + public List create(TypeDescriptor targetTypeDescriptor, Schema schema) { return JavaBeanUtils.getSetters( - targetClass, schema, SetterTypeSupplier.INSTANCE, new DefaultTypeConversionsFactory()); + targetTypeDescriptor, + schema, + SetterTypeSupplier.INSTANCE, + new DefaultTypeConversionsFactory()); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java index 16b96f1c7ae1..21f07c47b47f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java @@ -50,16 +50,16 @@ * on the schema. */ @SuppressWarnings({"nullness", "rawtypes"}) -public class JavaFieldSchema extends GetterBasedSchemaProvider { +public class JavaFieldSchema extends GetterBasedSchemaProviderV2 { /** {@link FieldValueTypeSupplier} that's based on public fields. */ @VisibleForTesting public static class JavaFieldTypeSupplier implements FieldValueTypeSupplier { public static final JavaFieldTypeSupplier INSTANCE = new JavaFieldTypeSupplier(); @Override - public List get(Class clazz) { + public List get(TypeDescriptor typeDescriptor) { List fields = - ReflectUtils.getFields(clazz).stream() + ReflectUtils.getFields(typeDescriptor.getRawType()).stream() .filter(m -> !m.isAnnotationPresent(SchemaIgnore.class)) .collect(Collectors.toList()); List types = Lists.newArrayListWithCapacity(fields.size()); @@ -71,8 +71,8 @@ public List get(Class clazz) { // If there are no creators registered, then make sure none of the schema fields are final, // as we (currently) have no way of creating classes in this case. - if (ReflectUtils.getAnnotatedCreateMethod(clazz) == null - && ReflectUtils.getAnnotatedConstructor(clazz) == null) { + if (ReflectUtils.getAnnotatedCreateMethod(typeDescriptor.getRawType()) == null + && ReflectUtils.getAnnotatedConstructor(typeDescriptor.getRawType()) == null) { Optional finalField = types.stream() .map(FieldValueTypeInformation::getField) @@ -81,7 +81,7 @@ public List get(Class clazz) { if (finalField.isPresent()) { throw new IllegalArgumentException( "Class " - + clazz + + typeDescriptor + " has final fields and no " + "registered creator. Cannot use as schema, as we don't know how to create this " + "object automatically"); @@ -111,29 +111,33 @@ private static void validateFieldNumbers(List types) @Override public Schema schemaFor(TypeDescriptor typeDescriptor) { - return POJOUtils.schemaFromPojoClass( - typeDescriptor.getRawType(), JavaFieldTypeSupplier.INSTANCE); + return POJOUtils.schemaFromPojoClass(typeDescriptor, JavaFieldTypeSupplier.INSTANCE); } @Override - public List fieldValueGetters(Class targetClass, Schema schema) { + public List fieldValueGetters( + TypeDescriptor targetTypeDescriptor, Schema schema) { return POJOUtils.getGetters( - targetClass, schema, JavaFieldTypeSupplier.INSTANCE, new DefaultTypeConversionsFactory()); + targetTypeDescriptor, + schema, + JavaFieldTypeSupplier.INSTANCE, + new DefaultTypeConversionsFactory()); } @Override public List fieldValueTypeInformations( - Class targetClass, Schema schema) { - return POJOUtils.getFieldTypes(targetClass, schema, JavaFieldTypeSupplier.INSTANCE); + TypeDescriptor targetTypeDescriptor, Schema schema) { + return POJOUtils.getFieldTypes(targetTypeDescriptor, schema, JavaFieldTypeSupplier.INSTANCE); } @Override - public SchemaUserTypeCreator schemaTypeCreator(Class targetClass, Schema schema) { + public SchemaUserTypeCreator schemaTypeCreator( + TypeDescriptor targetTypeDescriptor, Schema schema) { // If a static method is marked with @SchemaCreate, use that. - Method annotated = ReflectUtils.getAnnotatedCreateMethod(targetClass); + Method annotated = ReflectUtils.getAnnotatedCreateMethod(targetTypeDescriptor.getRawType()); if (annotated != null) { return POJOUtils.getStaticCreator( - targetClass, + targetTypeDescriptor, annotated, schema, JavaFieldTypeSupplier.INSTANCE, @@ -141,10 +145,11 @@ public SchemaUserTypeCreator schemaTypeCreator(Class targetClass, Schema sche } // If a Constructor was tagged with @SchemaCreate, invoke that constructor. - Constructor constructor = ReflectUtils.getAnnotatedConstructor(targetClass); + Constructor constructor = + ReflectUtils.getAnnotatedConstructor(targetTypeDescriptor.getRawType()); if (constructor != null) { return POJOUtils.getConstructorCreator( - targetClass, + targetTypeDescriptor, constructor, schema, JavaFieldTypeSupplier.INSTANCE, @@ -152,6 +157,9 @@ public SchemaUserTypeCreator schemaTypeCreator(Class targetClass, Schema sche } return POJOUtils.getSetFieldCreator( - targetClass, schema, JavaFieldTypeSupplier.INSTANCE, new DefaultTypeConversionsFactory()); + targetTypeDescriptor, + schema, + JavaFieldTypeSupplier.INSTANCE, + new DefaultTypeConversionsFactory()); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SetterBasedCreatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SetterBasedCreatorFactory.java index 7663651ae7c9..e7ded3c52af5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SetterBasedCreatorFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SetterBasedCreatorFactory.java @@ -19,6 +19,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.List; +import org.apache.beam.sdk.values.TypeDescriptor; /** * A {@link Factory} that uses a default constructor and a list of setters to construct a {@link @@ -35,14 +36,14 @@ public SetterBasedCreatorFactory(Factory> setterFactory) } @Override - public SchemaUserTypeCreator create(Class clazz, Schema schema) { - List setters = setterFactory.create(clazz, schema); + public SchemaUserTypeCreator create(TypeDescriptor typeDescriptor, Schema schema) { + List setters = setterFactory.create(typeDescriptor, schema); return new SchemaUserTypeCreator() { @Override public Object create(Object... params) { Object object; try { - object = clazz.getDeclaredConstructor().newInstance(); + object = typeDescriptor.getRawType().getDeclaredConstructor().newInstance(); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/DefaultSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/DefaultSchema.java index ddebbeb2bffe..6f3e598f5314 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/DefaultSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/annotations/DefaultSchema.java @@ -101,7 +101,7 @@ public ProviderAndDescriptor( try { return new ProviderAndDescriptor( providerClass.getDeclaredConstructor().newInstance(), - TypeDescriptor.of(clazz)); + typeDescriptor.getSupertype((Class) clazz)); } catch (NoSuchMethodException | InstantiationException | IllegalAccessException diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java index 2ec0a9a60cd6..54e2a595fa71 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java @@ -299,7 +299,7 @@ private static FunctionAndType createFunctionFromName(String name, String path) private static class EmptyFieldValueTypeSupplier implements org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier { @Override - public List get(Class clazz) { + public List get(TypeDescriptor typeDescriptor) { return Collections.emptyList(); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java index dcbbf70888d3..d7fddd8abfed 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java @@ -71,18 +71,18 @@ "rawtypes" }) public class AutoValueUtils { - public static Class getBaseAutoValueClass(Class clazz) { + public static TypeDescriptor getBaseAutoValueClass(TypeDescriptor typeDescriptor) { // AutoValue extensions may be nested - while (clazz != null && clazz.getName().contains("AutoValue_")) { - clazz = clazz.getSuperclass(); + while (typeDescriptor != null && typeDescriptor.getRawType().getName().contains("AutoValue_")) { + typeDescriptor = TypeDescriptor.of(typeDescriptor.getRawType().getSuperclass()); } - return clazz; + return typeDescriptor; } - private static Class getAutoValueGenerated(Class clazz) { - String generatedClassName = getAutoValueGeneratedName(clazz.getName()); + private static TypeDescriptor getAutoValueGenerated(TypeDescriptor typeDescriptor) { + String generatedClassName = getAutoValueGeneratedName(typeDescriptor.getRawType().getName()); try { - return Class.forName(generatedClassName); + return TypeDescriptor.of(Class.forName(generatedClassName)); } catch (ClassNotFoundException e) { throw new IllegalStateException("AutoValue generated class not found: " + generatedClassName); } @@ -121,11 +121,14 @@ private static String getAutoValueGeneratedName(String baseClass) { * Try to find an accessible constructor for creating an AutoValue class. Otherwise return null. */ public static @Nullable SchemaUserTypeCreator getConstructorCreator( - Class clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { - Class generatedClass = getAutoValueGenerated(clazz); - List schemaTypes = fieldValueTypeSupplier.get(clazz, schema); + TypeDescriptor typeDescriptor, + Schema schema, + FieldValueTypeSupplier fieldValueTypeSupplier) { + TypeDescriptor generatedTypeDescriptor = getAutoValueGenerated(typeDescriptor); + List schemaTypes = + fieldValueTypeSupplier.get(typeDescriptor, schema); Optional> constructor = - Arrays.stream(generatedClass.getDeclaredConstructors()) + Arrays.stream(generatedTypeDescriptor.getRawType().getDeclaredConstructors()) .filter(c -> !Modifier.isPrivate(c.getModifiers())) .filter(c -> matchConstructor(c, schemaTypes)) .findAny(); @@ -133,7 +136,7 @@ private static String getAutoValueGeneratedName(String baseClass) { .map( c -> JavaBeanUtils.getConstructorCreator( - generatedClass, + generatedTypeDescriptor, c, schema, fieldValueTypeSupplier, @@ -201,7 +204,8 @@ private static boolean matchConstructor( List setterMethods = Lists.newArrayList(); // The builder methods to call in order. - List schemaTypes = fieldValueTypeSupplier.get(clazz, schema); + List schemaTypes = + fieldValueTypeSupplier.get(TypeDescriptor.of(clazz), schema); for (FieldValueTypeInformation type : schemaTypes) { String autoValueFieldName = ReflectUtils.stripGetterPrefix(type.getMethod().getName()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java index d93456b21949..693997f64aa0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.beam.sdk.schemas.FieldValueTypeInformation; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.TypeDescriptor; /** * A naming policy for schema fields. This maps a name from the class (field name or getter name) to @@ -28,7 +29,7 @@ */ public interface FieldValueTypeSupplier extends Serializable { /** Return all the FieldValueTypeInformations. */ - List get(Class clazz); + List get(TypeDescriptor typeDescriptor); /** * Return all the FieldValueTypeInformations. @@ -36,7 +37,7 @@ public interface FieldValueTypeSupplier extends Serializable { *

If the schema parameter is not null, then the returned list must be in the same order as * fields in the schema. */ - default List get(Class clazz, Schema schema) { - return StaticSchemaInference.sortBySchema(get(clazz), schema); + default List get(TypeDescriptor typeDescriptor, Schema schema) { + return StaticSchemaInference.sortBySchema(get(typeDescriptor), schema); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java index 6573f25c66e2..911f79f6eeed 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java @@ -51,8 +51,9 @@ import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.InjectPackageStrategy; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.StaticFactoryMethodInstruction; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversionsFactory; -import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema; +import org.apache.beam.sdk.schemas.utils.ReflectUtils.TypeDescriptorWithSchema; import org.apache.beam.sdk.util.common.ReflectHelpers; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; /** A set of utilities to generate getter and setter classes for JavaBean objects. */ @@ -63,8 +64,8 @@ public class JavaBeanUtils { /** Create a {@link Schema} for a Java Bean class. */ public static Schema schemaFromJavaBeanClass( - Class clazz, FieldValueTypeSupplier fieldValueTypeSupplier) { - return StaticSchemaInference.schemaFromClass(clazz, fieldValueTypeSupplier); + TypeDescriptor typeDescriptor, FieldValueTypeSupplier fieldValueTypeSupplier) { + return StaticSchemaInference.schemaFromClass(typeDescriptor, fieldValueTypeSupplier); } private static final String CONSTRUCTOR_HELP_STRING = @@ -111,18 +112,21 @@ public static void validateJavaBean( // Static ByteBuddy instance used by all helpers. private static final ByteBuddy BYTE_BUDDY = new ByteBuddy(); - private static final Map> CACHED_FIELD_TYPES = - Maps.newConcurrentMap(); + private static final Map, List> + CACHED_FIELD_TYPES = Maps.newConcurrentMap(); public static List getFieldTypes( - Class clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { + TypeDescriptor typeDescriptor, + Schema schema, + FieldValueTypeSupplier fieldValueTypeSupplier) { return CACHED_FIELD_TYPES.computeIfAbsent( - ClassWithSchema.create(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema)); + TypeDescriptorWithSchema.create(typeDescriptor, schema), + c -> fieldValueTypeSupplier.get(typeDescriptor, schema)); } // The list of getters for a class is cached, so we only create the classes the first time // getSetters is called. - private static final Map> CACHED_GETTERS = + private static final Map, List> CACHED_GETTERS = Maps.newConcurrentMap(); /** @@ -131,14 +135,15 @@ public static List getFieldTypes( *

The returned list is ordered by the order of fields in the schema. */ public static List getGetters( - Class clazz, + TypeDescriptor typeDescriptor, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier, TypeConversionsFactory typeConversionsFactory) { return CACHED_GETTERS.computeIfAbsent( - ClassWithSchema.create(clazz, schema), + TypeDescriptorWithSchema.create(typeDescriptor, schema), c -> { - List types = fieldValueTypeSupplier.get(clazz, schema); + List types = + fieldValueTypeSupplier.get(typeDescriptor, schema); return types.stream() .map(t -> createGetter(t, typeConversionsFactory)) .collect(Collectors.toList()); @@ -186,7 +191,7 @@ private static DynamicType.Builder implementGetterMethods( // The list of setters for a class is cached, so we only create the classes the first time // getSetters is called. - private static final Map> CACHED_SETTERS = + private static final Map, List> CACHED_SETTERS = Maps.newConcurrentMap(); /** @@ -195,14 +200,15 @@ private static DynamicType.Builder implementGetterMethods( *

The returned list is ordered by the order of fields in the schema. */ public static List getSetters( - Class clazz, + TypeDescriptor typeDescriptor, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier, TypeConversionsFactory typeConversionsFactory) { return CACHED_SETTERS.computeIfAbsent( - ClassWithSchema.create(clazz, schema), + TypeDescriptorWithSchema.create(typeDescriptor, schema), c -> { - List types = fieldValueTypeSupplier.get(clazz, schema); + List types = + fieldValueTypeSupplier.get(typeDescriptor, schema); return types.stream() .map(t -> createSetter(t, typeConversionsFactory)) .collect(Collectors.toList()); @@ -250,21 +256,22 @@ private static DynamicType.Builder implementSetterMethods( // The list of constructors for a class is cached, so we only create the classes the first time // getConstructor is called. - public static final Map CACHED_CREATORS = + public static final Map, SchemaUserTypeCreator> CACHED_CREATORS = Maps.newConcurrentMap(); public static SchemaUserTypeCreator getConstructorCreator( - Class clazz, + TypeDescriptor typeDescriptor, Constructor constructor, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier, TypeConversionsFactory typeConversionsFactory) { return CACHED_CREATORS.computeIfAbsent( - ClassWithSchema.create(clazz, schema), + TypeDescriptorWithSchema.create(typeDescriptor, schema), c -> { - List types = fieldValueTypeSupplier.get(clazz, schema); + List types = + fieldValueTypeSupplier.get(typeDescriptor, schema); return createConstructorCreator( - clazz, constructor, schema, types, typeConversionsFactory); + typeDescriptor.getRawType(), constructor, schema, types, typeConversionsFactory); }); } @@ -302,16 +309,18 @@ public static SchemaUserTypeCreator createConstructorCreator( } public static SchemaUserTypeCreator getStaticCreator( - Class clazz, + TypeDescriptor typeDescriptor, Method creator, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier, TypeConversionsFactory typeConversionsFactory) { return CACHED_CREATORS.computeIfAbsent( - ClassWithSchema.create(clazz, schema), + TypeDescriptorWithSchema.create(typeDescriptor, schema), c -> { - List types = fieldValueTypeSupplier.get(clazz, schema); - return createStaticCreator(clazz, creator, schema, types, typeConversionsFactory); + List types = + fieldValueTypeSupplier.get(typeDescriptor, schema); + return createStaticCreator( + typeDescriptor.getRawType(), creator, schema, types, typeConversionsFactory); }); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java index 93875a20707f..571b9c690900 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java @@ -59,7 +59,7 @@ import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.StaticFactoryMethodInstruction; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversion; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversionsFactory; -import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema; +import org.apache.beam.sdk.schemas.utils.ReflectUtils.TypeDescriptorWithSchema; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -71,45 +71,53 @@ "rawtypes" // TODO(https://github.com/apache/beam/issues/20447) }) public class POJOUtils { + public static Schema schemaFromPojoClass( - Class clazz, FieldValueTypeSupplier fieldValueTypeSupplier) { - return StaticSchemaInference.schemaFromClass(clazz, fieldValueTypeSupplier); + TypeDescriptor typeDescriptor, FieldValueTypeSupplier fieldValueTypeSupplier) { + return StaticSchemaInference.schemaFromClass(typeDescriptor, fieldValueTypeSupplier); } // Static ByteBuddy instance used by all helpers. private static final ByteBuddy BYTE_BUDDY = new ByteBuddy(); - private static final Map> CACHED_FIELD_TYPES = - Maps.newConcurrentMap(); + private static final Map, List> + CACHED_FIELD_TYPES = Maps.newConcurrentMap(); public static List getFieldTypes( - Class clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { + TypeDescriptor typeDescriptor, + Schema schema, + FieldValueTypeSupplier fieldValueTypeSupplier) { return CACHED_FIELD_TYPES.computeIfAbsent( - ClassWithSchema.create(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema)); + TypeDescriptorWithSchema.create(typeDescriptor, schema), + c -> fieldValueTypeSupplier.get(typeDescriptor, schema)); } // The list of getters for a class is cached, so we only create the classes the first time // getSetters is called. - private static final Map> CACHED_GETTERS = + private static final Map> CACHED_GETTERS = Maps.newConcurrentMap(); public static List getGetters( - Class clazz, + TypeDescriptor typeDescriptor, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier, TypeConversionsFactory typeConversionsFactory) { // Return the getters ordered by their position in the schema. return CACHED_GETTERS.computeIfAbsent( - ClassWithSchema.create(clazz, schema), + TypeDescriptorWithSchema.create(typeDescriptor, schema), c -> { - List types = fieldValueTypeSupplier.get(clazz, schema); + List types = + fieldValueTypeSupplier.get(typeDescriptor, schema); List getters = types.stream() .map(t -> createGetter(t, typeConversionsFactory)) .collect(Collectors.toList()); if (getters.size() != schema.getFieldCount()) { throw new RuntimeException( - "Was not able to generate getters for schema: " + schema + " class: " + clazz); + "Was not able to generate getters for schema: " + + schema + + " class: " + + typeDescriptor); } return getters; }); @@ -117,19 +125,21 @@ public static List getGetters( // The list of constructors for a class is cached, so we only create the classes the first time // getConstructor is called. - public static final Map CACHED_CREATORS = + public static final Map CACHED_CREATORS = Maps.newConcurrentMap(); public static SchemaUserTypeCreator getSetFieldCreator( - Class clazz, + TypeDescriptor typeDescriptor, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier, TypeConversionsFactory typeConversionsFactory) { return CACHED_CREATORS.computeIfAbsent( - ClassWithSchema.create(clazz, schema), + TypeDescriptorWithSchema.create(typeDescriptor, schema), c -> { - List types = fieldValueTypeSupplier.get(clazz, schema); - return createSetFieldCreator(clazz, schema, types, typeConversionsFactory); + List types = + fieldValueTypeSupplier.get(typeDescriptor, schema); + return createSetFieldCreator( + typeDescriptor.getRawType(), schema, types, typeConversionsFactory); }); } @@ -171,17 +181,18 @@ private static SchemaUserTypeCreator createSetFieldCreator( } public static SchemaUserTypeCreator getConstructorCreator( - Class clazz, + TypeDescriptor typeDescriptor, Constructor constructor, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier, TypeConversionsFactory typeConversionsFactory) { return CACHED_CREATORS.computeIfAbsent( - ClassWithSchema.create(clazz, schema), + TypeDescriptorWithSchema.create(typeDescriptor, schema), c -> { - List types = fieldValueTypeSupplier.get(clazz, schema); + List types = + fieldValueTypeSupplier.get(typeDescriptor, schema); return createConstructorCreator( - clazz, constructor, schema, types, typeConversionsFactory); + typeDescriptor.getRawType(), constructor, schema, types, typeConversionsFactory); }); } @@ -220,16 +231,18 @@ public static SchemaUserTypeCreator createConstructorCreator( } public static SchemaUserTypeCreator getStaticCreator( - Class clazz, + TypeDescriptor typeDescriptor, Method creator, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier, TypeConversionsFactory typeConversionsFactory) { return CACHED_CREATORS.computeIfAbsent( - ClassWithSchema.create(clazz, schema), + TypeDescriptorWithSchema.create(typeDescriptor, schema), c -> { - List types = fieldValueTypeSupplier.get(clazz, schema); - return createStaticCreator(clazz, creator, schema, types, typeConversionsFactory); + List types = + fieldValueTypeSupplier.get(typeDescriptor, schema); + return createStaticCreator( + typeDescriptor.getRawType(), creator, schema, types, typeConversionsFactory); }); } @@ -324,19 +337,20 @@ private static DynamicType.Builder implementGetterMethods( // The list of setters for a class is cached, so we only create the classes the first time // getSetters is called. - private static final Map> CACHED_SETTERS = + private static final Map> CACHED_SETTERS = Maps.newConcurrentMap(); public static List getSetters( - Class clazz, + TypeDescriptor typeDescriptor, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier, TypeConversionsFactory typeConversionsFactory) { // Return the setters, ordered by their position in the schema. return CACHED_SETTERS.computeIfAbsent( - ClassWithSchema.create(clazz, schema), + TypeDescriptorWithSchema.create(typeDescriptor, schema), c -> { - List types = fieldValueTypeSupplier.get(clazz, schema); + List types = + fieldValueTypeSupplier.get(typeDescriptor, schema); return types.stream() .map(t -> createSetter(t, typeConversionsFactory)) .collect(Collectors.toList()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java index f3888a5ed443..4349a04c28ad 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java @@ -63,6 +63,19 @@ public static ClassWithSchema create(Class clazz, Schema schema) { } } + /** Represents a type descriptor and a schema. */ + @AutoValue + public abstract static class TypeDescriptorWithSchema { + public abstract TypeDescriptor getTypeDescriptor(); + + public abstract Schema getSchema(); + + public static TypeDescriptorWithSchema create( + TypeDescriptor typeDescriptor, Schema schema) { + return new AutoValue_ReflectUtils_TypeDescriptorWithSchema<>(typeDescriptor, schema); + } + } + private static final Map, List> DECLARED_METHODS = Maps.newConcurrentMap(); private static final Map, Method> ANNOTATED_CONSTRUCTORS = Maps.newConcurrentMap(); private static final Map, List> DECLARED_FIELDS = Maps.newConcurrentMap(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java index 72d79adb8288..196ee6f86593 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java @@ -85,25 +85,26 @@ enum MethodType { * public getter methods, or special annotations on the class. */ public static Schema schemaFromClass( - Class clazz, FieldValueTypeSupplier fieldValueTypeSupplier) { - return schemaFromClass(clazz, fieldValueTypeSupplier, new HashMap()); + TypeDescriptor typeDescriptor, FieldValueTypeSupplier fieldValueTypeSupplier) { + return schemaFromClass(typeDescriptor, fieldValueTypeSupplier, new HashMap<>()); } private static Schema schemaFromClass( - Class clazz, + TypeDescriptor typeDescriptor, FieldValueTypeSupplier fieldValueTypeSupplier, - Map alreadyVisitedSchemas) { - if (alreadyVisitedSchemas.containsKey(clazz)) { - Schema existingSchema = alreadyVisitedSchemas.get(clazz); + Map, Schema> alreadyVisitedSchemas) { + if (alreadyVisitedSchemas.containsKey(typeDescriptor)) { + Schema existingSchema = alreadyVisitedSchemas.get(typeDescriptor); if (existingSchema == null) { throw new IllegalArgumentException( - "Cannot infer schema with a circular reference. Class: " + clazz.getTypeName()); + "Cannot infer schema with a circular reference. Class: " + + typeDescriptor.getRawType().getTypeName()); } return existingSchema; } - alreadyVisitedSchemas.put(clazz, null); + alreadyVisitedSchemas.put(typeDescriptor, null); Schema.Builder builder = Schema.builder(); - for (FieldValueTypeInformation type : fieldValueTypeSupplier.get(clazz)) { + for (FieldValueTypeInformation type : fieldValueTypeSupplier.get(typeDescriptor)) { Schema.FieldType fieldType = fieldFromType(type.getType(), fieldValueTypeSupplier, alreadyVisitedSchemas); Schema.Field f = @@ -116,21 +117,21 @@ private static Schema schemaFromClass( builder.addFields(f); } Schema generatedSchema = builder.build(); - alreadyVisitedSchemas.replace(clazz, generatedSchema); + alreadyVisitedSchemas.replace(typeDescriptor, generatedSchema); return generatedSchema; } /** Map a Java field type to a Beam Schema FieldType. */ public static Schema.FieldType fieldFromType( TypeDescriptor type, FieldValueTypeSupplier fieldValueTypeSupplier) { - return fieldFromType(type, fieldValueTypeSupplier, new HashMap()); + return fieldFromType(type, fieldValueTypeSupplier, new HashMap<>()); } // TODO(https://github.com/apache/beam/issues/21567): support type inference for logical types private static Schema.FieldType fieldFromType( TypeDescriptor type, FieldValueTypeSupplier fieldValueTypeSupplier, - Map alreadyVisitedSchemas) { + Map, Schema> alreadyVisitedSchemas) { FieldType primitiveType = PRIMITIVE_TYPES.get(type.getRawType()); if (primitiveType != null) { return primitiveType; @@ -198,8 +199,7 @@ private static Schema.FieldType fieldFromType( throw new RuntimeException("Cannot infer schema from unparameterized collection."); } } else { - return FieldType.row( - schemaFromClass(type.getRawType(), fieldValueTypeSupplier, alreadyVisitedSchemas)); + return FieldType.row(schemaFromClass(type, fieldValueTypeSupplier, alreadyVisitedSchemas)); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java index cb4d83550577..9731507fb0f6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java @@ -51,7 +51,7 @@ public class RowWithGetters extends Row { Schema schema, Factory> getterFactory, Object getterTarget) { super(schema); this.getterTarget = getterTarget; - this.getters = getterFactory.create(getterTarget.getClass(), schema); + this.getters = getterFactory.create(TypeDescriptor.of(getterTarget.getClass()), schema); } @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java index 296d4e9953ce..021e39b84849 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java @@ -52,6 +52,7 @@ import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveArrayBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveMapBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean; +import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.DateTime; import org.junit.Test; @@ -63,7 +64,8 @@ public class JavaBeanUtilsTest { @Test public void testNullable() { Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class, GetterTypeSupplier.INSTANCE); + JavaBeanUtils.schemaFromJavaBeanClass( + new TypeDescriptor() {}, GetterTypeSupplier.INSTANCE); assertTrue(schema.getField("str").getType().getNullable()); assertFalse(schema.getField("anInt").getType().getNullable()); } @@ -71,14 +73,16 @@ public void testNullable() { @Test public void testSimpleBean() { Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class, GetterTypeSupplier.INSTANCE); + JavaBeanUtils.schemaFromJavaBeanClass( + new TypeDescriptor() {}, GetterTypeSupplier.INSTANCE); SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema); } @Test public void testNestedBean() { Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class, GetterTypeSupplier.INSTANCE); + JavaBeanUtils.schemaFromJavaBeanClass( + new TypeDescriptor() {}, GetterTypeSupplier.INSTANCE); SchemaTestUtils.assertSchemaEquivalent(NESTED_BEAN_SCHEMA, schema); } @@ -86,14 +90,15 @@ public void testNestedBean() { public void testPrimitiveArray() { Schema schema = JavaBeanUtils.schemaFromJavaBeanClass( - PrimitiveArrayBean.class, GetterTypeSupplier.INSTANCE); + new TypeDescriptor() {}, GetterTypeSupplier.INSTANCE); SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_ARRAY_BEAN_SCHEMA, schema); } @Test public void testNestedArray() { Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass(NestedArrayBean.class, GetterTypeSupplier.INSTANCE); + JavaBeanUtils.schemaFromJavaBeanClass( + new TypeDescriptor() {}, GetterTypeSupplier.INSTANCE); SchemaTestUtils.assertSchemaEquivalent(NESTED_ARRAY_BEAN_SCHEMA, schema); } @@ -101,21 +106,23 @@ public void testNestedArray() { public void testNestedCollection() { Schema schema = JavaBeanUtils.schemaFromJavaBeanClass( - NestedCollectionBean.class, GetterTypeSupplier.INSTANCE); + new TypeDescriptor() {}, GetterTypeSupplier.INSTANCE); SchemaTestUtils.assertSchemaEquivalent(NESTED_COLLECTION_BEAN_SCHEMA, schema); } @Test public void testPrimitiveMap() { Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveMapBean.class, GetterTypeSupplier.INSTANCE); + JavaBeanUtils.schemaFromJavaBeanClass( + new TypeDescriptor() {}, GetterTypeSupplier.INSTANCE); SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_MAP_BEAN_SCHEMA, schema); } @Test public void testNestedMap() { Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass(NestedMapBean.class, GetterTypeSupplier.INSTANCE); + JavaBeanUtils.schemaFromJavaBeanClass( + new TypeDescriptor() {}, GetterTypeSupplier.INSTANCE); SchemaTestUtils.assertSchemaEquivalent(NESTED_MAP_BEAN_SCHEMA, schema); } @@ -137,7 +144,7 @@ public void testGeneratedSimpleGetters() { List getters = JavaBeanUtils.getGetters( - SimpleBean.class, + new TypeDescriptor() {}, SIMPLE_BEAN_SCHEMA, new JavaBeanSchema.GetterTypeSupplier(), new DefaultTypeConversionsFactory()); @@ -169,7 +176,7 @@ public void testGeneratedSimpleSetters() { SimpleBean simpleBean = new SimpleBean(); List setters = JavaBeanUtils.getSetters( - SimpleBean.class, + new TypeDescriptor() {}, SIMPLE_BEAN_SCHEMA, new SetterTypeSupplier(), new DefaultTypeConversionsFactory()); @@ -215,7 +222,7 @@ public void testGeneratedSimpleBoxedGetters() { List getters = JavaBeanUtils.getGetters( - BeanWithBoxedFields.class, + new TypeDescriptor() {}, BEAN_WITH_BOXED_FIELDS_SCHEMA, new JavaBeanSchema.GetterTypeSupplier(), new DefaultTypeConversionsFactory()); @@ -231,7 +238,7 @@ public void testGeneratedSimpleBoxedSetters() { BeanWithBoxedFields bean = new BeanWithBoxedFields(); List setters = JavaBeanUtils.getSetters( - BeanWithBoxedFields.class, + new TypeDescriptor() {}, BEAN_WITH_BOXED_FIELDS_SCHEMA, new SetterTypeSupplier(), new DefaultTypeConversionsFactory()); @@ -254,7 +261,7 @@ public void testGeneratedByteBufferSetters() { BeanWithByteArray bean = new BeanWithByteArray(); List setters = JavaBeanUtils.getSetters( - BeanWithByteArray.class, + new TypeDescriptor() {}, BEAN_WITH_BYTE_ARRAY_SCHEMA, new SetterTypeSupplier(), new DefaultTypeConversionsFactory()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java index cd1cdfeb40d2..723353ed8d15 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.schemas.utils.TestPOJOs.PrimitiveArrayPOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.PrimitiveMapPOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.SimplePOJO; +import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.DateTime; import org.joda.time.Instant; import org.junit.Test; @@ -69,20 +70,25 @@ public class POJOUtilsTest { @Test public void testNullables() { Schema schema = - POJOUtils.schemaFromPojoClass(POJOWithNullables.class, JavaFieldTypeSupplier.INSTANCE); + POJOUtils.schemaFromPojoClass( + new TypeDescriptor() {}, JavaFieldTypeSupplier.INSTANCE); assertTrue(schema.getField("str").getType().getNullable()); assertFalse(schema.getField("anInt").getType().getNullable()); } @Test public void testSimplePOJO() { - Schema schema = POJOUtils.schemaFromPojoClass(SimplePOJO.class, JavaFieldTypeSupplier.INSTANCE); + Schema schema = + POJOUtils.schemaFromPojoClass( + new TypeDescriptor() {}, JavaFieldTypeSupplier.INSTANCE); assertEquals(SIMPLE_POJO_SCHEMA, schema); } @Test public void testNestedPOJO() { - Schema schema = POJOUtils.schemaFromPojoClass(NestedPOJO.class, JavaFieldTypeSupplier.INSTANCE); + Schema schema = + POJOUtils.schemaFromPojoClass( + new TypeDescriptor() {}, JavaFieldTypeSupplier.INSTANCE); SchemaTestUtils.assertSchemaEquivalent(NESTED_POJO_SCHEMA, schema); } @@ -90,42 +96,48 @@ public void testNestedPOJO() { public void testNestedPOJOWithSimplePOJO() { Schema schema = POJOUtils.schemaFromPojoClass( - TestPOJOs.NestedPOJOWithSimplePOJO.class, JavaFieldTypeSupplier.INSTANCE); + new TypeDescriptor() {}, + JavaFieldTypeSupplier.INSTANCE); SchemaTestUtils.assertSchemaEquivalent(NESTED_POJO_WITH_SIMPLE_POJO_SCHEMA, schema); } @Test public void testPrimitiveArray() { Schema schema = - POJOUtils.schemaFromPojoClass(PrimitiveArrayPOJO.class, JavaFieldTypeSupplier.INSTANCE); + POJOUtils.schemaFromPojoClass( + new TypeDescriptor() {}, JavaFieldTypeSupplier.INSTANCE); SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_ARRAY_POJO_SCHEMA, schema); } @Test public void testNestedArray() { Schema schema = - POJOUtils.schemaFromPojoClass(NestedArrayPOJO.class, JavaFieldTypeSupplier.INSTANCE); + POJOUtils.schemaFromPojoClass( + new TypeDescriptor() {}, JavaFieldTypeSupplier.INSTANCE); SchemaTestUtils.assertSchemaEquivalent(NESTED_ARRAY_POJO_SCHEMA, schema); } @Test public void testNestedCollection() { Schema schema = - POJOUtils.schemaFromPojoClass(NestedCollectionPOJO.class, JavaFieldTypeSupplier.INSTANCE); + POJOUtils.schemaFromPojoClass( + new TypeDescriptor() {}, JavaFieldTypeSupplier.INSTANCE); SchemaTestUtils.assertSchemaEquivalent(NESTED_COLLECTION_POJO_SCHEMA, schema); } @Test public void testPrimitiveMap() { Schema schema = - POJOUtils.schemaFromPojoClass(PrimitiveMapPOJO.class, JavaFieldTypeSupplier.INSTANCE); + POJOUtils.schemaFromPojoClass( + new TypeDescriptor() {}, JavaFieldTypeSupplier.INSTANCE); SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_MAP_POJO_SCHEMA, schema); } @Test public void testNestedMap() { Schema schema = - POJOUtils.schemaFromPojoClass(NestedMapPOJO.class, JavaFieldTypeSupplier.INSTANCE); + POJOUtils.schemaFromPojoClass( + new TypeDescriptor() {}, JavaFieldTypeSupplier.INSTANCE); SchemaTestUtils.assertSchemaEquivalent(NESTED_MAP_POJO_SCHEMA, schema); } @@ -148,7 +160,7 @@ public void testGeneratedSimpleGetters() { List getters = POJOUtils.getGetters( - SimplePOJO.class, + new TypeDescriptor() {}, SIMPLE_POJO_SCHEMA, JavaFieldTypeSupplier.INSTANCE, new DefaultTypeConversionsFactory()); @@ -174,7 +186,7 @@ public void testGeneratedSimpleSetters() { SimplePOJO simplePojo = new SimplePOJO(); List setters = POJOUtils.getSetters( - SimplePOJO.class, + new TypeDescriptor() {}, SIMPLE_POJO_SCHEMA, JavaFieldTypeSupplier.INSTANCE, new DefaultTypeConversionsFactory()); @@ -213,7 +225,7 @@ public void testGeneratedSimpleBoxedGetters() { List getters = POJOUtils.getGetters( - POJOWithBoxedFields.class, + new TypeDescriptor() {}, POJO_WITH_BOXED_FIELDS_SCHEMA, JavaFieldTypeSupplier.INSTANCE, new DefaultTypeConversionsFactory()); @@ -229,7 +241,7 @@ public void testGeneratedSimpleBoxedSetters() { POJOWithBoxedFields pojo = new POJOWithBoxedFields(); List setters = POJOUtils.getSetters( - POJOWithBoxedFields.class, + new TypeDescriptor() {}, POJO_WITH_BOXED_FIELDS_SCHEMA, JavaFieldTypeSupplier.INSTANCE, new DefaultTypeConversionsFactory()); @@ -252,7 +264,7 @@ public void testGeneratedByteBufferSetters() { POJOWithByteArray pojo = new POJOWithByteArray(); List setters = POJOUtils.getSetters( - POJOWithByteArray.class, + new TypeDescriptor() {}, POJO_WITH_BYTE_ARRAY_SCHEMA, JavaFieldTypeSupplier.INSTANCE, new DefaultTypeConversionsFactory()); diff --git a/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java b/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java index 88e6fefcf9d3..4b6538157fd0 100644 --- a/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java +++ b/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -291,7 +292,7 @@ private FieldVectorListValueGetterFactory(List fieldVectors) { } @Override - public List create(Class clazz, Schema schema) { + public List create(TypeDescriptor typeDescriptor, Schema schema) { return this.fieldVectors.stream() .map( (fieldVector) -> { diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/AvroRecordSchema.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/AvroRecordSchema.java index e3bf24621cd5..e75647a2ccfa 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/AvroRecordSchema.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/AvroRecordSchema.java @@ -21,7 +21,7 @@ import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.schemas.FieldValueGetter; import org.apache.beam.sdk.schemas.FieldValueTypeInformation; -import org.apache.beam.sdk.schemas.GetterBasedSchemaProvider; +import org.apache.beam.sdk.schemas.GetterBasedSchemaProviderV2; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaProvider; import org.apache.beam.sdk.schemas.SchemaUserTypeCreator; @@ -37,25 +37,27 @@ @SuppressWarnings({ "rawtypes" // TODO(https://github.com/apache/beam/issues/20447) }) -public class AvroRecordSchema extends GetterBasedSchemaProvider { +public class AvroRecordSchema extends GetterBasedSchemaProviderV2 { @Override public Schema schemaFor(TypeDescriptor typeDescriptor) { return AvroUtils.toBeamSchema(typeDescriptor.getRawType()); } @Override - public List fieldValueGetters(Class targetClass, Schema schema) { - return AvroUtils.getGetters(targetClass, schema); + public List fieldValueGetters( + TypeDescriptor targetTypeDescriptor, Schema schema) { + return AvroUtils.getGetters(targetTypeDescriptor, schema); } @Override public List fieldValueTypeInformations( - Class targetClass, Schema schema) { - return AvroUtils.getFieldTypes(targetClass, schema); + TypeDescriptor targetTypeDescriptor, Schema schema) { + return AvroUtils.getFieldTypes(targetTypeDescriptor, schema); } @Override - public SchemaUserTypeCreator schemaTypeCreator(Class targetClass, Schema schema) { - return AvroUtils.getCreator(targetClass, schema); + public SchemaUserTypeCreator schemaTypeCreator( + TypeDescriptor targetTypeDescriptor, Schema schema) { + return AvroUtils.getCreator(targetTypeDescriptor, schema); } } diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index 7622132c7e27..1b1c45969307 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -808,14 +808,14 @@ public static SchemaCoder schemaCoder(AvroCoder avroCoder) { private static final class AvroSpecificRecordFieldValueTypeSupplier implements FieldValueTypeSupplier { @Override - public List get(Class clazz) { + public List get(TypeDescriptor typeDescriptor) { throw new RuntimeException("Unexpected call."); } @Override - public List get(Class clazz, Schema schema) { + public List get(TypeDescriptor typeDescriptor, Schema schema) { Map mapping = getMapping(schema); - List methods = ReflectUtils.getMethods(clazz); + List methods = ReflectUtils.getMethods(typeDescriptor.getRawType()); List types = Lists.newArrayList(); for (int i = 0; i < methods.size(); ++i) { Method method = methods.get(i); @@ -864,8 +864,9 @@ private Map getMapping(Schema schema) { private static final class AvroPojoFieldValueTypeSupplier implements FieldValueTypeSupplier { @Override - public List get(Class clazz) { - List classFields = ReflectUtils.getFields(clazz); + public List get(TypeDescriptor typeDescriptor) { + List classFields = + ReflectUtils.getFields(typeDescriptor.getRawType()); Map types = Maps.newHashMap(); for (int i = 0; i < classFields.size(); ++i) { java.lang.reflect.Field f = classFields.get(i); @@ -883,36 +884,46 @@ public List get(Class clazz) { } /** Get field types for an AVRO-generated SpecificRecord or a POJO. */ - public static List getFieldTypes(Class clazz, Schema schema) { - if (TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) { + public static List getFieldTypes( + TypeDescriptor typeDescriptor, Schema schema) { + if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) { return JavaBeanUtils.getFieldTypes( - clazz, schema, new AvroSpecificRecordFieldValueTypeSupplier()); + typeDescriptor, schema, new AvroSpecificRecordFieldValueTypeSupplier()); } else { - return POJOUtils.getFieldTypes(clazz, schema, new AvroPojoFieldValueTypeSupplier()); + return POJOUtils.getFieldTypes(typeDescriptor, schema, new AvroPojoFieldValueTypeSupplier()); } } /** Get generated getters for an AVRO-generated SpecificRecord or a POJO. */ - public static List getGetters(Class clazz, Schema schema) { - if (TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) { + public static List getGetters( + TypeDescriptor typeDescriptor, Schema schema) { + if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) { return JavaBeanUtils.getGetters( - clazz, + typeDescriptor, schema, new AvroSpecificRecordFieldValueTypeSupplier(), new AvroTypeConversionFactory()); } else { return POJOUtils.getGetters( - clazz, schema, new AvroPojoFieldValueTypeSupplier(), new AvroTypeConversionFactory()); + typeDescriptor, + schema, + new AvroPojoFieldValueTypeSupplier(), + new AvroTypeConversionFactory()); } } /** Get an object creator for an AVRO-generated SpecificRecord. */ - public static SchemaUserTypeCreator getCreator(Class clazz, Schema schema) { - if (TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) { - return AvroByteBuddyUtils.getCreator((Class) clazz, schema); + public static SchemaUserTypeCreator getCreator( + TypeDescriptor typeDescriptor, Schema schema) { + if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) { + return AvroByteBuddyUtils.getCreator( + (Class) typeDescriptor.getRawType(), schema); } else { return POJOUtils.getSetFieldCreator( - clazz, schema, new AvroPojoFieldValueTypeSupplier(), new AvroTypeConversionFactory()); + typeDescriptor, + schema, + new AvroPojoFieldValueTypeSupplier(), + new AvroTypeConversionFactory()); } } diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteBuddyUtils.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteBuddyUtils.java index bb2e267bae23..d159e9de44a8 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteBuddyUtils.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteBuddyUtils.java @@ -476,7 +476,8 @@ public static List getGetters( return CACHED_GETTERS.computeIfAbsent( ClassWithSchema.create(clazz, schema), c -> { - List types = fieldValueTypeSupplier.get(clazz, schema); + List types = + fieldValueTypeSupplier.get(TypeDescriptor.of(clazz), schema); return types.stream() .map( t -> @@ -965,7 +966,7 @@ private static FieldValueGetter createGetter( // Create a map of case enum value to getter. This must be sorted, so store in a TreeMap. TreeMap> oneOfGetters = Maps.newTreeMap(); Map oneOfFieldTypes = - fieldValueTypeSupplier.get(clazz, oneOfType.getOneOfSchema()).stream() + fieldValueTypeSupplier.get(TypeDescriptor.of(clazz), oneOfType.getOneOfSchema()).stream() .collect(Collectors.toMap(FieldValueTypeInformation::getName, f -> f)); for (Field oneOfField : oneOfType.getOneOfSchema().getFields()) { int protoFieldIndex = getFieldNumber(oneOfField); diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java index 1b3d42e35536..faf3ad407af5 100644 --- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java +++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoMessageSchema.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.extensions.protobuf.ProtoByteBuddyUtils.ProtoTypeConversionsFactory; import org.apache.beam.sdk.schemas.FieldValueGetter; import org.apache.beam.sdk.schemas.FieldValueTypeInformation; -import org.apache.beam.sdk.schemas.GetterBasedSchemaProvider; +import org.apache.beam.sdk.schemas.GetterBasedSchemaProviderV2; import org.apache.beam.sdk.schemas.RowMessages; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; @@ -49,17 +49,17 @@ "nullness", // TODO(https://github.com/apache/beam/issues/20497) "rawtypes" // TODO(https://github.com/apache/beam/issues/20447) }) -public class ProtoMessageSchema extends GetterBasedSchemaProvider { +public class ProtoMessageSchema extends GetterBasedSchemaProviderV2 { private static final class ProtoClassFieldValueTypeSupplier implements FieldValueTypeSupplier { @Override - public List get(Class clazz) { + public List get(TypeDescriptor typeDescriptor) { throw new RuntimeException("Unexpected call."); } @Override - public List get(Class clazz, Schema schema) { - Multimap methods = ReflectUtils.getMethodsMap(clazz); + public List get(TypeDescriptor typeDescriptor, Schema schema) { + Multimap methods = ReflectUtils.getMethodsMap(typeDescriptor.getRawType()); List types = Lists.newArrayListWithCapacity(schema.getFieldCount()); for (int i = 0; i < schema.getFieldCount(); ++i) { @@ -96,9 +96,10 @@ public List get(Class clazz, Schema schema) { } @Override - public List fieldValueGetters(Class targetClass, Schema schema) { + public List fieldValueGetters( + TypeDescriptor targetTypeDescriptor, Schema schema) { return ProtoByteBuddyUtils.getGetters( - targetClass, + targetTypeDescriptor.getRawType(), schema, new ProtoClassFieldValueTypeSupplier(), new ProtoTypeConversionsFactory()); @@ -106,17 +107,19 @@ public List fieldValueGetters(Class targetClass, Schema sch @Override public List fieldValueTypeInformations( - Class targetClass, Schema schema) { - return JavaBeanUtils.getFieldTypes(targetClass, schema, new ProtoClassFieldValueTypeSupplier()); + TypeDescriptor targetTypeDescriptor, Schema schema) { + return JavaBeanUtils.getFieldTypes( + targetTypeDescriptor, schema, new ProtoClassFieldValueTypeSupplier()); } @Override - public SchemaUserTypeCreator schemaTypeCreator(Class targetClass, Schema schema) { + public SchemaUserTypeCreator schemaTypeCreator( + TypeDescriptor targetTypeDescriptor, Schema schema) { SchemaUserTypeCreator creator = ProtoByteBuddyUtils.getBuilderCreator( - targetClass, schema, new ProtoClassFieldValueTypeSupplier()); + targetTypeDescriptor.getRawType(), schema, new ProtoClassFieldValueTypeSupplier()); if (creator == null) { - throw new RuntimeException("Cannot create creator for " + targetClass); + throw new RuntimeException("Cannot create creator for " + targetTypeDescriptor); } return creator; } diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProvider.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProvider.java index 65812d72df1d..acdfcfc1ad09 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProvider.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProvider.java @@ -38,7 +38,7 @@ import org.apache.beam.sdk.schemas.Factory; import org.apache.beam.sdk.schemas.FieldValueGetter; import org.apache.beam.sdk.schemas.FieldValueTypeInformation; -import org.apache.beam.sdk.schemas.GetterBasedSchemaProvider; +import org.apache.beam.sdk.schemas.GetterBasedSchemaProviderV2; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaUserTypeCreator; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -60,7 +60,7 @@ * software.amazon.awssdk.services.dynamodb.model.AttributeValue DynamoDB AttributeValue} ({@link * org.apache.beam.sdk.io.aws2.dynamodb.AttributeValueCoder coder}). */ -public class AwsSchemaProvider extends GetterBasedSchemaProvider { +public class AwsSchemaProvider extends GetterBasedSchemaProviderV2 { /** Byte-code generated {@link SdkBuilder} factories. */ @SuppressWarnings("rawtypes") // Crashes checker otherwise private static final Map FACTORIES = Maps.newConcurrentMap(); @@ -75,9 +75,11 @@ public class AwsSchemaProvider extends GetterBasedSchemaProvider { @SuppressWarnings("rawtypes") @Override - public List fieldValueGetters(Class clazz, Schema schema) { + public List fieldValueGetters( + TypeDescriptor targetTypeDescriptor, Schema schema) { ConverterFactory fromAws = ConverterFactory.fromAws(); - Map> sdkFields = sdkFieldsByName((Class) clazz); + Map> sdkFields = + sdkFieldsByName((Class) targetTypeDescriptor.getRawType()); List getters = new ArrayList<>(schema.getFieldCount()); for (String field : schema.getFieldNames()) { SdkField sdkField = checkStateNotNull(sdkFields.get(field), "Unknown field"); @@ -91,7 +93,7 @@ public List fieldValueGetters(Class clazz, Schema schema) { @Override public SerializableFunction fromRowFunction(TypeDescriptor type) { checkState(SdkPojo.class.isAssignableFrom(type.getRawType()), "Unsupported type %s", type); - return FromRowFactory.create(type.getRawType()); + return FromRowFactory.create(type); } private static class FromRowWithBuilder @@ -114,7 +116,7 @@ public T apply(Row row) { } } SdkBuilder builder = sdkBuilder(cls); - List setters = factory.create(cls, row.getSchema()); + List setters = factory.create(TypeDescriptor.of(cls), row.getSchema()); for (SdkBuilderSetter set : setters) { if (!row.getSchema().hasField(set.name())) { continue; @@ -150,14 +152,19 @@ private static class FromRowFactory implements Factory(new SettersFactory()); @SuppressWarnings("nullness") // schema nullable for this factory - static SerializableFunction create(Class clazz) { - checkState(SdkPojo.class.isAssignableFrom(clazz), "Unsupported clazz %s", clazz); - return (SerializableFunction) new FromRowFactory().cachingFactory.create(clazz, null); + static SerializableFunction create(TypeDescriptor typeDescriptor) { + checkState( + SdkPojo.class.isAssignableFrom(typeDescriptor.getRawType()), + "Unsupported clazz %s", + typeDescriptor); + return (SerializableFunction) + new FromRowFactory().cachingFactory.create(typeDescriptor, null); } @Override - public SerializableFunction create(Class clazz, Schema ignored) { - return new FromRowWithBuilder<>((Class) clazz, settersFactory); + public SerializableFunction create(TypeDescriptor typeDescriptor, Schema ignored) { + return new FromRowWithBuilder<>( + (Class) typeDescriptor.getRawType(), settersFactory); } private class SettersFactory implements Factory> { @@ -168,8 +175,9 @@ private SettersFactory() { } @Override - public List create(Class clazz, Schema schema) { - Map> fields = sdkFieldsByName((Class) clazz); + public List create(TypeDescriptor typeDescriptor, Schema schema) { + Map> fields = + sdkFieldsByName((Class) typeDescriptor.getRawType()); checkForUnknownFields(schema, fields); List setters = new ArrayList<>(schema.getFieldCount()); @@ -192,12 +200,14 @@ private void checkForUnknownFields(Schema schema, Map> field } @Override - public List fieldValueTypeInformations(Class cls, Schema schema) { + public List fieldValueTypeInformations( + TypeDescriptor targetTypeDescriptor, Schema schema) { throw new UnsupportedOperationException("FieldValueTypeInformation not available"); } @Override - public SchemaUserTypeCreator schemaTypeCreator(Class cls, Schema schema) { + public SchemaUserTypeCreator schemaTypeCreator( + TypeDescriptor targetTypeDescriptor, Schema schema) { throw new UnsupportedOperationException("SchemaUserTypeCreator not available"); } diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsTypes.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsTypes.java index f5647c040526..a0fc0c8e91cd 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsTypes.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsTypes.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Ascii; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; @@ -210,7 +211,8 @@ private static class ToAws extends ConverterFactory { @Override @SuppressWarnings("nullness") // schema nullable for this factory protected SerializableFunction pojoTypeConverter(SdkField field) { - return fromRowFactory.create(targetClassOf(field.constructor().get()), null); + return fromRowFactory.create( + TypeDescriptor.of(targetClassOf(field.constructor().get())), null); } } diff --git a/sdks/java/io/thrift/src/main/java/org/apache/beam/sdk/io/thrift/ThriftSchema.java b/sdks/java/io/thrift/src/main/java/org/apache/beam/sdk/io/thrift/ThriftSchema.java index 5ee78590f679..5f4e195f227f 100644 --- a/sdks/java/io/thrift/src/main/java/org/apache/beam/sdk/io/thrift/ThriftSchema.java +++ b/sdks/java/io/thrift/src/main/java/org/apache/beam/sdk/io/thrift/ThriftSchema.java @@ -35,7 +35,7 @@ import java.util.stream.StreamSupport; import org.apache.beam.sdk.schemas.FieldValueGetter; import org.apache.beam.sdk.schemas.FieldValueTypeInformation; -import org.apache.beam.sdk.schemas.GetterBasedSchemaProvider; +import org.apache.beam.sdk.schemas.GetterBasedSchemaProviderV2; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.SchemaProvider; @@ -105,7 +105,7 @@ * not. On decoding, we set all non-{@code null} beam row values to the corresponding thrift fields, * leaving the rest unset. */ -public final class ThriftSchema extends GetterBasedSchemaProvider { +public final class ThriftSchema extends GetterBasedSchemaProviderV2 { private static final ThriftSchema defaultProvider = new ThriftSchema(Collections.emptyMap()); private final Map typedefs; @@ -203,17 +203,19 @@ private Schema.Field beamField(FieldMetaData fieldDescriptor) { @SuppressWarnings("rawtypes") @Override public @NonNull List fieldValueGetters( - @NonNull Class targetClass, @NonNull Schema schema) { - return schemaFieldDescriptors(targetClass, schema).keySet().stream() + @NonNull TypeDescriptor targetTypeDescriptor, @NonNull Schema schema) { + return schemaFieldDescriptors(targetTypeDescriptor.getRawType(), schema).keySet().stream() .map(FieldExtractor::new) .collect(Collectors.toList()); } @Override public @NonNull List fieldValueTypeInformations( - @NonNull Class targetClass, @NonNull Schema schema) { - return schemaFieldDescriptors(targetClass, schema).values().stream() - .map(descriptor -> fieldValueTypeInfo(targetClass, descriptor.fieldName)) + @NonNull TypeDescriptor targetTypeDescriptor, @NonNull Schema schema) { + return schemaFieldDescriptors(targetTypeDescriptor.getRawType(), schema).values().stream() + .map( + descriptor -> + fieldValueTypeInfo(targetTypeDescriptor.getRawType(), descriptor.fieldName)) .collect(Collectors.toList()); } @@ -252,10 +254,11 @@ private FieldValueTypeInformation fieldValueTypeInfo(Class type, String field @Override public @NonNull SchemaUserTypeCreator schemaTypeCreator( - @NonNull Class targetClass, @NonNull Schema schema) { + @NonNull TypeDescriptor targetTypeDescriptor, @NonNull Schema schema) { final Map fieldDescriptors = - schemaFieldDescriptors(targetClass, schema); - return params -> restoreThriftObject(targetClass, fieldDescriptors, params); + schemaFieldDescriptors(targetTypeDescriptor.getRawType(), schema); + return params -> + restoreThriftObject(targetTypeDescriptor.getRawType(), fieldDescriptors, params); } @SuppressWarnings("nullness")