Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update typecheck err msg #32880

Merged
merged 14 commits into from
Nov 4, 2024
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,8 @@ def test_write_messages_with_attributes_error(self, mock_pubsub):

options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with self.assertRaisesRegex(Exception, r'Type hint violation'):
with self.assertRaisesRegex(Exception,
r'requires.*PubsubMessage.*applied.*str'):
with TestPipeline(options=options) as p:
_ = (
p
Expand Down
25 changes: 21 additions & 4 deletions sdks/python/apache_beam/transforms/ptransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,12 @@ def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
at_context = ' %s %s' % (input_or_output, context) if context else ''
raise TypeCheckError(
'{type} type hint violation at {label}{context}: expected {hint}, '
'got {actual_type}\nFull type hint:\n{debug_str}'.format(
'got {actual_type}'.format(
type=input_or_output.title(),
label=self.label,
context=at_context,
hint=hint,
actual_type=pvalue_.element_type,
debug_str=type_hints.debug_str()))
actual_type=pvalue_.element_type))

def _infer_output_coder(self, input_type=None, input_coder=None):
# type: (...) -> Optional[coders.Coder]
Expand Down Expand Up @@ -939,7 +938,25 @@ def element_type(side_input):
bindings = getcallargs_forhints(argspec_fn, *arg_types, **kwargs_types)
hints = getcallargs_forhints(
argspec_fn, *input_types[0], **input_types[1])
for arg, hint in hints.items():

# First check the main input.
arg_hints = iter(hints.items())
element_arg, element_hint = next(arg_hints)
if not typehints.is_consistent_with(
bindings.get(element_arg, typehints.Any), element_hint):
transform_nest_level = self.label.count("/")
split_producer_label = pvalueish.producer.full_label.split("/")
producer_label = "/".join(
split_producer_label[:transform_nest_level + 1])
raise TypeCheckError(
f"The transform '{self.label}' requires "
f"PCollections of type '{element_hint}' "
f"but was applied to a PCollection of type"
f" '{bindings[element_arg]}' "
f"(produced by the transform '{producer_label}'). ")
Comment on lines +943 to +956
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this type check is done up here rather than modifying the check nested in the for loop? It looks kind of redundant here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is specifically a check for the main element while the loop is checking presumably side inputs.


# Now check the side inputs.
for arg, hint in arg_hints:
if arg.startswith('__unknown__'):
continue
if hint is None:
Expand Down
90 changes: 26 additions & 64 deletions sdks/python/apache_beam/transforms/ptransform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,17 +1298,13 @@ class ToUpperCaseWithPrefix(beam.DoFn):
def process(self, element, prefix):
return [prefix + element.upper()]

with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Upper.*requires.*str.*applied.*int'):
(
self.p
| 'T' >> beam.Create([1, 2, 3]).with_output_types(int)
| 'Upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello'))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Upper': "
"requires {} but got {} for element".format(str, int))

def test_do_fn_pipeline_runtime_type_check_satisfied(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True

Expand All @@ -1335,18 +1331,14 @@ class AddWithNum(beam.DoFn):
def process(self, element, num):
return [element + num]

with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Add.*requires.*int.*applied.*str'):
(
self.p
| 'T' >> beam.Create(['1', '2', '3']).with_output_types(str)
| 'Add' >> beam.ParDo(AddWithNum(), 5))
self.p.run()

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Add': "
"requires {} but got {} for element".format(int, str))

def test_pardo_does_not_type_check_using_type_hint_decorators(self):
@with_input_types(a=int)
@with_output_types(typing.List[str])
Expand All @@ -1355,17 +1347,13 @@ def int_to_str(a):

# The function above is expecting an int for its only parameter. However, it
# will receive a str instead, which should result in a raised exception.
with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'ToStr.*requires.*int.*applied.*str'):
(
self.p
| 'S' >> beam.Create(['b', 'a', 'r']).with_output_types(str)
| 'ToStr' >> beam.FlatMap(int_to_str))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'ToStr': "
"requires {} but got {} for a".format(int, str))

def test_pardo_properly_type_checks_using_type_hint_decorators(self):
@with_input_types(a=str)
@with_output_types(typing.List[str])
Expand All @@ -1387,7 +1375,8 @@ def to_all_upper_case(a):
def test_pardo_does_not_type_check_using_type_hint_methods(self):
# The first ParDo outputs pcoll's of type int, however the second ParDo is
# expecting pcoll's of type str instead.
with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Upper.*requires.*str.*applied.*int'):
(
self.p
| 'S' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
Expand All @@ -1398,11 +1387,6 @@ def test_pardo_does_not_type_check_using_type_hint_methods(self):
'Upper' >> beam.FlatMap(lambda x: [x.upper()]).with_input_types(
str).with_output_types(str)))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Upper': "
"requires {} but got {} for x".format(str, int))

