Skip to content

Commit

Permalink
-
Browse files Browse the repository at this point in the history
  • Loading branch information
kramerul committed Nov 20, 2023
1 parent e15ba78 commit 173e896
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.calcite.adapter.jdbc;

import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;

import org.checkerframework.checker.nullness.qual.Nullable;

public class JdbcCorrelationDataContext implements DataContext {
public final static int OFFSET= 10000;

private final DataContext delegate;
private final Object[] parameters;

public JdbcCorrelationDataContext(DataContext delegate, Object[] parameters) {
this.delegate = delegate;
this.parameters = parameters;
}
@Override
public @Nullable SchemaPlus getRootSchema() {
return delegate.getRootSchema();
}

@Override
public JavaTypeFactory getTypeFactory() {
return delegate.getTypeFactory();
}

@Override
public QueryProvider getQueryProvider() {
return delegate.getQueryProvider();
}

@Override
public @Nullable Object get(String name) {
if ( name.startsWith("?")) {
int index = Integer.parseInt(name.substring(1));
if ( index >= OFFSET && index < OFFSET+parameters.length) {
return parameters[index-OFFSET];
}
}
return delegate.get(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@

import java.lang.reflect.Type;

public interface JdbcCorrelationVariableBuilder {
int createCorrelationVariable(CorrelationId id, int ordinal, Type type);
public interface JdbcCorrelationDataContextBuilder {
int add(CorrelationId id, int ordinal, Type type);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.calcite.adapter.jdbc;

import com.google.common.collect.ImmutableList;

import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.linq4j.tree.Types;
import org.apache.calcite.rel.core.CorrelationId;

import java.lang.reflect.Constructor;
import java.lang.reflect.Type;

public class JdbcCorrelationDataContextBuilderImpl implements JdbcCorrelationDataContextBuilder {
private static final Constructor NEW = Types.lookupConstructor(JdbcCorrelationDataContext.class, DataContext.class, Object[].class);
private final ImmutableList.Builder<Expression> parameters = new ImmutableList.Builder<>();
private int offset = JdbcCorrelationDataContext.OFFSET;
private final EnumerableRelImplementor implementor;
private final BlockBuilder builder;
private final Expression dataContext;

public JdbcCorrelationDataContextBuilderImpl(EnumerableRelImplementor implementor,BlockBuilder builder, Expression dataContext) {
this.implementor = implementor;
this.builder = builder;
this.dataContext = dataContext;
}

@Override
public int add(CorrelationId id, int ordinal, Type type) {
parameters.add(implementor.getCorrelVariableGetter(id.getName()).field(builder,ordinal,type));
return offset++;
}

public Expression build() {
return Expressions.new_(NEW,dataContext,
Expressions.newArrayInit(Object.class,1,parameters.build()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,35 @@
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
import org.apache.calcite.rel.rel2sql.SqlImplementor;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlDynamicParam;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.Util;

import java.lang.reflect.Type;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.Set;

/**
* State for generating a SQL statement.
*/
public class JdbcImplementor extends RelToSqlConverter {

private final JdbcCorrelationVariableBuilder correlationVariableBuilder;
private final JdbcCorrelationDataContextBuilder dataContextBuilder;
private final JavaTypeFactory typeFactory;

public JdbcImplementor(SqlDialect dialect, JavaTypeFactory typeFactory, JdbcCorrelationVariableBuilder correlationVariableBuilder) {
public JdbcImplementor(SqlDialect dialect, JavaTypeFactory typeFactory, JdbcCorrelationDataContextBuilder dataContextBuilder) {
super(dialect);
this. typeFactory= typeFactory;
this.correlationVariableBuilder = correlationVariableBuilder;
this.dataContextBuilder = dataContextBuilder;
}

public JdbcImplementor(SqlDialect dialect, JavaTypeFactory typeFactory) {
this(dialect, typeFactory, new JdbcCorrelationVariableBuilder() {
this(dialect, typeFactory, new JdbcCorrelationDataContextBuilder() {
private int counter = 1;
@Override
public int createCorrelationVariable(CorrelationId id, int ordinal, Type type) {
public int add(CorrelationId id, int ordinal, Type type) {
return counter++;
}
});
Expand All @@ -74,7 +68,7 @@ protected Context getAliasContext(RexCorrelVariable variable){
@Override
public SqlNode field(int ordinal) {
RelDataTypeField field = fieldList.get(ordinal);
return new SqlDynamicParam(correlationVariableBuilder.createCorrelationVariable(variable.id,ordinal, typeFactory.getJavaClass(field.getType())),SqlParserPos.ZERO);
return new SqlDynamicParam(dataContextBuilder.add(variable.id,ordinal, typeFactory.getJavaClass(field.getType())),SqlParserPos.ZERO);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,29 @@
*/
package org.apache.calcite.adapter.jdbc;

import com.google.common.collect.ImmutableList;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.enumerable.*;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.adapter.enumerable.JavaRowFormat;
import org.apache.calcite.adapter.enumerable.PhysType;
import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
import org.apache.calcite.adapter.enumerable.RexImpTable;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.CalciteSystemProperty;
import org.apache.calcite.linq4j.tree.*;
import org.apache.calcite.plan.*;
import org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.calcite.linq4j.tree.ConstantExpression;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.linq4j.tree.ParameterExpression;
import org.apache.calcite.linq4j.tree.Primitive;
import org.apache.calcite.linq4j.tree.UnaryExpression;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterImpl;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.runtime.Hook;
Expand All @@ -35,23 +48,24 @@
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.SqlString;
import org.apache.calcite.util.BuiltInMethod;

import org.checkerframework.checker.nullness.qual.Nullable;

import javax.sql.DataSource;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.TimeZone;
import java.util.stream.Collectors;
import javax.sql.DataSource;

import static java.util.Objects.requireNonNull;
import static org.apache.calcite.linq4j.Nullness.castNonNull;

import static java.util.Objects.requireNonNull;

/**
* Relational expression representing a scan of a table in a JDBC data source.
*/
Expand Down Expand Up @@ -91,16 +105,9 @@ protected JdbcToEnumerableConverter(
final JdbcConvention jdbcConvention =
(JdbcConvention) requireNonNull(child.getConvention(),
() -> "child.getConvention() is null for " + child);
ImmutableList.Builder<Expression> parameterBuilder = new ImmutableList.Builder<>();
JdbcCorrelationVariableBuilder correlationVariableBuilder = new JdbcCorrelationVariableBuilder() {
int index = 1;
@Override
public int createCorrelationVariable(CorrelationId id, int ordinal, Type type) {
parameterBuilder.add(implementor.getCorrelVariableGetter(id.getName()).field(builder0,ordinal,type));
return index++;
}
};
SqlString sqlString = generateSql(jdbcConvention.dialect,correlationVariableBuilder);
JdbcCorrelationDataContextBuilderImpl dataContextBuilder = new JdbcCorrelationDataContextBuilderImpl(
implementor,builder0,DataContext.ROOT);
SqlString sqlString = generateSql(jdbcConvention.dialect,dataContextBuilder);
String sql = sqlString.getSql();
if (CalciteSystemProperty.DEBUG.value()) {
System.out.println("[" + sql + "]");
Expand Down Expand Up @@ -169,21 +176,13 @@ public int createCorrelationVariable(CorrelationId id, int ordinal, Type type) {

if (sqlString.getDynamicParameters() != null
&& !sqlString.getDynamicParameters().isEmpty()) {
ImmutableList<Expression> parameters = parameterBuilder.build();
final Expression preparedStatementConsumer_;
if (parameters.isEmpty()) {
preparedStatementConsumer_ =
builder0.append("preparedStatementConsumer",
Expressions.call(BuiltInMethod.CREATE_ENRICHER.method,
Expressions.newArrayInit(Integer.class, 1,
toIndexesTableExpression(sqlString)),
DataContext.ROOT));
}
else {
preparedStatementConsumer_ = builder0.append("preparedStatementConsumer",
Expressions.call(BuiltInMethod.CREATE_CORRELATION_ENRICHER.method,
Expressions.newArrayInit(Object.class,1,parameters)));
}
final Expression preparedStatementConsumer_ =
builder0.append("preparedStatementConsumer",
Expressions.call(BuiltInMethod.CREATE_ENRICHER.method,
Expressions.newArrayInit(Integer.class, 1,
toIndexesTableExpression(sqlString)),
dataContextBuilder.build()));

enumerable =
builder0.append("enumerable",
Expressions.call(
Expand Down Expand Up @@ -359,10 +358,10 @@ private static String jdbcGetMethod(@Nullable Primitive primitive) {
: "get" + SqlFunctions.initcap(castNonNull(primitive.primitiveName));
}

private SqlString generateSql(SqlDialect dialect, JdbcCorrelationVariableBuilder correlationVariableBuilder) {
private SqlString generateSql(SqlDialect dialect, JdbcCorrelationDataContextBuilder dataContextBuilder) {
final JdbcImplementor jdbcImplementor =
new JdbcImplementor(dialect,
(JavaTypeFactory) getCluster().getTypeFactory(), correlationVariableBuilder);
(JavaTypeFactory) getCluster().getTypeFactory(), dataContextBuilder);
final JdbcImplementor.Result result =
jdbcImplementor.visitRoot(this.getInput());
return result.asStatement().toSqlString(dialect);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ public SqlNode toSql(@Nullable RexProgram program, RexNode rex) {
assert lastAccess != null;
SqlNode node = correlAliasContext
.field(lastAccess.getField().getIndex());
if ( ! ( node instanceof SqlIdentifier)) {
if ( node instanceof SqlDynamicParam) {
return node;
}
sqlIdentifier = (SqlIdentifier) node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,34 @@
import org.apache.calcite.linq4j.function.Function1;
import org.apache.calcite.linq4j.tree.Primitive;
import org.apache.calcite.util.Static;

import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.math.BigDecimal;
import java.net.URL;
import java.sql.*;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.Date;
import java.sql.NClob;
import java.sql.PreparedStatement;
import java.sql.Ref;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.RowId;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLXML;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;

import static org.apache.calcite.linq4j.Nullness.castNonNull;

Expand Down Expand Up @@ -182,14 +198,6 @@ public static PreparedStatementEnricher createEnricher(Integer[] indexes,
};
}

public static PreparedStatementEnricher createCorrelationEnricher(Object[] values) {
return preparedStatement -> {
for (int i = 0; i < values.length; i++) {
setDynamicParam(preparedStatement, i+1,values[i]);
}
};
}

/** Assigns a value to a dynamic parameter in a prepared statement, calling
* the appropriate {@code setXxx} method based on the type of the value. */
private static void setDynamicParam(PreparedStatement preparedStatement,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,7 @@ public enum BuiltInMethod {
DataSource.class, String.class, Function1.class,
ResultSetEnumerable.PreparedStatementEnricher.class),
CREATE_ENRICHER(ResultSetEnumerable.class, "createEnricher", Integer[].class,
DataContext.class),
CREATE_CORRELATION_ENRICHER(ResultSetEnumerable.class, "createCorrelationEnricher", Object[].class),
DataContext.class),
HASH_JOIN(ExtendedEnumerable.class, "hashJoin", Enumerable.class,
Function1.class,
Function1.class, Function2.class, EqualityComparer.class,
Expand Down

0 comments on commit 173e896

Please sign in to comment.