diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java index 2d28279f90b6..92151a808770 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java @@ -312,6 +312,7 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class FileIO { + private static final Logger LOG = LoggerFactory.getLogger(FileIO.class); /** @@ -389,6 +390,7 @@ public static Write writeDynamic() { /** A utility class for accessing a potentially compressed file. */ public static final class ReadableFile { + private final MatchResult.Metadata metadata; private final Compression compression; @@ -427,16 +429,32 @@ public SeekableByteChannel openSeekable() throws IOException { return (SeekableByteChannel) open(); } - /** Returns the full contents of the file as bytes. */ - public byte[] readFullyAsBytes() throws IOException { + /** Returns the contents of the file as bytes with skipping {@code skipLines}. */ + public byte[] readFullyAsBytes(int skipLines) throws IOException { try (InputStream stream = Channels.newInputStream(open())) { + int count = 0; + int r; + while ((count < skipLines) && ((r = stream.read()) != -1)) { + if (r == '\n') { + count++; + } + } + return StreamUtils.getBytesWithoutClosing(stream); } } /** Returns the full contents of the file as a {@link String} decoded as UTF-8. */ public String readFullyAsUTF8String() throws IOException { - return new String(readFullyAsBytes(), StandardCharsets.UTF_8); + return new String(readFullyAsBytes(0), StandardCharsets.UTF_8); + } + + /** + * Returns the contents of the file as {@link String} decoded as UTF-8 with skipping {@code + * skipLines}. + */ + public String readFullyAsUTF8String(int skipLines) throws IOException { + return new String(readFullyAsBytes(skipLines), StandardCharsets.UTF_8); } @Override @@ -468,6 +486,7 @@ public int hashCode() { */ @AutoValue public abstract static class MatchConfiguration implements HasDisplayData, Serializable { + /** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */ public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) { return new AutoValue_FileIO_MatchConfiguration.Builder() @@ -488,6 +507,7 @@ public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) @AutoValue.Builder abstract static class Builder { + abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment); abstract Builder setMatchUpdatedFiles(boolean matchUpdatedFiles); @@ -570,6 +590,7 @@ public abstract static class Match extends PTransform filepattern); abstract Builder setConfiguration(MatchConfiguration configuration); @@ -643,12 +664,14 @@ public void populateDisplayData(DisplayData.Builder builder) { @AutoValue public abstract static class MatchAll extends PTransform, PCollection> { + abstract MatchConfiguration getConfiguration(); abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { + abstract Builder setConfiguration(MatchConfiguration configuration); abstract MatchAll build(); @@ -716,6 +739,7 @@ private Watch.Growth createWatchTrans } private static class MatchFn extends DoFn { + private final EmptyMatchTreatment emptyMatchTreatment; public MatchFn(EmptyMatchTreatment emptyMatchTreatment) { @@ -734,6 +758,7 @@ public void process(ProcessContext c) throws Exception { } private static class MatchPollFn extends PollFn { + @Override public Watch.Growth.PollResult apply(String element, Context c) throws Exception { @@ -746,6 +771,7 @@ public Watch.Growth.PollResult apply(String element, Conte private static class ExtractFilenameFn implements SerializableFunction { + @Override public String apply(MatchResult.Metadata input) { return input.resourceId().toString(); @@ -754,6 +780,7 @@ public String apply(MatchResult.Metadata input) { private static class ExtractFilenameAndLastUpdateFn implements SerializableFunction> { + @Override public KV apply(MatchResult.Metadata input) throws RuntimeException { long timestamp = input.lastModifiedMillis(); @@ -769,6 +796,7 @@ public KV apply(MatchResult.Metadata input) throws RuntimeExceptio @AutoValue public abstract static class ReadMatches extends PTransform, PCollection> { + /** Enum to control how directories are handled. */ public enum DirectoryTreatment { SKIP, @@ -783,6 +811,7 @@ public enum DirectoryTreatment { @AutoValue.Builder abstract static class Builder { + abstract Builder setCompression(Compression compression); abstract Builder setDirectoryTreatment(DirectoryTreatment directoryTreatment); @@ -866,6 +895,7 @@ static ReadableFile matchToReadableFile( } private static class ToReadableFileFn extends DoFn { + private final ReadMatches spec; private ToReadableFileFn(ReadMatches spec) { @@ -888,6 +918,7 @@ public void process(ProcessContext c) { * FileIO#writeDynamic}. A new instance of {@link Sink} is created for every file being written. */ public interface Sink extends Serializable { + /** * Initializes writing to the given channel. Will be invoked once on a given {@link Sink} * instance. @@ -908,8 +939,10 @@ public interface Sink extends Serializable { @AutoValue public abstract static class Write extends PTransform, WriteFilesResult> { + /** A policy for generating names for shard files. */ public interface FileNaming extends Serializable { + /** * Generates the filename. MUST use each argument and return different values for each * combination of the arguments. @@ -1020,6 +1053,7 @@ public static FileNaming relativeFileNaming( @AutoValue.Builder abstract static class Builder { + abstract Builder setDynamic(boolean dynamic); abstract Builder setSinkFn(Contextful>> sink); @@ -1418,6 +1452,7 @@ private Collection> getAllSideInputs() { private static class ViaFileBasedSink extends FileBasedSink { + private final Write spec; private ViaFileBasedSink(Write spec) { @@ -1472,6 +1507,7 @@ protected void finishWrite() throws Exception { private static class DynamicDestinationsAdapter extends DynamicDestinations { + private final Write spec; private transient Fn.@Nullable Context context; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java index b87c9caa1244..e217f1ea672b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -43,6 +43,7 @@ import java.util.zip.GZIPOutputStream; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.FileIO.ReadableFile; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -70,481 +71,573 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.experimental.runners.Enclosed; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; /** Tests for {@link FileIO}. */ -@RunWith(JUnit4.class) +@RunWith(Enclosed.class) public class FileIOTest implements Serializable { - @Rule public transient TestPipeline p = TestPipeline.create(); - - @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Rule public transient ExpectedException thrown = ExpectedException.none(); - - @Rule public transient Timeout globalTimeout = Timeout.seconds(1200); - - @Test - @Category(NeedsRunner.class) - public void testMatchAndMatchAll() throws IOException { - Path firstPath = tmpFolder.newFile("first").toPath(); - Path secondPath = tmpFolder.newFile("second").toPath(); - int firstSize = 37; - int secondSize = 42; - long firstModified = 1541097000L; - long secondModified = 1541098000L; - Files.write(firstPath, new byte[firstSize]); - Files.write(secondPath, new byte[secondSize]); - Files.setLastModifiedTime(firstPath, FileTime.fromMillis(firstModified)); - Files.setLastModifiedTime(secondPath, FileTime.fromMillis(secondModified)); - MatchResult.Metadata firstMetadata = metadata(firstPath, firstSize, firstModified); - MatchResult.Metadata secondMetadata = metadata(secondPath, secondSize, secondModified); - - PAssert.that( - p.apply( - "Match existing", - FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*"))) - .containsInAnyOrder(firstMetadata, secondMetadata); - PAssert.that( - p.apply( - "Match existing with provider", - FileIO.match() - .filepattern(p.newProvider(tmpFolder.getRoot().getAbsolutePath() + "/*")))) - .containsInAnyOrder(firstMetadata, secondMetadata); - PAssert.that( - p.apply("Create existing", Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*")) - .apply("MatchAll existing", FileIO.matchAll())) - .containsInAnyOrder(firstMetadata, secondMetadata); - - PAssert.that( - p.apply( - "Match non-existing ALLOW", - FileIO.match() - .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah") - .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))) - .empty(); - PAssert.that( - p.apply( - "Create non-existing", - Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah")) - .apply( - "MatchAll non-existing ALLOW", - FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))) - .empty(); - - PAssert.that( - p.apply( - "Match non-existing ALLOW_IF_WILDCARD", - FileIO.match() - .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah*") - .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD))) - .empty(); - PAssert.that( - p.apply( - "Create non-existing wildcard + explicit", - Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah*")) - .apply( - "MatchAll non-existing ALLOW_IF_WILDCARD", - FileIO.matchAll() - .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD))) - .empty(); - PAssert.that( - p.apply( - "Create non-existing wildcard + default", - Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah*")) - .apply("MatchAll non-existing default", FileIO.matchAll())) - .empty(); - - p.run(); - } - @Test - @Category(NeedsRunner.class) - public void testMatchDisallowEmptyDefault() throws IOException { - p.apply("Match", FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*")); + @RunWith(JUnit4.class) + public static class FileIOMatchTest { + + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + @Rule public transient Timeout globalTimeout = Timeout.seconds(1200); + + @Test + @Category(NeedsRunner.class) + public void testMatchAndMatchAll() throws IOException { + Path firstPath = tmpFolder.newFile("first").toPath(); + Path secondPath = tmpFolder.newFile("second").toPath(); + int firstSize = 37; + int secondSize = 42; + long firstModified = 1541097000L; + long secondModified = 1541098000L; + Files.write(firstPath, new byte[firstSize]); + Files.write(secondPath, new byte[secondSize]); + Files.setLastModifiedTime(firstPath, FileTime.fromMillis(firstModified)); + Files.setLastModifiedTime(secondPath, FileTime.fromMillis(secondModified)); + MatchResult.Metadata firstMetadata = metadata(firstPath, firstSize, firstModified); + MatchResult.Metadata secondMetadata = metadata(secondPath, secondSize, secondModified); + + PAssert.that( + p.apply( + "Match existing", + FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*"))) + .containsInAnyOrder(firstMetadata, secondMetadata); + PAssert.that( + p.apply( + "Match existing with provider", + FileIO.match() + .filepattern(p.newProvider(tmpFolder.getRoot().getAbsolutePath() + "/*")))) + .containsInAnyOrder(firstMetadata, secondMetadata); + PAssert.that( + p.apply("Create existing", Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*")) + .apply("MatchAll existing", FileIO.matchAll())) + .containsInAnyOrder(firstMetadata, secondMetadata); + + PAssert.that( + p.apply( + "Match non-existing ALLOW", + FileIO.match() + .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah") + .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))) + .empty(); + PAssert.that( + p.apply( + "Create non-existing", + Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah")) + .apply( + "MatchAll non-existing ALLOW", + FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))) + .empty(); + + PAssert.that( + p.apply( + "Match non-existing ALLOW_IF_WILDCARD", + FileIO.match() + .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah*") + .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD))) + .empty(); + PAssert.that( + p.apply( + "Create non-existing wildcard + explicit", + Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah*")) + .apply( + "MatchAll non-existing ALLOW_IF_WILDCARD", + FileIO.matchAll() + .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD))) + .empty(); + PAssert.that( + p.apply( + "Create non-existing wildcard + default", + Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah*")) + .apply("MatchAll non-existing default", FileIO.matchAll())) + .empty(); + + p.run(); + } - thrown.expectCause(isA(FileNotFoundException.class)); - p.run(); - } + @Test + @Category(NeedsRunner.class) + public void testMatchDisallowEmptyDefault() throws IOException { + p.apply("Match", FileIO.match().filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*")); - @Test - @Category(NeedsRunner.class) - public void testMatchDisallowEmptyExplicit() throws IOException { - p.apply( - FileIO.match() - .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*") - .withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)); + thrown.expectCause(isA(FileNotFoundException.class)); + p.run(); + } - thrown.expectCause(isA(FileNotFoundException.class)); - p.run(); - } + @Test + @Category(NeedsRunner.class) + public void testMatchDisallowEmptyExplicit() throws IOException { + p.apply( + FileIO.match() + .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/*") + .withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)); - @Test - @Category(NeedsRunner.class) - public void testMatchDisallowEmptyNonWildcard() throws IOException { - p.apply( - FileIO.match() - .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah") - .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)); + thrown.expectCause(isA(FileNotFoundException.class)); + p.run(); + } - thrown.expectCause(isA(FileNotFoundException.class)); - p.run(); - } + @Test + @Category(NeedsRunner.class) + public void testMatchDisallowEmptyNonWildcard() throws IOException { + p.apply( + FileIO.match() + .filepattern(tmpFolder.getRoot().getAbsolutePath() + "/blah") + .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)); - @Test - @Category(NeedsRunner.class) - public void testMatchAllDisallowEmptyExplicit() throws IOException { - p.apply(Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*")) - .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)); - thrown.expectCause(isA(FileNotFoundException.class)); - p.run(); - } + thrown.expectCause(isA(FileNotFoundException.class)); + p.run(); + } - @Test - @Category(NeedsRunner.class) - public void testMatchAllDisallowEmptyNonWildcard() throws IOException { - p.apply(Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah")) - .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)); - thrown.expectCause(isA(FileNotFoundException.class)); - p.run(); - } + @Test + @Category(NeedsRunner.class) + public void testMatchAllDisallowEmptyExplicit() throws IOException { + p.apply(Create.of(tmpFolder.getRoot().getAbsolutePath() + "/*")) + .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)); + thrown.expectCause(isA(FileNotFoundException.class)); + p.run(); + } - /** DoFn that copy test files from source to watch path. */ - private static class CopyFilesFn - extends DoFn, MatchResult.Metadata> { - public CopyFilesFn(Path sourcePath, Path watchPath) { - this.sourcePathStr = sourcePath.toString(); - this.watchPathStr = watchPath.toString(); + @Test + @Category(NeedsRunner.class) + public void testMatchAllDisallowEmptyNonWildcard() throws IOException { + p.apply(Create.of(tmpFolder.getRoot().getAbsolutePath() + "/blah")) + .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)); + thrown.expectCause(isA(FileNotFoundException.class)); + p.run(); } - @StateId("count") - @SuppressWarnings("unused") - private final StateSpec> countSpec = StateSpecs.value(VarIntCoder.of()); - - @ProcessElement - public void processElement(ProcessContext context, @StateId("count") ValueState count) - throws IOException, InterruptedException { - int current = firstNonNull(count.read(), 0); - // unpack value as output - context.output(Objects.requireNonNull(context.element()).getValue()); - - CopyOption[] cpOptions = {StandardCopyOption.COPY_ATTRIBUTES}; - CopyOption[] updOptions = {StandardCopyOption.REPLACE_EXISTING}; - final Path sourcePath = Paths.get(sourcePathStr); - final Path watchPath = Paths.get(watchPathStr); - - if (0 == current) { - Thread.sleep(100); - Files.copy(sourcePath.resolve("first"), watchPath.resolve("first"), updOptions); - Files.copy(sourcePath.resolve("second"), watchPath.resolve("second"), cpOptions); - } else if (1 == current) { - Thread.sleep(100); - Files.copy(sourcePath.resolve("first"), watchPath.resolve("first"), updOptions); - Files.copy(sourcePath.resolve("second"), watchPath.resolve("second"), updOptions); - Files.copy(sourcePath.resolve("third"), watchPath.resolve("third"), cpOptions); + /** DoFn that copy test files from source to watch path. */ + private static class CopyFilesFn + extends DoFn, MatchResult.Metadata> { + + public CopyFilesFn(Path sourcePath, Path watchPath) { + this.sourcePathStr = sourcePath.toString(); + this.watchPathStr = watchPath.toString(); + } + + @StateId("count") + @SuppressWarnings("unused") + private final StateSpec> countSpec = StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + ProcessContext context, @StateId("count") ValueState count) + throws IOException, InterruptedException { + int current = firstNonNull(count.read(), 0); + // unpack value as output + context.output(Objects.requireNonNull(context.element()).getValue()); + + CopyOption[] cpOptions = {StandardCopyOption.COPY_ATTRIBUTES}; + CopyOption[] updOptions = {StandardCopyOption.REPLACE_EXISTING}; + final Path sourcePath = Paths.get(sourcePathStr); + final Path watchPath = Paths.get(watchPathStr); + + if (0 == current) { + Thread.sleep(100); + Files.copy(sourcePath.resolve("first"), watchPath.resolve("first"), updOptions); + Files.copy(sourcePath.resolve("second"), watchPath.resolve("second"), cpOptions); + } else if (1 == current) { + Thread.sleep(100); + Files.copy(sourcePath.resolve("first"), watchPath.resolve("first"), updOptions); + Files.copy(sourcePath.resolve("second"), watchPath.resolve("second"), updOptions); + Files.copy(sourcePath.resolve("third"), watchPath.resolve("third"), cpOptions); + } + count.write(current + 1); } - count.write(current + 1); + + // Member variables need to be serializable. + private final String sourcePathStr; + private final String watchPathStr; } - // Member variables need to be serializable. - private final String sourcePathStr; - private final String watchPathStr; - } + @Test + @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class}) + public void testMatchWatchForNewFiles() throws IOException, InterruptedException { + // Write some files to a "source" directory. + final Path sourcePath = tmpFolder.getRoot().toPath().resolve("source"); + sourcePath.toFile().mkdir(); + Files.write(sourcePath.resolve("first"), new byte[42]); + Files.write(sourcePath.resolve("second"), new byte[37]); + Files.write(sourcePath.resolve("third"), new byte[99]); + + // Create a "watch" directory that the pipeline will copy files into. + final Path watchPath = tmpFolder.getRoot().toPath().resolve("watch"); + watchPath.toFile().mkdir(); + PCollection matchMetadata = + p.apply( + "match filename through match", + FileIO.match() + .filepattern(watchPath.resolve("*").toString()) + .continuously( + Duration.millis(100), + Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)))); + PCollection matchAllMetadata = + p.apply("create for matchAll new files", Create.of(watchPath.resolve("*").toString())) + .apply( + "match filename through matchAll", + FileIO.matchAll() + .continuously( + Duration.millis(100), + Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)))); + PCollection matchUpdatedMetadata = + p.apply( + "match updated", + FileIO.match() + .filepattern(watchPath.resolve("first").toString()) + .continuously( + Duration.millis(100), + Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)), + true)); + PCollection matchAllUpdatedMetadata = + p.apply("create for matchAll updated files", Create.of(watchPath.resolve("*").toString())) + .apply( + "matchAll updated", + FileIO.matchAll() + .continuously( + Duration.millis(100), + Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)), + true)); + + // write one file at the beginning. This will trigger the first output for matchAll + Files.copy( + sourcePath.resolve("first"), + watchPath.resolve("first"), + new StandardCopyOption[] {StandardCopyOption.COPY_ATTRIBUTES}); + + // using matchMetadata outputs to trigger file copy on-the-fly + matchMetadata = + matchMetadata + .apply( + MapElements.into( + TypeDescriptors.kvs( + TypeDescriptors.strings(), + TypeDescriptor.of(MatchResult.Metadata.class))) + .via((metadata) -> KV.of("dumb key", metadata))) + .apply(ParDo.of(new CopyFilesFn(sourcePath, watchPath))); + + assertEquals(PCollection.IsBounded.UNBOUNDED, matchMetadata.isBounded()); + assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllMetadata.isBounded()); + assertEquals(PCollection.IsBounded.UNBOUNDED, matchUpdatedMetadata.isBounded()); + assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllUpdatedMetadata.isBounded()); + + // We fetch lastModifiedTime from the files in the "source" directory to avoid a race + // condition + // with the writer thread. + List expectedMatchNew = + Arrays.asList( + metadata( + watchPath.resolve("first"), 42, lastModifiedMillis(sourcePath.resolve("first"))), + metadata( + watchPath.resolve("second"), + 37, + lastModifiedMillis(sourcePath.resolve("second"))), + metadata( + watchPath.resolve("third"), 99, lastModifiedMillis(sourcePath.resolve("third")))); + PAssert.that(matchMetadata).containsInAnyOrder(expectedMatchNew); + PAssert.that(matchAllMetadata).containsInAnyOrder(expectedMatchNew); + + List expectedMatchUpdated = Arrays.asList("first", "first", "first"); + PCollection matchUpdatedCount = + matchUpdatedMetadata.apply( + "pick up match file name", + MapElements.into(TypeDescriptors.strings()) + .via((metadata) -> metadata.resourceId().getFilename())); + PAssert.that(matchUpdatedCount).containsInAnyOrder(expectedMatchUpdated); + + // Check watch for file updates. Compare only filename since modified time of copied files are + // uncontrolled. + List expectedMatchAllUpdated = + Arrays.asList("first", "first", "first", "second", "second", "third"); + PCollection matchAllUpdatedCount = + matchAllUpdatedMetadata.apply( + "pick up matchAll file names", + MapElements.into(TypeDescriptors.strings()) + .via((metadata) -> metadata.resourceId().getFilename())); + PAssert.that(matchAllUpdatedCount).containsInAnyOrder(expectedMatchAllUpdated); + + p.run(); + } - @Test - @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class}) - public void testMatchWatchForNewFiles() throws IOException, InterruptedException { - // Write some files to a "source" directory. - final Path sourcePath = tmpFolder.getRoot().toPath().resolve("source"); - sourcePath.toFile().mkdir(); - Files.write(sourcePath.resolve("first"), new byte[42]); - Files.write(sourcePath.resolve("second"), new byte[37]); - Files.write(sourcePath.resolve("third"), new byte[99]); - - // Create a "watch" directory that the pipeline will copy files into. - final Path watchPath = tmpFolder.getRoot().toPath().resolve("watch"); - watchPath.toFile().mkdir(); - PCollection matchMetadata = - p.apply( - "match filename through match", - FileIO.match() - .filepattern(watchPath.resolve("*").toString()) - .continuously( - Duration.millis(100), - Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)))); - PCollection matchAllMetadata = - p.apply("create for matchAll new files", Create.of(watchPath.resolve("*").toString())) - .apply( - "match filename through matchAll", - FileIO.matchAll() - .continuously( - Duration.millis(100), - Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)))); - PCollection matchUpdatedMetadata = - p.apply( - "match updated", - FileIO.match() - .filepattern(watchPath.resolve("first").toString()) - .continuously( - Duration.millis(100), - Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)), - true)); - PCollection matchAllUpdatedMetadata = - p.apply("create for matchAll updated files", Create.of(watchPath.resolve("*").toString())) - .apply( - "matchAll updated", - FileIO.matchAll() - .continuously( - Duration.millis(100), - Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)), - true)); - - // write one file at the beginning. This will trigger the first output for matchAll - Files.copy( - sourcePath.resolve("first"), - watchPath.resolve("first"), - new StandardCopyOption[] {StandardCopyOption.COPY_ATTRIBUTES}); - - // using matchMetadata outputs to trigger file copy on-the-fly - matchMetadata = - matchMetadata - .apply( - MapElements.into( - TypeDescriptors.kvs( - TypeDescriptors.strings(), - TypeDescriptor.of(MatchResult.Metadata.class))) - .via((metadata) -> KV.of("dumb key", metadata))) - .apply(ParDo.of(new CopyFilesFn(sourcePath, watchPath))); - - assertEquals(PCollection.IsBounded.UNBOUNDED, matchMetadata.isBounded()); - assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllMetadata.isBounded()); - assertEquals(PCollection.IsBounded.UNBOUNDED, matchUpdatedMetadata.isBounded()); - assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllUpdatedMetadata.isBounded()); - - // We fetch lastModifiedTime from the files in the "source" directory to avoid a race condition - // with the writer thread. - List expectedMatchNew = - Arrays.asList( - metadata( - watchPath.resolve("first"), 42, lastModifiedMillis(sourcePath.resolve("first"))), - metadata( - watchPath.resolve("second"), 37, lastModifiedMillis(sourcePath.resolve("second"))), - metadata( - watchPath.resolve("third"), 99, lastModifiedMillis(sourcePath.resolve("third")))); - PAssert.that(matchMetadata).containsInAnyOrder(expectedMatchNew); - PAssert.that(matchAllMetadata).containsInAnyOrder(expectedMatchNew); - - List expectedMatchUpdated = Arrays.asList("first", "first", "first"); - PCollection matchUpdatedCount = - matchUpdatedMetadata.apply( - "pick up match file name", - MapElements.into(TypeDescriptors.strings()) - .via((metadata) -> metadata.resourceId().getFilename())); - PAssert.that(matchUpdatedCount).containsInAnyOrder(expectedMatchUpdated); - - // Check watch for file updates. Compare only filename since modified time of copied files are - // uncontrolled. - List expectedMatchAllUpdated = - Arrays.asList("first", "first", "first", "second", "second", "third"); - PCollection matchAllUpdatedCount = - matchAllUpdatedMetadata.apply( - "pick up matchAll file names", - MapElements.into(TypeDescriptors.strings()) - .via((metadata) -> metadata.resourceId().getFilename())); - PAssert.that(matchAllUpdatedCount).containsInAnyOrder(expectedMatchAllUpdated); - - p.run(); + private static MatchResult.Metadata metadata(Path path, int size, long lastModifiedMillis) { + return MatchResult.Metadata.builder() + .setResourceId(FileSystems.matchNewResource(path.toString(), false /* isDirectory */)) + .setIsReadSeekEfficient(true) + .setSizeBytes(size) + .setLastModifiedMillis(lastModifiedMillis) + .build(); + } + + private static long lastModifiedMillis(Path path) throws IOException { + return Files.getLastModifiedTime(path).toMillis(); + } } - @Test - @Category(NeedsRunner.class) - public void testRead() throws IOException { - final String path = tmpFolder.newFile("file").getAbsolutePath(); - final String pathGZ = tmpFolder.newFile("file.gz").getAbsolutePath(); - Files.write(new File(path).toPath(), "Hello world".getBytes(Charsets.UTF_8)); - try (Writer writer = - new OutputStreamWriter( - new GZIPOutputStream(new FileOutputStream(pathGZ)), Charsets.UTF_8)) { - writer.write("Hello world"); + @RunWith(JUnit4.class) + public static class FileIOReadTest { + + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + @Category(NeedsRunner.class) + public void testRead() throws IOException { + final String path = tmpFolder.newFile("file").getAbsolutePath(); + final String pathGZ = tmpFolder.newFile("file.gz").getAbsolutePath(); + Files.write(new File(path).toPath(), "Hello world".getBytes(Charsets.UTF_8)); + try (Writer writer = + new OutputStreamWriter( + new GZIPOutputStream(new FileOutputStream(pathGZ)), Charsets.UTF_8)) { + writer.write("Hello world"); + } + + PCollection matches = + p.apply("Match", FileIO.match().filepattern(path)); + PCollection decompressedAuto = + matches.apply("Read AUTO", FileIO.readMatches().withCompression(Compression.AUTO)); + PCollection decompressedDefault = + matches.apply("Read default", FileIO.readMatches()); + PCollection decompressedUncompressed = + matches.apply( + "Read UNCOMPRESSED", FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)); + for (PCollection c : + Arrays.asList(decompressedAuto, decompressedDefault, decompressedUncompressed)) { + PAssert.thatSingleton(c) + .satisfies( + input -> { + assertEquals(path, input.getMetadata().resourceId().toString()); + assertEquals("Hello world".length(), input.getMetadata().sizeBytes()); + assertEquals(Compression.UNCOMPRESSED, input.getCompression()); + assertTrue(input.getMetadata().isReadSeekEfficient()); + try { + assertEquals("Hello world", input.readFullyAsUTF8String()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + }); + } + + PCollection matchesGZ = + p.apply("Match GZ", FileIO.match().filepattern(pathGZ)); + PCollection compressionAuto = + matchesGZ.apply("Read GZ AUTO", FileIO.readMatches().withCompression(Compression.AUTO)); + PCollection compressionDefault = + matchesGZ.apply("Read GZ default", FileIO.readMatches()); + PCollection compressionGzip = + matchesGZ.apply("Read GZ GZIP", FileIO.readMatches().withCompression(Compression.GZIP)); + for (PCollection c : + Arrays.asList(compressionAuto, compressionDefault, compressionGzip)) { + PAssert.thatSingleton(c) + .satisfies( + input -> { + assertEquals(pathGZ, input.getMetadata().resourceId().toString()); + assertFalse(input.getMetadata().sizeBytes() == "Hello world".length()); + assertEquals(Compression.GZIP, input.getCompression()); + assertFalse(input.getMetadata().isReadSeekEfficient()); + try { + assertEquals("Hello world", input.readFullyAsUTF8String()); + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + }); + } + + p.run(); } + } + + @RunWith(Parameterized.class) + public static class FileIOReadWithSkipLinesTest { + + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static final String INPUT = "Hello world\nApache Beam\nData Processing"; - PCollection matches = p.apply("Match", FileIO.match().filepattern(path)); - PCollection decompressedAuto = - matches.apply("Read AUTO", FileIO.readMatches().withCompression(Compression.AUTO)); - PCollection decompressedDefault = - matches.apply("Read default", FileIO.readMatches()); - PCollection decompressedUncompressed = - matches.apply( - "Read UNCOMPRESSED", FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)); - for (PCollection c : - Arrays.asList(decompressedAuto, decompressedDefault, decompressedUncompressed)) { - PAssert.thatSingleton(c) - .satisfies( - input -> { - assertEquals(path, input.getMetadata().resourceId().toString()); - assertEquals("Hello world".length(), input.getMetadata().sizeBytes()); - assertEquals(Compression.UNCOMPRESSED, input.getCompression()); - assertTrue(input.getMetadata().isReadSeekEfficient()); - try { - assertEquals("Hello world", input.readFullyAsUTF8String()); - } catch (IOException e) { - throw new RuntimeException(e); - } - return null; - }); + @Parameterized.Parameters(name = "skipLines {1}") + public static Iterable data() { + return ImmutableList.builder() + .add(new Object[] {INPUT, 0, "Hello world\nApache Beam\nData Processing"}) + .add(new Object[] {INPUT, 1, "Apache Beam\nData Processing"}) + .add(new Object[] {INPUT, 2, "Data Processing"}) + .add(new Object[] {INPUT, 3, ""}) + .add(new Object[] {INPUT, 4, ""}) + .build(); } - PCollection matchesGZ = - p.apply("Match GZ", FileIO.match().filepattern(pathGZ)); - PCollection compressionAuto = - matchesGZ.apply("Read GZ AUTO", FileIO.readMatches().withCompression(Compression.AUTO)); - PCollection compressionDefault = - matchesGZ.apply("Read GZ default", FileIO.readMatches()); - PCollection compressionGzip = - matchesGZ.apply("Read GZ GZIP", FileIO.readMatches().withCompression(Compression.GZIP)); - for (PCollection c : - Arrays.asList(compressionAuto, compressionDefault, compressionGzip)) { - PAssert.thatSingleton(c) - .satisfies( - input -> { - assertEquals(pathGZ, input.getMetadata().resourceId().toString()); - assertFalse(input.getMetadata().sizeBytes() == "Hello world".length()); - assertEquals(Compression.GZIP, input.getCompression()); - assertFalse(input.getMetadata().isReadSeekEfficient()); - try { - assertEquals("Hello world", input.readFullyAsUTF8String()); - } catch (IOException e) { - throw new RuntimeException(e); - } - return null; - }); + @Parameterized.Parameter(0) + public String input; + + @Parameterized.Parameter(1) + public int skipLines; + + @Parameterized.Parameter(2) + public String expected; + + @Test + @Category(NeedsRunner.class) + public void testReadWithSkipLines() throws IOException { + final String path = tmpFolder.newFile("file").getAbsolutePath(); + Files.write(new File(path).toPath(), input.getBytes(Charsets.UTF_8)); + + PCollection c = + p.apply("Match", FileIO.match().filepattern(path)) + .apply("readMatches", FileIO.readMatches()) + .apply("readWithSkipLines", ParDo.of(new ReadWithSkipLinesFn(skipLines))); + + PAssert.that(c).containsInAnyOrder(expected); + + p.run(); } - p.run(); - } + private static class ReadWithSkipLinesFn extends DoFn { - private static MatchResult.Metadata metadata(Path path, int size, long lastModifiedMillis) { - return MatchResult.Metadata.builder() - .setResourceId(FileSystems.matchNewResource(path.toString(), false /* isDirectory */)) - .setIsReadSeekEfficient(true) - .setSizeBytes(size) - .setLastModifiedMillis(lastModifiedMillis) - .build(); - } + private final int skipLines; - private static long lastModifiedMillis(Path path) throws IOException { - return Files.getLastModifiedTime(path).toMillis(); - } + public ReadWithSkipLinesFn(int skipLines) { + this.skipLines = skipLines; + } - private static FileIO.Write.FileNaming resolveFileNaming(FileIO.Write write) - throws Exception { - return write.resolveFileNamingFn().getClosure().apply(null, null); + @ProcessElement + public void process(@Element ReadableFile element, OutputReceiver receiver) + throws IOException { + String result = element.readFullyAsUTF8String(skipLines); + receiver.output(result); + } + } } - private static String getDefaultFileName(FileIO.Write write) throws Exception { - return resolveFileNaming(write).getFilename(null, null, 0, 0, null); - } + @RunWith(JUnit4.class) + public static class FileIOWriteTest { + + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testFilenameFnResolution() throws Exception { + FileIO.Write.FileNaming foo = (window, pane, numShards, shardIndex, compression) -> "foo"; + + String expected = + FileSystems.matchNewResource("test", true).resolve("foo", RESOLVE_FILE).toString(); + assertEquals( + "Filenames should be resolved within a relative directory if '.to' is invoked", + expected, + getDefaultFileName(FileIO.writeDynamic().to("test").withNaming(o -> foo))); + assertEquals( + "Filenames should be resolved within a relative directory if '.to' is invoked", + expected, + getDefaultFileName(FileIO.write().to("test").withNaming(foo))); + + assertEquals( + "Filenames should be resolved as the direct result of the filenaming function if '.to' " + + "is not invoked", + "foo", + getDefaultFileName(FileIO.writeDynamic().withNaming(o -> foo))); + assertEquals( + "Filenames should be resolved as the direct result of the filenaming function if '.to' " + + "is not invoked", + "foo", + getDefaultFileName(FileIO.write().withNaming(foo))); + + assertEquals( + "Default to the defaultNaming if a filenaming isn't provided for a non-dynamic write", + "output-00000-of-00000", + resolveFileNaming(FileIO.write()) + .getFilename( + GlobalWindow.INSTANCE, + PaneInfo.ON_TIME_AND_ONLY_FIRING, + 0, + 0, + Compression.UNCOMPRESSED)); + + assertEquals( + "Default Naming should take prefix and suffix into account if provided", + "foo-00000-of-00000.bar", + resolveFileNaming(FileIO.write().withPrefix("foo").withSuffix(".bar")) + .getFilename( + GlobalWindow.INSTANCE, + PaneInfo.ON_TIME_AND_ONLY_FIRING, + 0, + 0, + Compression.UNCOMPRESSED)); + + assertEquals( + "Filenames should be resolved within a relative directory if '.to' is invoked, " + + "even with default naming", + FileSystems.matchNewResource("test", true) + .resolve("output-00000-of-00000", RESOLVE_FILE) + .toString(), + resolveFileNaming(FileIO.write().to("test")) + .getFilename( + GlobalWindow.INSTANCE, + PaneInfo.ON_TIME_AND_ONLY_FIRING, + 0, + 0, + Compression.UNCOMPRESSED)); + } - @Test - public void testFilenameFnResolution() throws Exception { - FileIO.Write.FileNaming foo = (window, pane, numShards, shardIndex, compression) -> "foo"; - - String expected = - FileSystems.matchNewResource("test", true).resolve("foo", RESOLVE_FILE).toString(); - assertEquals( - "Filenames should be resolved within a relative directory if '.to' is invoked", - expected, - getDefaultFileName(FileIO.writeDynamic().to("test").withNaming(o -> foo))); - assertEquals( - "Filenames should be resolved within a relative directory if '.to' is invoked", - expected, - getDefaultFileName(FileIO.write().to("test").withNaming(foo))); - - assertEquals( - "Filenames should be resolved as the direct result of the filenaming function if '.to' " - + "is not invoked", - "foo", - getDefaultFileName(FileIO.writeDynamic().withNaming(o -> foo))); - assertEquals( - "Filenames should be resolved as the direct result of the filenaming function if '.to' " - + "is not invoked", - "foo", - getDefaultFileName(FileIO.write().withNaming(foo))); - - assertEquals( - "Default to the defaultNaming if a filenaming isn't provided for a non-dynamic write", - "output-00000-of-00000", - resolveFileNaming(FileIO.write()) - .getFilename( - GlobalWindow.INSTANCE, - PaneInfo.ON_TIME_AND_ONLY_FIRING, - 0, - 0, - Compression.UNCOMPRESSED)); - - assertEquals( - "Default Naming should take prefix and suffix into account if provided", - "foo-00000-of-00000.bar", - resolveFileNaming(FileIO.write().withPrefix("foo").withSuffix(".bar")) - .getFilename( - GlobalWindow.INSTANCE, - PaneInfo.ON_TIME_AND_ONLY_FIRING, - 0, - 0, - Compression.UNCOMPRESSED)); - - assertEquals( - "Filenames should be resolved within a relative directory if '.to' is invoked, " - + "even with default naming", - FileSystems.matchNewResource("test", true) - .resolve("output-00000-of-00000", RESOLVE_FILE) - .toString(), - resolveFileNaming(FileIO.write().to("test")) - .getFilename( - GlobalWindow.INSTANCE, - PaneInfo.ON_TIME_AND_ONLY_FIRING, - 0, - 0, - Compression.UNCOMPRESSED)); - } + @Test + @Category(NeedsRunner.class) + public void testFileIoDynamicNaming() throws IOException { + // Test for BEAM-6407. + + String outputFileName = tmpFolder.newFile().getAbsolutePath(); + PCollectionView outputFileNameView = + p.apply("outputFileName", Create.of(outputFileName)).apply(View.asSingleton()); + + Contextful.Fn fileNaming = + (element, c) -> + (window, pane, numShards, shardIndex, compression) -> + c.sideInput(outputFileNameView) + "-" + shardIndex; + + p.apply(Create.of("")) + .apply( + "WriteDynamicFilename", + FileIO.writeDynamic() + .by(SerializableFunctions.constant("")) + .withDestinationCoder(StringUtf8Coder.of()) + .via(TextIO.sink()) + .withTempDirectory(tmpFolder.newFolder().getAbsolutePath()) + .withNaming( + Contextful.of( + fileNaming, Requirements.requiresSideInputs(outputFileNameView)))); + + // We need to run the TestPipeline with the default options. + p.run(PipelineOptionsFactory.create()).waitUntilFinish(); + assertTrue( + "Output file shard 0 exists after pipeline completes", + new File(outputFileName + "-0").exists()); + } - @Test - @Category(NeedsRunner.class) - public void testFileIoDynamicNaming() throws IOException { - // Test for BEAM-6407. - - String outputFileName = tmpFolder.newFile().getAbsolutePath(); - PCollectionView outputFileNameView = - p.apply("outputFileName", Create.of(outputFileName)).apply(View.asSingleton()); - - Contextful.Fn fileNaming = - (element, c) -> - (window, pane, numShards, shardIndex, compression) -> - c.sideInput(outputFileNameView) + "-" + shardIndex; - - p.apply(Create.of("")) - .apply( - "WriteDynamicFilename", - FileIO.writeDynamic() - .by(SerializableFunctions.constant("")) - .withDestinationCoder(StringUtf8Coder.of()) - .via(TextIO.sink()) - .withTempDirectory(tmpFolder.newFolder().getAbsolutePath()) - .withNaming( - Contextful.of( - fileNaming, Requirements.requiresSideInputs(outputFileNameView)))); - - // We need to run the TestPipeline with the default options. - p.run(PipelineOptionsFactory.create()).waitUntilFinish(); - assertTrue( - "Output file shard 0 exists after pipeline completes", - new File(outputFileName + "-0").exists()); + private static FileIO.Write.FileNaming resolveFileNaming(FileIO.Write write) + throws Exception { + return write.resolveFileNamingFn().getClosure().apply(null, null); + } + + private static String getDefaultFileName(FileIO.Write write) throws Exception { + return resolveFileNaming(write).getFilename(null, null, 0, 0, null); + } } }