From 2e3023a8cf204ecbd18804f681a79a86e82d6e9b Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Thu, 7 Nov 2024 07:45:33 -0800 Subject: [PATCH] Bulk Load CDK: Mapper Pipeline (#48371) --- .../load/data/AirbyteValueIdentityMapper.kt | 39 +++++- .../airbyte/cdk/load/data/MapperPipeline.kt | 39 ++++++ .../cdk/load/data/SchemalessTypesToJson.kt | 3 +- .../load/data/SchemalessTypesToJsonString.kt | 4 +- .../cdk/load/data/TimeStringToInteger.kt | 3 +- .../load/data/UnionTypeToDisjointRecord.kt | 4 +- .../data/AirbyteValueIdentityMapperTest.kt | 22 ++-- .../cdk/load/data/MapperPipelineTest.kt | 117 ++++++++++++++++++ .../data/SchemalessTypesToJsonStringTest.kt | 5 +- .../load/data/SchemalessTypesToJsonTest.kt | 5 +- .../cdk/load/data/TimeStringToIntegerTest.kt | 11 +- .../cdk/load/test/util/SchemaRecordBuilder.kt | 7 +- .../cdk/load/test/util/ValueTestBuilder.kt | 9 +- 13 files changed, 223 insertions(+), 45 deletions(-) create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/MapperPipeline.kt create mode 100644 airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/MapperPipelineTest.kt diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt index f72d908425fd..b10f2c340775 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt @@ -8,20 +8,47 @@ import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason -open class AirbyteValueIdentityMapper( - val meta: DestinationRecord.Meta, -) { +interface AirbyteValueMapper { + val collectedChanges: List + + fun map( + value: AirbyteValue, + schema: AirbyteType, + path: List = emptyList(), + ): AirbyteValue +} + +/** An optimized identity mapper that just passes through. */ +class AirbyteValueNoopMapper : AirbyteValueMapper { + override val collectedChanges: List = emptyList() + + override fun map( + value: AirbyteValue, + schema: AirbyteType, + path: List, + ): AirbyteValue = value +} + +open class AirbyteValueIdentityMapper : AirbyteValueMapper { + override val collectedChanges: List + get() = changes.toList().also { changes.clear() } + + private val changes: MutableList = mutableListOf() + private fun collectFailure( path: List, reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR ) { - meta.changes.add(DestinationRecord.Change(path.joinToString("."), Change.NULLED, reason)) + val joined = path.joinToString(".") + if (changes.none { it.field == joined }) { + changes.add(DestinationRecord.Change(path.joinToString("."), Change.NULLED, reason)) + } } - fun map( + override fun map( value: AirbyteValue, schema: AirbyteType, - path: List = emptyList() + path: List, ): AirbyteValue = try { when (schema) { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/MapperPipeline.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/MapperPipeline.kt new file mode 100644 index 000000000000..3b3b194d588c --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/MapperPipeline.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.message.DestinationRecord.Change + +class MapperPipeline( + inputSchema: AirbyteType, + schemaValueMapperPairs: List>, +) { + private val schemasWithMappers: List> + + val finalSchema: AirbyteType + + init { + val (schemaMappers, valueMappers) = schemaValueMapperPairs.unzip() + val schemas = + schemaMappers.runningFold(inputSchema) { schema, mapper -> mapper.map(schema) } + schemasWithMappers = schemas.zip(valueMappers) + finalSchema = schemas.last() + } + + fun map(data: AirbyteValue): Pair> { + val results = + schemasWithMappers.runningFold(data) { value, (schema, mapper) -> + mapper.map(value, schema) + } + val changesFlattened = + schemasWithMappers.flatMap { it.second.collectedChanges }.toSet().toList() + return results.last() to changesFlattened + } +} + +interface MapperPipelineFactory { + fun create(stream: DestinationStream): MapperPipeline +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJson.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJson.kt index 5ac72691d6d3..2b8dcc7d5169 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJson.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJson.kt @@ -5,7 +5,6 @@ package io.airbyte.cdk.load.data import io.airbyte.cdk.load.data.json.toJson -import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.util.serializeToString class SchemalessTypesToJson : AirbyteSchemaIdentityMapper { @@ -15,7 +14,7 @@ class SchemalessTypesToJson : AirbyteSchemaIdentityMapper { override fun mapArrayWithoutSchema(schema: ArrayTypeWithoutSchema): AirbyteType = StringType } -class SchemalessValuesToJson(meta: DestinationRecord.Meta) : AirbyteValueIdentityMapper(meta) { +class SchemalessValuesToJson : AirbyteValueIdentityMapper() { override fun mapObjectWithoutSchema( value: ObjectValue, schema: ObjectTypeWithoutSchema, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt index 7946bdc22b9b..e4eaee2663c4 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt @@ -5,7 +5,6 @@ package io.airbyte.cdk.load.data import io.airbyte.cdk.load.data.json.toJson -import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.util.serializeToString class SchemalessTypesToJsonString : AirbyteSchemaIdentityMapper { @@ -15,8 +14,7 @@ class SchemalessTypesToJsonString : AirbyteSchemaIdentityMapper { override fun mapArrayWithoutSchema(schema: ArrayTypeWithoutSchema): AirbyteType = StringType } -class SchemalessValuesToJsonString(meta: DestinationRecord.Meta) : - AirbyteValueIdentityMapper(meta) { +class SchemalessValuesToJsonString : AirbyteValueIdentityMapper() { override fun mapObjectWithoutSchema( value: ObjectValue, schema: ObjectTypeWithoutSchema, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringToInteger.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringToInteger.kt index 0bfe95911288..5cee5e5aa8fd 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringToInteger.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringToInteger.kt @@ -4,7 +4,6 @@ package io.airbyte.cdk.load.data -import io.airbyte.cdk.load.message.DestinationRecord import java.time.LocalDate import java.time.LocalDateTime import java.time.LocalTime @@ -30,7 +29,7 @@ class TimeStringTypeToIntegerType : AirbyteSchemaIdentityMapper { * NOTE: To keep parity with the old avro/parquet code, we will always first try to parse the value * as with timezone, then fall back to without. But in theory we should be more strict. */ -class TimeStringToInteger(meta: DestinationRecord.Meta) : AirbyteValueIdentityMapper(meta) { +class TimeStringToInteger : AirbyteValueIdentityMapper() { companion object { private val DATE_TIME_FORMATTER: DateTimeFormatter = DateTimeFormatter.ofPattern( diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt index c6af8baad53f..0e9f7a4c9075 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt @@ -4,8 +4,6 @@ package io.airbyte.cdk.load.data -import io.airbyte.cdk.load.message.DestinationRecord - class UnionTypeToDisjointRecord : AirbyteSchemaIdentityMapper { override fun mapUnion(schema: UnionType): AirbyteType { if (schema.options.size < 2) { @@ -46,7 +44,7 @@ class UnionTypeToDisjointRecord : AirbyteSchemaIdentityMapper { } } -class UnionValueToDisjointRecord(meta: DestinationRecord.Meta) : AirbyteValueIdentityMapper(meta) { +class UnionValueToDisjointRecord : AirbyteValueIdentityMapper() { override fun mapUnion( value: AirbyteValue, schema: UnionType, diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt index f162f8bfc630..0c9a57cb739d 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt @@ -4,7 +4,6 @@ package io.airbyte.cdk.load.data -import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.test.util.Root import io.airbyte.cdk.load.test.util.SchemaRecordBuilder import io.airbyte.cdk.load.test.util.ValueTestBuilder @@ -39,10 +38,10 @@ class AirbyteValueIdentityMapperTest { .endRecord() .build() - val meta = DestinationRecord.Meta() - val values = AirbyteValueIdentityMapper(meta).map(inputValues, inputSchema) + val mapper = AirbyteValueIdentityMapper() + val values = mapper.map(inputValues, inputSchema) Assertions.assertEquals(expectedValues, values) - Assertions.assertTrue(meta.changes.isEmpty()) + Assertions.assertTrue(mapper.collectedChanges.isEmpty()) } @Test @@ -56,16 +55,15 @@ class AirbyteValueIdentityMapperTest { nameOverride = "bad" ) .build() - val meta = DestinationRecord.Meta() - val values = AirbyteValueIdentityMapper(meta).map(inputValues, inputSchema) as ObjectValue - Assertions.assertTrue(meta.changes.isNotEmpty()) + val mapper = AirbyteValueIdentityMapper() + val values = mapper.map(inputValues, inputSchema) as ObjectValue + val changes = mapper.collectedChanges + Assertions.assertTrue(changes.isNotEmpty()) Assertions.assertTrue(values.values["bad"] is NullValue) - Assertions.assertTrue(meta.changes[0].field == "bad") + Assertions.assertTrue(changes[0].field == "bad") + Assertions.assertTrue(changes[0].change == AirbyteRecordMessageMetaChange.Change.NULLED) Assertions.assertTrue( - meta.changes[0].change == AirbyteRecordMessageMetaChange.Change.NULLED - ) - Assertions.assertTrue( - meta.changes[0].reason == + changes[0].reason == AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR ) } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/MapperPipelineTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/MapperPipelineTest.kt new file mode 100644 index 000000000000..937d80db21c7 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/MapperPipelineTest.kt @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data + +import io.airbyte.cdk.load.test.util.Root +import io.airbyte.cdk.load.test.util.SchemaRecordBuilder +import io.airbyte.cdk.load.test.util.ValueTestBuilder +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +class MapperPipelineTest { + class TurnSchemalessObjectTypesIntoIntegers : AirbyteSchemaIdentityMapper { + override fun mapObjectWithoutSchema(schema: ObjectTypeWithoutSchema): AirbyteType = + IntegerType + } + + class TurnSchemalessObjectsIntoIntegers : AirbyteValueIdentityMapper() { + override fun mapObjectWithoutSchema( + value: ObjectValue, + schema: ObjectTypeWithoutSchema, + path: List + ): AirbyteValue { + if (value.values.size == 1) { + throw IllegalStateException("Arbitrarily reject 1") + } + return IntegerValue(value.values.size.toLong()) + } + } + + class TurnIntegerTypesIntoStrings : AirbyteSchemaIdentityMapper { + override fun mapInteger(schema: IntegerType): AirbyteType = StringType + } + + class TurnIntegersIntoStrings : AirbyteValueIdentityMapper() { + override fun mapInteger(value: IntegerValue, path: List): AirbyteValue { + if (value.value == 2L) { + throw IllegalStateException("Arbitrarily reject 2") + } + return StringValue(value.value.toString()) + } + } + + private fun makePipeline(schema: AirbyteType) = + MapperPipeline( + schema, + listOf( + TurnIntegerTypesIntoStrings() to TurnIntegersIntoStrings(), + TurnSchemalessObjectTypesIntoIntegers() to TurnSchemalessObjectsIntoIntegers(), + ) + ) + + @Test + fun testSuccessfulPipeline() { + val (inputSchema, expectedSchema) = + SchemaRecordBuilder() + .with(ObjectTypeWithoutSchema, IntegerType) + .with(IntegerType, StringType) + .withRecord() + .with(IntegerType, StringType) + .with(BooleanType, BooleanType) // expect unchanged + .endRecord() + .build() + + val pipeline = makePipeline(inputSchema) + Assertions.assertEquals( + expectedSchema, + pipeline.finalSchema, + "final schema matches expected transformed schema" + ) + } + + @Test + fun testRecordMapping() { + val (inputValue, inputSchema, expectedOutput) = + ValueTestBuilder() + .with( + ObjectValue(linkedMapOf("a" to IntegerValue(1), "b" to IntegerValue(2))), + ObjectTypeWithoutSchema, + IntegerValue(2) + ) + .with(IntegerValue(1), IntegerType, StringValue("1")) + .withRecord() + .with(IntegerValue(3), IntegerType, StringValue("3")) + .with(BooleanValue(true), BooleanType, BooleanValue(true)) // expect unchanged + .endRecord() + .build() + val pipeline = makePipeline(inputSchema) + val (result, changes) = pipeline.map(inputValue) + + Assertions.assertEquals(0, changes.size, "no changes were captured") + Assertions.assertEquals(expectedOutput, result, "data was transformed as expected") + } + + @Test + fun testFailedMapping() { + val (inputValue, inputSchema, _) = + ValueTestBuilder() + .with( + ObjectValue(linkedMapOf("a" to IntegerValue(1))), + ObjectTypeWithoutSchema, + NullValue, + nullable = true + ) // fail: reject size==1 + .with(IntegerValue(1), IntegerType, StringValue("1")) + .withRecord() + .with(IntegerValue(2), IntegerType, NullValue, nullable = true) // fail: reject 2 + .with(BooleanValue(true), BooleanType, BooleanValue(true)) // expect unchanged + .endRecord() + .build() + val pipeline = makePipeline(inputSchema) + val (_, changes) = pipeline.map(inputValue) + + Assertions.assertEquals(2, changes.size, "two failures were captured") + } +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonStringTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonStringTest.kt index 27d5d9cf9042..eeac7504c8dc 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonStringTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonStringTest.kt @@ -5,7 +5,6 @@ package io.airbyte.cdk.load.data import io.airbyte.cdk.load.data.json.toAirbyteValue -import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.test.util.Root import io.airbyte.cdk.load.test.util.SchemaRecordBuilder import io.airbyte.cdk.load.test.util.ValueTestBuilder @@ -85,7 +84,7 @@ class SchemalessTypesToJsonStringTest { ArrayType(FieldType(StringType, nullable = false)) ) .build() - val mapper = SchemalessValuesToJsonString(DestinationRecord.Meta()) + val mapper = SchemalessValuesToJsonString() val output = mapper.map(inputValues, inputSchema) Assertions.assertEquals(expectedOutput, output) } @@ -120,7 +119,7 @@ class SchemalessTypesToJsonStringTest { ArrayType(FieldType(StringType, nullable = false)) ) .build() - val mapper = SchemalessValuesToJsonString(DestinationRecord.Meta()) + val mapper = SchemalessValuesToJsonString() val output = mapper.map(inputValues, inputSchema) Assertions.assertEquals(expectedOutput, output) } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonTest.kt index 7dab6e3d4b1a..53266563a27a 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonTest.kt @@ -5,7 +5,6 @@ package io.airbyte.cdk.load.data import io.airbyte.cdk.load.data.json.toAirbyteValue -import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.test.util.Root import io.airbyte.cdk.load.test.util.SchemaRecordBuilder import io.airbyte.cdk.load.test.util.ValueTestBuilder @@ -85,7 +84,7 @@ class SchemalessTypesToJsonTest { ArrayType(FieldType(StringType, nullable = false)) ) .build() - val mapper = SchemalessValuesToJson(DestinationRecord.Meta()) + val mapper = SchemalessValuesToJson() val output = mapper.map(inputValues, inputSchema) Assertions.assertEquals(expectedOutput, output) } @@ -120,7 +119,7 @@ class SchemalessTypesToJsonTest { ArrayType(FieldType(StringType, nullable = false)) ) .build() - val mapper = SchemalessValuesToJson(DestinationRecord.Meta()) + val mapper = SchemalessValuesToJson() val output = mapper.map(inputValues, inputSchema) Assertions.assertEquals(expectedOutput, output) } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/TimeStringToIntegerTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/TimeStringToIntegerTest.kt index 6cfc18a622fd..f17896fce9ef 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/TimeStringToIntegerTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/TimeStringToIntegerTest.kt @@ -4,7 +4,6 @@ package io.airbyte.cdk.load.data -import io.airbyte.cdk.load.message.DestinationRecord import io.airbyte.cdk.load.test.util.Root import io.airbyte.cdk.load.test.util.SchemaRecordBuilder import org.junit.jupiter.api.Assertions @@ -14,7 +13,7 @@ class TimeStringToIntegerTest { @Test fun testMapDate() { - val mapper = TimeStringToInteger(DestinationRecord.Meta()) + val mapper = TimeStringToInteger() listOf( "2021-1-1" to 18628, "2021-01-01" to 18628, @@ -65,7 +64,7 @@ class TimeStringToIntegerTest { @Test fun testMapTimestampWithTimezone() { - val mapper = TimeStringToInteger(DestinationRecord.Meta()) + val mapper = TimeStringToInteger() timestampPairs.forEach { Assertions.assertEquals( IntegerValue(it.second), @@ -77,7 +76,7 @@ class TimeStringToIntegerTest { @Test fun testMapTimestampWithoutTimezone() { - val mapper = TimeStringToInteger(DestinationRecord.Meta()) + val mapper = TimeStringToInteger() timestampPairs.forEach { Assertions.assertEquals( IntegerValue(it.second), @@ -100,7 +99,7 @@ class TimeStringToIntegerTest { @Test fun testTimeWithTimezone() { - val mapper = TimeStringToInteger(DestinationRecord.Meta()) + val mapper = TimeStringToInteger() timePairs.forEach { Assertions.assertEquals( IntegerValue(it.second), @@ -112,7 +111,7 @@ class TimeStringToIntegerTest { @Test fun testTimeWithoutTimezone() { - val mapper = TimeStringToInteger(DestinationRecord.Meta()) + val mapper = TimeStringToInteger() timePairs.forEach { Assertions.assertEquals( IntegerValue(it.second), diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/SchemaRecordBuilder.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/SchemaRecordBuilder.kt index b90cd930016e..16d1777e4be1 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/SchemaRecordBuilder.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/SchemaRecordBuilder.kt @@ -34,14 +34,15 @@ class SchemaRecordBuilder( fun with( given: AirbyteType, expected: AirbyteType = given, - nameOverride: String? = null + nameOverride: String? = null, ): SchemaRecordBuilder { return with(FieldType(given, false), FieldType(expected, false), nameOverride) } fun withRecord( nullable: Boolean = false, - nameOverride: String? = null + nameOverride: String? = null, + expectedInstead: ObjectType? = null ): SchemaRecordBuilder> { val name = nameOverride ?: UUID.randomUUID().toString() val inputRecord = ObjectType(properties = LinkedHashMap()) @@ -50,7 +51,7 @@ class SchemaRecordBuilder( expectedSchema.properties[name] = FieldType(outputRecord, nullable = nullable) return SchemaRecordBuilder( inputSchema = inputRecord, - expectedSchema = outputRecord, + expectedSchema = expectedInstead ?: outputRecord, parent = this ) } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/ValueTestBuilder.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/ValueTestBuilder.kt index 922cde30f918..3f8e0e10cd65 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/ValueTestBuilder.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/ValueTestBuilder.kt @@ -6,6 +6,7 @@ package io.airbyte.cdk.load.test.util import io.airbyte.cdk.load.data.AirbyteType import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.FieldType import io.airbyte.cdk.load.data.ObjectType import io.airbyte.cdk.load.data.ObjectValue import java.util.UUID @@ -21,12 +22,16 @@ data class ValueTestBuilder( inputValue: AirbyteValue, inputSchema: AirbyteType, expectedValue: AirbyteValue = inputValue, - nameOverride: String? = null + nameOverride: String? = null, + nullable: Boolean = false, ): ValueTestBuilder { val name = nameOverride ?: UUID.randomUUID().toString() inputValues.values[name] = inputValue expectedValues.values[name] = expectedValue - (schemaRecordBuilder as SchemaRecordBuilder<*>).with(inputSchema, nameOverride = name) + (schemaRecordBuilder as SchemaRecordBuilder<*>).with( + FieldType(inputSchema, nullable), + nameOverride = name + ) return this }