Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add skipLines in FileIO.ReadableFile.readFullyAsUTF8String #28728

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class FileIO {

private static final Logger LOG = LoggerFactory.getLogger(FileIO.class);

/**
Expand Down Expand Up @@ -389,6 +390,7 @@ public static <DestT, InputT> Write<DestT, InputT> writeDynamic() {

/** A utility class for accessing a potentially compressed file. */
public static final class ReadableFile {

private final MatchResult.Metadata metadata;
private final Compression compression;

Expand Down Expand Up @@ -427,16 +429,32 @@ public SeekableByteChannel openSeekable() throws IOException {
return (SeekableByteChannel) open();
}

/** Returns the full contents of the file as bytes. */
public byte[] readFullyAsBytes() throws IOException {
/** Returns the contents of the file as bytes with skipping {@code skipLines}. */
public byte[] readFullyAsBytes(int skipLines) throws IOException {
try (InputStream stream = Channels.newInputStream(open())) {
int count = 0;
int r;
while ((count < skipLines) && ((r = stream.read()) != -1)) {
if (r == '\n') {
count++;
}
}

return StreamUtils.getBytesWithoutClosing(stream);
}
}

/** Returns the full contents of the file as a {@link String} decoded as UTF-8. */
public String readFullyAsUTF8String() throws IOException {
return new String(readFullyAsBytes(), StandardCharsets.UTF_8);
return new String(readFullyAsBytes(0), StandardCharsets.UTF_8);
}

/**
* Returns the contents of the file as {@link String} decoded as UTF-8 with skipping {@code
* skipLines}.
*/
public String readFullyAsUTF8String(int skipLines) throws IOException {
return new String(readFullyAsBytes(skipLines), StandardCharsets.UTF_8);
}

@Override
Expand Down Expand Up @@ -468,6 +486,7 @@ public int hashCode() {
*/
@AutoValue
public abstract static class MatchConfiguration implements HasDisplayData, Serializable {

/** Creates a {@link MatchConfiguration} with the given {@link EmptyMatchTreatment}. */
public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment) {
return new AutoValue_FileIO_MatchConfiguration.Builder()
Expand All @@ -488,6 +507,7 @@ public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment)

@AutoValue.Builder
abstract static class Builder {

abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);

abstract Builder setMatchUpdatedFiles(boolean matchUpdatedFiles);
Expand Down Expand Up @@ -570,6 +590,7 @@ public abstract static class Match extends PTransform<PBegin, PCollection<MatchR

@AutoValue.Builder
abstract static class Builder {

abstract Builder setFilepattern(ValueProvider<String> filepattern);

abstract Builder setConfiguration(MatchConfiguration configuration);
Expand Down Expand Up @@ -643,12 +664,14 @@ public void populateDisplayData(DisplayData.Builder builder) {
@AutoValue
public abstract static class MatchAll
extends PTransform<PCollection<String>, PCollection<MatchResult.Metadata>> {

abstract MatchConfiguration getConfiguration();

abstract Builder toBuilder();

@AutoValue.Builder
abstract static class Builder {

abstract Builder setConfiguration(MatchConfiguration configuration);

abstract MatchAll build();
Expand Down Expand Up @@ -716,6 +739,7 @@ private <KeyT> Watch.Growth<String, MatchResult.Metadata, KeyT> createWatchTrans
}

private static class MatchFn extends DoFn<String, MatchResult.Metadata> {

private final EmptyMatchTreatment emptyMatchTreatment;

public MatchFn(EmptyMatchTreatment emptyMatchTreatment) {
Expand All @@ -734,6 +758,7 @@ public void process(ProcessContext c) throws Exception {
}

private static class MatchPollFn extends PollFn<String, MatchResult.Metadata> {

@Override
public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Context c)
throws Exception {
Expand All @@ -746,6 +771,7 @@ public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Conte

private static class ExtractFilenameFn
implements SerializableFunction<MatchResult.Metadata, String> {

@Override
public String apply(MatchResult.Metadata input) {
return input.resourceId().toString();
Expand All @@ -754,6 +780,7 @@ public String apply(MatchResult.Metadata input) {

private static class ExtractFilenameAndLastUpdateFn
implements SerializableFunction<MatchResult.Metadata, KV<String, Long>> {

@Override
public KV<String, Long> apply(MatchResult.Metadata input) throws RuntimeException {
long timestamp = input.lastModifiedMillis();
Expand All @@ -769,6 +796,7 @@ public KV<String, Long> apply(MatchResult.Metadata input) throws RuntimeExceptio
@AutoValue
public abstract static class ReadMatches
extends PTransform<PCollection<MatchResult.Metadata>, PCollection<ReadableFile>> {

/** Enum to control how directories are handled. */
public enum DirectoryTreatment {
SKIP,
Expand All @@ -783,6 +811,7 @@ public enum DirectoryTreatment {

@AutoValue.Builder
abstract static class Builder {

abstract Builder setCompression(Compression compression);

abstract Builder setDirectoryTreatment(DirectoryTreatment directoryTreatment);
Expand Down Expand Up @@ -866,6 +895,7 @@ static ReadableFile matchToReadableFile(
}

private static class ToReadableFileFn extends DoFn<MatchResult.Metadata, ReadableFile> {

private final ReadMatches spec;

private ToReadableFileFn(ReadMatches spec) {
Expand All @@ -888,6 +918,7 @@ public void process(ProcessContext c) {
* FileIO#writeDynamic}. A new instance of {@link Sink} is created for every file being written.
*/
public interface Sink<ElementT> extends Serializable {

/**
* Initializes writing to the given channel. Will be invoked once on a given {@link Sink}
* instance.
Expand All @@ -908,8 +939,10 @@ public interface Sink<ElementT> extends Serializable {
@AutoValue
public abstract static class Write<DestinationT, UserT>
extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> {

/** A policy for generating names for shard files. */
public interface FileNaming extends Serializable {

/**
* Generates the filename. MUST use each argument and return different values for each
* combination of the arguments.
Expand Down Expand Up @@ -1020,6 +1053,7 @@ public static FileNaming relativeFileNaming(

@AutoValue.Builder
abstract static class Builder<DestinationT, UserT> {

abstract Builder<DestinationT, UserT> setDynamic(boolean dynamic);

abstract Builder<DestinationT, UserT> setSinkFn(Contextful<Fn<DestinationT, Sink<?>>> sink);
Expand Down Expand Up @@ -1418,6 +1452,7 @@ private Collection<PCollectionView<?>> getAllSideInputs() {

private static class ViaFileBasedSink<UserT, DestinationT, OutputT>
extends FileBasedSink<UserT, DestinationT, OutputT> {

private final Write<DestinationT, UserT> spec;

private ViaFileBasedSink(Write<DestinationT, UserT> spec) {
Expand Down Expand Up @@ -1472,6 +1507,7 @@ protected void finishWrite() throws Exception {

private static class DynamicDestinationsAdapter<UserT, DestinationT, OutputT>
extends DynamicDestinations<UserT, DestinationT, OutputT> {

private final Write<DestinationT, UserT> spec;
private transient Fn.@Nullable Context context;

Expand Down
Loading
Loading