From b4064aa9683b216b87c1932b7d8ad3b88a03991e Mon Sep 17 00:00:00 2001 From: Hyungrok Ham Date: Thu, 28 Sep 2023 19:59:52 +0900 Subject: [PATCH 1/3] feat: add readFullyAsUTF8String with skipLines --- .../java/org/apache/beam/sdk/io/FileIO.java | 249 +++++++++++++----- 1 file changed, 180 insertions(+), 69 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 2d28279f90b6..8433deed97d6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -83,15 +83,17 @@ import org.slf4j.LoggerFactory; /** - * General-purpose transforms for working with files: listing files (matching), reading and writing. + * General-purpose transforms for working with files: listing files (matching), reading and + * writing. * *

Matching filepatterns

* *

{@link #match} and {@link #matchAll} match filepatterns (respectively either a single - * filepattern or a {@link PCollection} thereof) and return the files that match them as {@link - * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them are in - * {@link MatchConfiguration} and include features such as treatment of filepatterns that don't - * match anything and continuous incremental matching of filepatterns (watching for new files). + * filepattern or a {@link PCollection} thereof) and return the files that match them as + * {@link PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them + * are in {@link MatchConfiguration} and include features such as treatment of filepatterns that + * don't match anything and continuous incremental matching of filepatterns (watching for new + * files). * *

Example: Watching a single filepattern for new files

* @@ -108,9 +110,9 @@ * *

Example: Matching a PCollection of filepatterns arriving from Kafka

* - *

This example reads filepatterns from Kafka and matches each one as it arrives, producing again - * an unbounded {@code PCollection}, and failing in case the filepattern doesn't match - * anything. + *

This example reads filepatterns from Kafka and matches each one as it arrives, producing + * again an unbounded {@code PCollection}, and failing in case the filepattern doesn't + * match anything. * *

{@code
  * PCollection filepatterns = p.apply(KafkaIO.read()...);
@@ -309,9 +311,10 @@
  * }
*/ @SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class FileIO { + private static final Logger LOG = LoggerFactory.getLogger(FileIO.class); /** @@ -319,12 +322,13 @@ public class FileIO { * resources (both files and directories) as {@link MatchResult.Metadata}. * *

By default, matches the filepattern once and produces a bounded {@link PCollection}. To - * continuously watch the filepattern for new matches, use {@link MatchAll#continuously(Duration, - * TerminationCondition)} - this will produce an unbounded {@link PCollection}. + * continuously watch the filepattern for new matches, use + * {@link MatchAll#continuously(Duration, TerminationCondition)} - this will produce an unbounded + * {@link PCollection}. * *

By default, a filepattern matching no resources is treated according to {@link - * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link - * Match#withEmptyMatchTreatment}. + * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use + * {@link Match#withEmptyMatchTreatment}. * *

Returned {@link MatchResult.Metadata} are deduplicated by filename. For example, if this * transform observes a file with the same name several times with different metadata (e.g. @@ -344,8 +348,8 @@ public static Match match() { * multiple filepatterns, it will be produced multiple times. * *

By default, a filepattern matching no resources is treated according to {@link - * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link - * MatchAll#withEmptyMatchTreatment}. + * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use + * {@link MatchAll#withEmptyMatchTreatment}. */ public static MatchAll matchAll() { return new AutoValue_FileIO_MatchAll.Builder() @@ -364,7 +368,9 @@ public static ReadMatches readMatches() { .build(); } - /** Writes elements to files using a {@link Sink}. See class-level documentation. */ + /** + * Writes elements to files using a {@link Sink}. See class-level documentation. + */ public static Write write() { return new AutoValue_FileIO_Write.Builder() .setDynamic(false) @@ -387,8 +393,11 @@ public static Write writeDynamic() { .build(); } - /** A utility class for accessing a potentially compressed file. */ + /** + * A utility class for accessing a potentially compressed file. + */ public static final class ReadableFile { + private final MatchResult.Metadata metadata; private final Compression compression; @@ -397,12 +406,16 @@ public static final class ReadableFile { this.compression = compression; } - /** Returns the {@link MatchResult.Metadata} of the file. */ + /** + * Returns the {@link MatchResult.Metadata} of the file. + */ public MatchResult.Metadata getMetadata() { return metadata; } - /** Returns the method with which this file will be decompressed in {@link #open}. */ + /** + * Returns the method with which this file will be decompressed in {@link #open}. + */ public Compression getCompression() { return compression; } @@ -427,16 +440,36 @@ public SeekableByteChannel openSeekable() throws IOException { return (SeekableByteChannel) open(); } - /** Returns the full contents of the file as bytes. */ - public byte[] readFullyAsBytes() throws IOException { + /** + * Returns the contents of the file as bytes with skipping {@code skipLines}. + */ + public byte[] readFullyAsBytes(int skipLines) throws IOException { try (InputStream stream = Channels.newInputStream(open())) { + int count = 0; + int r; + while ((count < skipLines) && ((r = stream.read()) != -1)) { + if (r == '\n') { + count++; + } + } + return StreamUtils.getBytesWithoutClosing(stream); } } - /** Returns the full contents of the file as a {@link String} decoded as UTF-8. */ + /** + * Returns the full contents of the file as a {@link String} decoded as UTF-8. + */ public String readFullyAsUTF8String() throws IOException { - return new String(readFullyAsBytes(), StandardCharsets.UTF_8); + return new String(readFullyAsBytes(0), StandardCharsets.UTF_8); + } + + /** + * Returns the contents of the file as {@link String} decoded as UTF-8 with skipping + * {@code skipLines}. + */ + public String readFullyAsUTF8String(int skipLines) throws IOException { + return new String(readFullyAsBytes(skipLines), StandardCharsets.UTF_8); } @Override @@ -468,7 +501,10 @@ public int hashCode() { */ @AutoValue public abstract static class MatchConfiguration implements HasDisplayData, Serializable { - /** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */ + + /** + * Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. + */ public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) { return new AutoValue_FileIO_MatchConfiguration.Builder() .setEmptyMatchTreatment(emptyMatchTreatment) @@ -488,6 +524,7 @@ public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) @AutoValue.Builder abstract static class Builder { + abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment); abstract Builder setMatchUpdatedFiles(boolean matchUpdatedFiles); @@ -499,7 +536,9 @@ abstract static class Builder { abstract MatchConfiguration build(); } - /** Sets the {@link EmptyMatchTreatment}. */ + /** + * Sets the {@link EmptyMatchTreatment}. + */ public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) { return toBuilder().setEmptyMatchTreatment(treatment).build(); } @@ -509,8 +548,8 @@ public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) * condition is reached, where the input to the condition is the filepattern. * *

If {@code matchUpdatedFiles} is set, also watches for files with timestamp change, with - * the watching frequency given by the {@code interval}. The pipeline will throw a {@code - * RuntimeError} if timestamp extraction for the matched file has failed, suggesting the + * the watching frequency given by the {@code interval}. The pipeline will throw a + * {@code RuntimeError} if timestamp extraction for the matched file has failed, suggesting the * timestamp metadata is not available with the IO connector. * *

Matching continuously scales poorly, as it is stateful, and requires storing file ids in @@ -558,7 +597,9 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - /** Implementation of {@link #match}. */ + /** + * Implementation of {@link #match}. + */ @AutoValue public abstract static class Match extends PTransform> { @@ -570,6 +611,7 @@ public abstract static class Match extends PTransform filepattern); abstract Builder setConfiguration(MatchConfiguration configuration); @@ -577,22 +619,30 @@ abstract static class Builder { abstract Match build(); } - /** Matches the given filepattern. */ + /** + * Matches the given filepattern. + */ public Match filepattern(String filepattern) { return this.filepattern(StaticValueProvider.of(filepattern)); } - /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */ + /** + * Like {@link #filepattern(String)} but using a {@link ValueProvider}. + */ public Match filepattern(ValueProvider filepattern) { return toBuilder().setFilepattern(filepattern).build(); } - /** Sets the {@link MatchConfiguration}. */ + /** + * Sets the {@link MatchConfiguration}. + */ public Match withConfiguration(MatchConfiguration configuration) { return toBuilder().setConfiguration(configuration).build(); } - /** See {@link MatchConfiguration#withEmptyMatchTreatment(EmptyMatchTreatment)}. */ + /** + * See {@link MatchConfiguration#withEmptyMatchTreatment(EmptyMatchTreatment)}. + */ public Match withEmptyMatchTreatment(EmptyMatchTreatment treatment) { return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment)); } @@ -639,32 +689,42 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - /** Implementation of {@link #matchAll}. */ + /** + * Implementation of {@link #matchAll}. + */ @AutoValue public abstract static class MatchAll extends PTransform, PCollection> { + abstract MatchConfiguration getConfiguration(); abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { + abstract Builder setConfiguration(MatchConfiguration configuration); abstract MatchAll build(); } - /** Like {@link Match#withConfiguration}. */ + /** + * Like {@link Match#withConfiguration}. + */ public MatchAll withConfiguration(MatchConfiguration configuration) { return toBuilder().setConfiguration(configuration).build(); } - /** Like {@link Match#withEmptyMatchTreatment}. */ + /** + * Like {@link Match#withEmptyMatchTreatment}. + */ public MatchAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) { return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment)); } - /** Like {@link Match#continuously(Duration, TerminationCondition, boolean)}. */ + /** + * Like {@link Match#continuously(Duration, TerminationCondition, boolean)}. + */ public MatchAll continuously( Duration pollInterval, TerminationCondition terminationCondition, @@ -673,7 +733,9 @@ public MatchAll continuously( getConfiguration().continuously(pollInterval, terminationCondition, matchUpdatedFiles)); } - /** Like {@link Match#continuously(Duration, TerminationCondition)}. */ + /** + * Like {@link Match#continuously(Duration, TerminationCondition)}. + */ public MatchAll continuously( Duration pollInterval, TerminationCondition terminationCondition) { return continuously(pollInterval, terminationCondition, false); @@ -707,7 +769,9 @@ public void populateDisplayData(DisplayData.Builder builder) { builder.include("configuration", getConfiguration()); } - /** Helper function creating a watch transform based on outputKeyFn. */ + /** + * Helper function creating a watch transform based on outputKeyFn. + */ private Watch.Growth createWatchTransform( SerializableFunction outputKeyFn) { return Watch.growthOf(Contextful.of(new MatchPollFn(), Requirements.empty()), outputKeyFn) @@ -716,6 +780,7 @@ private Watch.Growth createWatchTrans } private static class MatchFn extends DoFn { + private final EmptyMatchTreatment emptyMatchTreatment; public MatchFn(EmptyMatchTreatment emptyMatchTreatment) { @@ -734,6 +799,7 @@ public void process(ProcessContext c) throws Exception { } private static class MatchPollFn extends PollFn { + @Override public Watch.Growth.PollResult apply(String element, Context c) throws Exception { @@ -746,6 +812,7 @@ public Watch.Growth.PollResult apply(String element, Conte private static class ExtractFilenameFn implements SerializableFunction { + @Override public String apply(MatchResult.Metadata input) { return input.resourceId().toString(); @@ -754,6 +821,7 @@ public String apply(MatchResult.Metadata input) { private static class ExtractFilenameAndLastUpdateFn implements SerializableFunction> { + @Override public KV apply(MatchResult.Metadata input) throws RuntimeException { long timestamp = input.lastModifiedMillis(); @@ -765,11 +833,16 @@ public KV apply(MatchResult.Metadata input) throws RuntimeExceptio } } - /** Implementation of {@link #readMatches}. */ + /** + * Implementation of {@link #readMatches}. + */ @AutoValue public abstract static class ReadMatches extends PTransform, PCollection> { - /** Enum to control how directories are handled. */ + + /** + * Enum to control how directories are handled. + */ public enum DirectoryTreatment { SKIP, PROHIBIT @@ -783,6 +856,7 @@ public enum DirectoryTreatment { @AutoValue.Builder abstract static class Builder { + abstract Builder setCompression(Compression compression); abstract Builder setDirectoryTreatment(DirectoryTreatment directoryTreatment); @@ -790,15 +864,17 @@ abstract static class Builder { abstract ReadMatches build(); } - /** Reads files using the given {@link Compression}. Default is {@link Compression#AUTO}. */ + /** + * Reads files using the given {@link Compression}. Default is {@link Compression#AUTO}. + */ public ReadMatches withCompression(Compression compression) { checkArgument(compression != null, "compression can not be null"); return toBuilder().setCompression(compression).build(); } /** - * Controls how to handle directories in the input {@link PCollection}. Default is {@link - * DirectoryTreatment#SKIP}. + * Controls how to handle directories in the input {@link PCollection}. Default is + * {@link DirectoryTreatment#SKIP}. */ public ReadMatches withDirectoryTreatment(DirectoryTreatment directoryTreatment) { checkArgument(directoryTreatment != null, "directoryTreatment can not be null"); @@ -818,10 +894,10 @@ public void populateDisplayData(DisplayData.Builder builder) { /** * @return True if metadata is a directory and directory Treatment is SKIP. - * @throws java.lang.IllegalArgumentException if metadata is a directory and directoryTreatment - * is Prohibited. + * @throws java.lang.IllegalArgumentException if metadata is a directory and + * directoryTreatment is Prohibited. * @throws java.lang.UnsupportedOperationException if metadata is a directory and - * directoryTreatment is not SKIP or PROHIBIT. + * directoryTreatment is not SKIP or PROHIBIT. */ static boolean shouldSkipDirectory( MatchResult.Metadata metadata, DirectoryTreatment directoryTreatment) { @@ -843,8 +919,8 @@ static boolean shouldSkipDirectory( } /** - * Converts metadata to readableFile. Make sure {@link - * #shouldSkipDirectory(org.apache.beam.sdk.io.fs.MatchResult.Metadata, + * Converts metadata to readableFile. Make sure + * {@link #shouldSkipDirectory(org.apache.beam.sdk.io.fs.MatchResult.Metadata, * org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment)} returns false before using. */ static ReadableFile matchToReadableFile( @@ -866,6 +942,7 @@ static ReadableFile matchToReadableFile( } private static class ToReadableFileFn extends DoFn { + private final ReadMatches spec; private ToReadableFileFn(ReadMatches spec) { @@ -884,17 +961,21 @@ public void process(ProcessContext c) { } /** - * Specifies how to write elements to individual files in {@link FileIO#write} and {@link - * FileIO#writeDynamic}. A new instance of {@link Sink} is created for every file being written. + * Specifies how to write elements to individual files in {@link FileIO#write} and + * {@link FileIO#writeDynamic}. A new instance of {@link Sink} is created for every file being + * written. */ public interface Sink extends Serializable { + /** * Initializes writing to the given channel. Will be invoked once on a given {@link Sink} * instance. */ void open(WritableByteChannel channel) throws IOException; - /** Appends a single element to the file. May be invoked zero or more times. */ + /** + * Appends a single element to the file. May be invoked zero or more times. + */ void write(ElementT element) throws IOException; /** @@ -904,12 +985,18 @@ public interface Sink extends Serializable { void flush() throws IOException; } - /** Implementation of {@link #write} and {@link #writeDynamic}. */ + /** + * Implementation of {@link #write} and {@link #writeDynamic}. + */ @AutoValue public abstract static class Write extends PTransform, WriteFilesResult> { - /** A policy for generating names for shard files. */ + + /** + * A policy for generating names for shard files. + */ public interface FileNaming extends Serializable { + /** * Generates the filename. MUST use each argument and return different values for each * combination of the arguments. @@ -929,7 +1016,8 @@ public static FileNaming defaultNaming(final String prefix, final String suffix) /** * Defines a default {@link FileNaming} which will use the prefix and suffix supplied to create * a name based on the window, pane, number of shards, shard index, and compression. Removes - * window when in the {@link GlobalWindow} and pane info when it is the only firing of the pane. + * window when in the {@link GlobalWindow} and pane info when it is the only firing of the + * pane. */ public static FileNaming defaultNaming( final ValueProvider prefix, final ValueProvider suffix) { @@ -1020,6 +1108,7 @@ public static FileNaming relativeFileNaming( @AutoValue.Builder abstract static class Builder { + abstract Builder setDynamic(boolean dynamic); abstract Builder setSinkFn(Contextful>> sink); @@ -1065,13 +1154,17 @@ abstract Builder setSharding( abstract Write build(); } - /** Specifies how to partition elements into groups ("destinations"). */ + /** + * Specifies how to partition elements into groups ("destinations"). + */ public Write by(SerializableFunction destinationFn) { checkArgument(destinationFn != null, "destinationFn can not be null"); return by(fn(destinationFn)); } - /** Like {@link #by}, but with access to context such as side inputs. */ + /** + * Like {@link #by}, but with access to context such as side inputs. + */ public Write by(Contextful> destinationFn) { checkArgument(destinationFn != null, "destinationFn can not be null"); return toBuilder().setDestinationFn(destinationFn).build(); @@ -1090,7 +1183,9 @@ public Write via( return toBuilder().setSinkFn((Contextful) sinkFn).setOutputFn(outputFn).build(); } - /** Like {@link #via(Contextful, Contextful)}, but uses the same sink for all destinations. */ + /** + * Like {@link #via(Contextful, Contextful)}, but uses the same sink for all destinations. + */ public Write via( Contextful> outputFn, final Sink sink) { checkArgument(sink != null, "sink can not be null"); @@ -1111,7 +1206,9 @@ public Write via(Contextful>> .build(); } - /** Like {@link #via(Contextful)}, but uses the same {@link Sink} for all destinations. */ + /** + * Like {@link #via(Contextful)}, but uses the same {@link Sink} for all destinations. + */ public Write via(Sink sink) { checkArgument(sink != null, "sink can not be null"); return via(fn(SerializableFunctions.clonesOf(sink))); @@ -1119,15 +1216,17 @@ public Write via(Sink sink) { /** * Specifies a common directory for all generated files. A temporary generated sub-directory of - * this directory will be used as the temp directory, unless overridden by {@link - * #withTempDirectory}. + * this directory will be used as the temp directory, unless overridden by + * {@link #withTempDirectory}. */ public Write to(String directory) { checkArgument(directory != null, "directory can not be null"); return to(StaticValueProvider.of(directory)); } - /** Like {@link #to(String)} but with a {@link ValueProvider}. */ + /** + * Like {@link #to(String)} but with a {@link ValueProvider}. + */ public Write to(ValueProvider directory) { checkArgument(directory != null, "directory can not be null"); return toBuilder().setOutputDirectory(directory).build(); @@ -1142,7 +1241,9 @@ public Write withPrefix(String prefix) { return withPrefix(StaticValueProvider.of(prefix)); } - /** Like {@link #withPrefix(String)} but with a {@link ValueProvider}. */ + /** + * Like {@link #withPrefix(String)} but with a {@link ValueProvider}. + */ public Write withPrefix(ValueProvider prefix) { checkArgument(prefix != null, "prefix can not be null"); return toBuilder().setFilenamePrefix(prefix).build(); @@ -1157,7 +1258,9 @@ public Write withSuffix(String suffix) { return withSuffix(StaticValueProvider.of(suffix)); } - /** Like {@link #withSuffix(String)} but with a {@link ValueProvider}. */ + /** + * Like {@link #withSuffix(String)} but with a {@link ValueProvider}. + */ public Write withSuffix(ValueProvider suffix) { checkArgument(suffix != null, "suffix can not be null"); return toBuilder().setFilenameSuffix(suffix).build(); @@ -1200,13 +1303,17 @@ public Write withNaming( return toBuilder().setFileNamingFn(namingFn).build(); } - /** Specifies a directory into which all temporary files will be placed. */ + /** + * Specifies a directory into which all temporary files will be placed. + */ public Write withTempDirectory(String tempDirectory) { checkArgument(tempDirectory != null, "tempDirectory can not be null"); return withTempDirectory(StaticValueProvider.of(tempDirectory)); } - /** Like {@link #withTempDirectory(String)}. */ + /** + * Like {@link #withTempDirectory(String)}. + */ public Write withTempDirectory(ValueProvider tempDirectory) { checkArgument(tempDirectory != null, "tempDirectory can not be null"); return toBuilder().setTempDirectory(tempDirectory).build(); @@ -1233,8 +1340,8 @@ public Write withEmptyGlobalWindowDestination( } /** - * Specifies a {@link Coder} for the destination type, if it can not be inferred from {@link - * #by}. + * Specifies a {@link Coder} for the destination type, if it can not be inferred from + * {@link #by}. */ public Write withDestinationCoder(Coder destinationCoder) { checkArgument(destinationCoder != null, "destinationCoder can not be null"); @@ -1276,14 +1383,16 @@ public Write withSharding( * window with the default trigger. * * @deprecated Avoid usage of this method: its effects are complex and it will be removed in - * future versions of Beam. Right now it exists for compatibility with {@link WriteFiles}. + * future versions of Beam. Right now it exists for compatibility with {@link WriteFiles}. */ @Deprecated public Write withIgnoreWindowing() { return toBuilder().setIgnoreWindowing(true).build(); } - /** See {@link WriteFiles#withNoSpilling()}. */ + /** + * See {@link WriteFiles#withNoSpilling()}. + */ public Write withNoSpilling() { return toBuilder().setNoSpilling(true).build(); } @@ -1418,6 +1527,7 @@ private Collection> getAllSideInputs() { private static class ViaFileBasedSink extends FileBasedSink { + private final Write spec; private ViaFileBasedSink(Write spec) { @@ -1472,6 +1582,7 @@ protected void finishWrite() throws Exception { private static class DynamicDestinationsAdapter extends DynamicDestinations { + private final Write spec; private transient Fn.@Nullable Context context; From b44e4124258e3a4ad9292cc9b2887c9a1158b1f6 Mon Sep 17 00:00:00 2001 From: Hyungrok Ham Date: Fri, 29 Sep 2023 04:37:23 +0900 Subject: [PATCH 2/3] test: add parameterized FileIOReadWithSkipLinesTest --- .../org/apache/beam/sdk/io/FileIOTest.java | 975 ++++++++++-------- 1 file changed, 540 insertions(+), 435 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java index b87c9caa1244..38f88fbdf222 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -43,6 +43,7 @@ import java.util.zip.GZIPOutputStream; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.FileIO.ReadableFile; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -70,481 +71,585 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.experimental.runners.Enclosed; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; -/** Tests for {@link FileIO}. */ -@RunWith(JUnit4.class) +/** + * Tests for {@link FileIO}. + */ +@RunWith(Enclosed.class) public class FileIOTest implements Serializable { - @Rule public transient TestPipeline p = TestPipeline.create(); - - @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Rule public transient ExpectedException thrown = ExpectedException.none(); - - @Rule public transient Timeout globalTimeout = Timeout.seconds(1200); - - @Test - @Category(NeedsRunner.class) - public void testMatchAndMatchAll() throws IOException { - Path firstPath = tmpFolder.newFile("first").toPath(); - Path secondPath = tmpFolder.newFile("second").toPath(); - int firstSize = 37; - int secondSize = 42; - long firstModified = 1541097000L; - long secondModified = 1541098000L; - Files.write(firstPath, new byte[firstSize]); - Files.write(secondPath, new byte[secondSize]); - Files.setLastModifiedTime(firstPath, FileTime.fromMillis(firstModified)); - Files.setLastModifiedTime(secondPath, FileTime.fromMillis(secondModified)); - MatchResult.Metadata firstMetadata = metadata(firstPath, firstSize, firstModified); - MatchResult.Metadata secondMetadata = metadata(secondPath, secondSize, secondModified); - - PAssert.that( - p.apply( - "Match existing", - FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*"))) - .containsInAnyOrder(firstMetadata, secondMetadata); - PAssert.that( - p.apply( - "Match existing with provider", - FileIO.match() - .filepattern(p.newProvider(tmpFolder.getRoot().getAbsolutePath() + "/*")))) - .containsInAnyOrder(firstMetadata, secondMetadata); - PAssert.that( - p.apply("Create existing", Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*")) - .apply("MatchAll existing", FileIO.matchAll())) - .containsInAnyOrder(firstMetadata, secondMetadata); - - PAssert.that( - p.apply( - "Match non-existing ALLOW", - FileIO.match() - .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah") - .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))) - .empty(); - PAssert.that( - p.apply( - "Create non-existing", - Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah")) - .apply( - "MatchAll non-existing ALLOW", - FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))) - .empty(); - - PAssert.that( - p.apply( - "Match non-existing ALLOW_IF_WILDCARD", - FileIO.match() - .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah*") - .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD))) - .empty(); - PAssert.that( - p.apply( - "Create non-existing wildcard + explicit", - Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah*")) - .apply( - "MatchAll non-existing ALLOW_IF_WILDCARD", - FileIO.matchAll() - .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD))) - .empty(); - PAssert.that( - p.apply( - "Create non-existing wildcard + default", - Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah*")) - .apply("MatchAll non-existing default", FileIO.matchAll())) - .empty(); - - p.run(); - } - @Test - @Category(NeedsRunner.class) - public void testMatchDisallowEmptyDefault() throws IOException { - p.apply("Match", FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*")); + @RunWith(JUnit4.class) + public static class FileIOMatchTest { + + @Rule + public transient TestPipeline p = TestPipeline.create(); + + @Rule + public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + @Rule + public transient Timeout globalTimeout = Timeout.seconds(1200); + + @Test + @Category(NeedsRunner.class) + public void testMatchAndMatchAll() throws IOException { + Path firstPath = tmpFolder.newFile("first").toPath(); + Path secondPath = tmpFolder.newFile("second").toPath(); + int firstSize = 37; + int secondSize = 42; + long firstModified = 1541097000L; + long secondModified = 1541098000L; + Files.write(firstPath, new byte[firstSize]); + Files.write(secondPath, new byte[secondSize]); + Files.setLastModifiedTime(firstPath, FileTime.fromMillis(firstModified)); + Files.setLastModifiedTime(secondPath, FileTime.fromMillis(secondModified)); + MatchResult.Metadata firstMetadata = metadata(firstPath, firstSize, firstModified); + MatchResult.Metadata secondMetadata = metadata(secondPath, secondSize, secondModified); + + PAssert.that( + p.apply( + "Match existing", + FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*"))) + .containsInAnyOrder(firstMetadata, secondMetadata); + PAssert.that( + p.apply( + "Match existing with provider", + FileIO.match() + .filepattern(p.newProvider(tmpFolder.getRoot().getAbsolutePath() + "/*")))) + .containsInAnyOrder(firstMetadata, secondMetadata); + PAssert.that( + p.apply("Create existing", Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*")) + .apply("MatchAll existing", FileIO.matchAll())) + .containsInAnyOrder(firstMetadata, secondMetadata); + + PAssert.that( + p.apply( + "Match non-existing ALLOW", + FileIO.match() + .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah") + .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))) + .empty(); + PAssert.that( + p.apply( + "Create non-existing", + Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah")) + .apply( + "MatchAll non-existing ALLOW", + FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))) + .empty(); + + PAssert.that( + p.apply( + "Match non-existing ALLOW_IF_WILDCARD", + FileIO.match() + .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah*") + .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD))) + .empty(); + PAssert.that( + p.apply( + "Create non-existing wildcard + explicit", + Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah*")) + .apply( + "MatchAll non-existing ALLOW_IF_WILDCARD", + FileIO.matchAll() + .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD))) + .empty(); + PAssert.that( + p.apply( + "Create non-existing wildcard + default", + Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah*")) + .apply("MatchAll non-existing default", FileIO.matchAll())) + .empty(); + + p.run(); + } - thrown.expectCause(isA(FileNotFoundException.class)); - p.run(); - } + @Test + @Category(NeedsRunner.class) + public void testMatchDisallowEmptyDefault() throws IOException { + p.apply("Match", FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*")); - @Test - @Category(NeedsRunner.class) - public void testMatchDisallowEmptyExplicit() throws IOException { - p.apply( - FileIO.match() - .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*") - .withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)); + thrown.expectCause(isA(FileNotFoundException.class)); + p.run(); + } - thrown.expectCause(isA(FileNotFoundException.class)); - p.run(); - } + @Test + @Category(NeedsRunner.class) + public void testMatchDisallowEmptyExplicit() throws IOException { + p.apply( + FileIO.match() + .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*") + .withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)); - @Test - @Category(NeedsRunner.class) - public void testMatchDisallowEmptyNonWildcard() throws IOException { - p.apply( - FileIO.match() - .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah") - .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)); + thrown.expectCause(isA(FileNotFoundException.class)); + p.run(); + } - thrown.expectCause(isA(FileNotFoundException.class)); - p.run(); - } + @Test + @Category(NeedsRunner.class) + public void testMatchDisallowEmptyNonWildcard() throws IOException { + p.apply( + FileIO.match() + .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah") + .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)); - @Test - @Category(NeedsRunner.class) - public void testMatchAllDisallowEmptyExplicit() throws IOException { - p.apply(Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*")) - .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)); - thrown.expectCause(isA(FileNotFoundException.class)); - p.run(); - } + thrown.expectCause(isA(FileNotFoundException.class)); + p.run(); + } - @Test - @Category(NeedsRunner.class) - public void testMatchAllDisallowEmptyNonWildcard() throws IOException { - p.apply(Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah")) - .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)); - thrown.expectCause(isA(FileNotFoundException.class)); - p.run(); - } + @Test + @Category(NeedsRunner.class) + public void testMatchAllDisallowEmptyExplicit() throws IOException { + p.apply(Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*")) + .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)); + thrown.expectCause(isA(FileNotFoundException.class)); + p.run(); + } - /** DoFn that copy test files from source to watch path. */ - private static class CopyFilesFn - extends DoFn, MatchResult.Metadata> { - public CopyFilesFn(Path sourcePath, Path watchPath) { - this.sourcePathStr = sourcePath.toString(); - this.watchPathStr = watchPath.toString(); + @Test + @Category(NeedsRunner.class) + public void testMatchAllDisallowEmptyNonWildcard() throws IOException { + p.apply(Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah")) + .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)); + thrown.expectCause(isA(FileNotFoundException.class)); + p.run(); } - @StateId("count") - @SuppressWarnings("unused") - private final StateSpec> countSpec = StateSpecs.value(VarIntCoder.of()); - - @ProcessElement - public void processElement(ProcessContext context, @StateId("count") ValueState count) - throws IOException, InterruptedException { - int current = firstNonNull(count.read(), 0); - // unpack value as output - context.output(Objects.requireNonNull(context.element()).getValue()); - - CopyOption[] cpOptions = {StandardCopyOption.COPY_ATTRIBUTES}; - CopyOption[] updOptions = {StandardCopyOption.REPLACE_EXISTING}; - final Path sourcePath = Paths.get(sourcePathStr); - final Path watchPath = Paths.get(watchPathStr); - - if (0 == current) { - Thread.sleep(100); - Files.copy(sourcePath.resolve("first"), watchPath.resolve("first"), updOptions); - Files.copy(sourcePath.resolve("second"), watchPath.resolve("second"), cpOptions); - } else if (1 == current) { - Thread.sleep(100); - Files.copy(sourcePath.resolve("first"), watchPath.resolve("first"), updOptions); - Files.copy(sourcePath.resolve("second"), watchPath.resolve("second"), updOptions); - Files.copy(sourcePath.resolve("third"), watchPath.resolve("third"), cpOptions); + /** + * DoFn that copy test files from source to watch path. + */ + private static class CopyFilesFn + extends DoFn, MatchResult.Metadata> { + + public CopyFilesFn(Path sourcePath, Path watchPath) { + this.sourcePathStr = sourcePath.toString(); + this.watchPathStr = watchPath.toString(); } - count.write(current + 1); + + @StateId("count") + @SuppressWarnings("unused") + private final StateSpec> countSpec = StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement(ProcessContext context, + @StateId("count") ValueState count) + throws IOException, InterruptedException { + int current = firstNonNull(count.read(), 0); + // unpack value as output + context.output(Objects.requireNonNull(context.element()).getValue()); + + CopyOption[] cpOptions = {StandardCopyOption.COPY_ATTRIBUTES}; + CopyOption[] updOptions = {StandardCopyOption.REPLACE_EXISTING}; + final Path sourcePath = Paths.get(sourcePathStr); + final Path watchPath = Paths.get(watchPathStr); + + if (0 == current) { + Thread.sleep(100); + Files.copy(sourcePath.resolve("first"), watchPath.resolve("first"), updOptions); + Files.copy(sourcePath.resolve("second"), watchPath.resolve("second"), cpOptions); + } else if (1 == current) { + Thread.sleep(100); + Files.copy(sourcePath.resolve("first"), watchPath.resolve("first"), updOptions); + Files.copy(sourcePath.resolve("second"), watchPath.resolve("second"), updOptions); + Files.copy(sourcePath.resolve("third"), watchPath.resolve("third"), cpOptions); + } + count.write(current + 1); + } + + // Member variables need to be serializable. + private final String sourcePathStr; + private final String watchPathStr; } - // Member variables need to be serializable. - private final String sourcePathStr; - private final String watchPathStr; - } + @Test + @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class}) + public void testMatchWatchForNewFiles() throws IOException, InterruptedException { + // Write some files to a "source" directory. + final Path sourcePath = tmpFolder.getRoot().toPath().resolve("source"); + sourcePath.toFile().mkdir(); + Files.write(sourcePath.resolve("first"), new byte[42]); + Files.write(sourcePath.resolve("second"), new byte[37]); + Files.write(sourcePath.resolve("third"), new byte[99]); + + // Create a "watch" directory that the pipeline will copy files into. + final Path watchPath = tmpFolder.getRoot().toPath().resolve("watch"); + watchPath.toFile().mkdir(); + PCollection matchMetadata = + p.apply( + "match filename through match", + FileIO.match() + .filepattern(watchPath.resolve("*").toString()) + .continuously( + Duration.millis(100), + Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)))); + PCollection matchAllMetadata = + p.apply("create for matchAll new files", Create.of(watchPath.resolve("*").toString())) + .apply( + "match filename through matchAll", + FileIO.matchAll() + .continuously( + Duration.millis(100), + Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)))); + PCollection matchUpdatedMetadata = + p.apply( + "match updated", + FileIO.match() + .filepattern(watchPath.resolve("first").toString()) + .continuously( + Duration.millis(100), + Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)), + true)); + PCollection matchAllUpdatedMetadata = + p.apply("create for matchAll updated files", Create.of(watchPath.resolve("*").toString())) + .apply( + "matchAll updated", + FileIO.matchAll() + .continuously( + Duration.millis(100), + Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)), + true)); + + // write one file at the beginning. This will trigger the first output for matchAll + Files.copy( + sourcePath.resolve("first"), + watchPath.resolve("first"), + new StandardCopyOption[]{StandardCopyOption.COPY_ATTRIBUTES}); + + // using matchMetadata outputs to trigger file copy on-the-fly + matchMetadata = + matchMetadata + .apply( + MapElements.into( + TypeDescriptors.kvs( + TypeDescriptors.strings(), + TypeDescriptor.of(MatchResult.Metadata.class))) + .via((metadata) -> KV.of("dumb key", metadata))) + .apply(ParDo.of(new CopyFilesFn(sourcePath, watchPath))); + + assertEquals(PCollection.IsBounded.UNBOUNDED, matchMetadata.isBounded()); + assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllMetadata.isBounded()); + assertEquals(PCollection.IsBounded.UNBOUNDED, matchUpdatedMetadata.isBounded()); + assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllUpdatedMetadata.isBounded()); + + // We fetch lastModifiedTime from the files in the "source" directory to avoid a race condition + // with the writer thread. + List expectedMatchNew = + Arrays.asList( + metadata( + watchPath.resolve("first"), 42, lastModifiedMillis(sourcePath.resolve("first"))), + metadata( + watchPath.resolve("second"), 37, + lastModifiedMillis(sourcePath.resolve("second"))), + metadata( + watchPath.resolve("third"), 99, lastModifiedMillis(sourcePath.resolve("third")))); + PAssert.that(matchMetadata).containsInAnyOrder(expectedMatchNew); + PAssert.that(matchAllMetadata).containsInAnyOrder(expectedMatchNew); + + List expectedMatchUpdated = Arrays.asList("first", "first", "first"); + PCollection matchUpdatedCount = + matchUpdatedMetadata.apply( + "pick up match file name", + MapElements.into(TypeDescriptors.strings()) + .via((metadata) -> metadata.resourceId().getFilename())); + PAssert.that(matchUpdatedCount).containsInAnyOrder(expectedMatchUpdated); + + // Check watch for file updates. Compare only filename since modified time of copied files are + // uncontrolled. + List expectedMatchAllUpdated = + Arrays.asList("first", "first", "first", "second", "second", "third"); + PCollection matchAllUpdatedCount = + matchAllUpdatedMetadata.apply( + "pick up matchAll file names", + MapElements.into(TypeDescriptors.strings()) + .via((metadata) -> metadata.resourceId().getFilename())); + PAssert.that(matchAllUpdatedCount).containsInAnyOrder(expectedMatchAllUpdated); + + p.run(); + } + + private static MatchResult.Metadata metadata(Path path, int size, long lastModifiedMillis) { + return MatchResult.Metadata.builder() + .setResourceId(FileSystems.matchNewResource(path.toString(), false /* isDirectory */)) + .setIsReadSeekEfficient(true) + .setSizeBytes(size) + .setLastModifiedMillis(lastModifiedMillis) + .build(); + } - @Test - @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class}) - public void testMatchWatchForNewFiles() throws IOException, InterruptedException { - // Write some files to a "source" directory. - final Path sourcePath = tmpFolder.getRoot().toPath().resolve("source"); - sourcePath.toFile().mkdir(); - Files.write(sourcePath.resolve("first"), new byte[42]); - Files.write(sourcePath.resolve("second"), new byte[37]); - Files.write(sourcePath.resolve("third"), new byte[99]); - - // Create a "watch" directory that the pipeline will copy files into. - final Path watchPath = tmpFolder.getRoot().toPath().resolve("watch"); - watchPath.toFile().mkdir(); - PCollection matchMetadata = - p.apply( - "match filename through match", - FileIO.match() - .filepattern(watchPath.resolve("*").toString()) - .continuously( - Duration.millis(100), - Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)))); - PCollection matchAllMetadata = - p.apply("create for matchAll new files", Create.of(watchPath.resolve("*").toString())) - .apply( - "match filename through matchAll", - FileIO.matchAll() - .continuously( - Duration.millis(100), - Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)))); - PCollection matchUpdatedMetadata = - p.apply( - "match updated", - FileIO.match() - .filepattern(watchPath.resolve("first").toString()) - .continuously( - Duration.millis(100), - Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)), - true)); - PCollection matchAllUpdatedMetadata = - p.apply("create for matchAll updated files", Create.of(watchPath.resolve("*").toString())) - .apply( - "matchAll updated", - FileIO.matchAll() - .continuously( - Duration.millis(100), - Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)), - true)); - - // write one file at the beginning. This will trigger the first output for matchAll - Files.copy( - sourcePath.resolve("first"), - watchPath.resolve("first"), - new StandardCopyOption[] {StandardCopyOption.COPY_ATTRIBUTES}); - - // using matchMetadata outputs to trigger file copy on-the-fly - matchMetadata = - matchMetadata - .apply( - MapElements.into( - TypeDescriptors.kvs( - TypeDescriptors.strings(), - TypeDescriptor.of(MatchResult.Metadata.class))) - .via((metadata) -> KV.of("dumb key", metadata))) - .apply(ParDo.of(new CopyFilesFn(sourcePath, watchPath))); - - assertEquals(PCollection.IsBounded.UNBOUNDED, matchMetadata.isBounded()); - assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllMetadata.isBounded()); - assertEquals(PCollection.IsBounded.UNBOUNDED, matchUpdatedMetadata.isBounded()); - assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllUpdatedMetadata.isBounded()); - - // We fetch lastModifiedTime from the files in the "source" directory to avoid a race condition - // with the writer thread. - List expectedMatchNew = - Arrays.asList( - metadata( - watchPath.resolve("first"), 42, lastModifiedMillis(sourcePath.resolve("first"))), - metadata( - watchPath.resolve("second"), 37, lastModifiedMillis(sourcePath.resolve("second"))), - metadata( - watchPath.resolve("third"), 99, lastModifiedMillis(sourcePath.resolve("third")))); - PAssert.that(matchMetadata).containsInAnyOrder(expectedMatchNew); - PAssert.that(matchAllMetadata).containsInAnyOrder(expectedMatchNew); - - List expectedMatchUpdated = Arrays.asList("first", "first", "first"); - PCollection matchUpdatedCount = - matchUpdatedMetadata.apply( - "pick up match file name", - MapElements.into(TypeDescriptors.strings()) - .via((metadata) -> metadata.resourceId().getFilename())); - PAssert.that(matchUpdatedCount).containsInAnyOrder(expectedMatchUpdated); - - // Check watch for file updates. Compare only filename since modified time of copied files are - // uncontrolled. - List expectedMatchAllUpdated = - Arrays.asList("first", "first", "first", "second", "second", "third"); - PCollection matchAllUpdatedCount = - matchAllUpdatedMetadata.apply( - "pick up matchAll file names", - MapElements.into(TypeDescriptors.strings()) - .via((metadata) -> metadata.resourceId().getFilename())); - PAssert.that(matchAllUpdatedCount).containsInAnyOrder(expectedMatchAllUpdated); - - p.run(); + private static long lastModifiedMillis(Path path) throws IOException { + return Files.getLastModifiedTime(path).toMillis(); + } } - @Test - @Category(NeedsRunner.class) - public void testRead() throws IOException { - final String path = tmpFolder.newFile("file").getAbsolutePath(); - final String pathGZ = tmpFolder.newFile("file.gz").getAbsolutePath(); - Files.write(new File(path).toPath(), "Hello world".getBytes(Charsets.UTF_8)); - try (Writer writer = - new OutputStreamWriter( - new GZIPOutputStream(new FileOutputStream(pathGZ)), Charsets.UTF_8)) { - writer.write("Hello world"); + @RunWith(JUnit4.class) + public static class FileIOReadTest { + + @Rule + public transient TestPipeline p = TestPipeline.create(); + + @Rule + public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + @Category(NeedsRunner.class) + public void testRead() throws IOException { + final String path = tmpFolder.newFile("file").getAbsolutePath(); + final String pathGZ = tmpFolder.newFile("file.gz").getAbsolutePath(); + Files.write(new File(path).toPath(), "Hello world".getBytes(Charsets.UTF_8)); + try (Writer writer = + new OutputStreamWriter( + new GZIPOutputStream(new FileOutputStream(pathGZ)), Charsets.UTF_8)) { + writer.write("Hello world"); + } + + PCollection matches = p.apply("Match", + FileIO.match().filepattern(path)); + PCollection decompressedAuto = + matches.apply("Read AUTO", FileIO.readMatches().withCompression(Compression.AUTO)); + PCollection decompressedDefault = + matches.apply("Read default", FileIO.readMatches()); + PCollection decompressedUncompressed = + matches.apply( + "Read UNCOMPRESSED", FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)); + for (PCollection c : + Arrays.asList(decompressedAuto, decompressedDefault, decompressedUncompressed)) { + PAssert.thatSingleton(c) + .satisfies( + input -> { + assertEquals(path, input.getMetadata().resourceId().toString()); + assertEquals("Hello world".length(), input.getMetadata().sizeBytes()); + assertEquals(Compression.UNCOMPRESSED, input.getCompression()); + assertTrue(input.getMetadata().isReadSeekEfficient()); + try { + assertEquals("Hello world", input.readFullyAsUTF8String()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + }); + } + + PCollection matchesGZ = + p.apply("Match GZ", FileIO.match().filepattern(pathGZ)); + PCollection compressionAuto = + matchesGZ.apply("Read GZ AUTO", FileIO.readMatches().withCompression(Compression.AUTO)); + PCollection compressionDefault = + matchesGZ.apply("Read GZ default", FileIO.readMatches()); + PCollection compressionGzip = + matchesGZ.apply("Read GZ GZIP", FileIO.readMatches().withCompression(Compression.GZIP)); + for (PCollection c : + Arrays.asList(compressionAuto, compressionDefault, compressionGzip)) { + PAssert.thatSingleton(c) + .satisfies( + input -> { + assertEquals(pathGZ, input.getMetadata().resourceId().toString()); + assertFalse(input.getMetadata().sizeBytes() == "Hello world".length()); + assertEquals(Compression.GZIP, input.getCompression()); + assertFalse(input.getMetadata().isReadSeekEfficient()); + try { + assertEquals("Hello world", input.readFullyAsUTF8String()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + }); + } + + p.run(); } + } + + @RunWith(Parameterized.class) + public static class FileIOReadWithSkipLinesTest { + + @Rule + public transient TestPipeline p = TestPipeline.create(); - PCollection matches = p.apply("Match", FileIO.match().filepattern(path)); - PCollection decompressedAuto = - matches.apply("Read AUTO", FileIO.readMatches().withCompression(Compression.AUTO)); - PCollection decompressedDefault = - matches.apply("Read default", FileIO.readMatches()); - PCollection decompressedUncompressed = - matches.apply( - "Read UNCOMPRESSED", FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)); - for (PCollection c : - Arrays.asList(decompressedAuto, decompressedDefault, decompressedUncompressed)) { - PAssert.thatSingleton(c) - .satisfies( - input -> { - assertEquals(path, input.getMetadata().resourceId().toString()); - assertEquals("Hello world".length(), input.getMetadata().sizeBytes()); - assertEquals(Compression.UNCOMPRESSED, input.getCompression()); - assertTrue(input.getMetadata().isReadSeekEfficient()); - try { - assertEquals("Hello world", input.readFullyAsUTF8String()); - } catch (IOException e) { - throw new RuntimeException(e); - } - return null; - }); + @Rule + public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static final String INPUT = "Hello world\nApache Beam\nData Processing"; + + @Parameterized.Parameters(name = "skipLines {1}") + public static Iterable data() { + return ImmutableList.builder() + .add(new Object[]{INPUT, 0, "Hello world\nApache Beam\nData Processing"}) + .add(new Object[]{INPUT, 1, "Apache Beam\nData Processing"}) + .add(new Object[]{INPUT, 2, "Data Processing"}) + .add(new Object[]{INPUT, 3, ""}) + .add(new Object[]{INPUT, 4, ""}) + .build(); } - PCollection matchesGZ = - p.apply("Match GZ", FileIO.match().filepattern(pathGZ)); - PCollection compressionAuto = - matchesGZ.apply("Read GZ AUTO", FileIO.readMatches().withCompression(Compression.AUTO)); - PCollection compressionDefault = - matchesGZ.apply("Read GZ default", FileIO.readMatches()); - PCollection compressionGzip = - matchesGZ.apply("Read GZ GZIP", FileIO.readMatches().withCompression(Compression.GZIP)); - for (PCollection c : - Arrays.asList(compressionAuto, compressionDefault, compressionGzip)) { - PAssert.thatSingleton(c) - .satisfies( - input -> { - assertEquals(pathGZ, input.getMetadata().resourceId().toString()); - assertFalse(input.getMetadata().sizeBytes() == "Hello world".length()); - assertEquals(Compression.GZIP, input.getCompression()); - assertFalse(input.getMetadata().isReadSeekEfficient()); - try { - assertEquals("Hello world", input.readFullyAsUTF8String()); - } catch (IOException e) { - throw new RuntimeException(e); - } - return null; - }); + @Parameterized.Parameter(0) + public String input; + + @Parameterized.Parameter(1) + public int skipLines; + + @Parameterized.Parameter(2) + public String expected; + + @Test + @Category(NeedsRunner.class) + public void testReadWithSkipLines() throws IOException { + final String path = tmpFolder.newFile("file").getAbsolutePath(); + Files.write(new File(path).toPath(), input.getBytes(Charsets.UTF_8)); + + PCollection c = p.apply("Match", FileIO.match().filepattern(path)) + .apply("readMatches", FileIO.readMatches()) + .apply("readWithSkipLines", ParDo.of(new ReadWithSkipLinesFn(skipLines))); + + PAssert.that(c) + .containsInAnyOrder(expected); + + p.run(); } - p.run(); - } + private static class ReadWithSkipLinesFn extends DoFn { - private static MatchResult.Metadata metadata(Path path, int size, long lastModifiedMillis) { - return MatchResult.Metadata.builder() - .setResourceId(FileSystems.matchNewResource(path.toString(), false /* isDirectory */)) - .setIsReadSeekEfficient(true) - .setSizeBytes(size) - .setLastModifiedMillis(lastModifiedMillis) - .build(); - } + private final int skipLines; - private static long lastModifiedMillis(Path path) throws IOException { - return Files.getLastModifiedTime(path).toMillis(); - } + public ReadWithSkipLinesFn(int skipLines) { + this.skipLines = skipLines; + } - private static FileIO.Write.FileNaming resolveFileNaming(FileIO.Write write) - throws Exception { - return write.resolveFileNamingFn().getClosure().apply(null, null); + @ProcessElement + public void process(@Element ReadableFile element, OutputReceiver receiver) + throws IOException { + String result = element.readFullyAsUTF8String(skipLines); + receiver.output(result); + } + } } - private static String getDefaultFileName(FileIO.Write write) throws Exception { - return resolveFileNaming(write).getFilename(null, null, 0, 0, null); - } + @RunWith(JUnit4.class) + public static class FileIOWriteTest { + + @Rule + public transient TestPipeline p = TestPipeline.create(); + + @Rule + public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testFilenameFnResolution() throws Exception { + FileIO.Write.FileNaming foo = (window, pane, numShards, shardIndex, compression) -> "foo"; + + String expected = + FileSystems.matchNewResource("test", true).resolve("foo", RESOLVE_FILE).toString(); + assertEquals( + "Filenames should be resolved within a relative directory if '.to' is invoked", + expected, + getDefaultFileName(FileIO.writeDynamic().to("test").withNaming(o -> foo))); + assertEquals( + "Filenames should be resolved within a relative directory if '.to' is invoked", + expected, + getDefaultFileName(FileIO.write().to("test").withNaming(foo))); + + assertEquals( + "Filenames should be resolved as the direct result of the filenaming function if '.to' " + + "is not invoked", + "foo", + getDefaultFileName(FileIO.writeDynamic().withNaming(o -> foo))); + assertEquals( + "Filenames should be resolved as the direct result of the filenaming function if '.to' " + + "is not invoked", + "foo", + getDefaultFileName(FileIO.write().withNaming(foo))); + + assertEquals( + "Default to the defaultNaming if a filenaming isn't provided for a non-dynamic write", + "output-00000-of-00000", + resolveFileNaming(FileIO.write()) + .getFilename( + GlobalWindow.INSTANCE, + PaneInfo.ON_TIME_AND_ONLY_FIRING, + 0, + 0, + Compression.UNCOMPRESSED)); + + assertEquals( + "Default Naming should take prefix and suffix into account if provided", + "foo-00000-of-00000.bar", + resolveFileNaming(FileIO.write().withPrefix("foo").withSuffix(".bar")) + .getFilename( + GlobalWindow.INSTANCE, + PaneInfo.ON_TIME_AND_ONLY_FIRING, + 0, + 0, + Compression.UNCOMPRESSED)); + + assertEquals( + "Filenames should be resolved within a relative directory if '.to' is invoked, " + + "even with default naming", + FileSystems.matchNewResource("test", true) + .resolve("output-00000-of-00000", RESOLVE_FILE) + .toString(), + resolveFileNaming(FileIO.write().to("test")) + .getFilename( + GlobalWindow.INSTANCE, + PaneInfo.ON_TIME_AND_ONLY_FIRING, + 0, + 0, + Compression.UNCOMPRESSED)); + } - @Test - public void testFilenameFnResolution() throws Exception { - FileIO.Write.FileNaming foo = (window, pane, numShards, shardIndex, compression) -> "foo"; - - String expected = - FileSystems.matchNewResource("test", true).resolve("foo", RESOLVE_FILE).toString(); - assertEquals( - "Filenames should be resolved within a relative directory if '.to' is invoked", - expected, - getDefaultFileName(FileIO.writeDynamic().to("test").withNaming(o -> foo))); - assertEquals( - "Filenames should be resolved within a relative directory if '.to' is invoked", - expected, - getDefaultFileName(FileIO.write().to("test").withNaming(foo))); - - assertEquals( - "Filenames should be resolved as the direct result of the filenaming function if '.to' " - + "is not invoked", - "foo", - getDefaultFileName(FileIO.writeDynamic().withNaming(o -> foo))); - assertEquals( - "Filenames should be resolved as the direct result of the filenaming function if '.to' " - + "is not invoked", - "foo", - getDefaultFileName(FileIO.write().withNaming(foo))); - - assertEquals( - "Default to the defaultNaming if a filenaming isn't provided for a non-dynamic write", - "output-00000-of-00000", - resolveFileNaming(FileIO.write()) - .getFilename( - GlobalWindow.INSTANCE, - PaneInfo.ON_TIME_AND_ONLY_FIRING, - 0, - 0, - Compression.UNCOMPRESSED)); - - assertEquals( - "Default Naming should take prefix and suffix into account if provided", - "foo-00000-of-00000.bar", - resolveFileNaming(FileIO.write().withPrefix("foo").withSuffix(".bar")) - .getFilename( - GlobalWindow.INSTANCE, - PaneInfo.ON_TIME_AND_ONLY_FIRING, - 0, - 0, - Compression.UNCOMPRESSED)); - - assertEquals( - "Filenames should be resolved within a relative directory if '.to' is invoked, " - + "even with default naming", - FileSystems.matchNewResource("test", true) - .resolve("output-00000-of-00000", RESOLVE_FILE) - .toString(), - resolveFileNaming(FileIO.write().to("test")) - .getFilename( - GlobalWindow.INSTANCE, - PaneInfo.ON_TIME_AND_ONLY_FIRING, - 0, - 0, - Compression.UNCOMPRESSED)); - } + @Test + @Category(NeedsRunner.class) + public void testFileIoDynamicNaming() throws IOException { + // Test for BEAM-6407. + + String outputFileName = tmpFolder.newFile().getAbsolutePath(); + PCollectionView outputFileNameView = + p.apply("outputFileName", Create.of(outputFileName)).apply(View.asSingleton()); + + Contextful.Fn fileNaming = + (element, c) -> + (window, pane, numShards, shardIndex, compression) -> + c.sideInput(outputFileNameView) + "-" + shardIndex; + + p.apply(Create.of("")) + .apply( + "WriteDynamicFilename", + FileIO.writeDynamic() + .by(SerializableFunctions.constant("")) + .withDestinationCoder(StringUtf8Coder.of()) + .via(TextIO.sink()) + .withTempDirectory(tmpFolder.newFolder().getAbsolutePath()) + .withNaming( + Contextful.of( + fileNaming, Requirements.requiresSideInputs(outputFileNameView)))); + + // We need to run the TestPipeline with the default options. + p.run(PipelineOptionsFactory.create()).waitUntilFinish(); + assertTrue( + "Output file shard 0 exists after pipeline completes", + new File(outputFileName + "-0").exists()); + } - @Test - @Category(NeedsRunner.class) - public void testFileIoDynamicNaming() throws IOException { - // Test for BEAM-6407. - - String outputFileName = tmpFolder.newFile().getAbsolutePath(); - PCollectionView outputFileNameView = - p.apply("outputFileName", Create.of(outputFileName)).apply(View.asSingleton()); - - Contextful.Fn fileNaming = - (element, c) -> - (window, pane, numShards, shardIndex, compression) -> - c.sideInput(outputFileNameView) + "-" + shardIndex; - - p.apply(Create.of("")) - .apply( - "WriteDynamicFilename", - FileIO.writeDynamic() - .by(SerializableFunctions.constant("")) - .withDestinationCoder(StringUtf8Coder.of()) - .via(TextIO.sink()) - .withTempDirectory(tmpFolder.newFolder().getAbsolutePath()) - .withNaming( - Contextful.of( - fileNaming, Requirements.requiresSideInputs(outputFileNameView)))); - - // We need to run the TestPipeline with the default options. - p.run(PipelineOptionsFactory.create()).waitUntilFinish(); - assertTrue( - "Output file shard 0 exists after pipeline completes", - new File(outputFileName + "-0").exists()); + private static FileIO.Write.FileNaming resolveFileNaming(FileIO.Write write) + throws Exception { + return write.resolveFileNamingFn().getClosure().apply(null, null); + } + + private static String getDefaultFileName(FileIO.Write write) throws Exception { + return resolveFileNaming(write).getFilename(null, null, 0, 0, null); + } } } From 72deaf769d68e7310987040e82cc9623c34e3db1 Mon Sep 17 00:00:00 2001 From: Hyungrok Ham Date: Fri, 29 Sep 2023 04:39:01 +0900 Subject: [PATCH 3/3] fix: spotless --- .../java/org/apache/beam/sdk/io/FileIO.java | 213 ++++++------------ .../org/apache/beam/sdk/io/FileIOTest.java | 74 +++--- 2 files changed, 100 insertions(+), 187 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 8433deed97d6..92151a808770 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -83,17 +83,15 @@ import org.slf4j.LoggerFactory; /** - * General-purpose transforms for working with files: listing files (matching), reading and - * writing. + * General-purpose transforms for working with files: listing files (matching), reading and writing. * *

Matching filepatterns

* *

{@link #match} and {@link #matchAll} match filepatterns (respectively either a single - * filepattern or a {@link PCollection} thereof) and return the files that match them as - * {@link PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them - * are in {@link MatchConfiguration} and include features such as treatment of filepatterns that - * don't match anything and continuous incremental matching of filepatterns (watching for new - * files). + * filepattern or a {@link PCollection} thereof) and return the files that match them as {@link + * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them are in + * {@link MatchConfiguration} and include features such as treatment of filepatterns that don't + * match anything and continuous incremental matching of filepatterns (watching for new files). * *

Example: Watching a single filepattern for new files

* @@ -110,9 +108,9 @@ * *

Example: Matching a PCollection of filepatterns arriving from Kafka

* - *

This example reads filepatterns from Kafka and matches each one as it arrives, producing - * again an unbounded {@code PCollection}, and failing in case the filepattern doesn't - * match anything. + *

This example reads filepatterns from Kafka and matches each one as it arrives, producing again + * an unbounded {@code PCollection}, and failing in case the filepattern doesn't match + * anything. * *

{@code
  * PCollection filepatterns = p.apply(KafkaIO.read()...);
@@ -311,7 +309,7 @@
  * }
*/ @SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class FileIO { @@ -322,13 +320,12 @@ public class FileIO { * resources (both files and directories) as {@link MatchResult.Metadata}. * *

By default, matches the filepattern once and produces a bounded {@link PCollection}. To - * continuously watch the filepattern for new matches, use - * {@link MatchAll#continuously(Duration, TerminationCondition)} - this will produce an unbounded - * {@link PCollection}. + * continuously watch the filepattern for new matches, use {@link MatchAll#continuously(Duration, + * TerminationCondition)} - this will produce an unbounded {@link PCollection}. * *

By default, a filepattern matching no resources is treated according to {@link - * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use - * {@link Match#withEmptyMatchTreatment}. + * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link + * Match#withEmptyMatchTreatment}. * *

Returned {@link MatchResult.Metadata} are deduplicated by filename. For example, if this * transform observes a file with the same name several times with different metadata (e.g. @@ -348,8 +345,8 @@ public static Match match() { * multiple filepatterns, it will be produced multiple times. * *

By default, a filepattern matching no resources is treated according to {@link - * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use - * {@link MatchAll#withEmptyMatchTreatment}. + * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link + * MatchAll#withEmptyMatchTreatment}. */ public static MatchAll matchAll() { return new AutoValue_FileIO_MatchAll.Builder() @@ -368,9 +365,7 @@ public static ReadMatches readMatches() { .build(); } - /** - * Writes elements to files using a {@link Sink}. See class-level documentation. - */ + /** Writes elements to files using a {@link Sink}. See class-level documentation. */ public static Write write() { return new AutoValue_FileIO_Write.Builder() .setDynamic(false) @@ -393,9 +388,7 @@ public static Write writeDynamic() { .build(); } - /** - * A utility class for accessing a potentially compressed file. - */ + /** A utility class for accessing a potentially compressed file. */ public static final class ReadableFile { private final MatchResult.Metadata metadata; @@ -406,16 +399,12 @@ public static final class ReadableFile { this.compression = compression; } - /** - * Returns the {@link MatchResult.Metadata} of the file. - */ + /** Returns the {@link MatchResult.Metadata} of the file. */ public MatchResult.Metadata getMetadata() { return metadata; } - /** - * Returns the method with which this file will be decompressed in {@link #open}. - */ + /** Returns the method with which this file will be decompressed in {@link #open}. */ public Compression getCompression() { return compression; } @@ -440,9 +429,7 @@ public SeekableByteChannel openSeekable() throws IOException { return (SeekableByteChannel) open(); } - /** - * Returns the contents of the file as bytes with skipping {@code skipLines}. - */ + /** Returns the contents of the file as bytes with skipping {@code skipLines}. */ public byte[] readFullyAsBytes(int skipLines) throws IOException { try (InputStream stream = Channels.newInputStream(open())) { int count = 0; @@ -457,16 +444,14 @@ public byte[] readFullyAsBytes(int skipLines) throws IOException { } } - /** - * Returns the full contents of the file as a {@link String} decoded as UTF-8. - */ + /** Returns the full contents of the file as a {@link String} decoded as UTF-8. */ public String readFullyAsUTF8String() throws IOException { return new String(readFullyAsBytes(0), StandardCharsets.UTF_8); } /** - * Returns the contents of the file as {@link String} decoded as UTF-8 with skipping - * {@code skipLines}. + * Returns the contents of the file as {@link String} decoded as UTF-8 with skipping {@code + * skipLines}. */ public String readFullyAsUTF8String(int skipLines) throws IOException { return new String(readFullyAsBytes(skipLines), StandardCharsets.UTF_8); @@ -502,9 +487,7 @@ public int hashCode() { @AutoValue public abstract static class MatchConfiguration implements HasDisplayData, Serializable { - /** - * Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. - */ + /** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */ public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) { return new AutoValue_FileIO_MatchConfiguration.Builder() .setEmptyMatchTreatment(emptyMatchTreatment) @@ -536,9 +519,7 @@ abstract static class Builder { abstract MatchConfiguration build(); } - /** - * Sets the {@link EmptyMatchTreatment}. - */ + /** Sets the {@link EmptyMatchTreatment}. */ public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) { return toBuilder().setEmptyMatchTreatment(treatment).build(); } @@ -548,8 +529,8 @@ public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment treatment) * condition is reached, where the input to the condition is the filepattern. * *

If {@code matchUpdatedFiles} is set, also watches for files with timestamp change, with - * the watching frequency given by the {@code interval}. The pipeline will throw a - * {@code RuntimeError} if timestamp extraction for the matched file has failed, suggesting the + * the watching frequency given by the {@code interval}. The pipeline will throw a {@code + * RuntimeError} if timestamp extraction for the matched file has failed, suggesting the * timestamp metadata is not available with the IO connector. * *

Matching continuously scales poorly, as it is stateful, and requires storing file ids in @@ -597,9 +578,7 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - /** - * Implementation of {@link #match}. - */ + /** Implementation of {@link #match}. */ @AutoValue public abstract static class Match extends PTransform> { @@ -619,30 +598,22 @@ abstract static class Builder { abstract Match build(); } - /** - * Matches the given filepattern. - */ + /** Matches the given filepattern. */ public Match filepattern(String filepattern) { return this.filepattern(StaticValueProvider.of(filepattern)); } - /** - * Like {@link #filepattern(String)} but using a {@link ValueProvider}. - */ + /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */ public Match filepattern(ValueProvider filepattern) { return toBuilder().setFilepattern(filepattern).build(); } - /** - * Sets the {@link MatchConfiguration}. - */ + /** Sets the {@link MatchConfiguration}. */ public Match withConfiguration(MatchConfiguration configuration) { return toBuilder().setConfiguration(configuration).build(); } - /** - * See {@link MatchConfiguration#withEmptyMatchTreatment(EmptyMatchTreatment)}. - */ + /** See {@link MatchConfiguration#withEmptyMatchTreatment(EmptyMatchTreatment)}. */ public Match withEmptyMatchTreatment(EmptyMatchTreatment treatment) { return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment)); } @@ -689,9 +660,7 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - /** - * Implementation of {@link #matchAll}. - */ + /** Implementation of {@link #matchAll}. */ @AutoValue public abstract static class MatchAll extends PTransform, PCollection> { @@ -708,23 +677,17 @@ abstract static class Builder { abstract MatchAll build(); } - /** - * Like {@link Match#withConfiguration}. - */ + /** Like {@link Match#withConfiguration}. */ public MatchAll withConfiguration(MatchConfiguration configuration) { return toBuilder().setConfiguration(configuration).build(); } - /** - * Like {@link Match#withEmptyMatchTreatment}. - */ + /** Like {@link Match#withEmptyMatchTreatment}. */ public MatchAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) { return withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment)); } - /** - * Like {@link Match#continuously(Duration, TerminationCondition, boolean)}. - */ + /** Like {@link Match#continuously(Duration, TerminationCondition, boolean)}. */ public MatchAll continuously( Duration pollInterval, TerminationCondition terminationCondition, @@ -733,9 +696,7 @@ public MatchAll continuously( getConfiguration().continuously(pollInterval, terminationCondition, matchUpdatedFiles)); } - /** - * Like {@link Match#continuously(Duration, TerminationCondition)}. - */ + /** Like {@link Match#continuously(Duration, TerminationCondition)}. */ public MatchAll continuously( Duration pollInterval, TerminationCondition terminationCondition) { return continuously(pollInterval, terminationCondition, false); @@ -769,9 +730,7 @@ public void populateDisplayData(DisplayData.Builder builder) { builder.include("configuration", getConfiguration()); } - /** - * Helper function creating a watch transform based on outputKeyFn. - */ + /** Helper function creating a watch transform based on outputKeyFn. */ private Watch.Growth createWatchTransform( SerializableFunction outputKeyFn) { return Watch.growthOf(Contextful.of(new MatchPollFn(), Requirements.empty()), outputKeyFn) @@ -833,16 +792,12 @@ public KV apply(MatchResult.Metadata input) throws RuntimeExceptio } } - /** - * Implementation of {@link #readMatches}. - */ + /** Implementation of {@link #readMatches}. */ @AutoValue public abstract static class ReadMatches extends PTransform, PCollection> { - /** - * Enum to control how directories are handled. - */ + /** Enum to control how directories are handled. */ public enum DirectoryTreatment { SKIP, PROHIBIT @@ -864,17 +819,15 @@ abstract static class Builder { abstract ReadMatches build(); } - /** - * Reads files using the given {@link Compression}. Default is {@link Compression#AUTO}. - */ + /** Reads files using the given {@link Compression}. Default is {@link Compression#AUTO}. */ public ReadMatches withCompression(Compression compression) { checkArgument(compression != null, "compression can not be null"); return toBuilder().setCompression(compression).build(); } /** - * Controls how to handle directories in the input {@link PCollection}. Default is - * {@link DirectoryTreatment#SKIP}. + * Controls how to handle directories in the input {@link PCollection}. Default is {@link + * DirectoryTreatment#SKIP}. */ public ReadMatches withDirectoryTreatment(DirectoryTreatment directoryTreatment) { checkArgument(directoryTreatment != null, "directoryTreatment can not be null"); @@ -894,10 +847,10 @@ public void populateDisplayData(DisplayData.Builder builder) { /** * @return True if metadata is a directory and directory Treatment is SKIP. - * @throws java.lang.IllegalArgumentException if metadata is a directory and - * directoryTreatment is Prohibited. + * @throws java.lang.IllegalArgumentException if metadata is a directory and directoryTreatment + * is Prohibited. * @throws java.lang.UnsupportedOperationException if metadata is a directory and - * directoryTreatment is not SKIP or PROHIBIT. + * directoryTreatment is not SKIP or PROHIBIT. */ static boolean shouldSkipDirectory( MatchResult.Metadata metadata, DirectoryTreatment directoryTreatment) { @@ -919,8 +872,8 @@ static boolean shouldSkipDirectory( } /** - * Converts metadata to readableFile. Make sure - * {@link #shouldSkipDirectory(org.apache.beam.sdk.io.fs.MatchResult.Metadata, + * Converts metadata to readableFile. Make sure {@link + * #shouldSkipDirectory(org.apache.beam.sdk.io.fs.MatchResult.Metadata, * org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment)} returns false before using. */ static ReadableFile matchToReadableFile( @@ -961,9 +914,8 @@ public void process(ProcessContext c) { } /** - * Specifies how to write elements to individual files in {@link FileIO#write} and - * {@link FileIO#writeDynamic}. A new instance of {@link Sink} is created for every file being - * written. + * Specifies how to write elements to individual files in {@link FileIO#write} and {@link + * FileIO#writeDynamic}. A new instance of {@link Sink} is created for every file being written. */ public interface Sink extends Serializable { @@ -973,9 +925,7 @@ public interface Sink extends Serializable { */ void open(WritableByteChannel channel) throws IOException; - /** - * Appends a single element to the file. May be invoked zero or more times. - */ + /** Appends a single element to the file. May be invoked zero or more times. */ void write(ElementT element) throws IOException; /** @@ -985,16 +935,12 @@ public interface Sink extends Serializable { void flush() throws IOException; } - /** - * Implementation of {@link #write} and {@link #writeDynamic}. - */ + /** Implementation of {@link #write} and {@link #writeDynamic}. */ @AutoValue public abstract static class Write extends PTransform, WriteFilesResult> { - /** - * A policy for generating names for shard files. - */ + /** A policy for generating names for shard files. */ public interface FileNaming extends Serializable { /** @@ -1016,8 +962,7 @@ public static FileNaming defaultNaming(final String prefix, final String suffix) /** * Defines a default {@link FileNaming} which will use the prefix and suffix supplied to create * a name based on the window, pane, number of shards, shard index, and compression. Removes - * window when in the {@link GlobalWindow} and pane info when it is the only firing of the - * pane. + * window when in the {@link GlobalWindow} and pane info when it is the only firing of the pane. */ public static FileNaming defaultNaming( final ValueProvider prefix, final ValueProvider suffix) { @@ -1154,17 +1099,13 @@ abstract Builder setSharding( abstract Write build(); } - /** - * Specifies how to partition elements into groups ("destinations"). - */ + /** Specifies how to partition elements into groups ("destinations"). */ public Write by(SerializableFunction destinationFn) { checkArgument(destinationFn != null, "destinationFn can not be null"); return by(fn(destinationFn)); } - /** - * Like {@link #by}, but with access to context such as side inputs. - */ + /** Like {@link #by}, but with access to context such as side inputs. */ public Write by(Contextful> destinationFn) { checkArgument(destinationFn != null, "destinationFn can not be null"); return toBuilder().setDestinationFn(destinationFn).build(); @@ -1183,9 +1124,7 @@ public Write via( return toBuilder().setSinkFn((Contextful) sinkFn).setOutputFn(outputFn).build(); } - /** - * Like {@link #via(Contextful, Contextful)}, but uses the same sink for all destinations. - */ + /** Like {@link #via(Contextful, Contextful)}, but uses the same sink for all destinations. */ public Write via( Contextful> outputFn, final Sink sink) { checkArgument(sink != null, "sink can not be null"); @@ -1206,9 +1145,7 @@ public Write via(Contextful>> .build(); } - /** - * Like {@link #via(Contextful)}, but uses the same {@link Sink} for all destinations. - */ + /** Like {@link #via(Contextful)}, but uses the same {@link Sink} for all destinations. */ public Write via(Sink sink) { checkArgument(sink != null, "sink can not be null"); return via(fn(SerializableFunctions.clonesOf(sink))); @@ -1216,17 +1153,15 @@ public Write via(Sink sink) { /** * Specifies a common directory for all generated files. A temporary generated sub-directory of - * this directory will be used as the temp directory, unless overridden by - * {@link #withTempDirectory}. + * this directory will be used as the temp directory, unless overridden by {@link + * #withTempDirectory}. */ public Write to(String directory) { checkArgument(directory != null, "directory can not be null"); return to(StaticValueProvider.of(directory)); } - /** - * Like {@link #to(String)} but with a {@link ValueProvider}. - */ + /** Like {@link #to(String)} but with a {@link ValueProvider}. */ public Write to(ValueProvider directory) { checkArgument(directory != null, "directory can not be null"); return toBuilder().setOutputDirectory(directory).build(); @@ -1241,9 +1176,7 @@ public Write withPrefix(String prefix) { return withPrefix(StaticValueProvider.of(prefix)); } - /** - * Like {@link #withPrefix(String)} but with a {@link ValueProvider}. - */ + /** Like {@link #withPrefix(String)} but with a {@link ValueProvider}. */ public Write withPrefix(ValueProvider prefix) { checkArgument(prefix != null, "prefix can not be null"); return toBuilder().setFilenamePrefix(prefix).build(); @@ -1258,9 +1191,7 @@ public Write withSuffix(String suffix) { return withSuffix(StaticValueProvider.of(suffix)); } - /** - * Like {@link #withSuffix(String)} but with a {@link ValueProvider}. - */ + /** Like {@link #withSuffix(String)} but with a {@link ValueProvider}. */ public Write withSuffix(ValueProvider suffix) { checkArgument(suffix != null, "suffix can not be null"); return toBuilder().setFilenameSuffix(suffix).build(); @@ -1303,17 +1234,13 @@ public Write withNaming( return toBuilder().setFileNamingFn(namingFn).build(); } - /** - * Specifies a directory into which all temporary files will be placed. - */ + /** Specifies a directory into which all temporary files will be placed. */ public Write withTempDirectory(String tempDirectory) { checkArgument(tempDirectory != null, "tempDirectory can not be null"); return withTempDirectory(StaticValueProvider.of(tempDirectory)); } - /** - * Like {@link #withTempDirectory(String)}. - */ + /** Like {@link #withTempDirectory(String)}. */ public Write withTempDirectory(ValueProvider tempDirectory) { checkArgument(tempDirectory != null, "tempDirectory can not be null"); return toBuilder().setTempDirectory(tempDirectory).build(); @@ -1340,8 +1267,8 @@ public Write withEmptyGlobalWindowDestination( } /** - * Specifies a {@link Coder} for the destination type, if it can not be inferred from - * {@link #by}. + * Specifies a {@link Coder} for the destination type, if it can not be inferred from {@link + * #by}. */ public Write withDestinationCoder(Coder destinationCoder) { checkArgument(destinationCoder != null, "destinationCoder can not be null"); @@ -1383,16 +1310,14 @@ public Write withSharding( * window with the default trigger. * * @deprecated Avoid usage of this method: its effects are complex and it will be removed in - * future versions of Beam. Right now it exists for compatibility with {@link WriteFiles}. + * future versions of Beam. Right now it exists for compatibility with {@link WriteFiles}. */ @Deprecated public Write withIgnoreWindowing() { return toBuilder().setIgnoreWindowing(true).build(); } - /** - * See {@link WriteFiles#withNoSpilling()}. - */ + /** See {@link WriteFiles#withNoSpilling()}. */ public Write withNoSpilling() { return toBuilder().setNoSpilling(true).build(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java index 38f88fbdf222..e217f1ea672b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -84,26 +84,20 @@ import org.junit.runners.JUnit4; import org.junit.runners.Parameterized; -/** - * Tests for {@link FileIO}. - */ +/** Tests for {@link FileIO}. */ @RunWith(Enclosed.class) public class FileIOTest implements Serializable { @RunWith(JUnit4.class) public static class FileIOMatchTest { - @Rule - public transient TestPipeline p = TestPipeline.create(); + @Rule public transient TestPipeline p = TestPipeline.create(); - @Rule - public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule - public transient ExpectedException thrown = ExpectedException.none(); + @Rule public transient ExpectedException thrown = ExpectedException.none(); - @Rule - public transient Timeout globalTimeout = Timeout.seconds(1200); + @Rule public transient Timeout globalTimeout = Timeout.seconds(1200); @Test @Category(NeedsRunner.class) @@ -230,9 +224,7 @@ public void testMatchAllDisallowEmptyNonWildcard() throws IOException { p.run(); } - /** - * DoFn that copy test files from source to watch path. - */ + /** DoFn that copy test files from source to watch path. */ private static class CopyFilesFn extends DoFn, MatchResult.Metadata> { @@ -246,8 +238,8 @@ public CopyFilesFn(Path sourcePath, Path watchPath) { private final StateSpec> countSpec = StateSpecs.value(VarIntCoder.of()); @ProcessElement - public void processElement(ProcessContext context, - @StateId("count") ValueState count) + public void processElement( + ProcessContext context, @StateId("count") ValueState count) throws IOException, InterruptedException { int current = firstNonNull(count.read(), 0); // unpack value as output @@ -328,7 +320,7 @@ public void testMatchWatchForNewFiles() throws IOException, InterruptedException Files.copy( sourcePath.resolve("first"), watchPath.resolve("first"), - new StandardCopyOption[]{StandardCopyOption.COPY_ATTRIBUTES}); + new StandardCopyOption[] {StandardCopyOption.COPY_ATTRIBUTES}); // using matchMetadata outputs to trigger file copy on-the-fly matchMetadata = @@ -346,14 +338,16 @@ public void testMatchWatchForNewFiles() throws IOException, InterruptedException assertEquals(PCollection.IsBounded.UNBOUNDED, matchUpdatedMetadata.isBounded()); assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllUpdatedMetadata.isBounded()); - // We fetch lastModifiedTime from the files in the "source" directory to avoid a race condition + // We fetch lastModifiedTime from the files in the "source" directory to avoid a race + // condition // with the writer thread. List expectedMatchNew = Arrays.asList( metadata( watchPath.resolve("first"), 42, lastModifiedMillis(sourcePath.resolve("first"))), metadata( - watchPath.resolve("second"), 37, + watchPath.resolve("second"), + 37, lastModifiedMillis(sourcePath.resolve("second"))), metadata( watchPath.resolve("third"), 99, lastModifiedMillis(sourcePath.resolve("third")))); @@ -399,11 +393,9 @@ private static long lastModifiedMillis(Path path) throws IOException { @RunWith(JUnit4.class) public static class FileIOReadTest { - @Rule - public transient TestPipeline p = TestPipeline.create(); + @Rule public transient TestPipeline p = TestPipeline.create(); - @Rule - public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); @Test @Category(NeedsRunner.class) @@ -417,8 +409,8 @@ public void testRead() throws IOException { writer.write("Hello world"); } - PCollection matches = p.apply("Match", - FileIO.match().filepattern(path)); + PCollection matches = + p.apply("Match", FileIO.match().filepattern(path)); PCollection decompressedAuto = matches.apply("Read AUTO", FileIO.readMatches().withCompression(Compression.AUTO)); PCollection decompressedDefault = @@ -477,22 +469,20 @@ public void testRead() throws IOException { @RunWith(Parameterized.class) public static class FileIOReadWithSkipLinesTest { - @Rule - public transient TestPipeline p = TestPipeline.create(); + @Rule public transient TestPipeline p = TestPipeline.create(); - @Rule - public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); private static final String INPUT = "Hello world\nApache Beam\nData Processing"; @Parameterized.Parameters(name = "skipLines {1}") public static Iterable data() { return ImmutableList.builder() - .add(new Object[]{INPUT, 0, "Hello world\nApache Beam\nData Processing"}) - .add(new Object[]{INPUT, 1, "Apache Beam\nData Processing"}) - .add(new Object[]{INPUT, 2, "Data Processing"}) - .add(new Object[]{INPUT, 3, ""}) - .add(new Object[]{INPUT, 4, ""}) + .add(new Object[] {INPUT, 0, "Hello world\nApache Beam\nData Processing"}) + .add(new Object[] {INPUT, 1, "Apache Beam\nData Processing"}) + .add(new Object[] {INPUT, 2, "Data Processing"}) + .add(new Object[] {INPUT, 3, ""}) + .add(new Object[] {INPUT, 4, ""}) .build(); } @@ -511,12 +501,12 @@ public void testReadWithSkipLines() throws IOException { final String path = tmpFolder.newFile("file").getAbsolutePath(); Files.write(new File(path).toPath(), input.getBytes(Charsets.UTF_8)); - PCollection c = p.apply("Match", FileIO.match().filepattern(path)) - .apply("readMatches", FileIO.readMatches()) - .apply("readWithSkipLines", ParDo.of(new ReadWithSkipLinesFn(skipLines))); + PCollection c = + p.apply("Match", FileIO.match().filepattern(path)) + .apply("readMatches", FileIO.readMatches()) + .apply("readWithSkipLines", ParDo.of(new ReadWithSkipLinesFn(skipLines))); - PAssert.that(c) - .containsInAnyOrder(expected); + PAssert.that(c).containsInAnyOrder(expected); p.run(); } @@ -541,11 +531,9 @@ public void process(@Element ReadableFile element, OutputReceiver receiv @RunWith(JUnit4.class) public static class FileIOWriteTest { - @Rule - public transient TestPipeline p = TestPipeline.create(); + @Rule public transient TestPipeline p = TestPipeline.create(); - @Rule - public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); @Test public void testFilenameFnResolution() throws Exception {