Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43994: [C++][Parquet] Fix schema conversion from two-level encoding nested list #43995

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

wgtmac
Copy link
Member

@wgtmac wgtmac commented Sep 6, 2024

Rationale for this change

The current C++ parquet implementation interprets following parquet schema as `array<struct<array:array>>, which is wrong:

  optional group a (LIST) {
    repeated group array (LIST) {
      repeated int32 array;
    }
  }

What changes are included in this PR?

According to the parquet spec, the above schema should be inferred as array<array<int>>.

Are these changes tested?

Yes, a test case has been added to verify the fix.

Are there any user-facing changes?

No.

@wgtmac
Copy link
Member Author

wgtmac commented Sep 6, 2024

@emkornfield @pitrou @mapleFU Would you mind taking a look? Thanks!

@mapleFU
Copy link
Member

mapleFU commented Sep 6, 2024

https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists

Without legacy:

The element field encodes the list's element type and repetition. Element repetition must be required or optional.

With backward capability:

Some existing data does not include the inner element layer. For backward-compatibility, the type of elements in LIST-annotated structures should always be determined by the following rules:

  1. If the repeated field is not a group, then its type is the element type and elements are required.
  2. If the repeated field is a group with multiple fields, then its type is the element type and elements are required.
  3. If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name with _tuple appended then the repeated type is the element type and elements are required.
  4. Otherwise, the repeated field's type is the element type with the repeated field's repetition.

So, seems this hit the (1)?

@mapleFU
Copy link
Member

mapleFU commented Sep 6, 2024

Copy link
Member

@mapleFU mapleFU left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels Sep 7, 2024
@wgtmac
Copy link
Member Author

wgtmac commented Sep 8, 2024

  optional group a (LIST) {
    repeated group array (LIST) {
      repeated int32 array;
    }
  }

IMO, the root cause is that the current code recognizes the schema above as a three-level encoding. However, the inner-most field can only be required or optional in three-level encoding, but here the int32 field is repeated. We can decouple the nesting field into two lists as below:

  outer_list:
  optional group a (LIST) {
    repeated group array (LIST) {}
  }

  inner_list:
  repeated group array (LIST) {
    repeated int32 array;
  }

It is obvious that inner_list can simply apply backward-compatibility rule (1). For the outer_list, the current code applies rule (3). I think we need to apply rule (4) here by modifying the rule (3) to below:

If the repeated field is a group with one required or optional field and is named either array or uses the LIST-annotated group's name with _tuple appended then the repeated type is the element type and elements are required.

@mapleFU
Copy link
Member

mapleFU commented Sep 8, 2024

Yes. It's so tricky, I think we can just copying the Java code directly, lol

@mapleFU
Copy link
Member

mapleFU commented Sep 8, 2024

I think we are just missing check of this line

This fixing itself LGTM, but I just think should we test and align more...

@pitrou
Copy link
Member

pitrou commented Sep 10, 2024

The current C++ parquet implementation interprets following parquet schema as `array<structarray:array>, which is wrong:

What is "array"? Do you mean "list"? Can you fix the PR description?

According to the parquet spec, the above schema should be inferred as array<array<int>>.

Where is this in the Parquet spec? I cannot find a similar example.

I have seen an issue when reading a Parquet file created by Hudi.

  1. Can we check with the Parquet ML whether this is really a legitimate schema structure?
  2. If so, can we add a testing file in parquet-testing?

@mapleFU
Copy link
Member

mapleFU commented Sep 10, 2024

Where is this in the Parquet spec? I cannot find a similar example.

The wording of the spec is very ambigious:

If the repeated field is not a group, then its type is the element type and elements are required.
If the repeated field is a group with multiple fields, then its type is the element type and elements are required.
If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name with _tuple appended then the repeated type is the element type and elements are required.
Otherwise, the repeated field's type is the element type with the repeated field's repetition.

I think this just following the rule(4): repeated field's type is the element type with the repeated field's repetition.

