Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43949: [C++] io::BufferedInput: Fixing the invalid state with SetBufferSize #44387

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions cpp/src/arrow/io/buffered.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class BufferedBase {
return !is_open_;
}

// Reset the `buffer_` to a new buffer of size `buffer_size_`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Reset the `buffer_` to a new buffer of size `buffer_size_`
// Resize buffer_ to buffer_size_

Let's keep consistent variable names in the comment across this file.

Status ResetBuffer() {
if (!buffer_) {
// On first invocation, or if the buffer has been released, we allocate a
Expand Down Expand Up @@ -284,17 +285,30 @@ class BufferedInputStream::Impl : public BufferedBase {

// Resize internal read buffer. Note that the internal buffer-size
// should be not larger than the raw_read_bound_.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// should be not larger than the raw_read_bound_.
// should not be larger than the raw_read_bound_.

//
// SetBufferSize will not change the buffer_pos_ and bytes_buffered_.
Comment on lines +288 to +289
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//
// SetBufferSize will not change the buffer_pos_ and bytes_buffered_.
// It will not change buffer states including buffer_pos_ and bytes_buffered_.

Status SetBufferSize(int64_t new_buffer_size) {
if (new_buffer_size <= 0) {
return Status::Invalid("Buffer size should be positive");
}
if ((buffer_pos_ + bytes_buffered_) >= new_buffer_size) {
return Status::Invalid("Cannot shrink read buffer if buffered data remains");
return Status::Invalid(
"Cannot shrink read buffer if buffered data remains, new_buffer_size:",
new_buffer_size, ", buffer_pos:", buffer_pos_,
", bytes_buffered:", bytes_buffered_, ", buffer_size:", buffer_size_);
}
if (raw_read_bound_ >= 0) {
// No need to reserve space for more than the total remaining number of bytes.
new_buffer_size = std::min(new_buffer_size,
bytes_buffered_ + (raw_read_bound_ - raw_read_total_));
if (bytes_buffered_ == 0) {
// Special case: we can not keep the current buffer because it does not
// contains any required data.
new_buffer_size = std::min(new_buffer_size, raw_read_bound_ - raw_read_total_);
} else {
// We should keeping the current buffer because it contains data that
// can be read.
new_buffer_size =
std::min(new_buffer_size, buffer_size_ + (raw_read_bound_ - raw_read_total_));
Comment on lines +309 to +310
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
new_buffer_size =
std::min(new_buffer_size, buffer_size_ + (raw_read_bound_ - raw_read_total_));
new_buffer_size =
std::min(new_buffer_size, buffer_pos_ + buffer_size_ + (raw_read_bound_ - raw_read_total_));

Isn't buffer_pos_ required to be kept as well?

}
}
return ResizeBuffer(new_buffer_size);
}
Expand Down Expand Up @@ -350,7 +364,7 @@ class BufferedInputStream::Impl : public BufferedBase {
}

Status DoBuffer() {
// Fill buffer
// Refill the buffer from the raw stream with `buffer_size_` bytes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Refill the buffer from the raw stream with `buffer_size_` bytes.
// Fill the buffer from the raw stream with at most `buffer_size_` bytes.

if (!buffer_) {
RETURN_NOT_OK(ResetBuffer());
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/io/buffered.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class ARROW_EXPORT BufferedInputStream
int64_t raw_read_bound = -1);

/// \brief Resize internal read buffer; calls to Read(...) will read at least
/// this many bytes from the raw InputStream if possible.
/// \param[in] new_buffer_size the new read buffer size
/// \return Status
Status SetBufferSize(int64_t new_buffer_size);
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/io/buffered_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,19 @@ TEST_F(TestBufferedInputStream, BufferSizeLimit) {
}
}

TEST_F(TestBufferedInputStream, PeekPastBufferedBytesTwice) {
MakeExample1(/*buffer_size=*/10, default_memory_pool(), /*raw_read_bound=*/15);
ASSERT_OK(buffered_->Read(9));
ASSERT_EQ(1, buffered_->bytes_buffered());
ASSERT_EQ(10, buffered_->buffer_size());
ASSERT_OK(buffered_->Peek(6));
ASSERT_EQ(6, buffered_->bytes_buffered());
ASSERT_EQ(15, buffered_->buffer_size());
ASSERT_OK(buffered_->Peek(6));
ASSERT_EQ(6, buffered_->bytes_buffered());
ASSERT_EQ(15, buffered_->buffer_size());
}

class TestBufferedInputStreamBound : public ::testing::Test {
public:
void SetUp() { CreateExample(/*bounded=*/true); }
Expand Down
Loading