diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java index b3550e7dbb15..f0070f13919b 100644 --- a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java +++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java @@ -118,14 +118,16 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { PCollection files = rowInput.apply("Write Rows", transform); PCollection output = - files.apply( - "Filenames to Rows", - MapElements.into(rows()) - .via( - (String name) -> - Row.withSchema(OUTPUT_SCHEMA) - .withFieldValue(FILE_NAME_FIELD.getName(), name) - .build())); + files + .apply( + "Filenames to Rows", + MapElements.into(rows()) + .via( + (String name) -> + Row.withSchema(OUTPUT_SCHEMA) + .withFieldValue(FILE_NAME_FIELD.getName(), name) + .build())) + .setRowSchema(OUTPUT_SCHEMA); return PCollectionRowTuple.of(OUTPUT_TAG, output); }