Skip to content
This repository has been archived by the owner on Oct 28, 2024. It is now read-only.

add insert support #32

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<sonar.host.url>https://sonarcloud.io</sonar.host.url>
<sonar.version>3.6.1.1688</sonar.version>
<sonar.version>3.6.1.1688</sonar.version>
<zetasql.version>2021.02.1</zetasql.version>
<zetasql.version>2021.03.2</zetasql.version>
<protoc-jar-maven-plugin.version>3.11.4</protoc-jar-maven-plugin.version>
<truth.version>1.1.2</truth.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package com.google.cloud.solutions.datalineage;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.zetasql.Analyzer.extractTableNamesFromStatement;

import com.google.cloud.solutions.datalineage.extractor.ColumnLineageExtractor;
import com.google.cloud.solutions.datalineage.extractor.ColumnLineageExtractorFactory;
import com.google.cloud.solutions.datalineage.extractor.FunctionExpressionsExtractor;
import com.google.cloud.solutions.datalineage.extractor.GroupByExtractor;
import com.google.cloud.solutions.datalineage.extractor.InsertStatementExtractor;
import com.google.cloud.solutions.datalineage.extractor.SimpleAggregateExtractor;
import com.google.cloud.solutions.datalineage.model.LineageMessages.ColumnEntity;
import com.google.cloud.solutions.datalineage.model.LineageMessages.ColumnLineage;
Expand All @@ -48,7 +48,7 @@ public class BigQuerySqlParser {
static {
ColumnLineageExtractorFactory.register(
FunctionExpressionsExtractor.class, GroupByExtractor.class,
SimpleAggregateExtractor.class);
SimpleAggregateExtractor.class, InsertStatementExtractor.class);
}

private final ZetaSqlSchemaLoader tableSchemaLoader;
Expand Down Expand Up @@ -110,7 +110,7 @@ private ImmutableSet<ImmutableMap<ColumnEntity, ColumnLineage>> extractLineages(
}

private static ImmutableSet<String> extractReferencedTables(String sql) {
return extractTableNamesFromStatement(sql).stream()
return Analyzer.extractTableNamesFromStatement(sql, enableAllFeatures()).stream()
.flatMap(List::stream)
.collect(toImmutableSet());
}
Expand All @@ -119,9 +119,11 @@ private ResolvedStatement resolve(String sql) {
return Analyzer.analyzeStatement(sql, enableAllFeatures(), buildCatalogWithQueryTables(sql));
}