Can we check with the Parquet ML whether this is really a legitimate schema structure?
If so, can we add a testing file in parquet-testing?

I think maybe a testfile would be better

@wgtmac
Copy link
Member Author

wgtmac commented Sep 11, 2024

I‘m using Hive schema, so that's why it is array<array<int>>. The file could be easily produced by Spark Sql like below:

package org.example

import org.apache.spark.sql.SparkSession

object ParquetTwoLevelList {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .master("local[1]")
      .appName("NestedListTest")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
      .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
      .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
      .getOrCreate()
    spark.sql("CREATE TABLE nested_list_test (a array<array<int>>) USING HUDI")
    spark.sql("INSERT INTO nested_list_test VALUES ( array(array(1,2), array(3,4)) )")
  }

}

The parquet-cli prints the following metadata:

File path:  /Users/gangwu/Projects/hudi-spark-generator/spark-warehouse/nested_list_test/f92ed4b5-c063-4b94-90a4-5ef997db1a6c-0_0-13-12_20240911093900996.parquet
Created by: parquet-mr version 1.12.3 (build f8dced182c4c1fbdec6ccb3185537b5a01e6ed6b)
Properties:
  hoodie_bloom_filter_type_code: DYNAMIC_V0
  org.apache.hudi.bloomfilter: ***
  hoodie_min_record_key: 20240911093900996_0_0
  parquet.avro.schema: {"type":"record","name":"nested_list_test_record","namespace":"hoodie.nested_list_test","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"a","type":["null",{"type":"array","items":["null",{"type":"array","items":["null","int"]}]}],"default":null}]}
  writer.model.name: avro
  hoodie_max_record_key: 20240911093900996_0_0
Schema:
message hoodie.nested_list_test.nested_list_test_record {
  optional binary _hoodie_commit_time (STRING);
  optional binary _hoodie_commit_seqno (STRING);
  optional binary _hoodie_record_key (STRING);
  optional binary _hoodie_partition_path (STRING);
  optional binary _hoodie_file_name (STRING);
  optional group a (LIST) {
    repeated group array (LIST) {
      repeated int32 array;
    }
  }
}


Row group 0:  count: 1  441.00 B records  start: 4  total(compressed): 441 B total(uncompressed):349 B
--------------------------------------------------------------------------------
                        type      encodings count     avg size   nulls   min / max
_hoodie_commit_time     BINARY    G   _     1         68.00 B    0       "20240911093900996" / "20240911093900996"
_hoodie_commit_seqno    BINARY    G   _     1         72.00 B    0       "20240911093900996_0_0" / "20240911093900996_0_0"
_hoodie_record_key      BINARY    G   _     1         72.00 B    0       "20240911093900996_0_0" / "20240911093900996_0_0"
_hoodie_partition_path  BINARY    G   _     1         50.00 B    0       "" / ""
_hoodie_file_name       BINARY    G   _     1         116.00 B   0       "f92ed4b5-c063-4b94-90a4-5..." / "f92ed4b5-c063-4b94-90a4-5..."
a.array.array           INT32     G   _     4         15.75 B    0       "1" / "4"

-------------

@mapleFU
Copy link
Member

mapleFU commented Sep 11, 2024

@wgtmac Would you mind check testing file and add one if not exists in parquet-testing?

@wgtmac
Copy link
Member Author

wgtmac commented Sep 13, 2024

I will try to use parquet-java to create a minimal file and add it to parquet-testing. The file created by Hudi is too large due to a file-level bloom filter embedded in the file footer.

@wgtmac
Copy link
Member Author

wgtmac commented Oct 17, 2024

Gentle ping :) @emkornfield @pitrou @mapleFU

Copy link
Member

@mapleFU mapleFU left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wgtmac wgtmac requested a review from pitrou October 24, 2024 14:06
auto arrow_inner_list =
field("array", list(field("array", ::arrow::int32(), /*nullable=*/false)),
/*nullable=*/false);
auto arrow_outer_list = list(arrow_inner_list);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be not-null as well?

