Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for readFully API #832

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ interface GhfsInputStreamStatistics extends AutoCloseable, GhfsStatisticInterfac
*/
void readOperationStarted(long pos, long len);

/**
* A {@code readFully(int position, byte[] buf, int off, int len)} operation has started.
*
* @param pos starting position of the read
* @param len length of bytes to read
*/
void readFullyOperationStarted(long pos, long len);

/**
* A read operation has completed.
*
Expand Down Expand Up @@ -90,6 +98,9 @@ interface GhfsInputStreamStatistics extends AutoCloseable, GhfsStatisticInterfac
/** The total number of times the read() operation in an input stream has been called. */
long getReadOperations();

/** The total number of times the readFully() operation in an input stream has been called. */
long getReadFullyOperations();

/** The total number of Incomplete read() operations */
long getReadsIncomplete();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ private final class InputStreamStatistics extends AbstractGhfsStatisticsSource
private final AtomicLong readExceptions;
private final AtomicLong readsIncomplete;
private final AtomicLong readOperations;
private final AtomicLong readFullyOperations;
private final AtomicLong seekOperations;

/** Bytes read by the application and any when draining streams . */
Expand Down Expand Up @@ -511,6 +512,8 @@ private InputStreamStatistics(@Nullable FileSystem.Statistics filesystemStatisti
readsIncomplete =
st.getCounterReference(StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE);
readOperations = st.getCounterReference(StreamStatisticNames.STREAM_READ_OPERATIONS);
readFullyOperations =
st.getCounterReference(StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS);
seekOperations = st.getCounterReference(StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS);
totalBytesRead = st.getCounterReference(StreamStatisticNames.STREAM_READ_TOTAL_BYTES);
setIOStatistics(st);
Expand Down Expand Up @@ -599,6 +602,17 @@ public void readOperationStarted(long pos, long len) {
readOperations.incrementAndGet();
}

/**
* A readFully() operation in the input stream has started.
*
* @param pos starting position of the read
* @param len length of bytes to read
*/
@Override
public void readFullyOperationStarted(long pos, long len) {
readFullyOperations.incrementAndGet();
}

/**
* If more data was requested than was actually returned, this was an incomplete read. Increment
* {@link #readsIncomplete}.
Expand Down Expand Up @@ -729,6 +743,16 @@ public long getReadOperations() {
return lookupCounterValue(StreamStatisticNames.STREAM_READ_OPERATIONS);
}

/**
* The total number of times the readFully() operation in an input stream has been called.
*
* @return the count of read operations.
*/
@Override
public long getReadFullyOperations() {
return lookupCounterValue(StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS);
}

/**
* The total number of Incomplete read() operations
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import com.google.cloud.hadoop.gcsio.FileInfo;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem;
import com.google.common.flogger.GoogleLogger;
import java.io.EOFException;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SeekableByteChannel;
import javax.annotation.Nonnull;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.statistics.IOStatistics;
Expand Down Expand Up @@ -101,9 +103,7 @@ public synchronized int read() throws IOException {
public synchronized int read(@Nonnull byte[] buf, int offset, int length) throws IOException {
streamStatistics.readOperationStarted(getPos(), length);
checkNotNull(buf, "buf must not be null");
if (offset < 0 || length < 0 || length > buf.length - offset) {
throw new IndexOutOfBoundsException();
}
validatePositionedReadArgs(getPos(), buf, offset, length);
int response = 0;
try {
// TODO(user): Wrap this in a while-loop if we ever introduce a non-blocking mode for the
Expand All @@ -124,6 +124,32 @@ public synchronized int read(@Nonnull byte[] buf, int offset, int length) throws
return response;
}

@Override
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
arunkumarchacko marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR comment mentions that this allows to avoid unnecessary seek. May you also clarify why is this important? Is this because we are sending new GCS request for each seek?

streamStatistics.readFullyOperationStarted(position, length);
abmodi marked this conversation as resolved.
Show resolved Hide resolved
checkNotNull(buffer, "buf must not be null");
abmodi marked this conversation as resolved.
Show resolved Hide resolved
validatePositionedReadArgs(position, buffer, offset, length);
abmodi marked this conversation as resolved.
Show resolved Hide resolved
if (length == 0) {
return;
}
int nread = 0;
synchronized (this) {
abmodi marked this conversation as resolved.
Show resolved Hide resolved
long oldPos = getPos();
try {
seek(position);
while (nread < length) {
int nbytes = read(buffer, offset + nread, length - nread);
if (nbytes < 0) {
abmodi marked this conversation as resolved.
Show resolved Hide resolved
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
}
nread += nbytes;
}
} finally {
seek(oldPos);
abmodi marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

@Override
public synchronized void seek(long pos) throws IOException {
logger.atFiner().log("seek(%d)", pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FileSystem;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -107,6 +108,44 @@ public void testAvailable() throws Exception {
assertThrows(ClosedChannelException.class, in::available);
}

@Test
public void testReadFully() throws Exception {
URI path = gcsFsIHelper.getUniqueObjectUri(getClass(), "testReadFully");
GoogleHadoopFileSystem ghfs =
GoogleHadoopFileSystemIntegrationHelper.createGhfs(
path, GoogleHadoopFileSystemIntegrationHelper.getTestConfig());

String testContent = "test content";
gcsFsIHelper.writeTextFile(path, testContent);

byte[] value = new byte[5];
byte[] expected = Arrays.copyOfRange(testContent.getBytes(StandardCharsets.UTF_8), 2, 7);

GoogleHadoopFSInputStream in = createGhfsInputStream(ghfs, path);
try (GoogleHadoopFSInputStream ignore = in) {
in.readFully(2, value);
assertThat(in.getPos()).isEqualTo(0);
}
assertThat(value).isEqualTo(expected);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May you add test that demonstrates impact of this change (we have tests that assert number of GCS requests). For example, do we expect that number of the HTTP JSON GCS requests for readFully calls will be reduced after this change?

}

@Test
public void testReadFully_illegalSize() throws Exception {
URI path = gcsFsIHelper.getUniqueObjectUri(getClass(), "testReadFully");
GoogleHadoopFileSystem ghfs =
GoogleHadoopFileSystemIntegrationHelper.createGhfs(
path, GoogleHadoopFileSystemIntegrationHelper.getTestConfig());

String testContent = "test content";
gcsFsIHelper.writeTextFile(path, testContent);

byte[] value = new byte[20];

GoogleHadoopFSInputStream in = createGhfsInputStream(ghfs, path);
Throwable exception = assertThrows(EOFException.class, () -> in.readFully(2, value));
assertThat(exception).hasMessageThat().contains(FSExceptionMessages.EOF_IN_READ_FULLY);
}

private static GoogleHadoopFSInputStream createGhfsInputStream(
GoogleHadoopFileSystem ghfs, URI path) throws IOException {
return GoogleHadoopFSInputStream.create(
Expand Down