From 42efefd989267b6d93fa5c912342feaaa07d7f02 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 23 Sep 2024 10:48:02 -0400 Subject: [PATCH] [GCSIO] Fix internal unit test failure (#32518) * Fix internal unit test failure. * Minor refactor and add comment. * Fix test failure in github action. * Mock is_soft_delete_enabled only if gcsio can be loaded. * Disable unused import lint. It is using in mock. * Format --- .../apache_beam/options/pipeline_options.py | 5 ++ .../options/pipeline_options_test.py | 47 ++++++++++++------- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 50021c4610f2..4497ab0993a4 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -975,6 +975,11 @@ def _create_default_gcs_bucket(self): # Log warning if soft delete policy is enabled in a gcs bucket # that is specified in an argument. def _warn_if_soft_delete_policy_enabled(self, arg_name): + # skip the check if it is in dry-run mode because the later step requires + # internet connection to access GCS + if self.view_as(TestOptions).dry_run: + return + gcs_path = getattr(self, arg_name, None) try: from apache_beam.io.gcp import gcsio diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 61b227d9a246..c0616bc6451c 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -44,6 +44,12 @@ _LOGGER = logging.getLogger(__name__) +try: + import apache_beam.io.gcp.gcsio # pylint: disable=unused-import + has_gcsio = True +except ImportError: + has_gcsio = False + # Mock runners to use for validations. class MockRunners(object): @@ -711,6 +717,16 @@ def test_options_store_false_with_different_dest(self): "the dest and the flag name to the map " "_FLAG_THAT_SETS_FALSE_VALUE in PipelineOptions.py") + def _check_errors(self, options, validator, expected): + if has_gcsio: + with mock.patch('apache_beam.io.gcp.gcsio.GcsIO.is_soft_delete_enabled', + return_value=False): + errors = options._handle_temp_and_staging_locations(validator) + self.assertEqual(errors, expected) + else: + errors = options._handle_temp_and_staging_locations(validator) + self.assertEqual(errors, expected) + def test_validation_good_stg_good_temp(self): runner = MockRunners.DataflowRunner() options = GoogleCloudOptions([ @@ -719,8 +735,7 @@ def test_validation_good_stg_good_temp(self): '--temp_location=gs://beam/tmp' ]) validator = PipelineOptionsValidator(options, runner) - errors = options._handle_temp_and_staging_locations(validator) - self.assertEqual(errors, []) + self._check_errors(options, validator, []) self.assertEqual( options.get_all_options()['staging_location'], "gs://beam/stg") self.assertEqual( @@ -734,8 +749,7 @@ def test_validation_bad_stg_good_temp(self): '--temp_location=gs://beam/tmp' ]) validator = PipelineOptionsValidator(options, runner) - errors = options._handle_temp_and_staging_locations(validator) - self.assertEqual(errors, []) + self._check_errors(options, validator, []) self.assertEqual( options.get_all_options()['staging_location'], "gs://beam/tmp") self.assertEqual( @@ -749,8 +763,7 @@ def test_validation_good_stg_bad_temp(self): '--temp_location=badGSpath' ]) validator = PipelineOptionsValidator(options, runner) - errors = options._handle_temp_and_staging_locations(validator) - self.assertEqual(errors, []) + self._check_errors(options, validator, []) self.assertEqual( options.get_all_options()['staging_location'], "gs://beam/stg") self.assertEqual( @@ -764,8 +777,7 @@ def test_validation_bad_stg_bad_temp_with_default(self): '--temp_location=badGSpath' ]) validator = PipelineOptionsValidator(options, runner) - errors = options._handle_temp_and_staging_locations(validator) - self.assertEqual(errors, []) + self._check_errors(options, validator, []) self.assertEqual( options.get_all_options()['staging_location'], "gs://default/bucket") self.assertEqual( @@ -779,16 +791,15 @@ def test_validation_bad_stg_bad_temp_no_default(self): '--temp_location=badGSpath' ]) validator = PipelineOptionsValidator(options, runner) - errors = options._handle_temp_and_staging_locations(validator) - self.assertEqual(len(errors), 2, errors) - self.assertIn( - 'Invalid GCS path (badGSpath), given for the option: temp_location.', - errors, - errors) - self.assertIn( - 'Invalid GCS path (badGSpath), given for the option: staging_location.', - errors, - errors) + self._check_errors( + options, + validator, + [ + 'Invalid GCS path (badGSpath), given for the option: ' \ + 'temp_location.', + 'Invalid GCS path (badGSpath), given for the option: ' \ + 'staging_location.' + ]) if __name__ == '__main__':