From 89b1a7f2028937f70547f165c82e086e308f9e0e Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Tue, 6 Aug 2024 12:24:27 -0400 Subject: [PATCH] [yaml] Fix PubSub error message Signed-off-by: Jeffrey Kinard --- sdks/python/apache_beam/yaml/yaml_io.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 6fac3a4b9b5f..689b5d9eb086 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -214,9 +214,13 @@ def _create_parser( format, schema: Any) -> Tuple[schema_pb2.Schema, Callable[[bytes], beam.Row]]: - if format.islower(): - format = format.upper() - logging.warning('Lowercase formats will be deprecated in version 2.60') + format = format.upper() + + def _validate_schema(): + if not schema: + raise ValueError( + f'{format} format requires valid {format} schema to be passed to ' + f'schema parameter.') if format == 'RAW': if schema: @@ -225,9 +229,11 @@ def _create_parser( schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]), lambda payload: beam.Row(payload=payload)) elif format == 'JSON': + _validate_schema() beam_schema = json_utils.json_schema_to_beam_schema(schema) return beam_schema, json_utils.json_parser(beam_schema, schema) elif format == 'AVRO': + _validate_schema() beam_schema = avroio.avro_schema_to_beam_schema(schema) covert_to_row = avroio.avro_dict_to_beam_row(schema, beam_schema) # pylint: disable=line-too-long