Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[yaml] Fix examples catalog tests #33027

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions sdks/python/apache_beam/yaml/examples/testing/examples_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@


def check_output(expected: List[str]):
def _check_inner(actual: PCollection[str]):
formatted_actual = actual | beam.Map(
def _check_inner(actual: List[PCollection[str]]):
formatted_actual = actual | beam.Flatten() | beam.Map(
lambda row: str(beam.Row(**row._asdict())))
assert_matches_stdout(formatted_actual, expected)

Expand Down Expand Up @@ -84,9 +84,12 @@ def test_yaml_example(self):
pickle_library='cloudpickle',
**yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get(
'options', {})))) as p:
actual = yaml_transform.expand_pipeline(p, pipeline_spec)
if not actual:
actual = p.transforms_stack[0].parts[-1].outputs[None]
actual = [yaml_transform.expand_pipeline(p, pipeline_spec)]
if not actual[0]:
actual = list(p.transforms_stack[0].parts[-1].outputs.values())
for transform in p.transforms_stack[0].parts[:-1]:
if transform.transform.label == 'log_for_testing':
actual += list(transform.outputs.values())
check_output(expected)(actual)

return test_yaml_example
Expand Down Expand Up @@ -155,8 +158,12 @@ def _wordcount_test_preprocessor(
env.input_file('kinglear.txt', '\n'.join(lines)))


@YamlExamplesTestSuite.register_test_preprocessor(
['test_simple_filter_yaml', 'test_simple_filter_and_combine_yaml'])
@YamlExamplesTestSuite.register_test_preprocessor([
'test_simple_filter_yaml',
'test_simple_filter_and_combine_yaml',
'test_spanner_read_yaml',
'test_spanner_write_yaml'
])
def _file_io_write_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):

