Skip to content

Commit

Permalink
[CsvIO] Create CsvIOParse Class (#32028)
Browse files Browse the repository at this point in the history
* [CsvIO] Create CsvIOParse Class

Co-authored-by: Francis O'Hara <[email protected]>

* rough draft

* [CsvIO] Create CsvIOParse Class

Co-authored-by: Francis O'Hara <[email protected]>

* Deleted changes made to CsvIOStringToRecord Class

Co-authored-by: Francis O'Hara <[email protected]>

* [CsvIO] update tests for CsvIO for more coverage

Co-authored-by: Francis O'Hara <[email protected]>

* added more tests for CsvIOParse

Co-authored-by: Francis O'Hara <[email protected]>

* Added documentation for CsvIOParse

Co-authored-by: Francis O'Hara <[email protected]>

---------

Co-authored-by: Francis O'Hara <[email protected]>
  • Loading branch information
lahariguduru and francisohara24 authored Aug 1, 2024
1 parent 21009e6 commit 6aac47c
Show file tree
Hide file tree
Showing 6 changed files with 756 additions and 1 deletion.
162 changes: 162 additions & 0 deletions sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.csv;

import static java.util.Objects.requireNonNull;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.sdk.values.TypeDescriptors.rows;
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
Expand All @@ -35,15 +36,21 @@
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.JavaBeanSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.commons.csv.CSVFormat;

Expand Down Expand Up @@ -340,6 +347,161 @@ public static Write<Row> writeRows(String to, CSVFormat csvFormat) {
.build();
}

/**
* Instantiates a {@link CsvIOParse} for parsing CSV string records into custom {@link
* Schema}-mapped {@code Class<T>}es from the records' assumed <a
* href="https://www.javadoc.io/doc/org.apache.commons/commons-csv/1.8/org/apache/commons/csv/CSVFormat.html">CsvFormat</a>.
* See the <a
* href="https://beam.apache.org/documentation/programming-guide/#inferring-schemas">Beam
* Programming Guide</a> on how to configure your custom {@code Class<T>} for Beam to infer its
* {@link Schema} using a {@link SchemaProvider} annotation such as {@link AutoValueSchema} or
* {@link JavaBeanSchema}.
*
* <h2>Example usage</h2>
*
* The example below illustrates parsing <a
* href="https://www.javadoc.io/doc/org.apache.commons/commons-csv/1.8/org/apache/commons/csv/CSVFormat.html#DEFAULT">CsvFormat#DEFAULT</a>
* formatted CSV string records, read from {@link TextIO.Read}, into an {@link AutoValueSchema}
* annotated <a
* href="https://github.com/google/auto/blob/main/value/userguide/index.md">AutoValue</a> data
* class {@link PCollection}.
*
* <pre>{@code
* // SomeDataClass is a data class configured for Beam to automatically infer its Schema.
* @DefaultSchema(AutoValueSchema.class)
* @AutoValue
* abstract class SomeDataClass {
*
* abstract String getSomeString();
* abstract Integer getSomeInteger();
*
* @AutoValue.Builder
* abstract static class Builder {
* abstract Builder setSomeString(String value);
* abstract Builder setSomeInteger(Integer value);
*
* abstract SomeDataClass build();
* }
* }
*
* // Pipeline example reads CSV string records from Google Cloud storage and writes to BigQuery.
* Pipeline pipeline = Pipeline.create();
*
* // Read CSV records from Google Cloud storage using TextIO.
* PCollection<String> csvRecords = pipeline
* .apply(TextIO.read().from("gs://bucket/folder/*.csv");
*
* // Apply the CSV records PCollection<String> to the CsvIOParse transform instantiated using CsvIO.parse.
* CsvIOParseResult<SomeDataClass> result = csvRecords.apply(CsvIO.parse(
* SomeDataClass.class,
* CsvFormat.DEFAULT.withHeader("someString", "someInteger")
* ));
*
* // Acquire any processing errors to either write to logs or apply to a downstream dead letter queue such as BigQuery.
* result.getErrors().apply(BigQueryIO.<CsvIOParseError>write()
* .to("project:dataset.table_of_errors")
* .useBeamSchema()
* .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
* .withWriteDisposition(WriteDisposition.WRITE_APPEND));
*
* // Acquire the successful PCollection<SomeDataClass> output.
* PCollection<SomeDataClass> output = result.getOutput();
*
* // Do something with the output such as write to BigQuery.
* output.apply(BigQueryIO.<SomeDataClass>write()
* .to("project:dataset.table_of_output")
* .useBeamSchema()
* .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
* .withWriteDisposition(WriteDisposition.WRITE_APPEND));
* }</pre>
*/
public static <T> CsvIOParse<T> parse(Class<T> klass, CSVFormat csvFormat) {
CsvIOParseHelpers.validateCsvFormat(csvFormat);
SchemaProvider provider = new DefaultSchema.DefaultSchemaProvider();
TypeDescriptor<T> type = TypeDescriptor.of(klass);
Schema schema =
checkStateNotNull(
provider.schemaFor(type),
"Illegal %s: Schema could not be generated from given %s class",
Schema.class,
klass);
CsvIOParseHelpers.validateCsvFormatWithSchema(csvFormat, schema);
SerializableFunction<Row, T> fromRowFn =
checkStateNotNull(
provider.fromRowFunction(type),
"FromRowFn could not be generated from the given %s class",
klass);
SerializableFunction<T, Row> toRowFn =
checkStateNotNull(
provider.toRowFunction(type),
"ToRowFn could not be generated from the given %s class",
klass);
SchemaCoder<T> coder = SchemaCoder.of(schema, type, toRowFn, fromRowFn);
CsvIOParseConfiguration.Builder<T> builder = CsvIOParseConfiguration.builder();
builder.setCsvFormat(csvFormat).setSchema(schema).setCoder(coder).setFromRowFn(fromRowFn);
return CsvIOParse.<T>builder().setConfigBuilder(builder).build();
}

/**
* Instantiates a {@link CsvIOParse} for parsing CSV string records into {@link Row}s from the
* records' assumed <a
* href="https://www.javadoc.io/doc/org.apache.commons/commons-csv/1.8/org/apache/commons/csv/CSVFormat.html">CsvFormat</a>
* and expected {@link Schema}.
*
* <h2>Example usage</h2>
*
* The example below illustrates parsing <a
* href="https://www.javadoc.io/doc/org.apache.commons/commons-csv/1.8/org/apache/commons/csv/CSVFormat.html#DEFAULT">CsvFormat#DEFAULT</a>
* formatted CSV string records, read from {@link TextIO.Read}, into a {@link Row} {@link
* PCollection}.
*
* <pre>{@code
* // Define the expected Schema.
* Schema schema = Schema.of(
* Schema.Field.of("someString", FieldType.STRING),
* Schema.Field.of("someInteger", FieldType.INT32)
* );
*
* // Pipeline example reads CSV string records from Google Cloud storage and writes to BigQuery.
* Pipeline pipeline = Pipeline.create();
*
* // Read CSV records from Google Cloud storage using TextIO.
* PCollection<String> csvRecords = pipeline
* .apply(TextIO.read().from("gs://bucket/folder/*.csv");
*
* // Apply the CSV records PCollection<String> to the CsvIOParse transform instantiated using CsvIO.parseRows.
* CsvIOParseResult<Row> result = csvRecords.apply(CsvIO.parseRow(
* schema,
* CsvFormat.DEFAULT.withHeader("someString", "someInteger")
* ));
*
* // Acquire any processing errors to either write to logs or apply to a downstream dead letter queue such as BigQuery.
* result.getErrors().apply(BigQueryIO.<CsvIOParseError>write()
* .to("project:dataset.table_of_errors")
* .useBeamSchema()
* .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
* .withWriteDisposition(WriteDisposition.WRITE_APPEND));
*
* // Acquire the successful PCollection<Row> output.
* PCollection<Row> output = result.getOutput();
*
* // Do something with the output such as write to BigQuery.
* output.apply(BigQueryIO.<Row>write()
* .to("project:dataset.table_of_output")
* .useBeamSchema()
* .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
* .withWriteDisposition(WriteDisposition.WRITE_APPEND));
* }</pre>
*/
public static CsvIOParse<Row> parseRows(Schema schema, CSVFormat csvFormat) {
CsvIOParseHelpers.validateCsvFormat(csvFormat);
CsvIOParseHelpers.validateCsvFormatWithSchema(csvFormat, schema);
RowCoder coder = RowCoder.of(schema);
CsvIOParseConfiguration.Builder<Row> builder = CsvIOParseConfiguration.builder();
builder.setCsvFormat(csvFormat).setSchema(schema).setCoder(coder).setFromRowFn(row -> row);
return CsvIOParse.<Row>builder().setConfigBuilder(builder).build();
}

/** {@link PTransform} for writing CSV files. */
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>, WriteFilesResult<String>>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.csv;

import com.google.auto.value.AutoValue;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;

/**
* {@link PTransform} for Parsing CSV Record Strings into {@link Schema}-mapped target types. {@link
* CsvIOParse} is not instantiated directly but via {@link CsvIO#parse} or {@link CsvIO#parseRows}.
*/
@AutoValue
public abstract class CsvIOParse<T> extends PTransform<PCollection<String>, CsvIOParseResult<T>> {

final TupleTag<T> outputTag = new TupleTag<T>() {};
final TupleTag<CsvIOParseError> errorTag = new TupleTag<CsvIOParseError>() {};

static <T> CsvIOParse.Builder<T> builder() {
return new AutoValue_CsvIOParse.Builder<>();
}

// TODO(https://github.com/apache/beam/issues/31875): Implement in future PR.
public CsvIOParse<T> withCustomRecordParsing(
Map<String, SerializableFunction<String, Object>> customProcessingMap) {
return this;
}

/** Contains all configuration parameters for {@link CsvIOParse}. */
abstract CsvIOParseConfiguration.Builder<T> getConfigBuilder();

@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setConfigBuilder(CsvIOParseConfiguration.Builder<T> configBuilder);

abstract CsvIOParse<T> build();
}

@Override
public CsvIOParseResult<T> expand(PCollection<String> input) {
CsvIOParseConfiguration<T> configuration = getConfigBuilder().build();

CsvIOStringToCsvRecord stringToCsvRecord =
new CsvIOStringToCsvRecord(configuration.getCsvFormat());
CsvIOParseResult<List<String>> stringToCsvRecordResult = input.apply(stringToCsvRecord);
PCollection<List<String>> stringToRecordOutput = stringToCsvRecordResult.getOutput();
PCollection<CsvIOParseError> stringToRecordErrors = stringToCsvRecordResult.getErrors();

CsvIORecordToObjects<T> recordToObjects = new CsvIORecordToObjects<T>(configuration);
CsvIOParseResult<T> recordToObjectsResult = stringToRecordOutput.apply(recordToObjects);
PCollection<T> output = recordToObjectsResult.getOutput();
PCollection<CsvIOParseError> recordToObjectsErrors = recordToObjectsResult.getErrors();

PCollectionList<CsvIOParseError> errorList =
PCollectionList.of(stringToRecordErrors).and(recordToObjectsErrors);
PCollection<CsvIOParseError> errors = errorList.apply(Flatten.pCollections());

PCollectionTuple result = PCollectionTuple.of(outputTag, output).and(errorTag, errors);
return CsvIOParseResult.of(outputTag, configuration.getCoder(), errorTag, result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void process(@Element String line, MultiOutputReceiver receiver) {
for (CSVRecord record : csvParser.getRecords()) {
receiver.get(outputTag).output(csvRecordtoList(record));
}
} catch (IOException e) {
} catch (RuntimeException | IOException e) {
receiver
.get(errorTag)
.output(
Expand Down
Loading

0 comments on commit 6aac47c

Please sign in to comment.