diff --git a/docs/large_inputs.md b/docs/large_inputs.md index 83e4b805d..3ef297d7e 100644 --- a/docs/large_inputs.md +++ b/docs/large_inputs.md @@ -14,7 +14,6 @@ Default settings: --worker_machine_type \ --disk_size_gb \ --worker_disk_type \ - --num_bigquery_write_shards \ --partition_config_path \ ``` @@ -98,8 +97,7 @@ transforms (e.g. the sample name is repeated in every record in the BigQuery output rather than just being specified once as in the VCF header), you typically need 3 to 4 times the total size of the raw VCF files. -In addition, if [merging](variant_merging.md) or -[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled, you may +In addition, if [merging](variant_merging.md) is enabled, you may need more disk per worker (e.g. 500GB) as the same variants need to be aggregated together on one machine. @@ -110,32 +108,14 @@ more expensive. However, when choosing a large machine (e.g. `n1-standard-16`), they can reduce cost as they can avoid idle CPU cycles due to disk IOPS limitations. -As a result, we recommend using SSDs if [merging](variant_merge.md) or -[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled: these -operations require "shuffling" the data (i.e. redistributing the data among -workers), which require significant disk I/O. +As a result, we recommend using SSDs if [merging](variant_merge.md) is enabled: +this operation requires "shuffling" the data (i.e. redistributing the data +among workers), which requires significant disk I/O. Set `--worker_disk_type compute.googleapis.com/projects//zones//diskTypes/pd-ssd` to use SSDs. -### `--num_bigquery_write_shards` - -Currently, the write operation to BigQuery in Dataflow is performed as a -postprocessing step after the main transforms are done. As a workaround for -BigQuery write limitations (more details -[here](https://github.com/googlegenomics/gcp-variant-transforms/issues/199)), -we have added "sharding" when writing to BigQuery. This makes the data load -to BigQuery significantly faster as it parallelizes the process and enables -loading large (>5TB) data to BigQuery at once. - -As a result, we recommend setting `--num_bigquery_write_shards 20` when loading -any data that has more than 1 billion rows (after merging) or 1TB of final -output. You may use a smaller number of write shards (e.g. 5) when using -[partitioned output](#--partition_config_path) as each partition also acts as a -"shard". Note that using a larger value (e.g. 50) can cause BigQuery write to -fail as there is a maximum limit on the number of concurrent writes per table. - ### `--partition_config_path` Partitioning the output can save significant query costs once the data is in @@ -146,4 +126,3 @@ partition). As a result, we recommend setting the partition config for very large data where possible. Please see the [documentation](partitioning.md) for more details. - diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index 562331e15..0c849b3bb 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -195,12 +195,7 @@ def add_arguments(self, parser): parser.add_argument( '--num_bigquery_write_shards', type=int, default=1, - help=('Before writing the final result to output BigQuery, the data is ' - 'sharded to avoid a known failure for very large inputs (issue ' - '#199). Setting this flag to 1 will avoid this extra sharding.' - 'It is recommended to use 20 for loading large inputs without ' - 'merging. Use a smaller value (2 or 3) if both merging and ' - 'optimize_for_large_inputs are enabled.')) + help=('This flag is deprecated and may be removed in future releases.')) parser.add_argument( '--null_numeric_value_replacement', type=int, diff --git a/gcp_variant_transforms/pipeline_common.py b/gcp_variant_transforms/pipeline_common.py index 06a66b9c3..71c767961 100644 --- a/gcp_variant_transforms/pipeline_common.py +++ b/gcp_variant_transforms/pipeline_common.py @@ -75,6 +75,9 @@ def parse_args(argv, command_line_options): if hasattr(known_args, 'input_pattern') or hasattr(known_args, 'input_file'): known_args.all_patterns = _get_all_patterns( known_args.input_pattern, known_args.input_file) + + # Enable new BQ sink experiment. + pipeline_args += ['--experiment', 'use_beam_bq_sink'] return known_args, pipeline_args diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json index 656b60995..29f89fa52 100644 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json @@ -9,7 +9,6 @@ "worker_machine_type": "n1-standard-64", "max_num_workers": "64", "num_workers": "20", - "num_bigquery_write_shards": "20", "assertion_configs": [ { "query": ["NUM_ROWS_QUERY"], diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json index 9defc00ba..f42452698 100644 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json @@ -9,7 +9,6 @@ "worker_machine_type": "n1-standard-16", "max_num_workers": "20", "num_workers": "20", - "num_bigquery_write_shards": "2", "assertion_configs": [ { "query": ["NUM_ROWS_QUERY"], @@ -68,4 +67,3 @@ ] } ] - diff --git a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json index c8be9fc61..68a460fb5 100644 --- a/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json +++ b/gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json @@ -7,7 +7,6 @@ "worker_machine_type": "n1-standard-16", "max_num_workers": "20", "num_workers": "20", - "num_bigquery_write_shards": "20", "assertion_configs": [ { "query": ["NUM_ROWS_QUERY"], diff --git a/gcp_variant_transforms/transforms/limit_write.py b/gcp_variant_transforms/transforms/limit_write.py deleted file mode 100644 index 0f910dc71..000000000 --- a/gcp_variant_transforms/transforms/limit_write.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2018 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""A PTransform to limit BQ sink from producing too many files (shards) - -This is a work around to avoid the following failure: - BigQuery execution failed., Error: - Message: Too many sources provided: xxxxx. Limit is 10000. -To limit sink we generate a random dummy key and group by input elements (which -are BigQuery rows) based on that key before writing them to output table. -""" - -from __future__ import absolute_import - -import random -import apache_beam as beam - - -class _RoundRobinKeyFn(beam.DoFn): - def __init__(self, count): - # type: (int) -> None - self._count = count - # This attribute will be properly initiated at each worker by start_bundle() - self._counter = 0 - - def start_bundle(self): - # type: () -> None - self._counter = random.randint(0, self._count - 1) - - def process(self, element): - self._counter += 1 - if self._counter >= self._count: - self._counter -= self._count - yield self._counter, element - - -class LimitWrite(beam.PTransform): - def __init__(self, count): - # type: (int) -> None - self._count = count - - def expand(self, pcoll): - return (pcoll - | beam.ParDo(_RoundRobinKeyFn(self._count)) - | beam.GroupByKey() - | beam.FlatMap(lambda kv: kv[1])) diff --git a/gcp_variant_transforms/transforms/limit_write_test.py b/gcp_variant_transforms/transforms/limit_write_test.py deleted file mode 100644 index 980e1cc87..000000000 --- a/gcp_variant_transforms/transforms/limit_write_test.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright 2018 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Tests for limit_write module.""" - -from __future__ import absolute_import - -import unittest - -from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that -from apache_beam.testing.util import equal_to -from apache_beam.transforms import Create - -from gcp_variant_transforms.beam_io import vcfio -from gcp_variant_transforms.transforms import limit_write - - -class LimitWriteTest(unittest.TestCase): - """Test cases for the ``LimitWrite`` PTransform.""" - - def _get_sample_variants(self): - variant1 = vcfio.Variant( - reference_name='chr19', start=11, end=12, reference_bases='C') - variant2 = vcfio.Variant( - reference_name='20', start=123, end=125, reference_bases='CT') - variant3 = vcfio.Variant( - reference_name='20', start=None, end=None, reference_bases=None) - variant4 = vcfio.Variant( - reference_name='20', start=123, end=125, reference_bases='CT') - return [variant1, variant2, variant3, variant4] - - - def test_limit_write_default_shard_limit(self): - variants = self._get_sample_variants() - input_pcoll = Create(variants) - pipeline = TestPipeline() - output_pcoll = ( - pipeline - | input_pcoll - | 'LimitWrite' >> limit_write.LimitWrite(4500)) - assert_that(output_pcoll, equal_to(variants)) - pipeline.run() - - def test_limit_write_shard_limit_4(self): - variants = self._get_sample_variants() - input_pcoll = Create(variants) - pipeline = TestPipeline() - output_pcoll = ( - pipeline - | input_pcoll - | 'LimitWrite' >> limit_write.LimitWrite(4)) - assert_that(output_pcoll, equal_to(variants)) - pipeline.run() - - def test_limit_write_shard_limit_1(self): - variants = self._get_sample_variants() - input_pcoll = Create(variants) - pipeline = TestPipeline() - output_pcoll = ( - pipeline - | input_pcoll - | 'LimitWrite' >> limit_write.LimitWrite(1)) - assert_that(output_pcoll, equal_to(variants)) - pipeline.run() diff --git a/gcp_variant_transforms/transforms/sample_info_to_bigquery.py b/gcp_variant_transforms/transforms/sample_info_to_bigquery.py index 90de4f4d8..e598d21e6 100644 --- a/gcp_variant_transforms/transforms/sample_info_to_bigquery.py +++ b/gcp_variant_transforms/transforms/sample_info_to_bigquery.py @@ -62,6 +62,7 @@ def __init__(self, output_table_prefix, append=False, self._append = append self.samples_span_multiple_files = samples_span_multiple_files self._schema = sample_info_table_schema_generator.generate_schema() + self._temp_location = temp_location def expand(self, pcoll): return (pcoll @@ -76,4 +77,5 @@ def expand(self, pcoll): beam.io.BigQueryDisposition.WRITE_APPEND if self._append else beam.io.BigQueryDisposition.WRITE_TRUNCATE), - method=beam.io.WriteToBigQuery.Method.FILE_LOADS)) + method=beam.io.WriteToBigQuery.Method.FILE_LOADS, + custom_gcs_temp_location=self._temp_location)) diff --git a/gcp_variant_transforms/transforms/variant_to_bigquery.py b/gcp_variant_transforms/transforms/variant_to_bigquery.py index 9121ba353..58f73d1c8 100644 --- a/gcp_variant_transforms/transforms/variant_to_bigquery.py +++ b/gcp_variant_transforms/transforms/variant_to_bigquery.py @@ -16,7 +16,6 @@ from __future__ import absolute_import -import random from typing import Dict, List # pylint: disable=unused-import import apache_beam as beam @@ -29,13 +28,6 @@ from gcp_variant_transforms.libs import processed_variant from gcp_variant_transforms.libs import vcf_field_conflict_resolver from gcp_variant_transforms.libs.variant_merge import variant_merge_strategy # pylint: disable=unused-import -from gcp_variant_transforms.transforms import limit_write - - -# TODO(samanvp): remove this hack when BQ custom sink is added to Python SDK, -# see: https://issues.apache.org/jira/browse/BEAM-2801 -# This has to be less than 10000. -_WRITE_SHARDS_LIMIT = 1000 @beam.typehints.with_input_types(processed_variant.ProcessedVariant) @@ -67,6 +59,7 @@ def __init__( self, output_table, # type: str header_fields, # type: vcf_header_io.VcfHeader + temp_location, # type: str variant_merger=None, # type: variant_merge_strategy.VariantMergeStrategy proc_var_factory=None, # type: processed_variant.ProcessedVariantFactory # TODO(bashir2): proc_var_factory is a required argument and if `None` is @@ -75,8 +68,8 @@ def __init__( update_schema_on_append=False, # type: bool allow_incompatible_records=False, # type: bool omit_empty_sample_calls=False, # type: bool - num_bigquery_write_shards=1, # type: int - null_numeric_value_replacement=None # type: int + null_numeric_value_replacement=None # type: int, + ): # type: (...) -> None """Initializes the transform. @@ -99,8 +92,6 @@ def __init__( + schema if there is a mismatch. omit_empty_sample_calls: If true, samples that don't have a given call will be omitted. - num_bigquery_write_shards: If > 1, we will limit number of sources which - are used for writing to the output BigQuery table. null_numeric_value_replacement: the value to use instead of null for numeric (float/int/long) lists. For instance, [0, None, 1] will become [0, `null_numeric_value_replacement`, 1]. If not set, the value will set @@ -108,6 +99,7 @@ def __init__( """ self._output_table = output_table self._header_fields = header_fields + self._temp_location = temp_location self._variant_merger = variant_merger self._proc_var_factory = proc_var_factory self._append = append @@ -125,7 +117,6 @@ def __init__( self._allow_incompatible_records = allow_incompatible_records self._omit_empty_sample_calls = omit_empty_sample_calls - self._num_bigquery_write_shards = num_bigquery_write_shards if update_schema_on_append: bigquery_util.update_bigquery_schema_on_append(self._schema.fields, self._output_table) @@ -136,35 +127,15 @@ def expand(self, pcoll): self._bigquery_row_generator, self._allow_incompatible_records, self._omit_empty_sample_calls)) - if self._num_bigquery_write_shards > 1: - # We split data into self._num_bigquery_write_shards random partitions - # and then write each part to final BQ by appending them together. - # Combined with LimitWrite transform, this will avoid the BQ failure. - bq_row_partitions = bq_rows | beam.Partition( - lambda _, n: random.randint(0, n - 1), - self._num_bigquery_write_shards) - bq_writes = [] - for i in range(self._num_bigquery_write_shards): - bq_rows = (bq_row_partitions[i] | 'LimitWrite' + str(i) >> - limit_write.LimitWrite(_WRITE_SHARDS_LIMIT)) - bq_writes.append( - bq_rows | 'WriteToBigQuery' + str(i) >> - beam.io.Write(beam.io.BigQuerySink( + return (bq_rows + | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( self._output_table, schema=self._schema, create_disposition=( beam.io.BigQueryDisposition.CREATE_IF_NEEDED), write_disposition=( - beam.io.BigQueryDisposition.WRITE_APPEND)))) - return bq_writes - else: - return (bq_rows - | 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink( - self._output_table, - schema=self._schema, - create_disposition=( - beam.io.BigQueryDisposition.CREATE_IF_NEEDED), - write_disposition=( - beam.io.BigQueryDisposition.WRITE_APPEND - if self._append - else beam.io.BigQueryDisposition.WRITE_TRUNCATE)))) + beam.io.BigQueryDisposition.WRITE_APPEND + if self._append + else beam.io.BigQueryDisposition.WRITE_TRUNCATE), + method=beam.io.WriteToBigQuery.Method.FILE_LOADS, + custom_gcs_temp_location=self._temp_location)) diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index ed0b8ee93..32e2c93b5 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -383,7 +383,8 @@ def _run_annotation_pipeline(known_args, pipeline_args): def _create_sample_info_table(pipeline, # type: beam.Pipeline pipeline_mode, # type: PipelineModes - known_args, # type: argparse.Namespace + known_args, # type: argparse.Namespace, + pipeline_args, # type: List[str] ): # type: (...) -> None headers = pipeline_common.read_headers( @@ -404,7 +405,6 @@ def run(argv=None): logging.info('Command: %s', ' '.join(argv or sys.argv)) known_args, pipeline_args = pipeline_common.parse_args(argv, _COMMAND_LINE_OPTIONS) - if known_args.auto_flags_experiment: _get_input_dimensions(known_args, pipeline_args) @@ -480,6 +480,8 @@ def run(argv=None): num_partitions = 1 if known_args.output_table: + options = pipeline_options.PipelineOptions(pipeline_args) + google_cloud_options = options.view_as(pipeline_options.GoogleCloudOptions) for i in range(num_partitions): table_suffix = '' if partitioner and partitioner.get_partition_name(i): @@ -489,17 +491,18 @@ def run(argv=None): variant_to_bigquery.VariantToBigQuery( table_name, header_fields, + google_cloud_options.temp_location, variant_merger, processed_variant_factory, append=known_args.append, update_schema_on_append=known_args.update_schema_on_append, allow_incompatible_records=known_args.allow_incompatible_records, omit_empty_sample_calls=known_args.omit_empty_sample_calls, - num_bigquery_write_shards=known_args.num_bigquery_write_shards, null_numeric_value_replacement=( known_args.null_numeric_value_replacement))) if known_args.generate_sample_info_table: - _create_sample_info_table(pipeline, pipeline_mode, known_args) + _create_sample_info_table( + pipeline, pipeline_mode, known_args, pipeline_args) if known_args.output_avro_path: # TODO(bashir2): Add an integration test that outputs to Avro files and diff --git a/setup.py b/setup.py index 86351a6ff..8c079679a 100644 --- a/setup.py +++ b/setup.py @@ -42,8 +42,7 @@ # Nucleus needs uptodate protocol buffer compiler (protoc). 'protobuf>=3.6.1', 'mmh3<2.6', - # Refer to issue #528 - 'google-cloud-storage<1.23.0', + 'google-cloud-storage', 'pyfarmhash', 'pyyaml' ]