From f0322f8ac9d33a80a84788343adbc98e378272da Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 8 Sep 2023 11:12:18 -0700 Subject: [PATCH] Add Java variants of WriteTo{Csv,Json}. --- .../schemaio-expansion-service/build.gradle | 3 + .../providers/CsvWriteTransformProvider.java | 143 +++++++++ .../sdk/io/csv/providers/package-info.java | 20 ++ sdks/java/io/json/build.gradle | 35 +++ .../org/apache/beam/sdk/io/json/JsonIO.java | 283 ++++++++++++++++++ .../apache/beam/sdk/io/json/package-info.java | 20 ++ .../providers/JsonWriteTransformProvider.java | 142 +++++++++ .../sdk/io/json/providers/package-info.java | 20 ++ .../beam/sdk/io/json/JsonIOWriteTest.java | 145 +++++++++ sdks/python/apache_beam/yaml/standard_io.yaml | 7 + settings.gradle.kts | 1 + 11 files changed, 819 insertions(+) create mode 100644 sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java create mode 100644 sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/package-info.java create mode 100644 sdks/java/io/json/build.gradle create mode 100644 sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/JsonIO.java create mode 100644 sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/package-info.java create mode 100644 sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java create mode 100644 sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/package-info.java create mode 100644 sdks/java/io/json/src/test/java/org/apache/beam/sdk/io/json/JsonIOWriteTest.java diff --git a/sdks/java/extensions/schemaio-expansion-service/build.gradle b/sdks/java/extensions/schemaio-expansion-service/build.gradle index d23330d73c22..9b809e71be0f 100644 --- a/sdks/java/extensions/schemaio-expansion-service/build.gradle +++ b/sdks/java/extensions/schemaio-expansion-service/build.gradle @@ -32,7 +32,10 @@ applyJavaNature( dependencies { implementation project(path: ":sdks:java:expansion-service") permitUnusedDeclared project(path: ":sdks:java:expansion-service") // BEAM-11761 + implementation project(":sdks:java:extensions:google-cloud-platform-core") + implementation project(":sdks:java:io:csv") implementation project(":sdks:java:io:jdbc") + implementation project(":sdks:java:io:json") permitUnusedDeclared project(":sdks:java:io:jdbc") // BEAM-11761 implementation library.java.postgres permitUnusedDeclared library.java.postgres // BEAM-11761 diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java new file mode 100644 index 000000000000..ca7436b89531 --- /dev/null +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cvs.providers; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.io.WriteFilesResult; +import org.apache.beam.sdk.io.csv.CsvIO; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.commons.csv.CSVFormat; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for {@link CsvIO#write}. + * + *

Internal only: This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@AutoService(SchemaTransformProvider.class) +public class CsvWriteTransformProvider + extends TypedSchemaTransformProvider { + private static final String INPUT_ROWS_TAG = "input"; + private static final String WRITE_RESULTS = "output"; + + @Override + protected Class configurationClass() { + return CsvWriteConfiguration.class; + } + + @Override + protected SchemaTransform from(CsvWriteConfiguration configuration) { + return new CsvWriteTransform(configuration); + } + + @Override + public String identifier() { + return String.format("beam:schematransform:org.apache.beam:csv_write:v1"); + } + + @Override + public List inputCollectionNames() { + return Collections.singletonList(INPUT_ROWS_TAG); + } + + @Override + public List outputCollectionNames() { + return Collections.singletonList(WRITE_RESULTS); + } + + /** Configuration for writing to BigQuery with Storage Write API. */ + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class CsvWriteConfiguration { + + public void validate() { + checkArgument( + !Strings.isNullOrEmpty(this.getPath()), "Path for a CSV Write must be specified."); + } + + public static Builder builder() { + return new AutoValue_CsvWriteTransformProvider_CsvWriteConfiguration.Builder(); + } + + @SchemaFieldDescription("The file path to write to.") + public abstract String getPath(); + + /** Builder for {@link CsvWriteConfiguration}. */ + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setPath(String path); + + /** Builds a {@link CsvWriteConfiguration} instance. */ + public abstract CsvWriteConfiguration build(); + } + } + + /** A {@link SchemaTransform} for {@link CsvIO#write} */ + protected static class CsvWriteTransform extends SchemaTransform { + + private final CsvWriteConfiguration configuration; + + CsvWriteTransform(CsvWriteConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + WriteFilesResult result = + input.get(INPUT_ROWS_TAG).apply(CsvIO.writeRows(configuration.getPath(), CSVFormat.DEFAULT).withSuffix("")); + Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING)); + return PCollectionRowTuple.of( + WRITE_RESULTS, + result + .getPerDestinationOutputFilenames() + .apply( + "Collect filenames", + MapElements.into(TypeDescriptors.rows()) + .via( + (destinationAndRow) -> + Row.withSchema(outputSchema) + .withFieldValue("filename", destinationAndRow.getValue()) + .build())) + .setRowSchema(outputSchema)); + } + } +} diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/package-info.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/package-info.java new file mode 100644 index 000000000000..646e69b7cb8c --- /dev/null +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Transforms for reading and writing CSV files. */ +package org.apache.beam.sdk.io.csv.providers; diff --git a/sdks/java/io/json/build.gradle b/sdks/java/io/json/build.gradle new file mode 100644 index 000000000000..fe1f607a3696 --- /dev/null +++ b/sdks/java/io/json/build.gradle @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.json' +) + +description = "Apache Beam :: SDKs :: Java :: IO :: JSON" +ext.summary = "IO to read and write JSON files." + +dependencies { + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.vendored_guava_32_1_2_jre + implementation library.java.everit_json_schema + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation library.java.junit + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") + testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration") +} \ No newline at end of file diff --git a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/JsonIO.java b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/JsonIO.java new file mode 100644 index 000000000000..3abb29a80427 --- /dev/null +++ b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/JsonIO.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.json; + +import static org.apache.beam.sdk.values.TypeDescriptors.rows; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.ShardNameTemplate; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.WriteFiles; +import org.apache.beam.sdk.io.WriteFilesResult; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.utils.JsonUtils; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; + +/** + * {@link PTransform}s for reading and writing JSON files. + * + *