private AnalyzerOptions enableAllFeatures() {
private static AnalyzerOptions enableAllFeatures() {
LanguageOptions languageOptions = new LanguageOptions().enableMaximumLanguageFeatures();
languageOptions.setSupportsAllStatementKinds();
AnalyzerOptions analyzerOptions = new AnalyzerOptions();
analyzerOptions.setLanguageOptions(new LanguageOptions().enableMaximumLanguageFeatures());
analyzerOptions.setLanguageOptions(languageOptions);
analyzerOptions.setPruneUnusedColumns(true);

return analyzerOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.zetasql.SimpleCatalog;
import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedStatement;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;

/**
Expand Down Expand Up @@ -91,22 +90,10 @@ public QueryColumns outputColumns() {
}

/**
* Returns a set of applicable Extractors for the provided SQL query.
* Returns a set of applicable Extractors
*/
public ImmutableSet<ColumnLineageExtractor> buildExtractors() {
return outputColumns()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extractor types and ordering is still necessary - this has just been temporarily disabled. One question is how extractors work in order (or nested?). An insert with SET col = CONCAT(colA, colB) will be parsed correctly with colA and colB properly set as parents of col, but the evaluation occurs after the DML statement evaluation and the type is a '$insert'. I think this just needs some extension to allow the ordering of extractors to change, as well as allowing an extractor registered on the table itself instead of columnType (I think this is necessary for DML, but still wrapping my head around it). Any advice appreciated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They work in order

.getProcessedColumnTypes().stream()
.map(this::buildExtractorFor)
.flatMap(Collection::stream)
.distinct()
.collect(toImmutableSet());
}

/**
* Returns a set of extractors for given columnType or empty set, if not find.
*/
public ImmutableSet<ColumnLineageExtractor> buildExtractorFor(String columnType) {
return extractorTypeMap.get(columnType).stream()
return extractorTypeMap.values().stream()
.map(clazz -> buildExtractor(clazz, resolvedStatement))
.filter(Optional::isPresent)
.map(Optional::get)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.google.cloud.solutions.datalineage.extractor;

import static com.google.cloud.solutions.datalineage.converter.ResolvedColumnToColumnEntityConverter.convertToColumnEntity;

import com.google.cloud.solutions.datalineage.model.LineageMessages.ColumnEntity;
import com.google.cloud.solutions.datalineage.model.LineageMessages.ColumnLineage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.zetasql.resolvedast.ResolvedColumn;
import com.google.zetasql.resolvedast.ResolvedNodes;
import java.util.HashMap;
import java.util.stream.IntStream;

public class InsertStatementExtractor extends ColumnLineageExtractor {

public InsertStatementExtractor(ResolvedNodes.ResolvedStatement resolvedStatement) {
super(resolvedStatement);
}

@Override
public String getSupportedColumnType() {
// TODO skuehn this PR determine how to support column types and the extractor instantiation
// optimizations
return null;
}

@Override
public ImmutableMap<ColumnEntity, ColumnLineage> extract() {
HashMap<ColumnEntity, ColumnLineage> exprLineageMapBuilder = new HashMap<>();

resolvedStatement.accept(
new ResolvedNodes.Visitor() {
@Override
public void visit(ResolvedNodes.ResolvedInsertStmt insertStmt) {
if (!insertStmt.getQueryOutputColumnList().isEmpty()) {
// For inserts, BQ requires values to be added in the same order as the specified
// columns, and the number of values added must match the number of specified columns.
IntStream.range(0, insertStmt.getInsertColumnList().size()).forEach(columnIndex -> {
ColumnEntity outputColumn =
convertToColumnEntity(insertStmt.getInsertColumnList().get(columnIndex));
ResolvedColumn sourceColumn = insertStmt.getQueryOutputColumnList()
.get(columnIndex);

exprLineageMapBuilder.put(
outputColumn,
ColumnLineage.newBuilder()
.setTarget(outputColumn)
.addAllParents(ImmutableList.of(convertToColumnEntity(sourceColumn)))
.build());
});
}
super.visit(insertStmt);
}
});

return ImmutableMap.copyOf(exprLineageMapBuilder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.cloud.solutions.datalineage.model.LineageMessages.ColumnEntity;
import com.google.cloud.solutions.datalineage.model.QueryColumns;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.zetasql.resolvedast.ResolvedColumn;
import com.google.zetasql.resolvedast.ResolvedNodes;
import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedStatement;
Expand All @@ -42,7 +41,6 @@ public OutputColumnExtractor(ResolvedStatement resolvedStatement) {
*/
public QueryColumns extract() {
ImmutableMap.Builder<String, ColumnEntity> outputColumnBuilder = ImmutableMap.builder();
ImmutableSet.Builder<String> nonTableColumnTypes = ImmutableSet.builder();

resolvedStatement.accept(
new ResolvedNodes.Visitor() {
Expand All @@ -53,12 +51,16 @@ public void visit(ResolvedNodes.ResolvedOutputColumn outputColumn) {
outputColumn.getName(),
convertToColumnEntity(resolvedColumn));

if (resolvedColumn.getTableName().startsWith("$")) {
nonTableColumnTypes.add(resolvedColumn.getTableName());
}

super.visit(outputColumn);
}

@Override
public void visit(ResolvedNodes.ResolvedInsertStmt insertStmt) {
insertStmt.getInsertColumnList().forEach(resolvedColumn ->
outputColumnBuilder.put(resolvedColumn.getName(),
convertToColumnEntity(resolvedColumn)));
super.visit(insertStmt);
}
});

return QueryColumns.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@

package com.google.cloud.solutions.datalineage.model;

import static com.google.common.collect.ImmutableSet.toImmutableSet;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.auto.value.AutoValue;
import com.google.cloud.solutions.datalineage.model.LineageMessages.ColumnEntity;
import com.google.cloud.solutions.datalineage.model.LineageMessages.DataEntity.DataEntityTypes;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
Expand All @@ -40,15 +36,6 @@ public abstract class QueryColumns {

public abstract ImmutableMap<String, ColumnEntity> getColumnMap();

public final ImmutableSet<String> getProcessedColumnTypes() {
return getColumnMap().values().stream()
.filter(columnEntity -> columnEntity.getTable().getKind()
.equals(DataEntityTypes.QUERY_LEVEL_TABLE))
.map(columnEntity -> columnEntity.getTable().getSqlResource())
.filter(tableName -> tableName.startsWith("$"))
.collect(toImmutableSet());
}

@SchemaCreate
public static QueryColumns create(ImmutableMap<String, ColumnEntity> columnMap) {
return builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;


@RunWith(JUnit4.class)
public final class BigQuerySqlParserTest {

@Test
public void extractColumnLineage_concatColumns_correctColumnNames() {
public void queryExtractColumnLineage_concatColumns_correctColumnNames() {
FakeBigQueryServiceFactory fakeBigqueryFactory =
FakeBigQueryServiceFactory
.forTableSchemas(
Expand Down Expand Up @@ -72,7 +73,7 @@ public void extractColumnLineage_concatColumns_correctColumnNames() {
}

@Test
public void extractColumnLineage_multipleOutputColumnsWithAlias_correctColumnLineage() {
public void queryExtractColumnLineage_multipleOutputColumnsWithAlias_correctColumnLineage() {
FakeBigQueryServiceFactory fakeBigqueryFactory =
FakeBigQueryServiceFactory.forTableSchemas(
TestResourceLoader.load("schemas/tableA_schema.json"),
Expand Down Expand Up @@ -116,7 +117,7 @@ public void extractColumnLineage_multipleOutputColumnsWithAlias_correctColumnLin
}

@Test
public void extractColumnLineage_multipleOutputColumnsWithoutAlias_correctColumnLineage() {
public void queryExtractColumnLineage_multipleOutputColumnsWithoutAlias_correctColumnLineage() {
FakeBigQueryServiceFactory fakeBigqueryFactory =
FakeBigQueryServiceFactory.forTableSchemas(
TestResourceLoader.load("schemas/tableA_schema.json"),
Expand Down Expand Up @@ -169,7 +170,7 @@ public void extractColumnLineage_multipleOutputColumnsWithoutAlias_correctColumn

@Test
public void
extractColumnLineage_bigQuerySchemaMultipleOutputColumnsWithoutAlias_correctColumnLineage() {
queryExtractColumnLineage_bigQuerySchemaMultipleOutputColumnsWithoutAlias_correctColumnLineage() {
FakeBigQueryServiceFactory fakeBigqueryFactory =
FakeBigQueryServiceFactory.forTableSchemas(
TestResourceLoader.load("schemas/daily_report_table_schema.json"),
Expand Down Expand Up @@ -278,4 +279,86 @@ public void extractColumnLineage_publicDatasetQuery_correctColumnLineage() {
.setColumn("color").build()))
.build());
}
}

@Test
public void insertExtractColumnLineage_tableA_specifyColumns_correctColumnLineage() {
FakeBigQueryServiceFactory fakeBigQueryFactory =
FakeBigQueryServiceFactory.forTableSchemas(
TestResourceLoader.load("schemas/tableA_schema.json"),
TestResourceLoader.load("schemas/tableB_schema.json"));
BigQueryZetaSqlSchemaLoader fakeSchemaLoader =
new BigQueryZetaSqlSchemaLoader(
BigQueryTableLoadService.usingServiceFactory(fakeBigQueryFactory));

ImmutableSet<ColumnLineage> resolvedStatement =
new BigQuerySqlParser(fakeSchemaLoader)
.extractColumnLineage(
TestResourceLoader
.load(
"sql/tableA_specify_columns_insert.sql"));

assertThat(resolvedStatement)
.containsExactly(
ColumnLineage.newBuilder()
.setTarget(ColumnEntity.newBuilder().setColumn("colA").build())
.addAllParents(
ImmutableSet.of(
ColumnEntity.newBuilder().setTable(
BigQueryTableCreator
.usingBestEffort("project2.datasetB.TableB")
.dataEntity())
.setColumn("colB").build()))
.build(),
ColumnLineage.newBuilder()
.setTarget(ColumnEntity.newBuilder().setColumn("colC").build())
.addAllParents(
ImmutableSet.of(
ColumnEntity.newBuilder().setTable(
BigQueryTableCreator
.usingBestEffort("project2.datasetB.TableB")
.dataEntity())
.setColumn("colC").build()))
.build());
}

@Test
public void insertExtractColumnLineage_tableA_inferColumns_correctColumnLineage() {
FakeBigQueryServiceFactory fakeBigQueryFactory =
FakeBigQueryServiceFactory.forTableSchemas(
TestResourceLoader.load("schemas/tableA_schema.json"),
TestResourceLoader.load("schemas/tableB_schema.json"));
BigQueryZetaSqlSchemaLoader fakeSchemaLoader =
new BigQueryZetaSqlSchemaLoader(
BigQueryTableLoadService.usingServiceFactory(fakeBigQueryFactory));

ImmutableSet<ColumnLineage> resolvedStatement =
new BigQuerySqlParser(fakeSchemaLoader)
.extractColumnLineage(
TestResourceLoader
.load(
"sql/tableA_infer_columns_insert.sql"));

assertThat(resolvedStatement)
.containsExactly(
ColumnLineage.newBuilder()
.setTarget(ColumnEntity.newBuilder().setColumn("colA").build())
.addAllParents(
ImmutableSet.of(
ColumnEntity.newBuilder().setTable(
BigQueryTableCreator
.usingBestEffort("project2.datasetB.TableB")
.dataEntity())
.setColumn("colB").build()))
.build(),
ColumnLineage.newBuilder()
.setTarget(ColumnEntity.newBuilder().setColumn("colC").build())
.addAllParents(
ImmutableSet.of(
ColumnEntity.newBuilder().setTable(
BigQueryTableCreator
.usingBestEffort("project2.datasetB.TableB")
.dataEntity())
.setColumn("colC").build()))
.build());
}
}
36 changes: 36 additions & 0 deletions src/test/resources/sql/tableA_infer_columns_insert.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

#standardSQL
INSERT
`project1.datasetA.TableA`
SELECT
colB,
colC
FROM
`project2.datasetB.TableB`
Loading