Skip to content

Commit

Permalink
cleaned up test cases to account for derby handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Aditya Bharadwaj committed Nov 2, 2024
1 parent 4b3a774 commit aa35525
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,8 @@ private String addWhereClause(String query, ImmutableList<String> partitionColum
// range to define the where clause. `col >= range.start() AND (col < range.end() OR
// (range.isLast() = TRUE AND col = range.end()))`
queryBuilder.append(
String.format("(%1$s >= ? AND (%1$s < ? OR (? = TRUE AND %1$s = ?)))", backtick(partitionColumn)));
String.format(
"(%1$s >= ? AND (%1$s < ? OR (? = TRUE AND %1$s = ?)))", safe(partitionColumn)));
queryBuilder.append(")");
firstDone = true;
}
Expand All @@ -588,7 +589,7 @@ private String addWhereClause(String query, ImmutableList<String> partitionColum
*/
@Override
public String getReadQuery(String tableName, ImmutableList<String> partitionColumns) {
return addWhereClause("select * from " + backtick(tableName), partitionColumns);
return addWhereClause("select * from " + safe(tableName), partitionColumns);
}

/**
Expand All @@ -604,8 +605,9 @@ public String getCountQuery(
String tableName, ImmutableList<String> partitionColumns, long timeoutMillis) {
return addWhereClause(
String.format(
"select /*+ MAX_EXECUTION_TIME(%s) */ COUNT(*) from %s", timeoutMillis,
backtick(tableName)), partitionColumns);
"select /*+ MAX_EXECUTION_TIME(%s) */ COUNT(*) from %s",
timeoutMillis, safe(tableName)),
partitionColumns);
}

/**
Expand All @@ -620,12 +622,19 @@ public String getCountQuery(
public String getBoundaryQuery(
String tableName, ImmutableList<String> partitionColumns, String colName) {
return addWhereClause(
String.format("select MIN(%s),MAX(%s) from %s", backtick(colName), backtick(colName),
backtick(tableName)), partitionColumns);
String.format(
"select MIN(%s),MAX(%s) from %s", safe(colName), safe(colName), safe(tableName)),
partitionColumns);
}

private String backtick(String s) {
return "`"+s+"`";
private String safe(String s) {
switch (mySqlVersion) {
case DERBY:
return "\"" + s + "\"";
case DEFAULT:
default:
return "`" + s + "`";
}
}

/**
Expand Down Expand Up @@ -675,6 +684,7 @@ public String getCollationsOrderQuery(String dbCharset, String dbCollation, bool
*/
public enum MySqlVersion {
DEFAULT,
DERBY
}