Reading JSON files

+ * + *

Reading from JSON files is not yet implemented in Java. Please see https://github.com/apache/beam/issues/24552. + * + *

Writing JSON files

+ * + *

To write a {@link PCollection} to one or more line-delimited JSON files, use {@link + * JsonIO.Write}, using{@link JsonIO#writeRows} or {@link JsonIO#write}. {@link JsonIO.Write} + * supports writing {@link Row} or custom Java types using an inferred {@link Schema}. Examples + * below show both scenarios. See the Beam Programming Guide on inferring + * schemas for more information on how to enable Beam to infer a {@link Schema} from a custom + * Java type. + * + *

Example usage:

+ * + *

Suppose we have the following Transaction class annotated with + * {@code @DefaultSchema(JavaBeanSchema.class)} so that Beam can infer its {@link Schema}: + * + *

{@code @DefaultSchema(JavaBeanSchema.class)
+ * public class Transaction {
+ *   public Transaction() { … }
+ *   public Long getTransactionId();
+ *   public void setTransactionId(Long transactionId) { … }
+ *   public String getBank() { … }
+ *   public void setBank(String bank) { … }
+ *   public double getPurchaseAmount() { … }
+ *   public void setPurchaseAmount(double purchaseAmount) { … }
+ * }
+ * }
+ * + *

From a {@code PCollection}, {@link JsonIO.Write} can write one or many JSON + * files. + * + *

{@code
+ * PCollection transactions = ...
+ * transactions.apply(JsonIO.write("path/to/folder/prefix"));
+ * }
+ * + *

The resulting JSON files will look like the following where the header is repeated for every + * file, whereas by default, {@link JsonIO.Write} will write all fields in sorted order of + * the field names. + * + *

{@code
+ * {"bank": "A", "purchaseAmount": 10.23, "transactionId": 12345}
+ * {"bank": "B", "purchaseAmount": 54.65, "transactionId": 54321}
+ * {"bank": "C", "purchaseAmount": 11,76, "transactionId": 98765}
+ * }
+ * + *

A {@link PCollection} of {@link Row}s works just like custom Java types illustrated above, + * except we use {@link JsonIO#writeRows} as shown below for the same {@code Transaction} class. We + * derive {@code Transaction}'s {@link Schema} using a {@link + * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider}. Note that + * hard-coding the {@link Row}s below is for illustration purposes. Developers are instead + * encouraged to take advantage of {@link + * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider#toRowFunction}. + * + *

{@code
+ * DefaultSchemaProvider defaultSchemaProvider = new DefaultSchemaProvider();
+ * Schema schema = defaultSchemaProvider.schemaFor(TypeDescriptor.of(Transaction.class));
+ * PCollection transactions = pipeline.apply(Create.of(
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "A")
+ *    .withFieldValue("purchaseAmount", 10.23)
+ *    .withFieldValue("transactionId", "12345")
+ *    .build(),
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "B")
+ *    .withFieldValue("purchaseAmount", 54.65)
+ *    .withFieldValue("transactionId", "54321")
+ *    .build(),
+ *  Row
+ *    .withSchema(schema)
+ *    .withFieldValue("bank", "C")
+ *    .withFieldValue("purchaseAmount", 11.76)
+ *    .withFieldValue("transactionId", "98765")
+ *    .build()
+ * );
+ *
+ * transactions.apply(
+ *  JsonIO
+ *    .writeRowsTo("gs://bucket/path/to/folder/prefix")
+ * );
+ * }
+ * + *

Writing the transactions {@link PCollection} of {@link Row}s would yield the following JSON + * file content. + * + *

{@code
+ * {"bank": "A", "purchaseAmount": 10.23, "transactionId": 12345}
+ * {"bank": "B", "purchaseAmount": 54.65, "transactionId": 54321}
+ * {"bank": "C", "purchaseAmount": 11,76, "transactionId": 98765}
+ * }
+ */ +public class JsonIO { + static final String DEFAULT_FILENAME_SUFFIX = ".json"; + + /** Instantiates a {@link Write} for writing user types in {@link JSONFormat} format. */ + public static Write write(String to) { + return new AutoValue_JsonIO_Write.Builder() + .setTextIOWrite(createDefaultTextIOWrite(to)) + .build(); + } + + /** Instantiates a {@link Write} for writing {@link Row}s in {@link JSONFormat} format. */ + public static Write writeRows(String to) { + return new AutoValue_JsonIO_Write.Builder() + .setTextIOWrite(createDefaultTextIOWrite(to)) + .build(); + } + + /** {@link PTransform} for writing JSON files. */ + @AutoValue + public abstract static class Write + extends PTransform, WriteFilesResult> { + + /** Specifies the {@link Compression} of all generated shard files. */ + public Write withCompression(Compression compression) { + return toBuilder().setTextIOWrite(getTextIOWrite().withCompression(compression)).build(); + } + + /** Whether to skip the spilling of data. See {@link WriteFiles#withNoSpilling}. */ + public Write withNoSpilling() { + return toBuilder().setTextIOWrite(getTextIOWrite().withNoSpilling()).build(); + } + + /** + * Specifies to use a given fixed number of shards per window. See {@link + * TextIO.Write#withNumShards}. + */ + public Write withNumShards(Integer numShards) { + return toBuilder().setTextIOWrite(getTextIOWrite().withNumShards(numShards)).build(); + } + + /** + * Forces a single file as output and empty shard name template. See {@link + * TextIO.Write#withoutSharding}. + */ + public Write withoutSharding() { + return toBuilder().setTextIOWrite(getTextIOWrite().withoutSharding()).build(); + } + + /** + * Uses the given {@link ShardNameTemplate} for naming output files. See {@link + * TextIO.Write#withShardNameTemplate}. + */ + public Write withShardTemplate(String shardTemplate) { + return toBuilder() + .setTextIOWrite(getTextIOWrite().withShardNameTemplate(shardTemplate)) + .build(); + } + + /** Configures the filename suffix for written files. See {@link TextIO.Write#withSuffix}. */ + public Write withSuffix(String suffix) { + return toBuilder().setTextIOWrite(getTextIOWrite().withSuffix(suffix)).build(); + } + + /** + * Set the base directory used to generate temporary files. See {@link + * TextIO.Write#withTempDirectory}. + */ + public Write withTempDirectory(ResourceId tempDirectory) { + return toBuilder().setTextIOWrite(getTextIOWrite().withTempDirectory(tempDirectory)).build(); + } + + /** + * Preserves windowing of input elements and writes them to files based on the element's window. + * See {@link TextIO.Write#withWindowedWrites}. + */ + public Write withWindowedWrites() { + return toBuilder().setTextIOWrite(getTextIOWrite().withWindowedWrites()).build(); + } + + /** + * Returns a transform for writing to text files like this one but that has the given {@link + * FileBasedSink.WritableByteChannelFactory} to be used by the {@link FileBasedSink} during + * output. See {@link TextIO.Write#withWritableByteChannelFactory}. + */ + public Write withWritableByteChannelFactory( + FileBasedSink.WritableByteChannelFactory writableByteChannelFactory) { + return toBuilder() + .setTextIOWrite( + getTextIOWrite().withWritableByteChannelFactory(writableByteChannelFactory)) + .build(); + } + + /** The underlying {@link FileIO.Write} that writes converted input to JSON formatted output. */ + abstract TextIO.Write getTextIOWrite(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + + /** + * The underlying {@link FileIO.Write} that writes converted input to JSON formatted output. + */ + abstract Builder setTextIOWrite(TextIO.Write value); + + abstract Write autoBuild(); + + final Write build() { + return autoBuild(); + } + } + + @Override + public WriteFilesResult expand(PCollection input) { + if (!input.hasSchema()) { + throw new IllegalArgumentException( + String.format( + "%s requires an input Schema. Note that only Row or user classes are supported. Consider using TextIO or FileIO directly when writing primitive types", + Write.class.getName())); + } + + Schema schema = input.getSchema(); + + RowCoder rowCoder = RowCoder.of(schema); + + PCollection rows = + input + .apply("To Rows", MapElements.into(rows()).via(input.getToRowFunction())) + .setCoder(rowCoder); + + SerializableFunction toJsonFn = + JsonUtils.getRowToJsonStringsFunction(input.getSchema()); + + PCollection json = rows.apply("To JSON", MapElements.into(strings()).via(toJsonFn)); + + return json.apply("Write JSON", getTextIOWrite().withOutputFilenames()); + } + } + + private static TextIO.Write createDefaultTextIOWrite(String to) { + return TextIO.write().to(to).withSuffix(DEFAULT_FILENAME_SUFFIX); + } +} diff --git a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/package-info.java b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/package-info.java new file mode 100644 index 000000000000..1ee191835713 --- /dev/null +++ b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Transforms for reading and writing JSON files. */ +package org.apache.beam.sdk.io.json; diff --git a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java new file mode 100644 index 000000000000..0a18a2c042e6 --- /dev/null +++ b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.json.providers; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.io.WriteFilesResult; +import org.apache.beam.sdk.io.json.JsonIO; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for {@link JsonIO#write}. + * + *

Internal only: This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@AutoService(SchemaTransformProvider.class) +public class JsonWriteTransformProvider + extends TypedSchemaTransformProvider { + private static final String INPUT_ROWS_TAG = "input"; + private static final String WRITE_RESULTS = "output"; + + @Override + protected Class configurationClass() { + return JsonWriteConfiguration.class; + } + + @Override + protected SchemaTransform from(JsonWriteConfiguration configuration) { + return new JsonWriteTransform(configuration); + } + + @Override + public String identifier() { + return String.format("beam:schematransform:org.apache.beam:json_write:v1"); + } + + @Override + public List inputCollectionNames() { + return Collections.singletonList(INPUT_ROWS_TAG); + } + + @Override + public List outputCollectionNames() { + return Collections.singletonList(WRITE_RESULTS); + } + + /** Configuration for writing to BigQuery with Storage Write API. */ + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class JsonWriteConfiguration { + + public void validate() { + checkArgument( + !Strings.isNullOrEmpty(this.getPath()), "Path for a JSON Write must be specified."); + } + + public static Builder builder() { + return new AutoValue_JsonWriteTransformProvider_JsonWriteConfiguration.Builder(); + } + + @SchemaFieldDescription("The file path to write to.") + public abstract String getPath(); + + /** Builder for {@link JsonWriteConfiguration}. */ + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setPath(String path); + + /** Builds a {@link JsonWriteConfiguration} instance. */ + public abstract JsonWriteConfiguration build(); + } + } + + /** A {@link SchemaTransform} for {@link JsonIO#write} */ + protected static class JsonWriteTransform extends SchemaTransform { + + private final JsonWriteConfiguration configuration; + + JsonWriteTransform(JsonWriteConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + WriteFilesResult result = + input.get(INPUT_ROWS_TAG).apply(JsonIO.writeRows(configuration.getPath()).withSuffix("")); + Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING)); + return PCollectionRowTuple.of( + WRITE_RESULTS, + result + .getPerDestinationOutputFilenames() + .apply( + "Collect filenames", + MapElements.into(TypeDescriptors.rows()) + .via( + (destinationAndRow) -> + Row.withSchema(outputSchema) + .withFieldValue("filename", destinationAndRow.getValue()) + .build())) + .setRowSchema(outputSchema)); + } + } +} diff --git a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/package-info.java b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/package-info.java new file mode 100644 index 000000000000..312454f8733b --- /dev/null +++ b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Transforms for reading and writing JSON files. */ +package org.apache.beam.sdk.io.json.providers; diff --git a/sdks/java/io/json/src/test/java/org/apache/beam/sdk/io/json/JsonIOWriteTest.java b/sdks/java/io/json/src/test/java/org/apache/beam/sdk/io/json/JsonIOWriteTest.java new file mode 100644 index 000000000000..71fdcd6b3d94 --- /dev/null +++ b/sdks/java/io/json/src/test/java/org/apache/beam/sdk/io/json/JsonIOWriteTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.json; + +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.allPrimitiveDataTypes; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.File; +import java.io.IOException; +import java.math.BigDecimal; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.AllPrimitiveDataTypes; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.SerializableMatcher; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link JsonIO.Write}. */ +@RunWith(JUnit4.class) +public class JsonIOWriteTest { + @Rule public TestPipeline writePipeline = TestPipeline.create(); + + @Rule public TestPipeline readPipeline = TestPipeline.create(); + + @Rule + public TestPipeline errorPipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void writesUserDefinedTypes() { + File folder = + createFolder(AllPrimitiveDataTypes.class.getSimpleName(), "writesUserDefinedTypes"); + + PCollection input = + writePipeline.apply( + Create.of( + allPrimitiveDataTypes(false, BigDecimal.TEN, 1.0, 1.0f, 1, 1L, "a"), + allPrimitiveDataTypes( + false, BigDecimal.TEN.add(BigDecimal.TEN), 2.0, 2.0f, 2, 2L, "b"), + allPrimitiveDataTypes( + false, + BigDecimal.TEN.add(BigDecimal.TEN).add(BigDecimal.TEN), + 3.0, + 3.0f, + 3, + 3L, + "c"))); + + input.apply(JsonIO.write(toFilenamePrefix(folder)).withNumShards(1)); + + writePipeline.run().waitUntilFinish(); + + PAssert.that(readPipeline.apply(TextIO.read().from(toFilenamePrefix(folder) + "*"))) + .containsInAnyOrder( + containsAll( + "\"aDouble\":1.0", + "\"aFloat\":1.0", + "\"aLong\":1", + "\"aString\":\"a\"", + "\"anInteger\":1", + "\"aDecimal\":10", + "\"aBoolean\":false"), + containsAll( + "\"aDouble\":2.0", + "\"aFloat\":2.0", + "\"aLong\":2", + "\"aString\":\"b\"", + "\"anInteger\":2", + "\"aDecimal\":20", + "\"aBoolean\":false"), + containsAll( + "\"aDouble\":3.0", + "\"aFloat\":3.0", + "\"aLong\":3", + "\"aString\":\"c\"", + "\"anInteger\":3", + "\"aDecimal\":30", + "\"aBoolean\":false")); + + readPipeline.run(); + } + + private static SerializableMatcher containsAll(String... needles) { + class Matcher extends BaseMatcher implements SerializableMatcher { + @Override + public boolean matches(Object item) { + if (!(item instanceof String)) { + return false; + } + + String haystack = (String) item; + for (String needle : needles) { + if (!haystack.contains(needle)) { + return false; + } + } + return true; + } + + @Override + public void describeTo(Description description) { + description.appendText("Contains all of: "); + description.appendValueList("[", ",", "]", needles); + } + } + return new Matcher(); + } + + private static String toFilenamePrefix(File folder) { + checkArgument(folder.isDirectory()); + return folder.getAbsolutePath() + "/out"; + } + + private File createFolder(String... paths) { + try { + return tempFolder.newFolder(paths); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } +} diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index e60f0026fd25..ada2956a74a0 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -80,3 +80,10 @@ 'WriteToCsv': 'apache_beam.io.WriteToCsv' 'ReadFromJson': 'apache_beam.io.ReadFromJson' 'WriteToJson': 'apache_beam.io.WriteToJson' + +- type: beamJar + transforms: + 'WriteToCsv': 'beam:schematransform:org.apache.beam:csv_write:v1' + 'WriteToJson': 'beam:schematransform:org.apache.beam:json_write:v1' + config: + gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar' diff --git a/settings.gradle.kts b/settings.gradle.kts index 45b8c25101b5..6a14aa094a17 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -244,6 +244,7 @@ include(":sdks:java:io:hbase") include(":sdks:java:io:hcatalog") include(":sdks:java:io:jdbc") include(":sdks:java:io:jms") +include(":sdks:java:io:json") include(":sdks:java:io:kafka") include(":sdks:java:io:kinesis") include(":sdks:java:io:kinesis:expansion-service")