Skip to content

Commit

Permalink
Update typecheck err msg (#32880)
Browse files Browse the repository at this point in the history
* Update typecheck err msg

* update a few typed_pipeline_test unit tests

* Move new logic to only apply to main element

* fix tests

* remove please comment

* update tests again

* remove debug str

* fix more tests

* fix pubsub test

* revert accidental pyproject change

* revert pyrpoject change

* fix ptransform_test tests

* add explanatory comments for similar looking code
  • Loading branch information
hjtran authored Nov 4, 2024
1 parent 242b0b2 commit 76c5d56
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 86 deletions.
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,8 @@ def test_write_messages_with_attributes_error(self, mock_pubsub):

options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with self.assertRaisesRegex(Exception, r'Type hint violation'):
with self.assertRaisesRegex(Exception,
r'requires.*PubsubMessage.*applied.*str'):
with TestPipeline(options=options) as p:
_ = (
p
Expand Down
25 changes: 21 additions & 4 deletions sdks/python/apache_beam/transforms/ptransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,12 @@ def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
at_context = ' %s %s' % (input_or_output, context) if context else ''
raise TypeCheckError(
'{type} type hint violation at {label}{context}: expected {hint}, '
'got {actual_type}\nFull type hint:\n{debug_str}'.format(
'got {actual_type}'.format(
type=input_or_output.title(),
label=self.label,
context=at_context,
hint=hint,
actual_type=pvalue_.element_type,
debug_str=type_hints.debug_str()))
actual_type=pvalue_.element_type))

def _infer_output_coder(self, input_type=None, input_coder=None):
# type: (...) -> Optional[coders.Coder]
Expand Down Expand Up @@ -939,7 +938,25 @@ def element_type(side_input):
bindings = getcallargs_forhints(argspec_fn, *arg_types, **kwargs_types)
hints = getcallargs_forhints(
argspec_fn, *input_types[0], **input_types[1])
for arg, hint in hints.items():

# First check the main input.
arg_hints = iter(hints.items())
element_arg, element_hint = next(arg_hints)
if not typehints.is_consistent_with(
bindings.get(element_arg, typehints.Any), element_hint):
transform_nest_level = self.label.count("/")
split_producer_label = pvalueish.producer.full_label.split("/")
producer_label = "/".join(
split_producer_label[:transform_nest_level + 1])
raise TypeCheckError(
f"The transform '{self.label}' requires "
f"PCollections of type '{element_hint}' "
f"but was applied to a PCollection of type"
f" '{bindings[element_arg]}' "
f"(produced by the transform '{producer_label}'). ")

# Now check the side inputs.
for arg, hint in arg_hints:
if arg.startswith('__unknown__'):
continue
if hint is None:
Expand Down
90 changes: 26 additions & 64 deletions sdks/python/apache_beam/transforms/ptransform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,17 +1298,13 @@ class ToUpperCaseWithPrefix(beam.DoFn):
def process(self, element, prefix):
return [prefix + element.upper()]

with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Upper.*requires.*str.*applied.*int'):
(
self.p
| 'T' >> beam.Create([1, 2, 3]).with_output_types(int)
| 'Upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello'))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Upper': "
"requires {} but got {} for element".format(str, int))

def test_do_fn_pipeline_runtime_type_check_satisfied(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True

Expand All @@ -1335,18 +1331,14 @@ class AddWithNum(beam.DoFn):
def process(self, element, num):
return [element + num]

with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Add.*requires.*int.*applied.*str'):
(
self.p
| 'T' >> beam.Create(['1', '2', '3']).with_output_types(str)
| 'Add' >> beam.ParDo(AddWithNum(), 5))
self.p.run()

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Add': "
"requires {} but got {} for element".format(int, str))

def test_pardo_does_not_type_check_using_type_hint_decorators(self):
@with_input_types(a=int)
@with_output_types(typing.List[str])
Expand All @@ -1355,17 +1347,13 @@ def int_to_str(a):

# The function above is expecting an int for its only parameter. However, it
# will receive a str instead, which should result in a raised exception.
with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'ToStr.*requires.*int.*applied.*str'):
(
self.p
| 'S' >> beam.Create(['b', 'a', 'r']).with_output_types(str)
| 'ToStr' >> beam.FlatMap(int_to_str))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'ToStr': "
"requires {} but got {} for a".format(int, str))

