Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new BigQuery sink and remove num_bigquery_write_shards flag usage. #499

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

tneymanov
Copy link
Collaborator

Modify BigQuery writing mechanism to use the new WriteToBigQuery PTransform, and deprecate num_bigquery_write_shards flag.

Previously, issue #199 forced us to use a hack to shard the variants before they are written to BigQuery, which negatively affects the processing speed. With the implementation of the new sink, the flag is no longer need.

The flag will remain as a dummy in order to not break any of the current callers.

@coveralls
Copy link

coveralls commented Jun 25, 2019

Pull Request Test Coverage Report for Build 1767

  • 0 of 2 (0.0%) changed or added relevant lines in 2 files are covered.
  • 2 unchanged lines in 2 files lost coverage.
  • Overall coverage increased (+0.07%) to 89.178%

Changes Missing Coverage Covered Lines Changed/Added Lines %
gcp_variant_transforms/pipeline_common.py 0 1 0.0%
gcp_variant_transforms/transforms/variant_to_bigquery.py 0 1 0.0%
Files with Coverage Reduction New Missed Lines %
gcp_variant_transforms/pipeline_common.py 1 71.63%
gcp_variant_transforms/vcf_to_bq.py 1 33.02%
Totals Coverage Status
Change from base Build 1758: 0.07%
Covered Lines: 7721
Relevant Lines: 8658

💛 - Coveralls

