diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 2e3e9b301618..73ba8d6abdb6 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -901,7 +901,8 @@ def test_write_messages_with_attributes_error(self, mock_pubsub): options = PipelineOptions([]) options.view_as(StandardOptions).streaming = True - with self.assertRaisesRegex(Exception, r'Type hint violation'): + with self.assertRaisesRegex(Exception, + r'requires.*PubsubMessage.*applied.*str'): with TestPipeline(options=options) as p: _ = ( p diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 6ec741705376..4848dc4aade8 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -497,13 +497,12 @@ def type_check_inputs_or_outputs(self, pvalueish, input_or_output): at_context = ' %s %s' % (input_or_output, context) if context else '' raise TypeCheckError( '{type} type hint violation at {label}{context}: expected {hint}, ' - 'got {actual_type}\nFull type hint:\n{debug_str}'.format( + 'got {actual_type}'.format( type=input_or_output.title(), label=self.label, context=at_context, hint=hint, - actual_type=pvalue_.element_type, - debug_str=type_hints.debug_str())) + actual_type=pvalue_.element_type)) def _infer_output_coder(self, input_type=None, input_coder=None): # type: (...) -> Optional[coders.Coder] @@ -939,7 +938,25 @@ def element_type(side_input): bindings = getcallargs_forhints(argspec_fn, *arg_types, **kwargs_types) hints = getcallargs_forhints( argspec_fn, *input_types[0], **input_types[1]) - for arg, hint in hints.items(): + + # First check the main input. + arg_hints = iter(hints.items()) + element_arg, element_hint = next(arg_hints) + if not typehints.is_consistent_with( + bindings.get(element_arg, typehints.Any), element_hint): + transform_nest_level = self.label.count("/") + split_producer_label = pvalueish.producer.full_label.split("/") + producer_label = "/".join( + split_producer_label[:transform_nest_level + 1]) + raise TypeCheckError( + f"The transform '{self.label}' requires " + f"PCollections of type '{element_hint}' " + f"but was applied to a PCollection of type" + f" '{bindings[element_arg]}' " + f"(produced by the transform '{producer_label}'). ") + + # Now check the side inputs. + for arg, hint in arg_hints: if arg.startswith('__unknown__'): continue if hint is None: diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 0acea547ccdc..7db017a59158 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -1298,17 +1298,13 @@ class ToUpperCaseWithPrefix(beam.DoFn): def process(self, element, prefix): return [prefix + element.upper()] - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'Upper.*requires.*str.*applied.*int'): ( self.p | 'T' >> beam.Create([1, 2, 3]).with_output_types(int) | 'Upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello')) - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'Upper': " - "requires {} but got {} for element".format(str, int)) - def test_do_fn_pipeline_runtime_type_check_satisfied(self): self.p._options.view_as(TypeOptions).runtime_type_check = True @@ -1335,18 +1331,14 @@ class AddWithNum(beam.DoFn): def process(self, element, num): return [element + num] - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'Add.*requires.*int.*applied.*str'): ( self.p | 'T' >> beam.Create(['1', '2', '3']).with_output_types(str) | 'Add' >> beam.ParDo(AddWithNum(), 5)) self.p.run() - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'Add': " - "requires {} but got {} for element".format(int, str)) - def test_pardo_does_not_type_check_using_type_hint_decorators(self): @with_input_types(a=int) @with_output_types(typing.List[str]) @@ -1355,17 +1347,13 @@ def int_to_str(a): # The function above is expecting an int for its only parameter. However, it # will receive a str instead, which should result in a raised exception. - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'ToStr.*requires.*int.*applied.*str'): ( self.p | 'S' >> beam.Create(['b', 'a', 'r']).with_output_types(str) | 'ToStr' >> beam.FlatMap(int_to_str)) - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'ToStr': " - "requires {} but got {} for a".format(int, str)) - def test_pardo_properly_type_checks_using_type_hint_decorators(self): @with_input_types(a=str) @with_output_types(typing.List[str]) @@ -1387,7 +1375,8 @@ def to_all_upper_case(a): def test_pardo_does_not_type_check_using_type_hint_methods(self): # The first ParDo outputs pcoll's of type int, however the second ParDo is # expecting pcoll's of type str instead. - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'Upper.*requires.*str.*applied.*int'): ( self.p | 'S' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) @@ -1398,11 +1387,6 @@ def test_pardo_does_not_type_check_using_type_hint_methods(self): 'Upper' >> beam.FlatMap(lambda x: [x.upper()]).with_input_types( str).with_output_types(str))) - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'Upper': " - "requires {} but got {} for x".format(str, int)) - def test_pardo_properly_type_checks_using_type_hint_methods(self): # Pipeline should be created successfully without an error d = ( @@ -1419,18 +1403,14 @@ def test_pardo_properly_type_checks_using_type_hint_methods(self): def test_map_does_not_type_check_using_type_hints_methods(self): # The transform before 'Map' has indicated that it outputs PCollections with # int's, while Map is expecting one of str. - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'Upper.*requires.*str.*applied.*int'): ( self.p | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int) | 'Upper' >> beam.Map(lambda x: x.upper()).with_input_types( str).with_output_types(str)) - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'Upper': " - "requires {} but got {} for x".format(str, int)) - def test_map_properly_type_checks_using_type_hints_methods(self): # No error should be raised if this type-checks properly. d = ( @@ -1449,17 +1429,13 @@ def upper(s): # Hinted function above expects a str at pipeline construction. # However, 'Map' should detect that Create has hinted an int instead. - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'Upper.*requires.*str.*applied.*int'): ( self.p | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int) | 'Upper' >> beam.Map(upper)) - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'Upper': " - "requires {} but got {} for s".format(str, int)) - def test_map_properly_type_checks_using_type_hints_decorator(self): @with_input_types(a=bool) @with_output_types(int) @@ -1477,7 +1453,8 @@ def bool_to_int(a): def test_filter_does_not_type_check_using_type_hints_method(self): # Filter is expecting an int but instead looks to the 'left' and sees a str # incoming. - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'Below 3.*requires.*int.*applied.*str'): ( self.p | 'Strs' >> beam.Create(['1', '2', '3', '4', '5' @@ -1486,11 +1463,6 @@ def test_filter_does_not_type_check_using_type_hints_method(self): str).with_output_types(str) | 'Below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int)) - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'Below 3': " - "requires {} but got {} for x".format(int, str)) - def test_filter_type_checks_using_type_hints_method(self): # No error should be raised if this type-checks properly. d = ( @@ -1508,17 +1480,13 @@ def more_than_half(a): return a > 0.50 # Func above was hinted to only take a float, yet a str will be passed. - with self.assertRaises(typehints.TypeCheckError) as e: + with self.assertRaisesRegex(typehints.TypeCheckError, + r'Half.*requires.*float.*applied.*str'): ( self.p | 'Ints' >> beam.Create(['1', '2', '3', '4']).with_output_types(str) | 'Half' >> beam.Filter(more_than_half)) - self.assertStartswith( - e.exception.args[0], - "Type hint violation for 'Half': " - "requires {} but got {} for a".format(float, str)) - def test_filter_type_checks_using_type_hints_decorator(self): @with_input_types(b=int) def half(b): @@ -2128,14 +2096,10 @@ def test_mean_globally_pipeline_checking_violated(self): self.p | 'C' >> beam.Create(['test']).with_output_types(str) | 'Mean' >> combine.Mean.Globally()) - - expected_msg = \ - "Type hint violation for 'CombinePerKey': " \ - "requires Tuple[TypeVariable[K], Union[, , " \ - ", ]] " \ - "but got Tuple[None, ] for element" - - self.assertStartswith(e.exception.args[0], expected_msg) + err_msg = e.exception.args[0] + assert "CombinePerKey" in err_msg + assert "Tuple[TypeVariable[K]" in err_msg + assert "Tuple[None, " in err_msg def test_mean_globally_runtime_checking_satisfied(self): self.p._options.view_as(TypeOptions).runtime_type_check = True @@ -2195,14 +2159,12 @@ def test_mean_per_key_pipeline_checking_violated(self): typing.Tuple[str, str])) | 'EvenMean' >> combine.Mean.PerKey()) self.p.run() - - expected_msg = \ - "Type hint violation for 'CombinePerKey(MeanCombineFn)': " \ - "requires Tuple[TypeVariable[K], Union[, , " \ - ", ]] " \ - "but got Tuple[, ] for element" - - self.assertStartswith(e.exception.args[0], expected_msg) + err_msg = e.exception.args[0] + assert "CombinePerKey(MeanCombineFn)" in err_msg + assert "requires" in err_msg + assert "Tuple[TypeVariable[K]" in err_msg + assert "applied" in err_msg + assert "Tuple[, ]" in err_msg def test_mean_per_key_runtime_checking_satisfied(self): self.p._options.view_as(TypeOptions).runtime_type_check = True diff --git a/sdks/python/apache_beam/typehints/decorators_test.py b/sdks/python/apache_beam/typehints/decorators_test.py index 3baf9fa8322f..71edc75f31a6 100644 --- a/sdks/python/apache_beam/typehints/decorators_test.py +++ b/sdks/python/apache_beam/typehints/decorators_test.py @@ -409,7 +409,7 @@ def fn(a: int) -> int: return a with self.assertRaisesRegex(TypeCheckError, - r'requires .*int.* but got .*str'): + r'requires .*int.* but was applied .*str'): _ = ['a', 'b', 'c'] | Map(fn) # Same pipeline doesn't raise without annotations on fn. @@ -423,7 +423,7 @@ def fn(a: int) -> int: _ = [1, 2, 3] | Map(fn) # Doesn't raise - correct types. with self.assertRaisesRegex(TypeCheckError, - r'requires .*int.* but got .*str'): + r'requires .*int.* but was applied .*str'): _ = ['a', 'b', 'c'] | Map(fn) @decorators.no_annotations diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 72aed46f5e78..57e7f44f6922 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -88,11 +88,11 @@ def process(self, element): self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'requires.*int.*applied.*str'): ['a', 'b', 'c'] | beam.ParDo(MyDoFn()) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'requires.*int.*applied.*str'): [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn())) def test_typed_dofn_method(self): @@ -104,11 +104,11 @@ def process(self, element: int) -> typehints.Tuple[str]: self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'requires.*int.*applied.*str'): _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn()) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'requires.*int.*applied.*str'): _ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn())) def test_typed_dofn_method_with_class_decorators(self): @@ -124,12 +124,12 @@ def process(self, element: int) -> typehints.Tuple[str]: with self.assertRaisesRegex( typehints.TypeCheckError, - r'requires.*Tuple\[, \].*got.*str'): + r'requires.*Tuple\[, \].*applied.*str'): _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn()) with self.assertRaisesRegex( typehints.TypeCheckError, - r'requires.*Tuple\[, \].*got.*int'): + r'requires.*Tuple\[, \].*applied.*int'): _ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn())) def test_typed_callable_iterable_output(self): @@ -156,11 +156,11 @@ def process(self, element: typehints.Tuple[int, int]) -> \ self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'requires.*int.*applied.*str'): _ = ['a', 'b', 'c'] | beam.ParDo(my_do_fn) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'requires.*int.*applied.*str'): _ = [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn)) def test_typed_callable_instance(self): @@ -177,11 +177,11 @@ def do_fn(element: typehints.Tuple[int, int]) -> typehints.Generator[str]: self.assertEqual(['1', '2', '3'], sorted(result)) with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'requires.*int.*applied.*str'): _ = ['a', 'b', 'c'] | pardo with self.assertRaisesRegex(typehints.TypeCheckError, - r'requires.*int.*got.*str'): + r'requires.*int.*applied.*str'): _ = [1, 2, 3] | (pardo | 'again' >> pardo) def test_filter_type_hint(self): @@ -430,7 +430,7 @@ def fn(element: float): return pcoll | beam.ParDo(fn) with self.assertRaisesRegex(typehints.TypeCheckError, - r'ParDo.*requires.*float.*got.*str'): + r'ParDo.*requires.*float.*applied.*str'): _ = ['1', '2', '3'] | MyMap() with self.assertRaisesRegex(typehints.TypeCheckError, r'MyMap.*expected.*str.*got.*bytes'): @@ -632,14 +632,14 @@ def produces_unkown(e): return e @typehints.with_input_types(int) - def requires_int(e): + def accepts_int(e): return e class MyPTransform(beam.PTransform): def expand(self, pcoll): unknowns = pcoll | beam.Map(produces_unkown) ints = pcoll | beam.Map(int) - return (unknowns, ints) | beam.Flatten() | beam.Map(requires_int) + return (unknowns, ints) | beam.Flatten() | beam.Map(accepts_int) _ = [1, 2, 3] | MyPTransform() @@ -761,8 +761,8 @@ def test_var_positional_only_side_input_hint(self): with self.assertRaisesRegex( typehints.TypeCheckError, - r'requires Tuple\[Union\[, \], ...\] but ' - r'got Tuple\[Union\[, \], ...\]'): + r'requires.*Tuple\[Union\[, \], ...\].*' + r'applied.*Tuple\[Union\[, \], ...\]'): _ = [1.2] | beam.Map(lambda *_: 'a', 5).with_input_types(int, str) def test_var_keyword_side_input_hint(self):