def test_pardo_properly_type_checks_using_type_hint_decorators(self):
@with_input_types(a=str)
@with_output_types(typing.List[str])
Expand All @@ -1387,7 +1375,8 @@ def to_all_upper_case(a):
def test_pardo_does_not_type_check_using_type_hint_methods(self):
# The first ParDo outputs pcoll's of type int, however the second ParDo is
# expecting pcoll's of type str instead.
with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Upper.*requires.*str.*applied.*int'):
(
self.p
| 'S' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
Expand All @@ -1398,11 +1387,6 @@ def test_pardo_does_not_type_check_using_type_hint_methods(self):
'Upper' >> beam.FlatMap(lambda x: [x.upper()]).with_input_types(
str).with_output_types(str)))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Upper': "
"requires {} but got {} for x".format(str, int))

def test_pardo_properly_type_checks_using_type_hint_methods(self):
# Pipeline should be created successfully without an error
d = (
Expand All @@ -1419,18 +1403,14 @@ def test_pardo_properly_type_checks_using_type_hint_methods(self):
def test_map_does_not_type_check_using_type_hints_methods(self):
# The transform before 'Map' has indicated that it outputs PCollections with
# int's, while Map is expecting one of str.
with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Upper.*requires.*str.*applied.*int'):
(
self.p
| 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
| 'Upper' >> beam.Map(lambda x: x.upper()).with_input_types(
str).with_output_types(str))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Upper': "
"requires {} but got {} for x".format(str, int))

def test_map_properly_type_checks_using_type_hints_methods(self):
# No error should be raised if this type-checks properly.
d = (
Expand All @@ -1449,17 +1429,13 @@ def upper(s):

# Hinted function above expects a str at pipeline construction.
# However, 'Map' should detect that Create has hinted an int instead.
with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Upper.*requires.*str.*applied.*int'):
(
self.p
| 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
| 'Upper' >> beam.Map(upper))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Upper': "
"requires {} but got {} for s".format(str, int))

def test_map_properly_type_checks_using_type_hints_decorator(self):
@with_input_types(a=bool)
@with_output_types(int)
Expand All @@ -1477,7 +1453,8 @@ def bool_to_int(a):
def test_filter_does_not_type_check_using_type_hints_method(self):
# Filter is expecting an int but instead looks to the 'left' and sees a str
# incoming.
with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Below 3.*requires.*int.*applied.*str'):
(
self.p
| 'Strs' >> beam.Create(['1', '2', '3', '4', '5'
Expand All @@ -1486,11 +1463,6 @@ def test_filter_does_not_type_check_using_type_hints_method(self):
str).with_output_types(str)
| 'Below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Below 3': "
"requires {} but got {} for x".format(int, str))

def test_filter_type_checks_using_type_hints_method(self):
# No error should be raised if this type-checks properly.
d = (
Expand All @@ -1508,17 +1480,13 @@ def more_than_half(a):
return a > 0.50

# Func above was hinted to only take a float, yet a str will be passed.
with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Half.*requires.*float.*applied.*str'):
(
self.p
| 'Ints' >> beam.Create(['1', '2', '3', '4']).with_output_types(str)
| 'Half' >> beam.Filter(more_than_half))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Half': "
"requires {} but got {} for a".format(float, str))

def test_filter_type_checks_using_type_hints_decorator(self):
@with_input_types(b=int)
def half(b):
Expand Down Expand Up @@ -2128,14 +2096,10 @@ def test_mean_globally_pipeline_checking_violated(self):
self.p
| 'C' >> beam.Create(['test']).with_output_types(str)
| 'Mean' >> combine.Mean.Globally())

expected_msg = \
"Type hint violation for 'CombinePerKey': " \
"requires Tuple[TypeVariable[K], Union[<class 'float'>, <class 'int'>, " \
"<class 'numpy.float64'>, <class 'numpy.int64'>]] " \
"but got Tuple[None, <class 'str'>] for element"

self.assertStartswith(e.exception.args[0], expected_msg)
err_msg = e.exception.args[0]
assert "CombinePerKey" in err_msg
assert "Tuple[TypeVariable[K]" in err_msg
assert "Tuple[None, <class 'str'>" in err_msg

def test_mean_globally_runtime_checking_satisfied(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True
Expand Down Expand Up @@ -2195,14 +2159,12 @@ def test_mean_per_key_pipeline_checking_violated(self):
typing.Tuple[str, str]))
| 'EvenMean' >> combine.Mean.PerKey())
self.p.run()

expected_msg = \
"Type hint violation for 'CombinePerKey(MeanCombineFn)': " \
"requires Tuple[TypeVariable[K], Union[<class 'float'>, <class 'int'>, " \
"<class 'numpy.float64'>, <class 'numpy.int64'>]] " \
"but got Tuple[<class 'str'>, <class 'str'>] for element"

self.assertStartswith(e.exception.args[0], expected_msg)
err_msg = e.exception.args[0]
assert "CombinePerKey(MeanCombineFn)" in err_msg
assert "requires" in err_msg
assert "Tuple[TypeVariable[K]" in err_msg
assert "applied" in err_msg
assert "Tuple[<class 'str'>, <class 'str'>]" in err_msg

def test_mean_per_key_runtime_checking_satisfied(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/typehints/decorators_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def fn(a: int) -> int:
return a

with self.assertRaisesRegex(TypeCheckError,
r'requires .*int.* but got .*str'):
r'requires .*int.* but was applied .*str'):
_ = ['a', 'b', 'c'] | Map(fn)

# Same pipeline doesn't raise without annotations on fn.
Expand All @@ -423,7 +423,7 @@ def fn(a: int) -> int:
_ = [1, 2, 3] | Map(fn) # Doesn't raise - correct types.

with self.assertRaisesRegex(TypeCheckError,
r'requires .*int.* but got .*str'):
r'requires .*int.* but was applied .*str'):
_ = ['a', 'b', 'c'] | Map(fn)

@decorators.no_annotations
Expand Down
30 changes: 15 additions & 15 deletions sdks/python/apache_beam/typehints/typed_pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ def process(self, element):
self.assertEqual(['1', '2', '3'], sorted(result))

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
['a', 'b', 'c'] | beam.ParDo(MyDoFn())

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
[1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))

def test_typed_dofn_method(self):
Expand All @@ -104,11 +104,11 @@ def process(self, element: int) -> typehints.Tuple[str]:
self.assertEqual(['1', '2', '3'], sorted(result))

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
_ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn())

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
_ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))

def test_typed_dofn_method_with_class_decorators(self):
Expand All @@ -124,12 +124,12 @@ def process(self, element: int) -> typehints.Tuple[str]:

with self.assertRaisesRegex(
typehints.TypeCheckError,
r'requires.*Tuple\[<class \'int\'>, <class \'int\'>\].*got.*str'):
r'requires.*Tuple\[<class \'int\'>, <class \'int\'>\].*applied.*str'):
_ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn())

with self.assertRaisesRegex(
typehints.TypeCheckError,
r'requires.*Tuple\[<class \'int\'>, <class \'int\'>\].*got.*int'):
r'requires.*Tuple\[<class \'int\'>, <class \'int\'>\].*applied.*int'):
_ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))

def test_typed_callable_iterable_output(self):
Expand All @@ -156,11 +156,11 @@ def process(self, element: typehints.Tuple[int, int]) -> \
self.assertEqual(['1', '2', '3'], sorted(result))

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
_ = ['a', 'b', 'c'] | beam.ParDo(my_do_fn)

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
_ = [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn))

def test_typed_callable_instance(self):
Expand All @@ -177,11 +177,11 @@ def do_fn(element: typehints.Tuple[int, int]) -> typehints.Generator[str]:
self.assertEqual(['1', '2', '3'], sorted(result))

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
_ = ['a', 'b', 'c'] | pardo

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
_ = [1, 2, 3] | (pardo | 'again' >> pardo)

def test_filter_type_hint(self):
Expand Down Expand Up @@ -430,7 +430,7 @@ def fn(element: float):
return pcoll | beam.ParDo(fn)

with self.assertRaisesRegex(typehints.TypeCheckError,
r'ParDo.*requires.*float.*got.*str'):
r'ParDo.*requires.*float.*applied.*str'):
_ = ['1', '2', '3'] | MyMap()
with self.assertRaisesRegex(typehints.TypeCheckError,
r'MyMap.*expected.*str.*got.*bytes'):
Expand Down Expand Up @@ -632,14 +632,14 @@ def produces_unkown(e):
return e

@typehints.with_input_types(int)
def requires_int(e):
def accepts_int(e):
return e

class MyPTransform(beam.PTransform):
def expand(self, pcoll):
unknowns = pcoll | beam.Map(produces_unkown)
ints = pcoll | beam.Map(int)
return (unknowns, ints) | beam.Flatten() | beam.Map(requires_int)
return (unknowns, ints) | beam.Flatten() | beam.Map(accepts_int)

_ = [1, 2, 3] | MyPTransform()

Expand Down Expand Up @@ -761,8 +761,8 @@ def test_var_positional_only_side_input_hint(self):

with self.assertRaisesRegex(
typehints.TypeCheckError,
r'requires Tuple\[Union\[<class \'int\'>, <class \'str\'>\], ...\] but '
r'got Tuple\[Union\[<class \'float\'>, <class \'int\'>\], ...\]'):
r'requires.*Tuple\[Union\[<class \'int\'>, <class \'str\'>\], ...\].*'
r'applied.*Tuple\[Union\[<class \'float\'>, <class \'int\'>\], ...\]'):
_ = [1.2] | beam.Map(lambda *_: 'a', 5).with_input_types(int, str)

def test_var_keyword_side_input_hint(self):
Expand Down

0 comments on commit 76c5d56

Please sign in to comment.