Skip to content

Commit

Permalink
Replace Class with TypeDescriptor in SchemaProvider implementations (a…
Browse files Browse the repository at this point in the history
…pache#31785)

* refactor GetterBasedSchemaProvider's methods' signatures

* update StaticSchemaInference's methods' signatures

* update invocations of schemaFromClass

* update FieldValueTypeSupplier's methods' signatures

* replace ClassWithSchema with TypeDescriptorWithSchema as caching key

* remove the stopgap default method in Factory interface

* use TypeDescriptors in AutoValueUtils' methods

* convert the class to generic type descriptor in DefaultSchema

* deprecate GetterBasedSchemaProvider

* update GetterBasedSchemaProviderV2 javadoc

* delegate the deprecated methods instead of throwing
  • Loading branch information
tilgalas authored Jul 31, 2024
1 parent e7847c9 commit 835630b
Show file tree
Hide file tree
Showing 28 changed files with 505 additions and 267 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,20 @@
"nullness", // TODO(https://github.com/apache/beam/issues/20497)
"rawtypes"
})
public class AutoValueSchema extends GetterBasedSchemaProvider {
public class AutoValueSchema extends GetterBasedSchemaProviderV2 {
/** {@link FieldValueTypeSupplier} that's based on AutoValue getters. */
@VisibleForTesting
public static class AbstractGetterTypeSupplier implements FieldValueTypeSupplier {
public static final AbstractGetterTypeSupplier INSTANCE = new AbstractGetterTypeSupplier();

@Override
public List<FieldValueTypeInformation> get(Class<?> clazz) {
public List<FieldValueTypeInformation> get(TypeDescriptor<?> typeDescriptor) {

// If the generated class is passed in, we want to look at the base class to find the getters.
Class<?> targetClass = AutoValueUtils.getBaseAutoValueClass(clazz);
TypeDescriptor<?> targetTypeDescriptor = AutoValueUtils.getBaseAutoValueClass(typeDescriptor);

List<Method> methods =
ReflectUtils.getMethods(targetClass).stream()
ReflectUtils.getMethods(targetTypeDescriptor.getRawType()).stream()
.filter(ReflectUtils::isGetter)
// All AutoValue getters are marked abstract.
.filter(m -> Modifier.isAbstract(m.getModifiers()))
Expand Down Expand Up @@ -89,27 +89,30 @@ private static void validateFieldNumbers(List<FieldValueTypeInformation> types)
}

@Override
public List<FieldValueGetter> fieldValueGetters(Class<?> targetClass, Schema schema) {
public List<FieldValueGetter> fieldValueGetters(
TypeDescriptor<?> targetTypeDescriptor, Schema schema) {
return JavaBeanUtils.getGetters(
targetClass,
targetTypeDescriptor,
schema,
AbstractGetterTypeSupplier.INSTANCE,
new DefaultTypeConversionsFactory());
}

@Override
public List<FieldValueTypeInformation> fieldValueTypeInformations(
Class<?> targetClass, Schema schema) {
return JavaBeanUtils.getFieldTypes(targetClass, schema, AbstractGetterTypeSupplier.INSTANCE);
TypeDescriptor<?> targetTypeDescriptor, Schema schema) {
return JavaBeanUtils.getFieldTypes(
targetTypeDescriptor, schema, AbstractGetterTypeSupplier.INSTANCE);
}

@Override
public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema schema) {
public SchemaUserTypeCreator schemaTypeCreator(
TypeDescriptor<?> targetTypeDescriptor, Schema schema) {
// If a static method is marked with @SchemaCreate, use that.
Method annotated = ReflectUtils.getAnnotatedCreateMethod(targetClass);
Method annotated = ReflectUtils.getAnnotatedCreateMethod(targetTypeDescriptor.getRawType());
if (annotated != null) {
return JavaBeanUtils.getStaticCreator(
targetClass,
targetTypeDescriptor,
annotated,
schema,
AbstractGetterTypeSupplier.INSTANCE,
Expand All @@ -119,7 +122,8 @@ public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema sche
// Try to find a generated builder class. If one exists, use that to generate a
// SchemaTypeCreator for creating AutoValue objects.
SchemaUserTypeCreator creatorFactory =
AutoValueUtils.getBuilderCreator(targetClass, schema, AbstractGetterTypeSupplier.INSTANCE);
AutoValueUtils.getBuilderCreator(
targetTypeDescriptor.getRawType(), schema, AbstractGetterTypeSupplier.INSTANCE);
if (creatorFactory != null) {
return creatorFactory;
}
Expand All @@ -128,9 +132,10 @@ public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema sche
// class. Use that for creating AutoValue objects.
creatorFactory =
AutoValueUtils.getConstructorCreator(
targetClass, schema, AbstractGetterTypeSupplier.INSTANCE);
targetTypeDescriptor, schema, AbstractGetterTypeSupplier.INSTANCE);
if (creatorFactory == null) {
throw new RuntimeException("Could not find a way to create AutoValue class " + targetClass);
throw new RuntimeException(
"Could not find a way to create AutoValue class " + targetTypeDescriptor);
}

return creatorFactory;
Expand All @@ -139,6 +144,6 @@ public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema sche
@Override
public <T> @Nullable Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
return JavaBeanUtils.schemaFromJavaBeanClass(
typeDescriptor.getRawType(), AbstractGetterTypeSupplier.INSTANCE);
typeDescriptor, AbstractGetterTypeSupplier.INSTANCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand All @@ -36,7 +37,7 @@
"rawtypes"
})
public class CachingFactory<CreatedT> implements Factory<CreatedT> {
private transient @Nullable ConcurrentHashMap<Class, CreatedT> cache = null;
private transient @Nullable ConcurrentHashMap<TypeDescriptor<?>, CreatedT> cache = null;

private final Factory<CreatedT> innerFactory;

Expand All @@ -45,16 +46,16 @@ public CachingFactory(Factory<CreatedT> innerFactory) {
}

@Override
public CreatedT create(Class<?> clazz, Schema schema) {
public CreatedT create(TypeDescriptor<?> typeDescriptor, Schema schema) {
if (cache == null) {
cache = new ConcurrentHashMap<>();
}
CreatedT cached = cache.get(clazz);
CreatedT cached = cache.get(typeDescriptor);
if (cached != null) {
return cached;
}
cached = innerFactory.create(clazz, schema);
cache.put(clazz, cached);
cached = innerFactory.create(typeDescriptor, schema);
cache.put(typeDescriptor, cached);
return cached;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

import java.io.Serializable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.values.TypeDescriptor;

/** A Factory interface for schema-related objects for a specific Java type. */
@Internal
public interface Factory<T> extends Serializable {
T create(Class<?> clazz, Schema schema);
T create(TypeDescriptor<?> typeDescriptor, Schema schema);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.RowWithGetters;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Collections2;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand All @@ -47,23 +48,28 @@
"rawtypes"
})
class FromRowUsingCreator<T> implements SerializableFunction<Row, T>, Function<Row, T> {
private final Class<T> clazz;
private final TypeDescriptor<T> typeDescriptor;
private final GetterBasedSchemaProvider schemaProvider;
private final Factory<SchemaUserTypeCreator> schemaTypeCreatorFactory;

@SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED")
private transient @MonotonicNonNull Function[] fieldConverters;

public FromRowUsingCreator(Class<T> clazz, GetterBasedSchemaProvider schemaProvider) {
this(clazz, schemaProvider, new CachingFactory<>(schemaProvider::schemaTypeCreator), null);
public FromRowUsingCreator(
TypeDescriptor<T> typeDescriptor, GetterBasedSchemaProvider schemaProvider) {
this(
typeDescriptor,
schemaProvider,
new CachingFactory<>(schemaProvider::schemaTypeCreator),
null);
}

private FromRowUsingCreator(
Class<T> clazz,
TypeDescriptor<T> typeDescriptor,
GetterBasedSchemaProvider schemaProvider,
Factory<SchemaUserTypeCreator> schemaTypeCreatorFactory,
@Nullable Function[] fieldConverters) {
this.clazz = clazz;
this.typeDescriptor = typeDescriptor;
this.schemaProvider = schemaProvider;
this.schemaTypeCreatorFactory = schemaTypeCreatorFactory;
this.fieldConverters = fieldConverters;
Expand All @@ -77,7 +83,7 @@ public T apply(Row row) {
}
if (row instanceof RowWithGetters) {
Object target = ((RowWithGetters) row).getGetterTarget();
if (target.getClass().equals(clazz)) {
if (target.getClass().equals(typeDescriptor.getRawType())) {
// Efficient path: simply extract the underlying object instead of creating a new one.
return (T) target;
}
Expand All @@ -91,21 +97,24 @@ public T apply(Row row) {
for (int i = 0; i < row.getFieldCount(); ++i) {
params[i] = fieldConverters[i].apply(row.getValue(i));
}
SchemaUserTypeCreator creator = schemaTypeCreatorFactory.create(clazz, row.getSchema());
SchemaUserTypeCreator creator =
schemaTypeCreatorFactory.create(typeDescriptor, row.getSchema());
return (T) creator.create(params);
}

private synchronized void initFieldConverters(Schema schema) {
if (fieldConverters == null) {
CachingFactory<List<FieldValueTypeInformation>> typeFactory =
new CachingFactory<>(schemaProvider::fieldValueTypeInformations);
fieldConverters = fieldConverters(clazz, schema, typeFactory);
fieldConverters = fieldConverters(typeDescriptor, schema, typeFactory);
}
}

private Function[] fieldConverters(
Class<?> clazz, Schema schema, Factory<List<FieldValueTypeInformation>> typeFactory) {
List<FieldValueTypeInformation> typeInfos = typeFactory.create(clazz, schema);
TypeDescriptor<?> typeDescriptor,
Schema schema,
Factory<List<FieldValueTypeInformation>> typeFactory) {
List<FieldValueTypeInformation> typeInfos = typeFactory.create(typeDescriptor, schema);
checkState(
typeInfos.size() == schema.getFieldCount(),
"Did not have a matching number of type informations and fields.");
Expand Down Expand Up @@ -133,10 +142,9 @@ private Function fieldConverter(
if (!needsConversion(type)) {
return FieldConverter.IDENTITY;
} else if (TypeName.ROW.equals(type.getTypeName())) {
Function[] converters =
fieldConverters(typeInfo.getRawType(), type.getRowSchema(), typeFactory);
Function[] converters = fieldConverters(typeInfo.getType(), type.getRowSchema(), typeFactory);
return new FromRowUsingCreator(
typeInfo.getRawType(), schemaProvider, schemaTypeCreatorFactory, converters);
typeInfo.getType(), schemaProvider, schemaTypeCreatorFactory, converters);
} else if (TypeName.ARRAY.equals(type.getTypeName())) {
return new ConvertCollection(
fieldConverter(type.getCollectionElementType(), typeInfo.getElementType(), typeFactory));
Expand Down Expand Up @@ -271,11 +279,11 @@ public boolean equals(@Nullable Object o) {
return false;
}
FromRowUsingCreator<?> that = (FromRowUsingCreator<?>) o;
return clazz.equals(that.clazz) && schemaProvider.equals(that.schemaProvider);
return typeDescriptor.equals(that.typeDescriptor) && schemaProvider.equals(that.schemaProvider);
}

@Override
public int hashCode() {
return Objects.hash(clazz, schemaProvider);
return Objects.hash(typeDescriptor, schemaProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,77 @@
/**
* A {@link SchemaProvider} base class that vends schemas and rows based on {@link
* FieldValueGetter}s.
*
* @deprecated new implementations should extend the {@link GetterBasedSchemaProviderV2} class'
* methods which receive {@link TypeDescriptor}s instead of ordinary {@link Class}es as
* arguments, which permits to support generic type signatures during schema inference
*/
@SuppressWarnings({
"nullness", // TODO(https://github.com/apache/beam/issues/20497)
"rawtypes"
})
@Deprecated
public abstract class GetterBasedSchemaProvider implements SchemaProvider {
/** Implementing class should override to return FieldValueGetters. */

/**
* Implementing class should override to return FieldValueGetters.
*
* @deprecated new implementations should override {@link #fieldValueGetters(TypeDescriptor,
* Schema)} and make this method throw an {@link UnsupportedOperationException}
*/
@Deprecated
public abstract List<FieldValueGetter> fieldValueGetters(Class<?> targetClass, Schema schema);

/** Implementing class should override to return a list of type-informations. */
/**
* Delegates to the {@link #fieldValueGetters(Class, Schema)} for backwards compatibility,
* override it if you want to use the richer type signature contained in the {@link
* TypeDescriptor} not subject to the type erasure.
*/
public List<FieldValueGetter> fieldValueGetters(
TypeDescriptor<?> targetTypeDescriptor, Schema schema) {
return fieldValueGetters(targetTypeDescriptor.getRawType(), schema);
}

/**
* Implementing class should override to return a list of type-informations.
*
* @deprecated new implementations should override {@link
* #fieldValueTypeInformations(TypeDescriptor, Schema)} and make this method throw an {@link
* UnsupportedOperationException}
*/
@Deprecated
public abstract List<FieldValueTypeInformation> fieldValueTypeInformations(
Class<?> targetClass, Schema schema);

/** Implementing class should override to return a constructor. */
/**
* Delegates to the {@link #fieldValueTypeInformations(Class, Schema)} for backwards
* compatibility, override it if you want to use the richer type signature contained in the {@link
* TypeDescriptor} not subject to the type erasure.
*/
public List<FieldValueTypeInformation> fieldValueTypeInformations(
TypeDescriptor<?> targetTypeDescriptor, Schema schema) {
return fieldValueTypeInformations(targetTypeDescriptor.getRawType(), schema);
}

/**
* Implementing class should override to return a constructor.
*
* @deprecated new implementations should override {@link #schemaTypeCreator(TypeDescriptor,
* Schema)} and make this method throw an {@link UnsupportedOperationException}
*/
@Deprecated
public abstract SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema schema);

/**
* Delegates to the {@link #schemaTypeCreator(Class, Schema)} for backwards compatibility,
* override it if you want to use the richer type signature contained in the {@link
* TypeDescriptor} not subject to the type erasure.
*/
public SchemaUserTypeCreator schemaTypeCreator(
TypeDescriptor<?> targetTypeDescriptor, Schema schema) {
return schemaTypeCreator(targetTypeDescriptor.getRawType(), schema);
}

private class ToRowWithValueGetters<T> implements SerializableFunction<T, Row> {
private final Schema schema;
private final Factory<List<FieldValueGetter>> getterFactory;
Expand Down Expand Up @@ -113,8 +168,7 @@ public <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDesc
@Override
@SuppressWarnings("unchecked")
public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> typeDescriptor) {
Class<T> clazz = (Class<T>) typeDescriptor.getType();
return new FromRowUsingCreator<>(clazz, this);
return new FromRowUsingCreator<>(typeDescriptor, this);
}

@Override
Expand All @@ -141,8 +195,8 @@ static Factory<List<FieldValueGetter>> of(Factory<List<FieldValueGetter>> getter
}

@Override
public List<FieldValueGetter> create(Class<?> clazz, Schema schema) {
List<FieldValueGetter> getters = gettersFactory.create(clazz, schema);
public List<FieldValueGetter> create(TypeDescriptor<?> typeDescriptor, Schema schema) {
List<FieldValueGetter> getters = gettersFactory.create(typeDescriptor, schema);
List<FieldValueGetter> rowGetters = new ArrayList<>(getters.size());
for (int i = 0; i < getters.size(); i++) {
rowGetters.add(rowValueGetter(getters.get(i), schema.getField(i).getType()));
Expand Down
Loading

0 comments on commit 835630b

Please sign in to comment.