Copy link
Member Author

@wgtmac wgtmac Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. list() cannot accept nullable parameter and it is set by field("a", arrow_outer_list, /*nullable=*/false) in the next line.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels Oct 29, 2024
@@ -681,6 +681,10 @@ Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels,
// List of primitive type
RETURN_NOT_OK(
NodeToSchemaField(*list_group.field(0), current_levels, ctx, out, child_field));
} else if (list_group.field_count() == 1 && list_group.field(0)->is_repeated()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, it looks like the HasStructListName is not correct for _tuple, as it only checks the name ends in _tuple and not that it is the top level list appended with _tuple.

@@ -681,6 +681,10 @@ Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels,
// List of primitive type
RETURN_NOT_OK(
NodeToSchemaField(*list_group.field(0), current_levels, ctx, out, child_field));
} else if (list_group.field_count() == 1 && list_group.field(0)->is_repeated()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure this is correct, or at least the comments above need to be updated to explain the logic further.

Specifially from the exapmles on logical type.md:

// List<OneTuple<String>> (nullable list, non-null elements)
optional group my_list (LIST) {
  repeated group array {
    required binary str (STRING);
  };
}

This seems to imply that despite how the file was written with Avro bindings there should in fact be an intermediate struct and not a llist<list<>>, its not clear to me if this is a bug in the spec, or a bug in Avro java writer implementation.

Copy link
Member Author

@wgtmac wgtmac Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rule (3) of backward-compatibility rules is that If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name with _tuple appended then the repeated type is the element type and elements are required.. It says that the repeated type is the element type.

optional group my_list (LIST) {
  repeated group array {
    required binary str (STRING);
  };
}

So for the schema you've just mentioned above, its element type is group array { required binary str (STRING); } which perfectly resolves to OneTuple<String>.

optional group a (LIST) {
  repeated group array (LIST) {
    repeated int32 array;
  }
}

However, for the schema I've mentioned in this issue, its element type is group array (LIST) { repeated int32 array; } and it perfectly resolves to List<int32> according to rule (1) which is If the repeated field is not a group, then its type is the element type and elements are required..

The parquet-java implementation has interpreted this case in the same way: https://github.com/apache/parquet-java/blob/42cf31c0fbe4f000d4ddb1e1092c6634989ea3ca/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java#L588

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have opened apache/parquet-format#466 to clarify things.

Copy link
Contributor

@emkornfield emkornfield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented, its not clear that this is correct, and the bug might be with Avro writer if I am reading the spec correctly.

::std::string_view name{node.name()};
return name == "array" || EndsWith(name, "_tuple");
return name == "array" || name == (parent.name() + "_tuple");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield Fix the matching of _tuple to follow the spec.

RETURN_NOT_OK(GroupToStruct(list_group, current_levels, ctx, out, child_field));
} else if (list_group.field_count() == 1) {
const auto& repeated_field = list_group.field(0);
if (repeated_field->is_repeated()) {
Copy link
Contributor

@emkornfield emkornfield Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the careful explanation on my last questions, after rereading I think I mostly agree that this is a bug that needs to be fixed. I think I missed the second (LIST) annotation. Which even though this check corresponds to the java, code, it seems an important factor here is that the list_group is in fact a (LIST and maybe even possibly a map), not that the inner element is repeated?

So I think the logic might make more sense as (pseudocode):

if (list_group.field_count() > 1) {
   ...
} else if (HasListElementName(list_group, group)) {
   if (IsMap(list_group)) {
           RETURN_NOT_OK(
            ListToMapField(*list_group, current_levels, ctx, out, child_field));
      
   } else if IsList(list_group)) {
      RETURN_NOT_OK(
            ListToSchemaField(*list_group, current_levels, ctx, out, child_field));
  } else {
      RETURN_NOT_OK(GroupToStruct(list_group, current_levels, ctx, out, child_field));
  }
} else {
  ...
}

Does this formulation work?

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels Oct 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants