From df9711340e8b9de7669f659c22c827efaa8136f5 Mon Sep 17 00:00:00 2001 From: atask-g Date: Thu, 8 Aug 2024 10:12:32 -0400 Subject: [PATCH] Added support for search indexes (#1768) * Added support for search indexes * Added HIDDEN column support to AvroSchemaToDddlConverter * Added test for SpannerSchema parsing of TOKENLIST * Addressed review feedback * Ran mvn spotless:apply to fix formatting --- .../spanner/AvroSchemaToDdlConverter.java | 5 + .../cloud/teleport/spanner/AvroUtil.java | 1 + .../spanner/DdlToAvroSchemaConverter.java | 3 + .../teleport/spanner/common/SizedType.java | 5 + .../cloud/teleport/spanner/common/Type.java | 9 ++ .../cloud/teleport/spanner/ddl/Column.java | 14 +++ .../cloud/teleport/spanner/ddl/Index.java | 20 ++- .../spanner/ddl/InformationSchemaScanner.java | 115 +++++++++++++++++- .../sdk/io/gcp/spanner/SpannerSchema.java | 3 + .../spanner/AvroSchemaToDdlConverterTest.java | 12 +- .../spanner/DdlToAvroSchemaConverterTest.java | 29 ++++- .../teleport/spanner/ExportPipelineIT.java | 14 +++ .../teleport/spanner/ImportPipelineIT.java | 8 ++ .../ddl/InformationSchemaScannerIT.java | 18 +++ .../ddl/InformationSchemaScannerTest.java | 9 +- .../sdk/io/gcp/spanner/SpannerSchemaTest.java | 4 +- 16 files changed, 256 insertions(+), 13 deletions(-) diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java b/v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java index 51c10bcd06..9fc8778e22 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java @@ -17,6 +17,7 @@ import static com.google.cloud.teleport.spanner.AvroUtil.DEFAULT_EXPRESSION; import static com.google.cloud.teleport.spanner.AvroUtil.GENERATION_EXPRESSION; +import static com.google.cloud.teleport.spanner.AvroUtil.HIDDEN; import static com.google.cloud.teleport.spanner.AvroUtil.INPUT; import static com.google.cloud.teleport.spanner.AvroUtil.NOT_NULL; import static com.google.cloud.teleport.spanner.AvroUtil.OUTPUT; @@ -258,6 +259,10 @@ public Table toTable(String tableName, Schema schema) { if (Boolean.parseBoolean(stored)) { column.stored(); } + String hidden = f.getProp(HIDDEN); + if (Boolean.parseBoolean(hidden)) { + column.isHidden(true); + } } else { boolean nullable = false; Schema avroType = f.schema(); diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/AvroUtil.java b/v1/src/main/java/com/google/cloud/teleport/spanner/AvroUtil.java index cb73030aa6..fe5febf7b2 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/AvroUtil.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/AvroUtil.java @@ -52,6 +52,7 @@ private AvroUtil() {} public static final String SPANNER_NAMED_SCHEMA = "spannerNamedSchema"; public static final String SPANNER_NAME = "spannerName"; public static final String STORED = "stored"; + public static final String HIDDEN = "hidden"; public static Schema unpackNullable(Schema schema) { if (schema.getType() != Schema.Type.UNION) { diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java b/v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java index 9a8e119d81..636984f127 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java @@ -19,6 +19,7 @@ import static com.google.cloud.teleport.spanner.AvroUtil.GENERATION_EXPRESSION; import static com.google.cloud.teleport.spanner.AvroUtil.GOOGLE_FORMAT_VERSION; import static com.google.cloud.teleport.spanner.AvroUtil.GOOGLE_STORAGE; +import static com.google.cloud.teleport.spanner.AvroUtil.HIDDEN; import static com.google.cloud.teleport.spanner.AvroUtil.INPUT; import static com.google.cloud.teleport.spanner.AvroUtil.NOT_NULL; import static com.google.cloud.teleport.spanner.AvroUtil.OUTPUT; @@ -155,6 +156,7 @@ public Collection convert(Ddl ddl) { fieldBuilder.prop(NOT_NULL, Boolean.toString(cm.notNull())); fieldBuilder.prop(GENERATION_EXPRESSION, cm.generationExpression()); fieldBuilder.prop(STORED, Boolean.toString(cm.isStored())); + fieldBuilder.prop(HIDDEN, Boolean.toString(cm.isHidden())); // Make the type null to allow us not export the generated column values, // which are semantically logical entities. fieldBuilder.type(SchemaBuilder.builder().nullType()).withDefault(null); @@ -334,6 +336,7 @@ private Schema avroType( case BYTES: case PG_BYTEA: case PROTO: + case TOKENLIST: return SchemaBuilder.builder().bytesType(); case TIMESTAMP: case PG_TIMESTAMPTZ: diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/common/SizedType.java b/v1/src/main/java/com/google/cloud/teleport/spanner/common/SizedType.java index 5e2aed8409..8da90f1679 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/common/SizedType.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/common/SizedType.java @@ -86,6 +86,8 @@ public static String typeString(Type type, Integer size, boolean outputAsDdlRepr return "text"; case BYTES: return "BYTES(" + (size == -1 ? "MAX" : Integer.toString(size)) + ")"; + case TOKENLIST: + return "TOKENLIST"; case PG_BYTEA: return "bytea"; case DATE: @@ -213,6 +215,9 @@ public static SizedType parseSpannerType(String spannerType, Dialect dialect) { if (spannerType.equals("JSON")) { return t(Type.json(), null); } + if (spannerType.equals("TOKENLIST")) { + return t(Type.tokenlist(), null); + } if (spannerType.startsWith("ARRAY<")) { // Substring "ARRAY or ARRAY(vector_length)" diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/common/Type.java b/v1/src/main/java/com/google/cloud/teleport/spanner/common/Type.java index 70c6a0a10c..46c7d2b8c2 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/common/Type.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/common/Type.java @@ -44,6 +44,7 @@ public final class Type implements Serializable { private static final Type TYPE_STRING = new Type(Code.STRING, null, null); private static final Type TYPE_JSON = new Type(Code.JSON, null, null); private static final Type TYPE_BYTES = new Type(Code.BYTES, null, null); + private static final Type TYPE_TOKENLIST = new Type(Code.TOKENLIST, null, null); private static final Type TYPE_TIMESTAMP = new Type(Code.TIMESTAMP, null, null); private static final Type TYPE_DATE = new Type(Code.DATE, null, null); private static final Type TYPE_ARRAY_BOOL = new Type(Code.ARRAY, TYPE_BOOL, null); @@ -137,6 +138,13 @@ public static Type bytes() { return TYPE_BYTES; } + /** + * Returns the descriptor for the {@code TOKENLIST} type: a collection of unique token strings. + */ + public static Type tokenlist() { + return TYPE_TOKENLIST; + } + /** * Returns the descriptor for the {@code TIMESTAMP} type: a nano precision timestamp in the range * [0000-01-01 00:00:00, 9999-12-31 23:59:59.999999999 UTC]. @@ -326,6 +334,7 @@ public enum Code { STRING("STRING", Dialect.GOOGLE_STANDARD_SQL), JSON("JSON", Dialect.GOOGLE_STANDARD_SQL), BYTES("BYTES", Dialect.GOOGLE_STANDARD_SQL), + TOKENLIST("TOKENLIST", Dialect.GOOGLE_STANDARD_SQL), TIMESTAMP("TIMESTAMP", Dialect.GOOGLE_STANDARD_SQL), DATE("DATE", Dialect.GOOGLE_STANDARD_SQL), ARRAY("ARRAY", Dialect.GOOGLE_STANDARD_SQL), diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Column.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Column.java index 1f73d46ccd..f8a1661b59 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Column.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Column.java @@ -55,6 +55,8 @@ public abstract class Column implements Serializable { public abstract Dialect dialect(); + public abstract boolean isHidden(); + @Nullable public abstract String defaultExpression(); @@ -64,6 +66,7 @@ public static Builder builder(Dialect dialect) { .columnOptions(ImmutableList.of()) .notNull(false) .isGenerated(false) + .isHidden(false) .generationExpression("") .isStored(false); } @@ -99,6 +102,11 @@ public void prettyPrint(Appendable appendable) throws IOException { appendable.append(" STORED"); } } + if (isHidden()) { + if (dialect() == Dialect.GOOGLE_STANDARD_SQL) { + appendable.append(" HIDDEN"); + } + } if (columnOptions() == null) { return; } @@ -169,6 +177,8 @@ public Builder notNull() { return notNull(true); } + public abstract Builder isHidden(boolean hidden); + public abstract Builder isGenerated(boolean generated); public abstract Builder generationExpression(String expression); @@ -239,6 +249,10 @@ public Builder pgBytea() { return type(Type.pgBytea()).max(); } + public Builder tokenlist() { + return type(Type.tokenlist()); + } + public Builder timestamp() { return type(Type.timestamp()); } diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Index.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Index.java index 39bf5f87c4..6bfe3dcf2b 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Index.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Index.java @@ -39,6 +39,9 @@ public abstract class Index implements Serializable { abstract ImmutableList indexColumns(); + @Nullable + abstract ImmutableList options(); + abstract boolean unique(); // restricted for gsql @@ -51,6 +54,9 @@ public abstract class Index implements Serializable { @Nullable abstract String interleaveIn(); + @Nullable + abstract String type(); + public static Builder builder(Dialect dialect) { return new AutoValue_Index.Builder().dialect(dialect).nullFiltered(false).unique(false); } @@ -111,7 +117,9 @@ private void prettyPrintPg(Appendable appendable) throws IOException { private void prettyPrintGsql(Appendable appendable) throws IOException { appendable.append("CREATE"); - if (unique()) { + if (type() != null && type().equals("SEARCH")) { + appendable.append(" SEARCH"); + } else if (unique()) { appendable.append(" UNIQUE"); } if (nullFiltered()) { @@ -143,6 +151,12 @@ private void prettyPrintGsql(Appendable appendable) throws IOException { if (interleaveIn() != null) { appendable.append(", INTERLEAVE IN ").append(quoteIdentifier(interleaveIn(), dialect())); } + if (options() != null) { + String optionsString = String.join(",", options()); + if (!optionsString.isEmpty()) { + appendable.append(" OPTIONS (").append(optionsString).append(")"); + } + } } abstract Builder autoToBuilder(); @@ -190,6 +204,8 @@ public IndexColumn.IndexColumnsBuilder columns() { return columnsBuilder(); } + abstract Builder options(ImmutableList options); + public abstract Builder unique(boolean unique); public Builder unique() { @@ -206,6 +222,8 @@ public Builder nullFiltered() { public abstract Builder interleaveIn(String interleaveIn); + public abstract Builder type(String type); + abstract Index autoBuild(); public Index build() { diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java index 428f3843b2..bf4244dda5 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java @@ -103,6 +103,9 @@ public Ddl scan() { Map> indexes = Maps.newHashMap(); listIndexes(indexes); listIndexColumns(builder, indexes); + if (dialect == Dialect.GOOGLE_STANDARD_SQL) { + listIndexOptions(builder, indexes); + } for (Map.Entry> tableEntry : indexes.entrySet()) { String tableName = tableEntry.getKey(); @@ -302,12 +305,14 @@ private void listColumns(Ddl.Builder builder) { String generationExpression = resultSet.isNull(7) ? "" : resultSet.getString(7); boolean isStored = !resultSet.isNull(8) && resultSet.getString(8).equalsIgnoreCase("YES"); String defaultExpression = resultSet.isNull(9) ? null : resultSet.getString(9); + boolean isHidden = dialect == Dialect.GOOGLE_STANDARD_SQL ? resultSet.getBoolean(10) : false; builder .createTable(tableName) .column(columnName) .parseType(spannerType) .notNull(!nullable) .isGenerated(isGenerated) + .isHidden(isHidden) .generationExpression(generationExpression) .isStored(isStored) .defaultExpression(defaultExpression) @@ -323,7 +328,8 @@ Statement listColumnsSQL() { return Statement.of( "SELECT c.table_schema, c.table_name, c.column_name," + " c.ordinal_position, c.spanner_type, c.is_nullable," - + " c.is_generated, c.generation_expression, c.is_stored, c.column_default" + + " c.is_generated, c.generation_expression, c.is_stored," + + " c.column_default, c.is_hidden" + " FROM information_schema.columns as c" + " WHERE c.table_schema NOT IN" + " ('INFORMATION_SCHEMA', 'SPANNER_SYS')" @@ -372,6 +378,13 @@ private void listIndexes(Map> indexe ? null : resultSet.getString(6); + // Note that 'type' is only queried from GoogleSQL and is not from Postgres and + // the number of columns will be different. + String type = + (dialect == Dialect.GOOGLE_STANDARD_SQL && !resultSet.isNull(6)) + ? resultSet.getString(6) + : null; + Map tableIndexes = indexes.computeIfAbsent(tableName, k -> Maps.newTreeMap()); @@ -383,6 +396,7 @@ private void listIndexes(Map> indexe .unique(unique) .nullFiltered(nullFiltered) .interleaveIn(parent) + .type(type) .filter(filter)); } } @@ -393,11 +407,11 @@ Statement listIndexesSQL() { case GOOGLE_STANDARD_SQL: return Statement.of( "SELECT t.table_schema, t.table_name, t.index_name, t.parent_table_name, t.is_unique," - + " t.is_null_filtered" + + " t.is_null_filtered, t.index_type" + " FROM information_schema.indexes AS t" + " WHERE t.table_schema NOT IN" + " ('INFORMATION_SCHEMA', 'SPANNER_SYS') AND" - + " t.index_type='INDEX' AND t.spanner_is_managed = FALSE" + + " (t.index_type='INDEX' OR t.index_type='SEARCH') AND t.spanner_is_managed = FALSE" + " ORDER BY t.table_name, t.index_name"); case POSTGRESQL: return Statement.of( @@ -422,6 +436,8 @@ private void listIndexColumns( String columnName = resultSet.getString(2); String ordering = resultSet.isNull(3) ? null : resultSet.getString(3); String indexLocalName = resultSet.getString(4); + String indexType = dialect == Dialect.GOOGLE_STANDARD_SQL ? resultSet.getString(5) : null; + String spannerType = dialect == Dialect.GOOGLE_STANDARD_SQL ? resultSet.getString(6) : null; if (indexLocalName.equals("PRIMARY_KEY")) { IndexColumn.IndexColumnsBuilder pkBuilder = @@ -432,6 +448,32 @@ private void listIndexColumns( pkBuilder.desc(columnName).end(); } pkBuilder.end().endTable(); + } else if (indexType != null && indexType.equals("SEARCH")) { + if (!spannerType.equals("TOKENLIST") && ordering != null) { + continue; + } + Map tableIndexes = indexes.get(tableName); + if (tableIndexes == null) { + continue; + } + String indexName = + dialect == Dialect.POSTGRESQL + ? indexLocalName + : getQualifiedName(resultSet.getString(0), indexLocalName); + Index.Builder indexBuilder = tableIndexes.get(indexName); + if (indexBuilder == null) { + LOG.warn("Can not find index using name {}", indexName); + continue; + } + IndexColumn.IndexColumnsBuilder indexColumnsBuilder = + indexBuilder.columns().create().name(columnName); + + if (spannerType.equals("TOKENLIST")) { + indexColumnsBuilder.asc(); + } else if (ordering == null) { + indexColumnsBuilder.storing(); + } + indexColumnsBuilder.endIndexColumn().end(); } else { Map tableIndexes = indexes.get(tableName); if (tableIndexes == null) { @@ -475,7 +517,8 @@ Statement listIndexColumnsSQL() { switch (dialect) { case GOOGLE_STANDARD_SQL: return Statement.of( - "SELECT t.table_schema, t.table_name, t.column_name, t.column_ordering, t.index_name " + "SELECT t.table_schema, t.table_name, t.column_name, t.column_ordering, t.index_name," + + " t.index_type, t.spanner_type " + "FROM information_schema.index_columns AS t " + " WHERE t.table_schema NOT IN" + " ('INFORMATION_SCHEMA', 'SPANNER_SYS')" @@ -492,6 +535,70 @@ Statement listIndexColumnsSQL() { } } + private void listIndexOptions( + Ddl.Builder builder, Map> indexes) { + Statement statement = listIndexOptionsSQL(); + + ResultSet resultSet = context.executeQuery(statement); + + Map, ImmutableList.Builder> allOptions = Maps.newHashMap(); + while (resultSet.next()) { + String tableName = getQualifiedName(resultSet.getString(0), resultSet.getString(1)); + String indexName = resultSet.getString(2); + String indexType = resultSet.getString(3); + String optionName = resultSet.getString(4); + String optionType = resultSet.getString(5); + String optionValue = resultSet.getString(6); + + KV kv = KV.of(tableName, indexName); + ImmutableList.Builder options = + allOptions.computeIfAbsent(kv, k -> ImmutableList.builder()); + + if (optionType.equalsIgnoreCase("STRING")) { + options.add(optionName + "=\"" + OPTION_STRING_ESCAPER.escape(optionValue) + "\""); + } else if (optionType.equalsIgnoreCase("character varying")) { + options.add(optionName + "='" + OPTION_STRING_ESCAPER.escape(optionValue) + "'"); + } else { + options.add(optionName + "=" + optionValue); + } + } + + for (Map.Entry, ImmutableList.Builder> entry : + allOptions.entrySet()) { + String tableName = entry.getKey().getKey(); + String indexName = entry.getKey().getValue(); + ImmutableList options = entry.getValue().build(); + + Map tableIndexes = indexes.get(tableName); + if (tableIndexes == null) { + continue; + } + Index.Builder indexBuilder = tableIndexes.get(indexName); + if (indexBuilder == null) { + LOG.warn("Can not find index using name {}", indexName); + continue; + } + + indexBuilder.options(options); + } + } + + @VisibleForTesting + Statement listIndexOptionsSQL() { + switch (dialect) { + case GOOGLE_STANDARD_SQL: + return Statement.of( + "SELECT t.table_schema, t.table_name, t.index_name, t.index_type," + + " t.option_name, t.option_type, t.option_value" + + " FROM information_schema.index_options AS t" + + " WHERE t.table_schema NOT IN" + + " ('INFORMATION_SCHEMA', 'SPANNER_SYS')" + + " ORDER BY t.table_name, t.index_name, t.option_name"); + default: + throw new IllegalArgumentException("Unrecognized dialect: " + dialect); + } + } + private void listColumnOptions(Ddl.Builder builder) { Statement statement = listColumnOptionsSQL(); diff --git a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java b/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java index 2d0866f7de..7a197108cf 100644 --- a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java +++ b/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java @@ -193,6 +193,9 @@ private static Type parseSpannerType(String spannerType, Dialect dialect) { if (spannerType.startsWith("BYTES")) { return Type.bytes(); } + if ("TOKENLIST".equals(spannerType)) { + return Type.bytes(); + } if ("TIMESTAMP".equals(spannerType)) { return Type.timestamp(); } diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverterTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverterTest.java index 6ca28b580e..6aed975dd1 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverterTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverterTest.java @@ -170,7 +170,16 @@ public void simple() { + " }, {" + " \"name\" : \"timestamp\"," + " \"type\" : [ \"null\", {\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}]" - + " }]," + + " }, {" + + " \"name\" : \"MyTokens\"," + + " \"type\" : \"null\"," + + " \"default\" : null," + + " \"sqlType\" : \"TOKENLIST\"," + + " \"hidden\" : \"true\"," + + " \"notNull\" : \"false\"," + + " \"generationExpression\" : \"(TOKENIZE_FULLTEXT(MyData))\"," + + " \"stored\" : \"false\"" + + " }]," + " \"googleStorage\" : \"CloudSpanner\"," + " \"spannerParent\" : \"\"," + " \"googleFormatVersion\" : \"booleans\"," @@ -227,6 +236,7 @@ public void simple() { + " `float32` FLOAT32," + " `float64` FLOAT64," + " `timestamp` TIMESTAMP," + + " `MyTokens` TOKENLIST AS ((TOKENIZE_FULLTEXT(MyData))) HIDDEN," + " CONSTRAINT `ck` CHECK(`first_name` != 'last_name')," + " ) PRIMARY KEY (`id` ASC, `gen_id` ASC, `last_name` DESC)" + " CREATE INDEX `UsersByFirstName` ON `Users` (`first_name`)" diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java index c964454efc..20813c5f55 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverterTest.java @@ -19,6 +19,7 @@ import static com.google.cloud.teleport.spanner.AvroUtil.GENERATION_EXPRESSION; import static com.google.cloud.teleport.spanner.AvroUtil.GOOGLE_FORMAT_VERSION; import static com.google.cloud.teleport.spanner.AvroUtil.GOOGLE_STORAGE; +import static com.google.cloud.teleport.spanner.AvroUtil.HIDDEN; import static com.google.cloud.teleport.spanner.AvroUtil.INPUT; import static com.google.cloud.teleport.spanner.AvroUtil.NOT_NULL; import static com.google.cloud.teleport.spanner.AvroUtil.OUTPUT; @@ -114,12 +115,21 @@ public void simple() { .generatedAs("MOD(id+1, 64)") .stored() .endColumn() + .column("MyTokens") + .tokenlist() + .isHidden(true) + .generatedAs("(TOKENIZE_FULLTEXT(MyData))") + .endColumn() .primaryKey() .asc("id") .asc("gen_id") .desc("last_name") .end() - .indexes(ImmutableList.of("CREATE INDEX `UsersByFirstName` ON `Users` (`first_name`)")) + .indexes( + ImmutableList.of( + "CREATE INDEX `UsersByFirstName` ON `Users` (`first_name`)", + "CREATE SEARCH INDEX `SearchIndex` ON `Users` (`MyTokens`)" + + " OPTIONS (sort_order_sharding=TRUE)")) .foreignKeys( ImmutableList.of( "ALTER TABLE `Users` ADD CONSTRAINT `fk` FOREIGN KEY (`first_name`)" @@ -142,7 +152,7 @@ public void simple() { List fields = avroSchema.getFields(); - assertThat(fields, hasSize(5)); + assertThat(fields, hasSize(6)); assertThat(fields.get(0).name(), equalTo("id")); // Not null @@ -187,6 +197,16 @@ public void simple() { assertThat(fields.get(4).getProp(STORED), equalTo("true")); assertThat(fields.get(4).getProp(DEFAULT_EXPRESSION), equalTo(null)); + assertThat(fields.get(5).name(), equalTo("MyTokens")); + assertThat(fields.get(5).schema(), equalTo(Schema.create(Schema.Type.NULL))); + assertThat(fields.get(5).getProp(SQL_TYPE), equalTo("TOKENLIST")); + assertThat(fields.get(5).getProp(NOT_NULL), equalTo("false")); + assertThat(fields.get(5).getProp(STORED), equalTo("false")); + assertThat(fields.get(5).getProp(HIDDEN), equalTo("true")); + assertThat( + fields.get(5).getProp(GENERATION_EXPRESSION), equalTo("(TOKENIZE_FULLTEXT(MyData))")); + assertThat(fields.get(5).getProp(DEFAULT_EXPRESSION), equalTo(null)); + // spanner pk assertThat(avroSchema.getProp(SPANNER_PRIMARY_KEY + "_0"), equalTo("`id` ASC")); assertThat(avroSchema.getProp(SPANNER_PRIMARY_KEY + "_1"), equalTo("`gen_id` ASC")); @@ -197,6 +217,11 @@ public void simple() { assertThat( avroSchema.getProp(SPANNER_INDEX + "0"), equalTo("CREATE INDEX `UsersByFirstName` ON `Users` (`first_name`)")); + assertThat( + avroSchema.getProp(SPANNER_INDEX + "1"), + equalTo( + "CREATE SEARCH INDEX `SearchIndex` ON `Users` (`MyTokens`)" + + " OPTIONS (sort_order_sharding=TRUE)")); assertThat( avroSchema.getProp(SPANNER_FOREIGN_KEY + "0"), equalTo( diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ExportPipelineIT.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ExportPipelineIT.java index 5f886247bc..350a8e7134 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ExportPipelineIT.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ExportPipelineIT.java @@ -141,6 +141,7 @@ public class ExportPipelineIT extends TemplateTestBase { + " }" + " ]" + "}"); + private SpannerResourceManager spannerResourceManager; @After @@ -185,6 +186,8 @@ private void testSpannerToGCSAvroBase( + " FirstName String(1024),\n" + " LastName String(1024),\n" + " Rating FLOAT32,\n" + + " Review String(MAX),\n" + + " `MyTokens` TOKENLIST AS (TOKENIZE_FULLTEXT(Review)) HIDDEN,\n" + ") PRIMARY KEY(Id)", testName); String createModelStructStatement = @@ -194,10 +197,17 @@ private void testSpannerToGCSAvroBase( + " OUTPUT (embeddings STRUCT, values ARRAY>) \n" + " REMOTE OPTIONS (endpoint=\"//aiplatform.googleapis.com/projects/span-cloud-testing/locations/us-central1/publishers/google/models/textembedding-gecko\")", testName); + String createSearchIndexStatement = + String.format( + "CREATE SEARCH INDEX `%s_SearchIndex`\n" + + " ON `%s_Singers`(`MyTokens` ASC)\n" + + " OPTIONS (sort_order_sharding=TRUE)", + testName, testName); spannerResourceManager.executeDdlStatement(createEmptyTableStatement); spannerResourceManager.executeDdlStatement(createSingersTableStatement); spannerResourceManager.executeDdlStatement(createModelStructStatement); + spannerResourceManager.executeDdlStatement(createSearchIndexStatement); List expectedData = generateTableRows(String.format("%s_Singers", testName)); spannerResourceManager.write(expectedData); PipelineLauncher.LaunchConfig.Builder options = @@ -226,6 +236,10 @@ private void testSpannerToGCSAvroBase( gcsClient.listArtifacts( "output/", Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "ModelStruct"))); + List searchIndexArtifacts = + gcsClient.listArtifacts( + "output/", + Pattern.compile(String.format(".*/%s_%s.*\\.avro.*", testName, "SearchIndex"))); assertThat(singersArtifacts).isNotEmpty(); assertThat(emptyArtifacts).isNotEmpty(); assertThat(modelStructArtifacts).isNotEmpty(); diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ImportPipelineIT.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ImportPipelineIT.java index a8b39d6d8c..e2c2db2862 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ImportPipelineIT.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ImportPipelineIT.java @@ -158,6 +158,8 @@ private void testGoogleSqlImportPipelineBase( + " Id INT64,\n" + " FirstName STRING(MAX),\n" + " LastName STRING(MAX),\n" + + " Review STRING(MAX),\n" + + " MyTokens TOKENLIST AS (TOKENIZE_FULLTEXT(Review)) HIDDEN,\n" + ") PRIMARY KEY(Id)"; spannerResourceManager.executeDdlStatement(createSingersTableStatement); @@ -168,6 +170,12 @@ private void testGoogleSqlImportPipelineBase( + ") PRIMARY KEY(Key)"; spannerResourceManager.executeDdlStatement(createFloat32TableStatement); + String createSearchIndexStatement = + "CREATE SEARCH INDEX `SearchIndex`\n" + + " ON `Singers`(`MyTokens` ASC)\n" + + " OPTIONS (sort_order_sharding=TRUE)"; + spannerResourceManager.executeDdlStatement(createSearchIndexStatement); + PipelineLauncher.LaunchConfig.Builder options = paramsAdder.apply( PipelineLauncher.LaunchConfig.builder(testName, specPath) diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java index b814c5c9e6..eefea33109 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java @@ -568,6 +568,24 @@ public void indexes() throws Exception { assertThat(ddl.prettyPrint(), equalToCompressingWhiteSpace(String.join("", statements))); } + @Test + public void searchIndexes() throws Exception { + // Prefix indexes to ensure ordering. + List statements = + Arrays.asList( + "CREATE TABLE `Base` (" + + " `MyKey` INT64 NOT NULL," + + " `MyData` STRING(MAX)," + + " `MyTokens` TOKENLIST AS (TOKENIZE_FULLTEXT(MyData)) HIDDEN," + + " ) PRIMARY KEY (`MyKey` ASC)", + " CREATE SEARCH INDEX `SearchIndex` ON `Base`(`MyTokens` ASC)" + + " OPTIONS (sort_order_sharding=TRUE)"); + + spannerServer.createDatabase(dbId, statements); + Ddl ddl = getDatabaseDdl(); + assertThat(ddl.prettyPrint(), equalToCompressingWhiteSpace(String.join("", statements))); + } + @Test public void pgIndexes() throws Exception { // Prefix indexes to ensure ordering. diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java index d10e1b3569..2faf1380f9 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerTest.java @@ -50,7 +50,7 @@ public void testListColumnsSQL() { googleSQLInfoScanner.listColumnsSQL().getSql(), equalToCompressingWhiteSpace( "SELECT c.table_schema, c.table_name, c.column_name, c.ordinal_position, c.spanner_type, c.is_nullable," - + " c.is_generated, c.generation_expression, c.is_stored, c.column_default" + + " c.is_generated, c.generation_expression, c.is_stored, c.column_default, c.is_hidden" + " FROM information_schema.columns as c WHERE c.table_schema NOT IN" + " ('INFORMATION_SCHEMA', 'SPANNER_SYS') AND c.spanner_state = 'COMMITTED' " + " ORDER BY c.table_name, c.ordinal_position")); @@ -71,11 +71,11 @@ public void testListIndexesSQL() { googleSQLInfoScanner.listIndexesSQL().getSql(), equalToCompressingWhiteSpace( "SELECT t.table_schema, t.table_name, t.index_name, t.parent_table_name, t.is_unique," - + " t.is_null_filtered" + + " t.is_null_filtered, t.index_type" + " FROM information_schema.indexes AS t" + " WHERE t.table_schema NOT IN" + " ('INFORMATION_SCHEMA', 'SPANNER_SYS') AND" - + " t.index_type='INDEX' AND t.spanner_is_managed = FALSE" + + " (t.index_type='INDEX' OR t.index_type='SEARCH') AND t.spanner_is_managed = FALSE" + " ORDER BY t.table_name, t.index_name")); assertThat( @@ -94,7 +94,8 @@ public void testListIndexColumnsSQL() { assertThat( googleSQLInfoScanner.listIndexColumnsSQL().getSql(), equalToCompressingWhiteSpace( - "SELECT t.table_schema, t.table_name, t.column_name, t.column_ordering, t.index_name " + "SELECT t.table_schema, t.table_name, t.column_name, t.column_ordering, t.index_name, " + + "t.index_type, t.spanner_type " + "FROM information_schema.index_columns AS t " + " WHERE t.table_schema NOT IN" + " ('INFORMATION_SCHEMA', 'SPANNER_SYS')" diff --git a/v1/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java b/v1/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java index c3d6fa2fa6..b9c28ec4c8 100644 --- a/v1/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java +++ b/v1/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java @@ -40,14 +40,16 @@ public void testSingleTable() throws Exception { .addColumn("test", "jsonVal", "JSON") .addColumn("test", "arrayVal", "ARRAY") .addColumn("test", "embeddingVectorVal", "ARRAY(VECTOR_LENGTH=>128)") + .addColumn("test", "tokens", "TOKENLIST") .build(); assertEquals(1, schema.getTables().size()); - assertEquals(6, schema.getColumns("test").size()); + assertEquals(7, schema.getColumns("test").size()); assertEquals(1, schema.getKeyParts("test").size()); assertEquals(Type.json(), schema.getColumns("test").get(3).getType()); assertEquals(Type.array(Type.float64()), schema.getColumns("test").get(4).getType()); assertEquals(Type.array(Type.float64()), schema.getColumns("test").get(5).getType()); + assertEquals(Type.bytes(), schema.getColumns("test").get(6).getType()); } @Test