def test_pardo_properly_type_checks_using_type_hint_methods(self):
# Pipeline should be created successfully without an error
d = (
Expand All @@ -1419,18 +1403,14 @@ def test_pardo_properly_type_checks_using_type_hint_methods(self):
def test_map_does_not_type_check_using_type_hints_methods(self):
# The transform before 'Map' has indicated that it outputs PCollections with
# int's, while Map is expecting one of str.
with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Upper.*requires.*str.*applied.*int'):
(
self.p
| 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
| 'Upper' >> beam.Map(lambda x: x.upper()).with_input_types(
str).with_output_types(str))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Upper': "
"requires {} but got {} for x".format(str, int))

def test_map_properly_type_checks_using_type_hints_methods(self):
# No error should be raised if this type-checks properly.
d = (
Expand All @@ -1449,17 +1429,13 @@ def upper(s):

# Hinted function above expects a str at pipeline construction.
# However, 'Map' should detect that Create has hinted an int instead.
with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Upper.*requires.*str.*applied.*int'):
(
self.p
| 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
| 'Upper' >> beam.Map(upper))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Upper': "
"requires {} but got {} for s".format(str, int))

def test_map_properly_type_checks_using_type_hints_decorator(self):
@with_input_types(a=bool)
@with_output_types(int)
Expand All @@ -1477,7 +1453,8 @@ def bool_to_int(a):
def test_filter_does_not_type_check_using_type_hints_method(self):
# Filter is expecting an int but instead looks to the 'left' and sees a str
# incoming.
with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Below 3.*requires.*int.*applied.*str'):
(
self.p
| 'Strs' >> beam.Create(['1', '2', '3', '4', '5'
Expand All @@ -1486,11 +1463,6 @@ def test_filter_does_not_type_check_using_type_hints_method(self):
str).with_output_types(str)
| 'Below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Below 3': "
"requires {} but got {} for x".format(int, str))

def test_filter_type_checks_using_type_hints_method(self):
# No error should be raised if this type-checks properly.
d = (
Expand All @@ -1508,17 +1480,13 @@ def more_than_half(a):
return a > 0.50

# Func above was hinted to only take a float, yet a str will be passed.
with self.assertRaises(typehints.TypeCheckError) as e:
with self.assertRaisesRegex(typehints.TypeCheckError,
r'Half.*requires.*float.*applied.*str'):
(
self.p
| 'Ints' >> beam.Create(['1', '2', '3', '4']).with_output_types(str)
| 'Half' >> beam.Filter(more_than_half))

self.assertStartswith(
e.exception.args[0],
"Type hint violation for 'Half': "
"requires {} but got {} for a".format(float, str))

def test_filter_type_checks_using_type_hints_decorator(self):
@with_input_types(b=int)
def half(b):
Expand Down Expand Up @@ -2128,14 +2096,10 @@ def test_mean_globally_pipeline_checking_violated(self):
self.p
| 'C' >> beam.Create(['test']).with_output_types(str)
| 'Mean' >> combine.Mean.Globally())

expected_msg = \
"Type hint violation for 'CombinePerKey': " \
"requires Tuple[TypeVariable[K], Union[<class 'float'>, <class 'int'>, " \
"<class 'numpy.float64'>, <class 'numpy.int64'>]] " \
"but got Tuple[None, <class 'str'>] for element"

self.assertStartswith(e.exception.args[0], expected_msg)
Comment on lines -2131 to -2138
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and one of the other messages got too hard/frustrating to update to use with regex, so I just did some basic spot checking. The number of cases that the new asserts wouldn't catch but the old would I think are pretty few.

err_msg = e.exception.args[0]
assert "CombinePerKey" in err_msg
assert "Tuple[TypeVariable[K]" in err_msg
assert "Tuple[None, <class 'str'>" in err_msg

def test_mean_globally_runtime_checking_satisfied(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True
Expand Down Expand Up @@ -2195,14 +2159,12 @@ def test_mean_per_key_pipeline_checking_violated(self):
typing.Tuple[str, str]))
| 'EvenMean' >> combine.Mean.PerKey())
self.p.run()

expected_msg = \
"Type hint violation for 'CombinePerKey(MeanCombineFn)': " \
"requires Tuple[TypeVariable[K], Union[<class 'float'>, <class 'int'>, " \
"<class 'numpy.float64'>, <class 'numpy.int64'>]] " \
"but got Tuple[<class 'str'>, <class 'str'>] for element"

self.assertStartswith(e.exception.args[0], expected_msg)
err_msg = e.exception.args[0]
assert "CombinePerKey(MeanCombineFn)" in err_msg
assert "requires" in err_msg
assert "Tuple[TypeVariable[K]" in err_msg
assert "applied" in err_msg
assert "Tuple[<class 'str'>, <class 'str'>]" in err_msg