Expand All @@ -167,7 +174,8 @@ def _file_io_write_test_preprocessor(
transform['config'] = {
k: v
for k,
v in transform.get('config', {}).items() if k.startswith('__')
v in transform.get('config', {}).items()
if (k.startswith('__') or k == 'error_handling')
}

return test_spec
Expand Down Expand Up @@ -205,6 +213,10 @@ def _file_io_read_test_preprocessor(
'AggregationExamplesTest',
os.path.join(YAML_DOCS_DIR, '../transforms/aggregation/*.yaml')).run()

IOTest = YamlExamplesTestSuite(
'IOExamplesTest', os.path.join(YAML_DOCS_DIR,
'../transforms/io/*.yaml')).run()

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,6 @@ pipeline:
input: WriteSpanner.my_error_output
config:
path: errors.json

# Expected:
# Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='[email protected]')
67 changes: 67 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import functools
import inspect
from typing import NamedTuple

from apache_beam.typehints.row_type import RowTypeConstraint

import apache_beam as beam


class ErrorHandlingConfig(NamedTuple):
output: str
# TODO: Other parameters are valid here too, but not common to Java.


def exception_handling_args(error_handling_spec):
if error_handling_spec:
return {
'dead_letter_tag' if k == 'output' else k: v
for (k, v) in error_handling_spec.items()
}
else:
return None


def map_errors_to_standard_format(input_type):
# TODO(https://github.com/apache/beam/issues/24755): Switch to MapTuple.

return beam.Map(
lambda x: beam.Row(
element=x[0], msg=str(x[1][1]), stack=''.join(x[1][2]))
).with_output_types(
RowTypeConstraint.from_fields([("element", input_type), ("msg", str),
("stack", str)]))


def maybe_with_exception_handling(inner_expand):
def expand(self, pcoll):
wrapped_pcoll = beam.core._MaybePValueWithErrors(
pcoll, self._exception_handling_args)
return inner_expand(self, wrapped_pcoll).as_result(
map_errors_to_standard_format(pcoll.element_type))

return expand


def maybe_with_exception_handling_transform_fn(transform_fn):
@functools.wraps(transform_fn)
def expand(pcoll, error_handling=None, **kwargs):
wrapped_pcoll = beam.core._MaybePValueWithErrors(
pcoll, exception_handling_args(error_handling))
return transform_fn(wrapped_pcoll, **kwargs).as_result(
map_errors_to_standard_format(pcoll.element_type))

original_signature = inspect.signature(transform_fn)
new_parameters = list(original_signature.parameters.values())
error_handling_param = inspect.Parameter(
'error_handling',
inspect.Parameter.KEYWORD_ONLY,
default=None,
annotation=ErrorHandlingConfig)
if new_parameters[-1].kind == inspect.Parameter.VAR_KEYWORD:
new_parameters.insert(-1, error_handling_param)
else:
new_parameters.append(error_handling_param)
expand.__signature__ = original_signature.replace(parameters=new_parameters)

return expand
6 changes: 3 additions & 3 deletions sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from apache_beam.portability.api import schema_pb2
from apache_beam.typehints import schemas
from apache_beam.yaml import json_utils
from apache_beam.yaml import yaml_mapping
from apache_beam.yaml import yaml_errors
from apache_beam.yaml import yaml_provider


Expand Down Expand Up @@ -289,7 +289,7 @@ def formatter(row):


@beam.ptransform_fn
@yaml_mapping.maybe_with_exception_handling_transform_fn
@yaml_errors.maybe_with_exception_handling_transform_fn
def read_from_pubsub(
root,
*,
Expand Down Expand Up @@ -393,7 +393,7 @@ def mapper(msg):


@beam.ptransform_fn
@yaml_mapping.maybe_with_exception_handling_transform_fn
@yaml_errors.maybe_with_exception_handling_transform_fn
def write_to_pubsub(
pcoll,
*,
Expand Down
73 changes: 6 additions & 67 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
#

"""This module defines the basic MapToFields operation."""
import functools
import inspect
import itertools
import re
from collections import abc
Expand All @@ -27,7 +25,6 @@
from typing import Dict
from typing import List
from typing import Mapping
from typing import NamedTuple
from typing import Optional
from typing import TypeVar
from typing import Union
Expand All @@ -41,13 +38,16 @@
from apache_beam.typehints import trivial_inference
from apache_beam.typehints import typehints
from apache_beam.typehints.native_type_compatibility import convert_to_beam_type
from apache_beam.typehints.row_type import RowTypeConstraint
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.typehints.schemas import schema_from_element_type
from apache_beam.utils import python_callable
from apache_beam.yaml import json_utils
from apache_beam.yaml import options
from apache_beam.yaml import yaml_provider
from apache_beam.yaml.yaml_errors import map_errors_to_standard_format
from apache_beam.yaml.yaml_errors import exception_handling_args
from apache_beam.yaml.yaml_errors import maybe_with_exception_handling
from apache_beam.yaml.yaml_errors import maybe_with_exception_handling_transform_fn
from apache_beam.yaml.yaml_provider import dicts_to_rows

# Import js2py package if it exists
Expand Down Expand Up @@ -417,66 +417,6 @@ def checking_func(row):
return func


class ErrorHandlingConfig(NamedTuple):
output: str
# TODO: Other parameters are valid here too, but not common to Java.


def exception_handling_args(error_handling_spec):
if error_handling_spec:
return {
'dead_letter_tag' if k == 'output' else k: v
for (k, v) in error_handling_spec.items()
}
else:
return None


def _map_errors_to_standard_format(input_type):
# TODO(https://github.com/apache/beam/issues/24755): Switch to MapTuple.

return beam.Map(
lambda x: beam.Row(
element=x[0], msg=str(x[1][1]), stack=''.join(x[1][2]))
).with_output_types(
RowTypeConstraint.from_fields([("element", input_type), ("msg", str),
("stack", str)]))


def maybe_with_exception_handling(inner_expand):
def expand(self, pcoll):
wrapped_pcoll = beam.core._MaybePValueWithErrors(
pcoll, self._exception_handling_args)
return inner_expand(self, wrapped_pcoll).as_result(
_map_errors_to_standard_format(pcoll.element_type))

return expand


def maybe_with_exception_handling_transform_fn(transform_fn):
@functools.wraps(transform_fn)
def expand(pcoll, error_handling=None, **kwargs):
wrapped_pcoll = beam.core._MaybePValueWithErrors(
pcoll, exception_handling_args(error_handling))
return transform_fn(wrapped_pcoll, **kwargs).as_result(
_map_errors_to_standard_format(pcoll.element_type))

original_signature = inspect.signature(transform_fn)
new_parameters = list(original_signature.parameters.values())
error_handling_param = inspect.Parameter(
'error_handling',
inspect.Parameter.KEYWORD_ONLY,
default=None,
annotation=ErrorHandlingConfig)
if new_parameters[-1].kind == inspect.Parameter.VAR_KEYWORD:
new_parameters.insert(-1, error_handling_param)
else:
new_parameters.append(error_handling_param)
expand.__signature__ = original_signature.replace(parameters=new_parameters)

return expand


class _Validate(beam.PTransform):
"""Validates each element of a PCollection against a json schema.

Expand Down Expand Up @@ -780,9 +720,8 @@ def split(element):
splits = pcoll | mapping_transform.with_input_types(T).with_output_types(T)
result = {out: getattr(splits, out) for out in output_set}
if error_output:
result[
error_output] = result[error_output] | _map_errors_to_standard_format(
pcoll.element_type)
result[error_output] = result[error_output] | map_errors_to_standard_format(
pcoll.element_type)
return result


Expand Down
7 changes: 5 additions & 2 deletions sdks/python/apache_beam/yaml/yaml_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from apache_beam.utils import subprocess_server
from apache_beam.version import __version__ as beam_version
from apache_beam.yaml import json_utils
from apache_beam.yaml.yaml_errors import maybe_with_exception_handling_transform_fn


class Provider:
Expand Down Expand Up @@ -876,8 +877,10 @@ def _parse_window_spec(spec):
return beam.WindowInto(window_fn)

@staticmethod
@beam.ptransform_fn
@maybe_with_exception_handling_transform_fn
def log_for_testing(
level: Optional[str] = 'INFO', prefix: Optional[str] = ''):
pcoll, *, level: Optional[str] = 'INFO', prefix: Optional[str] = ''):
"""Logs each element of its input PCollection.

The output of this transform is a copy of its input for ease of use in
Expand Down Expand Up @@ -918,7 +921,7 @@ def log_and_return(x):
logger(prefix + json.dumps(to_loggable_json_recursive(x)))
return x

return "LogForTesting" >> beam.Map(log_and_return)
return pcoll | "LogForTesting" >> beam.Map(log_and_return)

@staticmethod
def create_builtin_provider():
Expand Down
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/yaml/yaml_transform_scope_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ def test_get_pcollection_output(self):
str(scope.get_pcollection("Create")))

self.assertEqual(
"PCollection[Square.None]", str(scope.get_pcollection("Square")))
"PCollection[Square/LogForTesting.None]",
str(scope.get_pcollection("Square")))

self.assertEqual(
"PCollection[Square.None]", str(scope.get_pcollection("LogForTesting")))
"PCollection[Square/LogForTesting.None]",
str(scope.get_pcollection("LogForTesting")))

self.assertTrue(
scope.get_pcollection("Square") == scope.get_pcollection(
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def test_expand_composite_transform_with_name_input(self):
inputs={'elements': elements})
self.assertRegex(
str(expand_composite_transform(spec, scope)['output']),
r"PCollection.*Composite/LogForTesting.*")
r"PCollection.*Composite/log_for_testing/LogForTesting.*")

def test_expand_composite_transform_root(self):
with new_pipeline() as p:
Expand Down
Loading