@@ -173,7 +173,8 @@ def add_arguments(self, parser):
parser.add_argument(
'--num_bigquery_write_shards',
type=int, default=1,
help=('Before writing the final result to output BigQuery, the data is '
help=('Note: This flag is now deprecated and should not be used! '

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we nuke all the help text and just say "This flag is deprecated and may be removed in future releases" ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we nuke all the help text and just say "This flag is deprecated and may be removed in future releases" ?

+1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

operations require "shuffling" the data (i.e. redistributing the data among
workers), which require significant disk I/O.
As a result, we recommend using SSDs if [merging](variant_merge.md) is enabled:
these operations require "shuffling" the data (i.e. redistributing the data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It is not 'these' any more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -173,7 +173,8 @@ def add_arguments(self, parser):
parser.add_argument(
'--num_bigquery_write_shards',
type=int, default=1,
help=('Before writing the final result to output BigQuery, the data is '
help=('Note: This flag is now deprecated and should not be used! '
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we nuke all the help text and just say "This flag is deprecated and may be removed in future releases" ?

+1

beam.io.BigQueryDisposition.WRITE_APPEND
if self._append
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
method=beam.io.WriteToBigQuery.Method.STREAMING_INSERTS))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you choose to use streaming here? It costs extra money.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Collaborator Author

@tneymanov tneymanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed the comments.

Also enabled 'use_beam_bq_sink' flag - using this flag, I was able to run the new thousand genomes dataset without receiving the BQ error.

operations require "shuffling" the data (i.e. redistributing the data among
workers), which require significant disk I/O.
As a result, we recommend using SSDs if [merging](variant_merge.md) is enabled:
these operations require "shuffling" the data (i.e. redistributing the data
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -173,7 +173,8 @@ def add_arguments(self, parser):
parser.add_argument(
'--num_bigquery_write_shards',
type=int, default=1,
help=('Before writing the final result to output BigQuery, the data is '
help=('Note: This flag is now deprecated and should not be used! '
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

beam.io.BigQueryDisposition.WRITE_APPEND
if self._append
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
method=beam.io.WriteToBigQuery.Method.STREAMING_INSERTS))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@allieychen allieychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Is the performance of the bq sink similar to the beam sink? Also, use_beam_bq_sink is an experiment flag, I don't know whether there is any known downside or the plan for this flag from Beam's team? It might worth to talk to them.

@tneymanov
Copy link
Collaborator Author

LGTM. Is the performance of the bq sink similar to the beam sink? Also, use_beam_bq_sink is an experiment flag, I don't know whether there is any known downside or the plan for this flag from Beam's team? It might worth to talk to them.

For the new 1k genomes dataset it finished in 47 min 19 sec with 3,712 cores. Without it, it takes more than hour.

Copy link
Member

@samanvp samanvp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Tural,
Please also remove transforms/limit_write.py and transforms/limit_write_test.py

@@ -29,7 +28,6 @@
from gcp_variant_transforms.libs import processed_variant
from gcp_variant_transforms.libs import vcf_field_conflict_resolver
from gcp_variant_transforms.libs.variant_merge import variant_merge_strategy # pylint: disable=unused-import
from gcp_variant_transforms.transforms import limit_write


# TODO(samanvp): remove this hack when BQ custom sink is added to Python SDK,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also remove the TODO and the const under it.

@tneymanov tneymanov force-pushed the beam_sink branch 2 times, most recently from 482151b to 4d4481c Compare January 13, 2020 15:18
@tneymanov
Copy link
Collaborator Author

Addressed Saman's comments from July.

Copy link
Member

@samanvp samanvp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Tural, please address the comments.
After making sure all tests are passing we can submit this PR.

setup.py Outdated
@@ -42,8 +42,7 @@
# Nucleus needs uptodate protocol buffer compiler (protoc).
'protobuf>=3.6.1',
'mmh3<2.6',
# Refer to issue #528
'google-cloud-storage<1.23.0',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to pin to 1.22 due to #528

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -383,7 +383,8 @@ def _run_annotation_pipeline(known_args, pipeline_args):

def _create_sample_info_table(pipeline, # type: beam.Pipeline
pipeline_mode, # type: PipelineModes
known_args, # type: argparse.Namespace
known_args, # type: argparse.Namespace,
pipeline_args, # type: List[str]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipeline_args is not used anywhere in this method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to use them to create temp_location/_directory for SampleInfoToBigQuery, but I guess I forgot and there are no tests to catch it...

Removed it, since I can just pass temp_directory from the parent.

@@ -480,6 +480,8 @@ def run(argv=None):
num_partitions = 1

if known_args.output_table:
options = pipeline_options.PipelineOptions(pipeline_args)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is used only once in the next line. Is it possible to combine these two lines?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlined even further to get temp_directory, which is used in both following calls. Also added a check to prevent using BQ export (since there is also Avro one) without temp_directory, as new sink demands it. The check is in the beginning so that customer doesn't waste compute resources just to be shut down at the end.

Copy link
Collaborator Author

@tneymanov tneymanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All but 1k genomes test have passed (this one takes a bit longer).

Also modified sample table code a bit, as apparently it was broken. I presume your follow up PR was going to fix it, but I didn't want a broken code to be in the last release for python 2.7. PTAL.

@@ -383,7 +383,8 @@ def _run_annotation_pipeline(known_args, pipeline_args):

def _create_sample_info_table(pipeline, # type: beam.Pipeline
pipeline_mode, # type: PipelineModes
known_args, # type: argparse.Namespace
known_args, # type: argparse.Namespace,
pipeline_args, # type: List[str]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to use them to create temp_location/_directory for SampleInfoToBigQuery, but I guess I forgot and there are no tests to catch it...

Removed it, since I can just pass temp_directory from the parent.

@@ -480,6 +480,8 @@ def run(argv=None):
num_partitions = 1

if known_args.output_table:
options = pipeline_options.PipelineOptions(pipeline_args)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inlined even further to get temp_directory, which is used in both following calls. Also added a check to prevent using BQ export (since there is also Avro one) without temp_directory, as new sink demands it. The check is in the beginning so that customer doesn't waste compute resources just to be shut down at the end.

setup.py Outdated
@@ -42,8 +42,7 @@
# Nucleus needs uptodate protocol buffer compiler (protoc).
'protobuf>=3.6.1',
'mmh3<2.6',
# Refer to issue #528
'google-cloud-storage<1.23.0',
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

@samanvp samanvp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Tural, after addressing these comments we should be able to land this PR.

I have 2 main concerns:

  • What will happen if pipeline is in DirectRunner mode? Does the new sink still works as expected? Do we need to set the temp_location on GCS or it has to be a local directory on the machine that runs the pipeline? Please make sure we check this scenario thoroughly.
  • We need to run the code for a large input, for example 1000Genome. If you remember that's how we found out this new sink has a bug and drops some of the output rows.

Thanks!

'It is recommended to use 20 for loading large inputs without '
'merging. Use a smaller value (2 or 3) if both merging and '
'optimize_for_large_inputs are enabled.'))
help=('This flag is deprecated and may be removed in future releases.'))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...and will be removed in the next release.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 409 to 410
if known_args.output_table and '--temp_location' not in pipeline_args:
raise ValueError('--temp_location is required for BigQuery imports.')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check seems out of place in this module, could we move it somewhere else?
How about this:
We can conduct this check in pipeline_common.parse_args, perhaps in _raise_error_on_invalid_flags by adding pipeline_args as its second input argument. We should check temp_directory is set, is not null, and is a valid GCS bucket.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 488 to 489
if not temp_directory:
raise ValueError('--temp_location must be set when writing to BigQuery.')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we conduct the check as I mentioned above this check can be removed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, artifact. Removed.

if self._append
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
custom_gcs_temp_location=self._temp_location))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't need to set this argument as long as w have it in pipeline_args:
https://github.com/apache/beam/blob/7bea94c59d1ead24659e09b0e467beeb82f4cadd/sdks/python/apache_beam/io/gcp/bigquery.py#L1261

custom_gcs_temp_location (str): A GCS location to store files to be used for file loads into BigQuery. By default, this will use the pipeline's temp_location, but for pipelines whose temp_location is not appropriate for BQ File Loads, users should pass a specific one.

Could you please test this and make sure it will work without setting this arg. If that's the case there is no need to pass temp_location to this module as well as sample_info_to_bigquery.py.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested, seems to be the case... removed manual selection of temp_dir.

Copy link
Member

@samanvp samanvp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure all the comments are addressed before merging this PR.

@@ -1,23 +0,0 @@
[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this file is deleted?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are getting 'non-homogenized' list error from this test, as I've told you offline.

Seems like old sink was fixing it on it's own. This test will be reintroduced in the PySam PR.

@tneymanov tneymanov force-pushed the beam_sink branch 2 times, most recently from ad9582b to 1ed2f38 Compare January 14, 2020 21:59
Copy link
Collaborator Author

@tneymanov tneymanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Saman.

'It is recommended to use 20 for loading large inputs without '
'merging. Use a smaller value (2 or 3) if both merging and '
'optimize_for_large_inputs are enabled.'))
help=('This flag is deprecated and may be removed in future releases.'))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if self._append
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
custom_gcs_temp_location=self._temp_location))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested, seems to be the case... removed manual selection of temp_dir.

Comment on lines 409 to 410
if known_args.output_table and '--temp_location' not in pipeline_args:
raise ValueError('--temp_location is required for BigQuery imports.')
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 488 to 489
if not temp_directory:
raise ValueError('--temp_location must be set when writing to BigQuery.')
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, artifact. Removed.

@@ -1,23 +0,0 @@
[
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are getting 'non-homogenized' list error from this test, as I've told you offline.

Seems like old sink was fixing it on it's own. This test will be reintroduced in the PySam PR.

@samanvp
Copy link
Member

samanvp commented Jan 14, 2020

Thanks Tural.
Please make sure we add the removed test in next PR.
Also this PR shouldn't merge unless we get a successful 1000Genome run without any missing rows.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants