Skip to content

Commit

Permalink
[yaml] Fix PubSub error message
Browse files Browse the repository at this point in the history
Signed-off-by: Jeffrey Kinard <[email protected]>
  • Loading branch information
Polber committed Aug 8, 2024
1 parent c606235 commit 89b1a7f
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,13 @@ def _create_parser(
format,
schema: Any) -> Tuple[schema_pb2.Schema, Callable[[bytes], beam.Row]]:

if format.islower():
format = format.upper()
logging.warning('Lowercase formats will be deprecated in version 2.60')
format = format.upper()

def _validate_schema():
if not schema:
raise ValueError(
f'{format} format requires valid {format} schema to be passed to '
f'schema parameter.')

if format == 'RAW':
if schema:
Expand All @@ -225,9 +229,11 @@ def _create_parser(
schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]),
lambda payload: beam.Row(payload=payload))
elif format == 'JSON':
_validate_schema()
beam_schema = json_utils.json_schema_to_beam_schema(schema)
return beam_schema, json_utils.json_parser(beam_schema, schema)
elif format == 'AVRO':
_validate_schema()
beam_schema = avroio.avro_schema_to_beam_schema(schema)
covert_to_row = avroio.avro_dict_to_beam_row(schema, beam_schema)
# pylint: disable=line-too-long
Expand Down

0 comments on commit 89b1a7f

Please sign in to comment.