Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Map in BQ for StorageWrites API for Beam Rows #32512

Conversation

prodriguezdefino
Copy link
Contributor

@prodriguezdefino prodriguezdefino commented Sep 20, 2024

Currently, BigQuery table schema utility and the implementation for StorageWrites for Beam Rows does not support sending rows with properties of type Map or array of Map as part of their schema.

This PR adds that functionality transforming the Map into a Message type which contains two fields, key and value, respecting the types coming from the Row schema while mimicking the behavior when using TableRows to the BigQueryIO PTransform.

Copied from #22179 since it got closed after inactivity for long period (and I can not re-open it).


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@prodriguezdefino
Copy link
Contributor Author

fixes #23618

@JohnZZGithub FYI

@prodriguezdefino prodriguezdefino marked this pull request as ready for review September 20, 2024 06:30
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @robertwb for label java.
R: @johnjcasey for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@JohnZZGithub
Copy link
Contributor

The patch LGTM. And we tested it on our GCP env.

@damccorm
Copy link
Contributor

damccorm commented Oct 3, 2024

@robertwb @johnjcasey could you please take a look at this one?

Copy link
Contributor

Reminder, please take a look at this pr: @robertwb @johnjcasey

Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.
R: @chamikaramj for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@Nullable FieldType keyType = field.getType().getMapKeyType();
@Nullable FieldType valueType = field.getType().getMapValueType();
if (keyType == null || valueType == null) {
throw new RuntimeException("Unexpected null element type!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some context to this exception around the error being in converting to the storage api proto? That would help users diagnose their pipelines without needing to know the beam code as well

return list.stream()
.map(v -> toProtoValue(fieldDescriptor, arrayElementType, v))
.collect(Collectors.toList());
boolean shouldFlatMap =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it supports one level of nested collection. Is that an intended limit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can you add a comment to that effect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I will add the comment.

Treating the recursive collection flattening should help to cover the other collection types. This PR focus on the special case of having a MAP, or ARRAY as the type of a field in a Row.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the flattening of nested container types.

@@ -403,7 +403,7 @@ private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {
}
if (type.getTypeName().isCollectionType()) {
type = Preconditions.checkArgumentNotNull(type.getCollectionElementType());
if (type.getTypeName().isCollectionType() || type.getTypeName().isMapType()) {
if (type.getTypeName().isCollectionType() && !type.getTypeName().isMapType()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BQ supports arrays of maps, but not arrays of other collections?

Copy link
Contributor Author

@prodriguezdefino prodriguezdefino Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BigQuery only supports arrays of structs and scalar types, not nested collection/map types.

This change enables storing a simple map or an array of maps (after flattening them) as a repeated struct field.

A separated change may help to support flattening other collection types (arrays of arrays for example).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this change.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this contribution. Supporting maps will be very nice.

@Nullable FieldType keyType = field.getType().getMapKeyType();
@Nullable FieldType valueType = field.getType().getMapValueType();
if (keyType == null || valueType == null) {
throw new RuntimeException("Unexpected null element type!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be more informative if the key and value raised distinct errors.

@@ -272,6 +286,8 @@ private static Object messageValueFromRowValue(
if (value == null) {
if (fieldDescriptor.isOptional()) {
return null;
} else if (fieldDescriptor.isRepeated()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we distinguish between the empty list and a missing value. I think we want to keep that distinction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping the return value as null.

It seems that the previous code will fail in the case of having a schema with a simple array of strings as a field and marking it as nullable. By adding this check here it also fixes that case. Adding a test to check those particularities.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh, seems like master now has the same code as initially proposed by this PR (see here). For my changes this was not necessary, but maybe other things have been modified.

I will use the Collections.emptyList() reference from there.

list.stream().map(v -> toProtoValue(fieldDescriptor, arrayElementType, v));

if (shouldFlatMap) {
valueStream = valueStream.flatMap(vs -> ((List) vs).stream());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we introducing this flattening here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment here: #22179 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also added a comment on the code to explain it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the flattening of nested container types and added a missing check for array.

.collect(Collectors.toList());
// We currently only support maps as non-row or non-scalar element types
// given that BigQuery does not support nested arrays. If the element type is of map type
// we should flatten it given how is being translated (as a list of proto(key, value).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, if BQ doesn't support arrays of arrays (or arrays of maps) we should reject such rather than implicitly flattening them (which is lossy and could be unexpected from a user's perspective).

Copy link
Contributor Author

@prodriguezdefino prodriguezdefino Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you correctly stated, BQ does not support those nested container types, but it does not support maps either.

currently as a Beam user, when translating formats for BQ ingestion (from Avro, Thrift, or others which support MAP natively), I need to inspect the schemas or IDLs to understand if a MAP or a nested container is there and translate it into something that works for BQ. This adds complexity to the pipelines, and can be detrimental of the overall performance (because potential packing and unpacking needed to translate data in the original format).

this change aims to aid that translation, for both cases MAP type and ARRAY/ITERABLE of MAPs which are both supported in Beam Rows (used for simplicity after original format translation) but not in the BQ storage write proto translation. for MAP, we are making a structure decision for the translation, and for ARRAY/ITERABLE of MAPs as well.

I agree, we are losing key functionalities from the original structures with this translation decision (indexing and key uniqueness as starters), but I think through improved documentation we can alert the users about these caveats (which does not affect already existing pipelines given that this is a net-new feature).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Maps it's fine as there's no surprise on the user side (e.g. seeing key-value records) plus it can be losslessly translated back if read as a MAP type. But the same cannot be said of the flattening that's done.

If we are concerned about convenience for users, a separate explicit transform that flattens nested structures could be provided (which would be the identity if there are no required unnestings). This should have comparable performance to doing it as part of the write, and likely negligible to the cost of actually talking to the services in question.

Copy link
Contributor Author

@prodriguezdefino prodriguezdefino Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I will remove the flattening here. FYI @JohnZZGithub.

I noticed that we are not checking for maps types here, I will add the check and improve the messaging for users to understand what's going on.

Also I will try to work, on a separated PR, on a more general flattening configuration. Probably in the form of a lambda, so we can delegate the users what to do in the case of encountering a nested container type in the translation process.

@prodriguezdefino prodriguezdefino changed the title Support Map and Arrays of Maps in BQ for StorageWrites API for Beam Rows Support Map in BQ for StorageWrites API for Beam Rows Oct 18, 2024
@robertwb robertwb merged commit 4f4853e into apache:master Oct 22, 2024
18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants