Skip to content

Commit

Permalink
DP-2429 | Introduce incremental schema evolution (#60)
Browse files Browse the repository at this point in the history
* introduced incremental schema evolution
* reset schema when evolution fails
  • Loading branch information
afiore authored Dec 27, 2023
1 parent dd97de2 commit 101b6c0
Show file tree
Hide file tree
Showing 10 changed files with 1,150 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2023 Celonis SE
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.celonis.kafka.connect.schema;

import org.apache.kafka.connect.data.Schema;

public interface SchemaEvolution {
Schema evolve(Schema currentSchema, Schema recordSchema) throws SchemaEvolutionException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2023 Celonis SE
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.celonis.kafka.connect.schema;

public final class SchemaEvolutionException extends RuntimeException {
public SchemaEvolutionException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023 Celonis SE
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.celonis.kafka.connect.schema;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.util.Optional;

public class SchemaUtils {
public static SchemaBuilder withMetadata(SchemaBuilder builder, Schema schema) {
Optional.ofNullable(schema.parameters())
.filter(params -> !params.isEmpty())
.ifPresent(builder::parameters);

Optional.ofNullable(schema.name()).ifPresent(builder::name);
Optional.ofNullable(schema.doc()).ifPresent(builder::doc);
Optional.ofNullable(schema.defaultValue()).ifPresent(builder::defaultValue);
Optional.ofNullable(schema.version()).ifPresent(builder::version);

return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2023 Celonis SE
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.celonis.kafka.connect.schema;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;

import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;

public final class StructSchemaAlignment {
/**
* Align an object to the schema accumulated by incrementally calling SchemaEvolution#evolve.
*
* <p>NOTE: this is needed in order to avoid the unnecessarily frequent flush of output Parquet
* files due JSON schema inference producing different records for what effectively is the same
* data (see `EmsOutputRecordSink#put)
*
* @param evolvedSchema the schema this value should align to. Must be a superset of the supplied
* struct schema.
* @param value the current SinKRecord input struct
* @return a struct with the evolved schema
*/
public static Struct alignTo(Schema evolvedSchema, Struct value) {
return (Struct) align(evolvedSchema, value);
}

private static Object align(Schema evolvedSchema, Object value) {
switch (evolvedSchema.type()) {
case ARRAY:
final var collection = (Collection<?>) value;
return collection.stream()
.map(item -> align(evolvedSchema.valueSchema(), item))
.collect(Collectors.toList());

case MAP:
final var map = (Map<?, ?>) value;
return map.entrySet().stream()
.collect(
Collectors.toMap(
entry -> align(evolvedSchema.keySchema(), entry.getKey()),
entry -> align(evolvedSchema.valueSchema(), entry.getValue())));

case STRUCT:
final var structValue = (Struct) value;
if (structValue.schema() == evolvedSchema) return structValue;
final var newStruct = new Struct(evolvedSchema);

for (final var evolvedField : evolvedSchema.fields()) {
if (structValue.schema().field(evolvedField.name()) != null) {
newStruct.put(
evolvedField.name(),
align(evolvedField.schema(), structValue.get(evolvedField.name())));
}
}

return newStruct;
default:
return value;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2023 Celonis SE
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.celonis.kafka.connect.schema;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.util.Objects;

/**
* StructSchemaEvolution is responsible for the recursively merging existing schema and schema from
* new record. And thus evolving the schema with each new record received from connector.
*/
public class StructSchemaEvolution implements SchemaEvolution {

/**
* Merge top level Kafka Connect Structs
*
* @param currentSchema existing schema, must be of type Struct
* @param recordSchema schema of new record, must be of type Struct
* @return Schema after merging existing and new schema recursively
*/
@Override
public Schema evolve(Schema currentSchema, Schema recordSchema) throws SchemaEvolutionException {
if (currentSchema == recordSchema) return currentSchema;

// RecordTransformer ensures that the top level schema are of type Struct.
return mergeSchemas(null, currentSchema, recordSchema);
}

/**
* Merge a Kafka Connect Schemas
*
* @param fieldName current field name, `null` when the recursion starts
* @param currentSchema existing schema, (Accepted types MAP, ARRAY, STRUCT)
* @param recordSchema schema of new record, (Accepted types MAP, ARRAY, STRUCT)
* @return Schema after merging existing and new schemas recursively
*/
private Schema mergeSchemas(String fieldName, Schema currentSchema, Schema recordSchema) {
// validationsFirst
validateSchemasTypes(fieldName, currentSchema, recordSchema);

switch (currentSchema.type()) {
case STRUCT:
return mergeStructs(currentSchema, recordSchema);
case ARRAY:
return SchemaBuilder.array(
mergeSchemas(fieldName, currentSchema.valueSchema(), recordSchema.valueSchema()))
.build();
case MAP:
var keySchema =
mergeSchemas(fieldName, currentSchema.keySchema(), recordSchema.keySchema());
var valueSchema =
mergeSchemas(fieldName, currentSchema.valueSchema(), recordSchema.valueSchema());
return SchemaBuilder.map(keySchema, valueSchema).build();
default:
return currentSchema;
}
}

private Schema mergeStructs(Schema currentSchema, Schema recordSchema)
throws SchemaEvolutionException {
SchemaBuilder result = SchemaUtils.withMetadata(SchemaBuilder.struct(), currentSchema);

// First currentSchemaFields
currentSchema.fields().stream()
.forEach(
currentSchemaField -> {
final var recordSchemaField = recordSchema.field(currentSchemaField.name());
if (recordSchemaField == null) {
// If not present in recordSchema, just add it
result.field(currentSchemaField.name(), currentSchemaField.schema());
} else {
// Recursively evolve otherwise
result.field(
currentSchemaField.name(),
mergeSchemas(
currentSchemaField.name(),
currentSchemaField.schema(),
recordSchemaField.schema()));
}
});

// Just add remaining record schema fields as they are
recordSchema.fields().stream()
.filter(rf -> currentSchema.field(rf.name()) == null)
.forEach(rf -> result.field(rf.name(), rf.schema()));

return result.build();
}

private void validateSchemasTypes(String fieldName, Schema currentSchema, Schema recordSchema) {
if (bothPrimitives(currentSchema, recordSchema) && !sameLogicalType(currentSchema, recordSchema)
|| !currentSchema.type().equals(recordSchema.type())) {

throw new SchemaEvolutionException(
String.format(
"New schema has field '%s' with a different type! "
+ "previous type: %s, current type: %s",
fieldName, currentSchema, recordSchema));
}
}

private boolean bothPrimitives(Schema s1, Schema s2) {
return s1.type().isPrimitive() && s2.type().isPrimitive();
}

private boolean sameLogicalType(Schema s1, Schema s2) {
return Objects.equals(s1.type(), s2.type())
&& Objects.equals(s1.name(), s2.name())
&& Objects.equals(s1.version(), s2.version())
&& Objects.equals(s1.parameters(), s2.parameters());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@ import com.celonis.kafka.connect.ems.errors.FailedObfuscationException
import com.celonis.kafka.connect.ems.model._
import com.celonis.kafka.connect.ems.obfuscation.ObfuscationUtils._
import com.celonis.kafka.connect.ems.storage.PrimaryKeysValidator
import com.celonis.kafka.connect.schema.SchemaEvolutionException
import com.celonis.kafka.connect.schema.StructSchemaAlignment
import com.celonis.kafka.connect.schema.StructSchemaEvolution
import com.celonis.kafka.connect.transform.conversion.ConnectConversion
import com.celonis.kafka.connect.transform.fields.EmbeddedKafkaMetadata
import com.celonis.kafka.connect.transform.fields.FieldInserter
import com.celonis.kafka.connect.transform.flatten.Flattener
import com.typesafe.scalalogging.StrictLogging
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.sink.SinkRecord

/** The main business transformation.
Expand All @@ -45,6 +51,10 @@ final class RecordTransformer(
obfuscation: Option[ObfuscationConfig],
inserter: FieldInserter,
) extends StrictLogging {

private var targetSchema: Schema = SchemaBuilder.struct().build();
private val schemaEvolution = new StructSchemaEvolution();

def transform(sinkRecord: SinkRecord): IO[GenericRecord] = {
val (convertedValue, convertedSchema) = preConversion.convert(sinkRecord.value(), Option(sinkRecord.valueSchema()))
val flattenedValue = flattener.flatten(convertedValue, convertedSchema)
Expand All @@ -56,8 +66,9 @@ final class RecordTransformer(
EmbeddedKafkaMetadata(sinkRecord.kafkaPartition(), sinkRecord.kafkaOffset(), sinkRecord.timestamp()),
),
)
v <- IO.fromEither(DataConverter.apply(transformedValue))
_ <- IO(logger.debug("[{}] EmsSinkTask:put obfuscation={}", sinkName, obfuscation))
schemaAlignedValue = evolveSchemaAndAlignValue(transformedValue)
v <- IO.fromEither(DataConverter.apply(schemaAlignedValue))
_ <- IO(logger.debug("[{}] EmsSinkTask:put obfuscation={}", sinkName, obfuscation))
value <- obfuscation.fold(IO.pure(v)) { o =>
IO.fromEither(v.obfuscate(o).leftMap(FailedObfuscationException))
}
Expand All @@ -68,6 +79,21 @@ final class RecordTransformer(
_ <- IO.fromEither(pksValidator.validate(value, metadata))
} yield value
}

private def evolveSchemaAndAlignValue(value: Any): Any =
value match {
case struct: Struct =>
try {
targetSchema = schemaEvolution.evolve(targetSchema, struct.schema())
StructSchemaAlignment.alignTo(targetSchema, struct)
} catch {
case exception: SchemaEvolutionException =>
logger.warn(s"resetting incrementally computed schema as evolution failed: ${exception.getMessage}")
targetSchema = struct.schema()
struct
}
case _ => value
}
}

object RecordTransformer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,19 @@ class EmsSinkConfiguratorTest extends AnyFunSuite with Matchers {

test(s"throws exception when $FALLBACK_VARCHAR_LENGTH_MAX is too big") {
val props = Map(
"name" -> "ems",
EmsSinkConfigConstants.ENDPOINT_KEY -> "https://celonis.cloud",
EmsSinkConfigConstants.AUTHORIZATION_KEY -> "AppKey key",
EmsSinkConfigConstants.TARGET_TABLE_KEY -> "target-table",
EmsSinkConfigConstants.COMMIT_RECORDS_KEY -> "1",
EmsSinkConfigConstants.COMMIT_SIZE_KEY -> "1000000",
EmsSinkConfigConstants.COMMIT_INTERVAL_KEY -> "3600000",
EmsSinkConfigConstants.TMP_DIRECTORY_KEY -> "/tmp/",
EmsSinkConfigConstants.ERROR_POLICY_KEY -> "CONTINUE",
"name" -> "ems",
EmsSinkConfigConstants.ENDPOINT_KEY -> "https://celonis.cloud",
EmsSinkConfigConstants.AUTHORIZATION_KEY -> "AppKey key",
EmsSinkConfigConstants.TARGET_TABLE_KEY -> "target-table",
EmsSinkConfigConstants.COMMIT_RECORDS_KEY -> "1",
EmsSinkConfigConstants.COMMIT_SIZE_KEY -> "1000000",
EmsSinkConfigConstants.COMMIT_INTERVAL_KEY -> "3600000",
EmsSinkConfigConstants.TMP_DIRECTORY_KEY -> "/tmp/",
EmsSinkConfigConstants.ERROR_POLICY_KEY -> "CONTINUE",
EmsSinkConfigConstants.FALLBACK_VARCHAR_LENGTH_KEY -> "65001",
).asJava

val thrown = the[ConnectException] thrownBy emsSinkConfigurator.getEmsSinkConfig(props)
thrown.getMessage should include regex "^.*Must be greater than 0 and smaller or equal than 65000.*$"
}
}


Loading

0 comments on commit 101b6c0

Please sign in to comment.