Skip to content

Commit

Permalink
Use Key fields and Record Projection for deletes (#408)
Browse files Browse the repository at this point in the history
* Use Key fields and Record Projection for deletes

* Use Key fields and Record Projection for deletes
  • Loading branch information
ismailsimsek authored Sep 8, 2024
1 parent b246015 commit 8269d89
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 2 deletions.
5 changes: 5 additions & 0 deletions debezium-server-iceberg-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,11 @@
<version>4.2.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.ConfigValue;
Expand Down Expand Up @@ -172,7 +174,7 @@ public static GenericAppenderFactory getTableAppender(Table icebergTable) {
icebergTable.schema(),
icebergTable.spec(),
Ints.toArray(identifierFieldIds),
icebergTable.schema(),
TypeUtil.select(icebergTable.schema(), Sets.newHashSet(identifierFieldIds)),
null)
.setAll(icebergTable.properties());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {
private final InternalRecordWrapper keyWrapper;
private final boolean upsert;
private final boolean upsertKeepDeletes;
private final RecordProjection keyProjection;

BaseDeltaTaskWriter(PartitionSpec spec,
FileFormat format,
Expand All @@ -40,6 +41,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new InternalRecordWrapper(schema.asStruct());
this.keyWrapper = new InternalRecordWrapper(deleteSchema.asStruct());
this.keyProjection = RecordProjection.create(schema, deleteSchema);
this.upsert = upsert;
this.upsertKeepDeletes = upsertKeepDeletes;
}
Expand All @@ -65,7 +67,7 @@ public void write(Record row) throws IOException {
} else {
// UPSERT MODE
if (!opFieldValue.equals("c")) {// anything which not created is deleted first
writer.delete(row);
writer.deleteKey(keyProjection.wrap(row));
}
// when upsertKeepDeletes = FALSE we dont keep deleted record
if (upsertKeepDeletes || !opFieldValue.equals("d")) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package io.debezium.server.iceberg.tableoperator;

import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types.ListType;
import org.apache.iceberg.types.Types.MapType;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;

import java.util.List;
import java.util.Map;

/**
* This is copy of tabular-io iceberg-kafka-connect code!
* https://github.com/tabular-io/iceberg-kafka-connect/blob/main/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordProjection.java
* <p>
* This is modified from {@link org.apache.iceberg.util.StructProjection} to support record types.
*/
public class RecordProjection implements Record {

/**
* Creates a projecting wrapper for {@link Record} rows.
*
* <p>This projection does not work with repeated types like lists and maps.
*
* @param dataSchema schema of rows wrapped by this projection
* @param projectedSchema result schema of the projected rows
* @return a wrapper to project rows
*/
public static RecordProjection create(Schema dataSchema, Schema projectedSchema) {
return new RecordProjection(dataSchema.asStruct(), projectedSchema.asStruct());
}

private final StructType type;
private final int[] positionMap;
private final RecordProjection[] nestedProjections;
private Record record;

private RecordProjection(StructType structType, StructType projection) {
this(structType, projection, false);
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private RecordProjection(StructType structType, StructType projection, boolean allowMissing) {
this.type = projection;
this.positionMap = new int[projection.fields().size()];
this.nestedProjections = new RecordProjection[projection.fields().size()];

// set up the projection positions and any nested projections that are needed
List<NestedField> dataFields = structType.fields();
for (int pos = 0; pos < positionMap.length; pos += 1) {
NestedField projectedField = projection.fields().get(pos);

boolean found = false;
for (int i = 0; !found && i < dataFields.size(); i += 1) {
NestedField dataField = dataFields.get(i);
if (projectedField.fieldId() == dataField.fieldId()) {
found = true;
positionMap[pos] = i;
switch (projectedField.type().typeId()) {
case STRUCT:
nestedProjections[pos] =
new RecordProjection(
dataField.type().asStructType(), projectedField.type().asStructType());
break;
case MAP:
MapType projectedMap = projectedField.type().asMapType();
MapType originalMap = dataField.type().asMapType();

boolean keyProjectable =
!projectedMap.keyType().isNestedType()
|| projectedMap.keyType().equals(originalMap.keyType());
boolean valueProjectable =
!projectedMap.valueType().isNestedType()
|| projectedMap.valueType().equals(originalMap.valueType());
Preconditions.checkArgument(
keyProjectable && valueProjectable,
"Cannot project a partial map key or value struct. Trying to project %s out of %s",
projectedField,
dataField);

nestedProjections[pos] = null;
break;
case LIST:
ListType projectedList = projectedField.type().asListType();
ListType originalList = dataField.type().asListType();

boolean elementProjectable =
!projectedList.elementType().isNestedType()
|| projectedList.elementType().equals(originalList.elementType());
Preconditions.checkArgument(
elementProjectable,
"Cannot project a partial list element struct. Trying to project %s out of %s",
projectedField,
dataField);

nestedProjections[pos] = null;
break;
default:
nestedProjections[pos] = null;
}
}
}

if (!found && projectedField.isOptional() && allowMissing) {
positionMap[pos] = -1;
nestedProjections[pos] = null;
} else if (!found) {
throw new IllegalArgumentException(
String.format("Cannot find field %s in %s", projectedField, structType));
}
}
}

public RecordProjection wrap(Record newRecord) {
this.record = newRecord;
return this;
}

@Override
public int size() {
return type.fields().size();
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
// struct can be null if wrap is not called first before the get call
// or if a null struct is wrapped.
if (record == null) {
return null;
}

int recordPos = positionMap[pos];
if (nestedProjections[pos] != null) {
Record nestedStruct = record.get(recordPos, Record.class);
if (nestedStruct == null) {
return null;
}

return javaClass.cast(nestedProjections[pos].wrap(nestedStruct));
}

if (recordPos != -1) {
return record.get(recordPos, javaClass);
} else {
return null;
}
}

@Override
public <T> void set(int pos, T value) {
throw new UnsupportedOperationException();
}

@Override
public StructType struct() {
return type;
}

@Override
public Object getField(String name) {
throw new UnsupportedOperationException();
}

@Override
public void setField(String name, Object value) {
throw new UnsupportedOperationException();
}

@Override
public Object get(int pos) {
return get(pos, Object.class);
}

@Override
public Record copy() {
throw new UnsupportedOperationException();
}

@Override
public Record copy(Map<String, Object> overwriteValues) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.debezium.server.iceberg.tableoperator;

import io.debezium.server.iceberg.IcebergUtil;
import org.apache.iceberg.*;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.encryption.PlaintextEncryptionManager;
import org.apache.iceberg.inmemory.InMemoryFileIO;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.BeforeEach;

import java.util.Set;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class BaseWriterTest {

protected InMemoryFileIO fileIO;
protected Table table;
FileFormat format;
GenericAppenderFactory appenderFactory;
OutputFileFactory fileFactory;
Set<Integer> equalityFieldIds;

protected static final Schema SCHEMA =
new Schema(
ImmutableList.of(
Types.NestedField.required(1, "id", Types.StringType.get()),
Types.NestedField.required(2, "data", Types.StringType.get()),
Types.NestedField.required(3, "id2", Types.StringType.get()),
Types.NestedField.required(4, "__op", Types.StringType.get())
),
ImmutableSet.of(1, 3));

protected static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).identity("data").build();

@BeforeEach
public void before() {
fileIO = new InMemoryFileIO();

table = mock(Table.class);
when(table.schema()).thenReturn(SCHEMA);
when(table.spec()).thenReturn(PartitionSpec.unpartitioned());
when(table.io()).thenReturn(fileIO);
when(table.locationProvider())
.thenReturn(LocationProviders.locationsFor("file", ImmutableMap.of()));
when(table.encryption()).thenReturn(PlaintextEncryptionManager.instance());
when(table.properties()).thenReturn(ImmutableMap.of());

format = IcebergUtil.getTableFileFormat(table);
appenderFactory = IcebergUtil.getTableAppender(table);
fileFactory = IcebergUtil.getTableOutputFileFactory(table, format);
equalityFieldIds = table.schema().identifierFieldIds();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.debezium.server.iceberg.tableoperator;

import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.WriteResult;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.IOException;

class UnpartitionedDeltaWriterTest extends BaseWriterTest {

@Test
public void testUnpartitionedDeltaWriter() throws IOException {
UnpartitionedDeltaWriter writer = new UnpartitionedDeltaWriter(table.spec(), format, appenderFactory, fileFactory,
table.io(),
Long.MAX_VALUE, table.schema(), equalityFieldIds, true, true);

Record row = GenericRecord.create(SCHEMA);
row.setField("id", "123");
row.setField("data", "hello world!");
row.setField("id2", "123");
row.setField("__op", "u");

writer.write(row);
WriteResult result = writer.complete();

// in upsert mode, each write is a delete + append, so we'll have 1 data file and 1 delete file
Assertions.assertEquals(result.dataFiles().length, 1);
Assertions.assertEquals(result.dataFiles()[0].format(), format);
Assertions.assertEquals(result.deleteFiles().length, 1);
Assertions.assertEquals(result.deleteFiles()[0].format(), format);
}
}

0 comments on commit 8269d89

Please sign in to comment.