Skip to content

Commit

Permalink
feat(index): add RangeReader trait (#4718)
Browse files Browse the repository at this point in the history
* feat(index): add `RangeReader` trait`

Signed-off-by: Zhenchi <[email protected]>

* fix: return content_length as read bytes

Signed-off-by: Zhenchi <[email protected]>

* chore: remove buffer & use `BufMut`

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Sep 10, 2024
1 parent ff40d51 commit f252599
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 465 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/common/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ workspace = true

[dependencies]
anymap = "1.0.0-beta.2"
async-trait.workspace = true
bitvec = "1.0"
bytes.workspace = true
common-error.workspace = true
common-macro.workspace = true
futures.workspace = true
paste = "1.0"
serde = { version = "1.0", features = ["derive"] }
snafu.workspace = true
Expand Down
242 changes: 0 additions & 242 deletions src/common/base/src/buffer.rs

This file was deleted.

6 changes: 6 additions & 0 deletions src/common/base/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ impl From<Vec<u8>> for Bytes {
}
}

impl From<Bytes> for Vec<u8> {
fn from(bytes: Bytes) -> Vec<u8> {
bytes.0.into()
}
}

impl Deref for Bytes {
type Target = [u8];

Expand Down
2 changes: 1 addition & 1 deletion src/common/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
// limitations under the License.

pub mod bit_vec;
pub mod buffer;
pub mod bytes;
pub mod plugins;
pub mod range_read;
#[allow(clippy::all)]
pub mod readable_size;
pub mod secrets;
Expand Down
80 changes: 80 additions & 0 deletions src/common/base/src/range_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io;
use std::ops::Range;

use async_trait::async_trait;
use bytes::{BufMut, Bytes};
use futures::{AsyncReadExt, AsyncSeekExt};

/// `Metadata` contains the metadata of a source.
pub struct Metadata {
/// The length of the source in bytes.
pub content_length: u64,
}

/// `RangeReader` reads a range of bytes from a source.
#[async_trait]
pub trait RangeReader: Send + Unpin {
/// Returns the metadata of the source.
async fn metadata(&mut self) -> io::Result<Metadata>;

/// Reads the bytes in the given range.
async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes>;

/// Reads the bytes in the given range into the buffer.
///
/// Handles the buffer based on its capacity:
/// - If the buffer is insufficient to hold the bytes, it will either:
/// - Allocate additional space (e.g., for `Vec<u8>`)
/// - Panic (e.g., for `&mut [u8]`)
async fn read_into(
&mut self,
range: Range<u64>,
buf: &mut (impl BufMut + Send),
) -> io::Result<()> {
let bytes = self.read(range).await?;
buf.put_slice(&bytes);
Ok(())
}

/// Reads the bytes in the given ranges.
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> io::Result<Vec<Bytes>> {
let mut result = Vec::with_capacity(ranges.len());
for range in ranges {
result.push(self.read(range.clone()).await?);
}
Ok(result)
}
}

/// Implement `RangeReader` for a type that implements `AsyncRead + AsyncSeek`.
///
/// TODO(zhongzc): It's a temporary solution for porting the codebase from `AsyncRead + AsyncSeek` to `RangeReader`.
/// Until the codebase is fully ported to `RangeReader`, remove this implementation.
#[async_trait]
impl<R: futures::AsyncRead + futures::AsyncSeek + Send + Unpin> RangeReader for R {
async fn metadata(&mut self) -> io::Result<Metadata> {
let content_length = self.seek(io::SeekFrom::End(0)).await?;
Ok(Metadata { content_length })
}

async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes> {
let mut buf = vec![0; (range.end - range.start) as usize];
self.seek(io::SeekFrom::Start(range.start)).await?;
self.read_exact(&mut buf).await?;
Ok(Bytes::from(buf))
}
}
Loading

0 comments on commit f252599

Please sign in to comment.