Skip to content

Commit

Permalink
Escape column names
Browse files Browse the repository at this point in the history
  • Loading branch information
psainics committed Oct 23, 2024
1 parent acd47fa commit b8a9f71
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.cloudsql.mysql;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.cdap.cdap.api.annotation.Description;
Expand All @@ -25,6 +26,7 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSink;
Expand All @@ -40,7 +42,11 @@
import io.cdap.plugin.util.CloudSQLUtil;
import io.cdap.plugin.util.DBUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import javax.annotation.Nullable;

/** Sink support for a CloudSQL MySQL database. */
Expand All @@ -52,6 +58,7 @@
public class CloudSQLMySQLSink extends AbstractDBSink<CloudSQLMySQLSink.CloudSQLMySQLSinkConfig> {

private final CloudSQLMySQLSinkConfig cloudsqlMysqlSinkConfig;
private static final Character ESCAPE_CHAR = '`';

public CloudSQLMySQLSink(CloudSQLMySQLSinkConfig cloudsqlMysqlSinkConfig) {
super(cloudsqlMysqlSinkConfig);
Expand All @@ -78,6 +85,24 @@ protected DBRecord getDBRecord(StructuredRecord output) {
return new MysqlDBRecord(output, columnTypes);
}

@Override
protected void setColumnsInfo(List<Schema.Field> fields) {
List<String> columnsList = new ArrayList<>();
StringJoiner columnsJoiner = new StringJoiner(",");
for (Schema.Field field : fields) {
columnsList.add(field.getName());
columnsJoiner.add(ESCAPE_CHAR + field.getName() + ESCAPE_CHAR);
}

super.columns = Collections.unmodifiableList(columnsList);
super.dbColumns = columnsJoiner.toString();
}

@VisibleForTesting
String getDbColumns() {
return dbColumns;
}

@Override
protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
String host;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.cloudsql.mysql;

import io.cdap.cdap.api.data.schema.Schema;
import org.junit.Assert;
import org.junit.Test;

public class CloudSQLMySQLSinkTest {
@Test
public void testSetColumnsInfo() {
Schema outputSchema = Schema.recordOf("output",
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("insert", Schema.of(Schema.Type.STRING)));
CloudSQLMySQLSink cloudSQLMySQLSink = new CloudSQLMySQLSink(new CloudSQLMySQLSink.CloudSQLMySQLSinkConfig());
Assert.assertNotNull(outputSchema.getFields());
cloudSQLMySQLSink.setColumnsInfo(outputSchema.getFields());
Assert.assertEquals("`id`,`name`,`insert`", cloudSQLMySQLSink.getDbColumns());
}
}
24 changes: 24 additions & 0 deletions mysql-plugin/src/main/java/io/cdap/plugin/mysql/MysqlSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.mysql;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
Expand All @@ -24,6 +25,7 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
Expand All @@ -39,9 +41,12 @@
import io.cdap.plugin.db.sink.FieldsValidator;
import io.cdap.plugin.util.DBUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/**
Expand All @@ -54,6 +59,7 @@
public class MysqlSink extends AbstractDBSink<MysqlSink.MysqlSinkConfig> {

private final MysqlSinkConfig mysqlSinkConfig;
private static final Character ESCAPE_CHAR = '`';

public MysqlSink(MysqlSinkConfig mysqlSinkConfig) {
super(mysqlSinkConfig);
Expand Down Expand Up @@ -85,6 +91,24 @@ protected SchemaReader getSchemaReader() {
return new MysqlSchemaReader(null);
}

@Override
protected void setColumnsInfo(List<Schema.Field> fields) {
List<String> columnsList = new ArrayList<>();
StringJoiner columnsJoiner = new StringJoiner(",");
for (Schema.Field field : fields) {
columnsList.add(field.getName());
columnsJoiner.add(ESCAPE_CHAR + field.getName() + ESCAPE_CHAR);
}

super.columns = Collections.unmodifiableList(columnsList);
super.dbColumns = columnsJoiner.toString();
}

@VisibleForTesting
String getDbColumns() {
return dbColumns;
}

/**
* MySQL action configuration.
*/
Expand Down
35 changes: 35 additions & 0 deletions mysql-plugin/src/test/java/io/cdap/plugin/mysql/MysqlSinkTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.mysql;

import io.cdap.cdap.api.data.schema.Schema;
import org.junit.Assert;
import org.junit.Test;

public class MysqlSinkTest {
@Test
public void testSetColumnsInfo() {
Schema outputSchema = Schema.recordOf("output",
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("insert", Schema.of(Schema.Type.STRING)));
MysqlSink mySQLSink = new MysqlSink(new MysqlSink.MysqlSinkConfig());
Assert.assertNotNull(outputSchema.getFields());
mySQLSink.setColumnsInfo(outputSchema.getFields());
Assert.assertEquals("`id`,`name`,`insert`", mySQLSink.getDbColumns());
}
}

0 comments on commit b8a9f71

Please sign in to comment.