From 528314d83d5c689913f51f6c017575265cb02ae1 Mon Sep 17 00:00:00 2001 From: Abhishek Modi Date: Wed, 20 Jul 2022 16:48:08 -0700 Subject: [PATCH 1/4] Adding support for readFully API --- .../fs/gcs/GhfsInputStreamStatistics.java | 11 ++++++ .../hadoop/fs/gcs/GhfsInstrumentation.java | 24 ++++++++++++ .../fs/gcs/GoogleHadoopFSInputStream.java | 32 +++++++++++++-- ...gleHadoopFSInputStreamIntegrationTest.java | 39 +++++++++++++++++++ 4 files changed, 103 insertions(+), 3 deletions(-) diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInputStreamStatistics.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInputStreamStatistics.java index c68b521502..7ed7ec6749 100644 --- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInputStreamStatistics.java +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInputStreamStatistics.java @@ -55,6 +55,14 @@ interface GhfsInputStreamStatistics extends AutoCloseable, GhfsStatisticInterfac */ void readOperationStarted(long pos, long len); + /** + * A {@code readFully(int position, byte[] buf, int off, int len)} operation has started. + * + * @param pos starting position of the read + * @param len length of bytes to read + */ + void readFullyOperationStarted(long pos, long len); + /** * A read operation has completed. * @@ -90,6 +98,9 @@ interface GhfsInputStreamStatistics extends AutoCloseable, GhfsStatisticInterfac /** The total number of times the read() operation in an input stream has been called. */ long getReadOperations(); + /** The total number of times the readFully() operation in an input stream has been called. */ + long getReadFullyOperations(); + /** The total number of Incomplete read() operations */ long getReadsIncomplete(); diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInstrumentation.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInstrumentation.java index ba02cc9bdd..48a592de43 100644 --- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInstrumentation.java +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInstrumentation.java @@ -470,6 +470,7 @@ private final class InputStreamStatistics extends AbstractGhfsStatisticsSource private final AtomicLong readExceptions; private final AtomicLong readsIncomplete; private final AtomicLong readOperations; + private final AtomicLong readFullyOperations; private final AtomicLong seekOperations; /** Bytes read by the application and any when draining streams . */ @@ -511,6 +512,8 @@ private InputStreamStatistics(@Nullable FileSystem.Statistics filesystemStatisti readsIncomplete = st.getCounterReference(StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE); readOperations = st.getCounterReference(StreamStatisticNames.STREAM_READ_OPERATIONS); + readFullyOperations = + st.getCounterReference(StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS); seekOperations = st.getCounterReference(StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS); totalBytesRead = st.getCounterReference(StreamStatisticNames.STREAM_READ_TOTAL_BYTES); setIOStatistics(st); @@ -599,6 +602,17 @@ public void readOperationStarted(long pos, long len) { readOperations.incrementAndGet(); } + /** + * A readFully() operation in the input stream has started. + * + * @param pos starting position of the read + * @param len length of bytes to read + */ + @Override + public void readFullyOperationStarted(long pos, long len) { + readFullyOperations.incrementAndGet(); + } + /** * If more data was requested than was actually returned, this was an incomplete read. Increment * {@link #readsIncomplete}. @@ -729,6 +743,16 @@ public long getReadOperations() { return lookupCounterValue(StreamStatisticNames.STREAM_READ_OPERATIONS); } + /** + * The total number of times the readFully() operation in an input stream has been called. + * + * @return the count of read operations. + */ + @Override + public long getReadFullyOperations() { + return lookupCounterValue(StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS); + } + /** * The total number of Incomplete read() operations * diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java index 9695d559bd..842609bb0f 100644 --- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java @@ -23,12 +23,14 @@ import com.google.cloud.hadoop.gcsio.FileInfo; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem; import com.google.common.flogger.GoogleLogger; +import java.io.EOFException; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SeekableByteChannel; import javax.annotation.Nonnull; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.statistics.IOStatistics; @@ -101,9 +103,7 @@ public synchronized int read() throws IOException { public synchronized int read(@Nonnull byte[] buf, int offset, int length) throws IOException { streamStatistics.readOperationStarted(getPos(), length); checkNotNull(buf, "buf must not be null"); - if (offset < 0 || length < 0 || length > buf.length - offset) { - throw new IndexOutOfBoundsException(); - } + validatePositionedReadArgs(getPos(), buf, offset, length); int response = 0; try { // TODO(user): Wrap this in a while-loop if we ever introduce a non-blocking mode for the @@ -124,6 +124,32 @@ public synchronized int read(@Nonnull byte[] buf, int offset, int length) throws return response; } + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + streamStatistics.readFullyOperationStarted(position, length); + checkNotNull(buffer, "buf must not be null"); + validatePositionedReadArgs(position, buffer, offset, length); + if (length == 0) { + return; + } + int nread = 0; + synchronized (this) { + long oldPos = getPos(); + try { + seek(position); + while (nread < length) { + int nbytes = read(buffer, offset + nread, length - nread); + if (nbytes < 0) { + throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); + } + nread += nbytes; + } + } finally { + seek(oldPos); + } + } + } + @Override public synchronized void seek(long pos) throws IOException { logger.atFiner().log("seek(%d)", pos); diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStreamIntegrationTest.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStreamIntegrationTest.java index bbb4647673..5e5ed9ed74 100644 --- a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStreamIntegrationTest.java +++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStreamIntegrationTest.java @@ -24,6 +24,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FileSystem; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -107,6 +108,44 @@ public void testAvailable() throws Exception { assertThrows(ClosedChannelException.class, in::available); } + @Test + public void testReadFully() throws Exception { + URI path = gcsFsIHelper.getUniqueObjectUri(getClass(), "testReadFully"); + GoogleHadoopFileSystem ghfs = + GoogleHadoopFileSystemIntegrationHelper.createGhfs( + path, GoogleHadoopFileSystemIntegrationHelper.getTestConfig()); + + String testContent = "test content"; + gcsFsIHelper.writeTextFile(path, testContent); + + byte[] value = new byte[5]; + byte[] expected = Arrays.copyOfRange(testContent.getBytes(StandardCharsets.UTF_8), 2, 7); + + GoogleHadoopFSInputStream in = createGhfsInputStream(ghfs, path); + try (GoogleHadoopFSInputStream ignore = in) { + in.readFully(2, value); + assertThat(in.getPos()).isEqualTo(0); + } + assertThat(value).isEqualTo(expected); + } + + @Test + public void testReadFully_illegalSize() throws Exception { + URI path = gcsFsIHelper.getUniqueObjectUri(getClass(), "testReadFully"); + GoogleHadoopFileSystem ghfs = + GoogleHadoopFileSystemIntegrationHelper.createGhfs( + path, GoogleHadoopFileSystemIntegrationHelper.getTestConfig()); + + String testContent = "test content"; + gcsFsIHelper.writeTextFile(path, testContent); + + byte[] value = new byte[20]; + + GoogleHadoopFSInputStream in = createGhfsInputStream(ghfs, path); + Throwable exception = assertThrows(EOFException.class, () -> in.readFully(2, value)); + assertThat(exception).hasMessageThat().contains(FSExceptionMessages.EOF_IN_READ_FULLY); + } + private static GoogleHadoopFSInputStream createGhfsInputStream( GoogleHadoopFileSystem ghfs, URI path) throws IOException { return GoogleHadoopFSInputStream.create( From 2bdf2d364e421be23cedab657a90614badbda0c2 Mon Sep 17 00:00:00 2001 From: Abhishek Modi Date: Wed, 20 Jul 2022 20:05:56 -0700 Subject: [PATCH 2/4] Addressing PR comments --- .../com/google/cloud/hadoop/fs/gcs/GhfsInstrumentation.java | 1 + .../cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInstrumentation.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInstrumentation.java index 48a592de43..66ac033f89 100644 --- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInstrumentation.java +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsInstrumentation.java @@ -490,6 +490,7 @@ private InputStreamStatistics(@Nullable FileSystem.Statistics filesystemStatisti StreamStatisticNames.STREAM_READ_BYTES, StreamStatisticNames.STREAM_READ_EXCEPTIONS, StreamStatisticNames.STREAM_READ_OPERATIONS, + StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS, StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE, StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS, StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS, diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java index 842609bb0f..1a221d2ea6 100644 --- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java @@ -145,7 +145,11 @@ public void readFully(long position, byte[] buffer, int offset, int length) thro nread += nbytes; } } finally { - seek(oldPos); + try { + seek(oldPos); + } catch (IOException ie) { + logger.atFiner().log("Ignoring IOE on seek to (%d): (%s)", oldPos, ie.getMessage()); + } } } } From 0d4d4d232c7afa07b44e3afff7522ba26dd263a7 Mon Sep 17 00:00:00 2001 From: Abhishek Modi Date: Thu, 21 Jul 2022 10:12:11 -0700 Subject: [PATCH 3/4] Fixing UTs --- .../google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java | 2 -- .../google/cloud/hadoop/fs/gcs/HadoopFileSystemTestBase.java | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java index 1a221d2ea6..c2fbc4439a 100644 --- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java @@ -102,7 +102,6 @@ public synchronized int read() throws IOException { @Override public synchronized int read(@Nonnull byte[] buf, int offset, int length) throws IOException { streamStatistics.readOperationStarted(getPos(), length); - checkNotNull(buf, "buf must not be null"); validatePositionedReadArgs(getPos(), buf, offset, length); int response = 0; try { @@ -127,7 +126,6 @@ public synchronized int read(@Nonnull byte[] buf, int offset, int length) throws @Override public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { streamStatistics.readFullyOperationStarted(position, length); - checkNotNull(buffer, "buf must not be null"); validatePositionedReadArgs(position, buffer, offset, length); if (length == 0) { return; diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/HadoopFileSystemTestBase.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/HadoopFileSystemTestBase.java index 7f67354521..833d0ecafc 100644 --- a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/HadoopFileSystemTestBase.java +++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/HadoopFileSystemTestBase.java @@ -356,13 +356,13 @@ public void testReadInvalidArgs() throws IOException { assertWithMessage("Expected exactly 1 byte to be read").that(numBytesRead).isEqualTo(1); // Null buffer. - testReadInvalidArgsHelper(readStream, null, 0, 1, NullPointerException.class); + testReadInvalidArgsHelper(readStream, null, 0, 1, IllegalArgumentException.class); // offset < 0 testReadInvalidArgsHelper(readStream, buffer, -1, 1, IndexOutOfBoundsException.class); // length < 0 - testReadInvalidArgsHelper(readStream, buffer, 0, -1, IndexOutOfBoundsException.class); + testReadInvalidArgsHelper(readStream, buffer, 0, -1, IllegalArgumentException.class); // length > buffer.length - offset testReadInvalidArgsHelper(readStream, buffer, 0, 2, IndexOutOfBoundsException.class); From eeec92dff8036abcc9ac531b67909b485794ae14 Mon Sep 17 00:00:00 2001 From: Abhishek Modi Date: Thu, 21 Jul 2022 11:46:41 -0700 Subject: [PATCH 4/4] Minor fixes --- .../google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java index c2fbc4439a..340c7a7daa 100644 --- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFSInputStream.java @@ -16,7 +16,6 @@ package com.google.cloud.hadoop.fs.gcs; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static java.lang.Math.max; @@ -146,7 +145,7 @@ public void readFully(long position, byte[] buffer, int offset, int length) thro try { seek(oldPos); } catch (IOException ie) { - logger.atFiner().log("Ignoring IOE on seek to (%d): (%s)", oldPos, ie.getMessage()); + logger.atWarning().log("Ignoring IOE on seek to (%d): (%s)", oldPos, ie.getMessage()); } } }