Skip to content

Commit

Permalink
Drop NULL columns from INSERT INTO (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan authored Jul 15, 2024
1 parent 92f1864 commit 701ce75
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 9 deletions.
2 changes: 1 addition & 1 deletion deploy/samples/subscriptions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ kind: Subscription
metadata:
name: names
spec:
sql: SELECT NAME, NAME AS KEY FROM DATAGEN.PERSON
sql: SELECT NAME, NULL AS KEY FROM DATAGEN.PERSON
database: RAWKAFKA


Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
public enum DataType {

VARCHAR(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), true)),
VARCHAR_NOT_NULL(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), false));
VARCHAR_NOT_NULL(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), false)),
NULL(x -> x.createSqlType(SqlTypeName.NULL));

public static final RelDataTypeFactory DEFAULT_TYPE_FACTORY = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
private final RelProtoDataType protoType;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.hoptimator.catalog;

import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
Expand Down Expand Up @@ -27,11 +28,13 @@
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.pretty.SqlPrettyWriter;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.SqlShuttle;

import java.util.Map;
import java.util.List;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -171,6 +174,7 @@ public SqlNode visit(SqlCall call) {
*
* N.B. the following magic:
* - field 'PRIMARY_KEY' is treated as a PRIMARY KEY
* - NULL fields are promoted to BYTES
*/
class ConnectorImplementor implements ScriptImplementor {
private final String database;
Expand Down Expand Up @@ -228,7 +232,12 @@ public void implement(SqlWriter w) {
}
}

/** Implements an INSERT INTO statement */
/** Implements an INSERT INTO statement.
*
* N.B. the following magic:
* - NULL columns (e.g. `NULL AS KEY`) are elided from the pipeline
*
* */
class InsertImplementor implements ScriptImplementor {
private final String database;
private final String name;
Expand All @@ -245,11 +254,24 @@ public void implement(SqlWriter w) {
w.keyword("INSERT INTO");
(new CompoundIdentifierImplementor(database, name)).implement(w);
SqlWriter.Frame frame1 = w.startList("(", ")");
(new ColumnListImplementor(relNode.getRowType())).implement(w);
RelNode project = dropNullFields(relNode);
(new ColumnListImplementor(project.getRowType())).implement(w);
w.endList(frame1);
(new QueryImplementor(relNode)).implement(w);
(new QueryImplementor(project)).implement(w);
w.literal(";");
}

private static RelNode dropNullFields(RelNode relNode) {
List<Integer> cols = new ArrayList<>();
int i = 0;
for (RelDataTypeField field : relNode.getRowType().getFieldList()) {
if (!field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) {
cols.add(i);
}
i++;
}
return RelOptUtil.createProject(relNode, cols);
}
}

/** Implements a CREATE DATABASE IF NOT EXISTS statement */
Expand Down Expand Up @@ -288,7 +310,11 @@ public void implement(SqlWriter w) {
}
}

/** Implements row type specs, e.g. `NAME VARCHAR(20), AGE INTEGER` */
/** Implements row type specs, e.g. `NAME VARCHAR(20), AGE INTEGER`.
*
* N.B. the following magic:
* - NULL fields are promoted to BYTES
*/
class RowTypeSpecImplementor implements ScriptImplementor {
private final RelDataType dataType;

Expand All @@ -309,7 +335,11 @@ public void implement(SqlWriter w) {
for (int i = 0; i < fieldNames.size(); i++) {
w.sep(",");
fieldNames.get(i).unparse(w, 0, 0);
fieldTypes.get(i).unparse(w, 0, 0);
if (fieldTypes.get(i).getTypeName().getSimple().equals("NULL")) {
w.literal("BYTES"); // promote NULL fields to BYTES
} else {
fieldTypes.get(i).unparse(w, 0, 0);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,27 @@ public void implementsFlinkCreateTableDDL() {
assertTrue(out, out.contains("'topic'='topic1'"));
assertFalse(out, out.contains("Row"));
}

@Test
public void magicPrimaryKey() {
SqlWriter w = new SqlPrettyWriter();
RelDataType rowType = DataType.struct().with("F1", DataType.VARCHAR)
.with("PRIMARY_KEY", DataType.VARCHAR).rel();
HopTable table = new HopTable("DATABASE", "TABLE1", rowType, ConfigProvider.empty().config("x"));
table.implement(w);
String out = w.toString();
assertTrue(out, out.contains("PRIMARY KEY (PRIMARY_KEY)"));
}

@Test
public void magicNullFields() {
SqlWriter w = new SqlPrettyWriter();
RelDataType rowType = DataType.struct().with("F1", DataType.VARCHAR)
.with("KEY", DataType.NULL).rel();
HopTable table = new HopTable("DATABASE", "TABLE1", rowType, ConfigProvider.empty().config("x"));
table.implement(w);
String out = w.toString();
assertTrue(out, out.contains("\"KEY\" BYTES")); // NULL fields are promoted to BYTES.
assertFalse(out, out.contains("\"KEY\" NULL")); // Without magic, this is what you'd get.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> o
ConfigProvider connectorConfigProvider = ConfigProvider.from(clientConfig)
.withPrefix("properties.")
.with("connector", "kafka")
.with("key.format", "csv")
.with("key.format", "raw")
.with("key.fields", "KEY")
.with("value.format", "csv")
.with("value.fields-include", "EXCEPT_KEY")
.with("scan.startup.mode", "earliest-offset")
.with("topic", x -> x);
TableLister tableLister = () -> {
AdminClient client = AdminClient.create(clientConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ public ScriptImplementor query() {
/** Script ending in INSERT INTO ... */
public ScriptImplementor insertInto(HopTable sink) {
RelOptUtil.eq(sink.name(), sink.rowType(), "subscription", rowType(), Litmus.THROW);
RelNode castRel = RelOptUtil.createCastRel(relNode, sink.rowType(), true);
return script.database(sink.database()).with(sink)
.insert(sink.database(), sink.name(), relNode);
.insert(sink.database(), sink.name(), castRel);
}

/** Add any resources, SQL, DDL etc required to access the table. */
Expand Down

0 comments on commit 701ce75

Please sign in to comment.