From 19706342b31ce5d574a2e203d231ec7073c2b289 Mon Sep 17 00:00:00 2001 From: mwish Date: Sat, 12 Oct 2024 17:45:33 +0800 Subject: [PATCH 1/2] io::BufferedInput: Fixing the invalid state with SetBufferSize --- cpp/src/arrow/io/buffered.cc | 22 ++++++++++++++++++---- cpp/src/arrow/io/buffered.h | 1 + cpp/src/arrow/io/buffered_test.cc | 13 +++++++++++++ 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc index b41e4257af215..68d337905feae 100644 --- a/cpp/src/arrow/io/buffered.cc +++ b/cpp/src/arrow/io/buffered.cc @@ -51,6 +51,7 @@ class BufferedBase { return !is_open_; } + // Reset the `buffer_` to a new buffer of size `buffer_size_` Status ResetBuffer() { if (!buffer_) { // On first invocation, or if the buffer has been released, we allocate a @@ -284,17 +285,30 @@ class BufferedInputStream::Impl : public BufferedBase { // Resize internal read buffer. Note that the internal buffer-size // should be not larger than the raw_read_bound_. + // + // SetBufferSize will not change the buffer_pos_ and bytes_buffered_. Status SetBufferSize(int64_t new_buffer_size) { if (new_buffer_size <= 0) { return Status::Invalid("Buffer size should be positive"); } if ((buffer_pos_ + bytes_buffered_) >= new_buffer_size) { - return Status::Invalid("Cannot shrink read buffer if buffered data remains"); + return Status::Invalid( + "Cannot shrink read buffer if buffered data remains, new_buffer_size:", + new_buffer_size, ", buffer_pos:", buffer_pos_, + ", bytes_buffered:", bytes_buffered_, ", buffer_size:", buffer_size_); } if (raw_read_bound_ >= 0) { // No need to reserve space for more than the total remaining number of bytes. - new_buffer_size = std::min(new_buffer_size, - bytes_buffered_ + (raw_read_bound_ - raw_read_total_)); + if (bytes_buffered_ == 0) { + // Special case: we can not keep the current buffer because it does not + // contains any required data. + new_buffer_size = std::min(new_buffer_size, raw_read_bound_ - raw_read_total_); + } else { + // We should keeping the current buffer because it contains data that + // can be read. + new_buffer_size = + std::min(new_buffer_size, buffer_size_ + (raw_read_bound_ - raw_read_total_)); + } } return ResizeBuffer(new_buffer_size); } @@ -350,7 +364,7 @@ class BufferedInputStream::Impl : public BufferedBase { } Status DoBuffer() { - // Fill buffer + // Refill the buffer from the raw stream with `buffer_size_` bytes. if (!buffer_) { RETURN_NOT_OK(ResetBuffer()); } diff --git a/cpp/src/arrow/io/buffered.h b/cpp/src/arrow/io/buffered.h index 01c0a016daba0..22ea7520a5050 100644 --- a/cpp/src/arrow/io/buffered.h +++ b/cpp/src/arrow/io/buffered.h @@ -111,6 +111,7 @@ class ARROW_EXPORT BufferedInputStream int64_t raw_read_bound = -1); /// \brief Resize internal read buffer; calls to Read(...) will read at least + /// this many bytes from the raw InputStream if possible. /// \param[in] new_buffer_size the new read buffer size /// \return Status Status SetBufferSize(int64_t new_buffer_size); diff --git a/cpp/src/arrow/io/buffered_test.cc b/cpp/src/arrow/io/buffered_test.cc index 89fe4b159f341..c77684f37fbce 100644 --- a/cpp/src/arrow/io/buffered_test.cc +++ b/cpp/src/arrow/io/buffered_test.cc @@ -491,6 +491,19 @@ TEST_F(TestBufferedInputStream, BufferSizeLimit) { } } +TEST_F(TestBufferedInputStream, PeekPastBufferedBytesTwice) { + MakeExample1(/*buffer_size=*/10, default_memory_pool(), /*raw_read_bound=*/15); + ASSERT_OK(buffered_->Read(9)); + ASSERT_EQ(1, buffered_->bytes_buffered()); + ASSERT_EQ(10, buffered_->buffer_size()); + ASSERT_OK(buffered_->Peek(6)); + ASSERT_EQ(6, buffered_->bytes_buffered()); + ASSERT_EQ(15, buffered_->buffer_size()); + ASSERT_OK(buffered_->Peek(6)); + ASSERT_EQ(6, buffered_->bytes_buffered()); + ASSERT_EQ(15, buffered_->buffer_size()); +} + class TestBufferedInputStreamBound : public ::testing::Test { public: void SetUp() { CreateExample(/*bounded=*/true); } From e3c7eeac76b8c6b839f25601a31ba11cb6959225 Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 15 Oct 2024 17:41:22 +0800 Subject: [PATCH 2/2] Fix a bug: buffer_size_ should not regard as bytes_buffered_ + buffer_pos_ --- cpp/src/arrow/io/buffered.cc | 23 ++++++++++++----------- cpp/src/arrow/io/buffered_test.cc | 23 +++++++++++++++++------ 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/io/buffered.cc b/cpp/src/arrow/io/buffered.cc index 68d337905feae..714493ae68cc9 100644 --- a/cpp/src/arrow/io/buffered.cc +++ b/cpp/src/arrow/io/buffered.cc @@ -51,7 +51,7 @@ class BufferedBase { return !is_open_; } - // Reset the `buffer_` to a new buffer of size `buffer_size_` + // Allocate buffer_ if needed, and resize it to buffer_size_ if required. Status ResetBuffer() { if (!buffer_) { // On first invocation, or if the buffer has been released, we allocate a @@ -284,9 +284,9 @@ class BufferedInputStream::Impl : public BufferedBase { } // Resize internal read buffer. Note that the internal buffer-size - // should be not larger than the raw_read_bound_. - // - // SetBufferSize will not change the buffer_pos_ and bytes_buffered_. + // should not be larger than the raw_read_bound_. + // It might change the buffer_size_, but will not change buffer states + // buffer_pos_ and bytes_buffered_. Status SetBufferSize(int64_t new_buffer_size) { if (new_buffer_size <= 0) { return Status::Invalid("Buffer size should be positive"); @@ -301,13 +301,14 @@ class BufferedInputStream::Impl : public BufferedBase { // No need to reserve space for more than the total remaining number of bytes. if (bytes_buffered_ == 0) { // Special case: we can not keep the current buffer because it does not - // contains any required data. + // contain any required data. new_buffer_size = std::min(new_buffer_size, raw_read_bound_ - raw_read_total_); } else { - // We should keeping the current buffer because it contains data that + // We should keep the current buffer because it contains data that // can be read. new_buffer_size = - std::min(new_buffer_size, buffer_size_ + (raw_read_bound_ - raw_read_total_)); + std::min(new_buffer_size, + buffer_pos_ + bytes_buffered_ + (raw_read_bound_ - raw_read_total_)); } } return ResizeBuffer(new_buffer_size); @@ -325,7 +326,7 @@ class BufferedInputStream::Impl : public BufferedBase { } // Increase the buffer size if needed. - if (nbytes > buffer_->size() - buffer_pos_) { + if (nbytes > buffer_size_ - buffer_pos_) { RETURN_NOT_OK(SetBufferSize(nbytes + buffer_pos_)); DCHECK(buffer_->size() - buffer_pos_ >= nbytes); } @@ -364,7 +365,7 @@ class BufferedInputStream::Impl : public BufferedBase { } Status DoBuffer() { - // Refill the buffer from the raw stream with `buffer_size_` bytes. + // Fill the buffer from the raw stream with at most `buffer_size_` bytes. if (!buffer_) { RETURN_NOT_OK(ResetBuffer()); } @@ -458,8 +459,8 @@ class BufferedInputStream::Impl : public BufferedBase { // The default -1 indicates that it is unbounded int64_t raw_read_bound_; - // Number of remaining bytes in the buffer, to be reduced on each read from - // the buffer + // Number of remaining valid bytes in the buffer, to be reduced on each read + // from the buffer. int64_t bytes_buffered_; }; diff --git a/cpp/src/arrow/io/buffered_test.cc b/cpp/src/arrow/io/buffered_test.cc index c77684f37fbce..d5cbb5455464c 100644 --- a/cpp/src/arrow/io/buffered_test.cc +++ b/cpp/src/arrow/io/buffered_test.cc @@ -491,17 +491,28 @@ TEST_F(TestBufferedInputStream, BufferSizeLimit) { } } -TEST_F(TestBufferedInputStream, PeekPastBufferedBytesTwice) { +TEST_F(TestBufferedInputStream, PeekPastBufferedBytes) { + // GH-43949: Peek and SetBufferSize should not affect the + // buffered bytes. MakeExample1(/*buffer_size=*/10, default_memory_pool(), /*raw_read_bound=*/15); - ASSERT_OK(buffered_->Read(9)); + ASSERT_OK_AND_ASSIGN(auto bytes, buffered_->Read(9)); + EXPECT_EQ(std::string_view(bytes->data_as(), bytes->size()), + kExample1.substr(0, 9)); ASSERT_EQ(1, buffered_->bytes_buffered()); ASSERT_EQ(10, buffered_->buffer_size()); - ASSERT_OK(buffered_->Peek(6)); - ASSERT_EQ(6, buffered_->bytes_buffered()); - ASSERT_EQ(15, buffered_->buffer_size()); - ASSERT_OK(buffered_->Peek(6)); + ASSERT_OK_AND_ASSIGN(auto view, buffered_->Peek(3)); + EXPECT_EQ(view, kExample1.substr(9, 3)); + ASSERT_EQ(3, buffered_->bytes_buffered()); + ASSERT_EQ(12, buffered_->buffer_size()); + ASSERT_OK_AND_ASSIGN(view, buffered_->Peek(10)); + EXPECT_EQ(view, kExample1.substr(9, 6)); ASSERT_EQ(6, buffered_->bytes_buffered()); ASSERT_EQ(15, buffered_->buffer_size()); + // Do read + ASSERT_OK_AND_ASSIGN(bytes, buffered_->Read(6)); + EXPECT_EQ(std::string_view(bytes->data_as(), bytes->size()), + kExample1.substr(9, 6)); + ASSERT_EQ(0, buffered_->bytes_buffered()); } class TestBufferedInputStreamBound : public ::testing::Test {