protected static final class InformationSchemaCols {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,20 +248,6 @@ public void testJdbcIoWrapperDifferentTables() throws RetriableSchemaDiscoveryEx
String testCol = "ID";
SourceColumnType testColType = new SourceColumnType("INTEGER", new Long[] {}, null);
when(mockDialectAdapter.discoverTables(any(), any())).thenReturn(ImmutableList.of("testTable"));
when(mockDialectAdapter.discoverTableIndexes(any(), any(), any()))
.thenReturn(
ImmutableMap.of(
"testTable",
ImmutableList.of(
SourceColumnIndexInfo.builder()
.setIndexType(IndexType.NUMERIC)
.setIndexName("PRIMARY")
.setIsPrimary(true)
.setCardinality(42L)
.setColumnName(testCol)
.setIsUnique(true)
.setOrdinalPosition(1)
.build())));
when(mockDialectAdapter.discoverTableSchema(any(), any(), any()))
.thenReturn(ImmutableMap.of("testTable", ImmutableMap.of(testCol, testColType)));
JdbcIoWrapper jdbcIoWrapper =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.mysql.MysqlDialectAdapter.MySqlVersion;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.BoundarySplitterFactory;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.Range;
import com.google.cloud.teleport.v2.utilities.DerbyUtils;
import com.google.common.collect.ImmutableList;
import java.sql.Connection;
import java.sql.DriverManager;
Expand Down Expand Up @@ -60,17 +61,40 @@ public void initDerby() throws SQLException, ClassNotFoundException {

private void createDerbyTable() throws SQLException {
Statement stmtCreateTable = connection.createStatement();
String tableName = "test_table_column_boundary";
String col1Name = DerbyUtils.quote("col1");
String col2Name = DerbyUtils.quote("col2");
String dataName = DerbyUtils.quote("data");
String createTableSQL =
"CREATE TABLE test_table_column_boundary ("
+ "col1 INT,"
+ "col2 INT,"
+ "PRIMARY KEY (col1, col2)"
"CREATE TABLE "
+ DerbyUtils.quote(tableName)
+ " ("
+ col1Name
+ " INT,"
+ col2Name
+ " INT,"
+ dataName
+ " VARCHAR(20),"
+ "PRIMARY KEY ("
+ col1Name
+ ", "
+ col2Name
+ ")"
+ ")";
stmtCreateTable.executeUpdate(createTableSQL);
stmtCreateTable.executeUpdate(DerbyUtils.modifyQuery(createTableSQL));

// 2.2 Insert Data (Using PreparedStatement for Efficiency & Security)
String insertSQL = "INSERT INTO test_table_column_boundary (col1, col2) VALUES (?, ?)";
PreparedStatement stmtInsert = connection.prepareStatement(insertSQL);
String insertSQL =
"INSERT INTO "
+ DerbyUtils.quote(tableName)
+ "("
+ col1Name
+ ", "
+ col2Name
+ ", "
+ dataName
+ ") VALUES (?, ?, ?)";
PreparedStatement stmtInsert = connection.prepareStatement(DerbyUtils.modifyQuery(insertSQL));

// Batch the insert operations
stmtInsert.setInt(1, 10);
Expand All @@ -94,7 +118,7 @@ private void createDerbyTable() throws SQLException {

private void dropDerbyTable() throws SQLException {
Statement statement = connection.createStatement();
statement.executeUpdate("drop table test_table_column_boundary");
statement.executeUpdate("drop table " + DerbyUtils.quote("test_table_column_boundary"));
}

@Test
Expand Down Expand Up @@ -124,12 +148,12 @@ public void setParameters() throws Exception {
.build();

String boundaryQueryCol1 =
new MysqlDialectAdapter(MySqlVersion.DEFAULT)
new MysqlDialectAdapter(MySqlVersion.DERBY)
.getBoundaryQuery("test_table_column_boundary", partitionCols, "col1");
PreparedStatement boundaryStmtCol1 = connection.prepareStatement(boundaryQueryCol1);

String boundaryQueryCol2 =
new MysqlDialectAdapter(MySqlVersion.DEFAULT)
new MysqlDialectAdapter(MySqlVersion.DERBY)
.getBoundaryQuery("test_table_column_boundary", partitionCols, "col2");
PreparedStatement boundaryStmtCol2 = connection.prepareStatement(boundaryQueryCol2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.mysql.MysqlDialectAdapter;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.mysql.MysqlDialectAdapter.MySqlVersion;
import com.google.cloud.teleport.v2.utilities.DerbyUtils;
import com.google.common.collect.ImmutableList;
import java.sql.Connection;
import java.sql.DriverManager;
Expand Down Expand Up @@ -66,7 +67,8 @@ private void createDerbyTable() throws SQLException {
stmtCreateTable.executeUpdate(createTableSQL);

// 2.2 Insert Data (Using PreparedStatement for Efficiency & Security)
String insertSQL = "INSERT INTO \"test_table_range_setter\" (\"col1\", \"col2\", \"data\") VALUES (?, ?, ?)";
String insertSQL =
"INSERT INTO \"test_table_range_setter\" (\"col1\", \"col2\", \"data\") VALUES (?, ?, ?)";
PreparedStatement stmtInsert = connection.prepareStatement(insertSQL);

// Batch the insert operations
Expand Down Expand Up @@ -127,7 +129,8 @@ public void testSetParameters() throws Exception {
String readQuery =
new MysqlDialectAdapter(MySqlVersion.DEFAULT)
.getReadQuery("test_table_range_setter", partitionCols);
PreparedStatement readStmtSingleColNonLast = connection.prepareStatement(changeQueryForDerby(readQuery));
PreparedStatement readStmtSingleColNonLast =
connection.prepareStatement(DerbyUtils.modifyQuery(readQuery));
rangePreparedStatementSetter.setParameters(singleColNonLastRange, readStmtSingleColNonLast);
ResultSet readStmtSingleColNonLastResultSet = readStmtSingleColNonLast.executeQuery();
ImmutableList.Builder<String> readSingleColNonLastRangedataPointsBuilder =
Expand All @@ -140,7 +143,8 @@ public void testSetParameters() throws Exception {
String countQuery =
new MysqlDialectAdapter(MySqlVersion.DEFAULT)
.getCountQuery("test_table_range_setter", partitionCols, 0);
PreparedStatement countStmtSingleColNonLast = connection.prepareStatement(changeQueryForDerby(countQuery));
PreparedStatement countStmtSingleColNonLast =
connection.prepareStatement(DerbyUtils.modifyQuery(countQuery));
rangePreparedStatementSetter.setParameters(singleColNonLastRange, countStmtSingleColNonLast);
ResultSet countStmtSingleColNonLastResultSet = countStmtSingleColNonLast.executeQuery();
countStmtSingleColNonLastResultSet.next();
Expand Down Expand Up @@ -173,7 +177,8 @@ public void testSetParameters() throws Exception {
.setEnd(40)
.build(),
null);
PreparedStatement countStmtBothCol = connection.prepareStatement(changeQueryForDerby(countQuery));
PreparedStatement countStmtBothCol =
connection.prepareStatement(DerbyUtils.modifyQuery(countQuery));
rangePreparedStatementSetter.setParameters(bothColRange, countStmtBothCol);
ResultSet countStmtBothColResultSet = countStmtBothCol.executeQuery();
countStmtBothColResultSet.next();
Expand All @@ -192,8 +197,4 @@ public void exitDerby() throws SQLException {
dropDerbyTable();
connection.close();
}

public String changeQueryForDerby(String query) {
return query.replaceAll("`","\"");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testRangeCountTransform() throws Exception {
testPipeline.apply(Create.of(firstColumnForBoundaryQuery, secondColumnForBoundaryQuery));
RangeBoundaryTransform rangeBoundaryTransform =
RangeBoundaryTransform.builder()
.setDbAdapter(new MysqlDialectAdapter(MySqlVersion.DEFAULT))
.setDbAdapter(new MysqlDialectAdapter(MySqlVersion.DERBY))
.setPartitionColumns(partitionCols)
.setDataSourceProviderFn(dataSourceProviderFn)
.setTableName(tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testRangeCountTransform() throws Exception {
PCollection<Range> input = testPipeline.apply(Create.of(singleColNonLastRange, bothColRange));
RangeCountTransform rangeCountTransform =
RangeCountTransform.builder()
.setDbAdapter(new MysqlDialectAdapter(MySqlVersion.DEFAULT))
.setDbAdapter(new MysqlDialectAdapter(MySqlVersion.DERBY))
.setPartitionColumns(partitionCols)
.setDataSourceProviderFn(dataSourceProviderFn)
.setTimeoutMillis(42L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ private ReadWithUniformPartitions getReadWithUniformPartitionsForTest(
.setColumnName("col2")
.setColumnClass(Integer.class)
.build()))
.setDbAdapter(new MysqlDialectAdapter(MySqlVersion.DEFAULT))
.setDbAdapter(new MysqlDialectAdapter(MySqlVersion.DERBY))
.setDataSourceProviderFn(dataSourceProviderFn)
.setAdditionalOperationsOnRanges(testRangesPeek)
.setRowMapper(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.transforms;

import com.google.cloud.teleport.v2.utilities.DerbyUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
Expand All @@ -34,20 +35,39 @@ private TransformTestUtils() {}
static void createDerbyTable(String tableName) throws SQLException {
try (java.sql.Connection connection = getConnection()) {
Statement stmtCreateTable = connection.createStatement();
String col1Name = DerbyUtils.quote("col1");
String col2Name = DerbyUtils.quote("col2");
String dataName = DerbyUtils.quote("data");
String createTableSQL =
"CREATE TABLE "
+ tableName
+ DerbyUtils.quote(tableName)
+ " ("
+ "col1 INT,"
+ "col2 INT,"
+ "data VARCHAR(20),"
+ "PRIMARY KEY (col1, col2)"
+ col1Name
+ " INT,"
+ col2Name
+ " INT,"
+ dataName
+ " VARCHAR(20),"
+ "PRIMARY KEY ("
+ col1Name
+ ", "
+ col2Name
+ ")"
+ ")";
stmtCreateTable.executeUpdate(createTableSQL);
stmtCreateTable.executeUpdate(DerbyUtils.modifyQuery(createTableSQL));

// 2.2 Insert Data (Using PreparedStatement for Efficiency & Security)
String insertSQL = "INSERT INTO " + tableName + " (col1, col2, data) VALUES (?, ?, ?)";
PreparedStatement stmtInsert = connection.prepareStatement(insertSQL);
String insertSQL =
"INSERT INTO "
+ DerbyUtils.quote(tableName)
+ "("
+ col1Name
+ ", "
+ col2Name
+ ", "
+ dataName
+ ") VALUES (?, ?, ?)";
PreparedStatement stmtInsert = connection.prepareStatement(DerbyUtils.modifyQuery(insertSQL));

// Batch the insert operations
stmtInsert.setInt(1, 10);
Expand Down Expand Up @@ -87,7 +107,7 @@ static void createDerbyTable(String tableName) throws SQLException {
static void dropDerbyTable(String tableName) throws SQLException {
try (Connection connection = getConnection()) {
Statement statement = connection.createStatement();
statement.executeUpdate("drop table " + tableName);
statement.executeUpdate(DerbyUtils.modifyQuery("drop table " + DerbyUtils.quote(tableName)));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.utilities;

/**
* Utilities to run test cases on derby
*
* @see org.apache.beam.it.jdbc.MySQLResourceManager
*/
public class DerbyUtils {

public static String quote(String element) {
return "\"" + element + "\"";
}

public static String modifyQuery(String query) {
return query.replaceAll("`", "\"");
}
}

0 comments on commit aa35525

Please sign in to comment.