def test_mean_per_key_runtime_checking_satisfied(self):
self.p._options.view_as(TypeOptions).runtime_type_check = True
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/typehints/decorators_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ def fn(a: int) -> int:
return a

with self.assertRaisesRegex(TypeCheckError,
r'requires .*int.* but got .*str'):
r'requires .*int.* but was applied .*str'):
_ = ['a', 'b', 'c'] | Map(fn)

# Same pipeline doesn't raise without annotations on fn.
Expand All @@ -423,7 +423,7 @@ def fn(a: int) -> int:
_ = [1, 2, 3] | Map(fn) # Doesn't raise - correct types.

with self.assertRaisesRegex(TypeCheckError,
r'requires .*int.* but got .*str'):
r'requires .*int.* but was applied .*str'):
_ = ['a', 'b', 'c'] | Map(fn)

@decorators.no_annotations
Expand Down
30 changes: 15 additions & 15 deletions sdks/python/apache_beam/typehints/typed_pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ def process(self, element):
self.assertEqual(['1', '2', '3'], sorted(result))

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
['a', 'b', 'c'] | beam.ParDo(MyDoFn())

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
[1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))

def test_typed_dofn_method(self):
Expand All @@ -104,11 +104,11 @@ def process(self, element: int) -> typehints.Tuple[str]:
self.assertEqual(['1', '2', '3'], sorted(result))

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
_ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn())

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
_ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))

def test_typed_dofn_method_with_class_decorators(self):
Expand All @@ -124,12 +124,12 @@ def process(self, element: int) -> typehints.Tuple[str]:

with self.assertRaisesRegex(
typehints.TypeCheckError,
r'requires.*Tuple\[<class \'int\'>, <class \'int\'>\].*got.*str'):
r'requires.*Tuple\[<class \'int\'>, <class \'int\'>\].*applied.*str'):
_ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn())

with self.assertRaisesRegex(
typehints.TypeCheckError,
r'requires.*Tuple\[<class \'int\'>, <class \'int\'>\].*got.*int'):
r'requires.*Tuple\[<class \'int\'>, <class \'int\'>\].*applied.*int'):
_ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))

def test_typed_callable_iterable_output(self):
Expand All @@ -156,11 +156,11 @@ def process(self, element: typehints.Tuple[int, int]) -> \
self.assertEqual(['1', '2', '3'], sorted(result))

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
_ = ['a', 'b', 'c'] | beam.ParDo(my_do_fn)

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
_ = [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn))

def test_typed_callable_instance(self):
Expand All @@ -177,11 +177,11 @@ def do_fn(element: typehints.Tuple[int, int]) -> typehints.Generator[str]:
self.assertEqual(['1', '2', '3'], sorted(result))

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
_ = ['a', 'b', 'c'] | pardo

with self.assertRaisesRegex(typehints.TypeCheckError,
r'requires.*int.*got.*str'):
r'requires.*int.*applied.*str'):
_ = [1, 2, 3] | (pardo | 'again' >> pardo)

def test_filter_type_hint(self):
Expand Down Expand Up @@ -430,7 +430,7 @@ def fn(element: float):
return pcoll | beam.ParDo(fn)

with self.assertRaisesRegex(typehints.TypeCheckError,
r'ParDo.*requires.*float.*got.*str'):
r'ParDo.*requires.*float.*applied.*str'):
_ = ['1', '2', '3'] | MyMap()
with self.assertRaisesRegex(typehints.TypeCheckError,
r'MyMap.*expected.*str.*got.*bytes'):
Expand Down Expand Up @@ -632,14 +632,14 @@ def produces_unkown(e):
return e

@typehints.with_input_types(int)
def requires_int(e):
def accepts_int(e):
return e

class MyPTransform(beam.PTransform):
def expand(self, pcoll):
unknowns = pcoll | beam.Map(produces_unkown)
ints = pcoll | beam.Map(int)
return (unknowns, ints) | beam.Flatten() | beam.Map(requires_int)
return (unknowns, ints) | beam.Flatten() | beam.Map(accepts_int)

_ = [1, 2, 3] | MyPTransform()

Expand Down Expand Up @@ -761,8 +761,8 @@ def test_var_positional_only_side_input_hint(self):

with self.assertRaisesRegex(
typehints.TypeCheckError,
r'requires Tuple\[Union\[<class \'int\'>, <class \'str\'>\], ...\] but '
r'got Tuple\[Union\[<class \'float\'>, <class \'int\'>\], ...\]'):
r'requires.*Tuple\[Union\[<class \'int\'>, <class \'str\'>\], ...\].*'
r'applied.*Tuple\[Union\[<class \'float\'>, <class \'int\'>\], ...\]'):
_ = [1.2] | beam.Map(lambda *_: 'a', 5).with_input_types(int, str)

def test_var_keyword_side_input_hint(self):
Expand Down
Loading