diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 0f8457a40a7b..e20e0e8ca046 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -576,6 +576,14 @@ def _add_argparse_args(cls, parser): 'updating a pipeline or reloading the job state. ' 'This is not recommended for streaming jobs.') + parser.add_argument( + '--no_wait_until_finish', + default=False, + action='store_true', + help='By default, the "with" statement waits for the job to ' + 'complete. Set this flag to bypass this behavior and continue ' + 'execution immediately') + class StreamingOptions(PipelineOptions): @classmethod diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index f5183b49bc4b..6209ca1ddae8 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -618,7 +618,13 @@ def __exit__( try: if not exc_type: self.result = self.run() - self.result.wait_until_finish() + if not self._options.view_as(StandardOptions).no_wait_until_finish: + self.result.wait_until_finish() + else: + logging.info( + 'Job execution continues without waiting for completion.' + ' Use "wait_until_finish" in PipelineResult to block' + ' until finished.') finally: self._extra_context.__exit__(exc_type, exc_val, exc_tb) diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 7863352cbefa..61aac350280f 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -274,6 +274,16 @@ def test_reuse_custom_transform_instance(self): 'reloading the job state. This is not recommended for ' 'streaming jobs.') + @mock.patch('logging.info') # Mock the logging.info function + def test_no_wait_until_finish(self, mock_info): + with Pipeline(runner='DirectRunner', + options=PipelineOptions(["--no_wait_until_finish"])) as p: + _ = p | beam.Create(['test']) + mock_info.assert_called_once_with( + 'Job execution continues without waiting for completion. ' + 'Use "wait_until_finish" in PipelineResult to block until finished.') + p.result.wait_until_finish() + def test_auto_unique_labels(self): opts = PipelineOptions(["--auto_unique_labels"]) @@ -773,7 +783,7 @@ def test_windowed_value_param(self): | Map(lambda _, wv=DoFn.WindowedValueParam: (wv.value, wv.windows))) assert_that( pcoll, - equal_to([(1, [IntervalWindow(0, 5)]), (7, [IntervalWindow(5, 10)])])) + equal_to([(1, [IntervalWindow(0, 5)]), (7, [IntervalWindow(5, 10)])])) # pylint: disable=too-many-function-args def test_timestamp_param(self): class TestDoFn(DoFn):