Skip to content

Commit

Permalink
Cleanup MetadataLoader tests (apache#6484)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Oct 1, 2024
1 parent 551eaab commit e538289
Showing 1 changed file with 24 additions and 15 deletions.
39 changes: 24 additions & 15 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ mod async_tests {

struct MetadataFetchFn<F>(F);

impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn<F>
impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
where
F: FnMut(Range<usize>) -> Fut + Send,
Fut: Future<Output = Result<Bytes>> + Send,
Expand Down Expand Up @@ -870,68 +870,74 @@ mod async_tests {
let expected = expected.file_metadata().schema();
let fetch_count = AtomicUsize::new(0);

let fetch = |range| {
let mut fetch = |range| {
fetch_count.fetch_add(1, Ordering::SeqCst);
futures::future::ready(read_range(&mut file, range))
};

let mut f = MetadataFetchFn(fetch);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.load_and_finish(&mut f, len)
.load_and_finish(input, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too small - below footer size
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(7))
.load_and_finish(&mut f, len)
.load_and_finish(input, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too small
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(10))
.load_and_finish(&mut f, len)
.load_and_finish(input, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too large
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(500))
.load_and_finish(&mut f, len)
.load_and_finish(input, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);

// Metadata hint exactly correct
fetch_count.store(0, Ordering::SeqCst);
let input = MetadataFetchFn(&mut fetch);
let actual = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(428))
.load_and_finish(&mut f, len)
.load_and_finish(input, len)
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);

let input = MetadataFetchFn(&mut fetch);
let err = ParquetMetaDataReader::new()
.load_and_finish(&mut f, 4)
.load_and_finish(input, 4)
.await
.unwrap_err()
.to_string();
assert_eq!(err, "EOF: file size of 4 is less than footer");

let input = MetadataFetchFn(&mut fetch);
let err = ParquetMetaDataReader::new()
.load_and_finish(&mut f, 20)
.load_and_finish(input, 20)
.await
.unwrap_err()
.to_string();
Expand All @@ -948,39 +954,42 @@ mod async_tests {
futures::future::ready(read_range(&mut file, range))
};

let mut f = MetadataFetchFn(&mut fetch);
let f = MetadataFetchFn(&mut fetch);
let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
loader.try_load(&mut f, len).await.unwrap();
loader.try_load(f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch just footer exactly
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(1729));
loader.try_load(&mut f, len).await.unwrap();
loader.try_load(f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch more than footer but not enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let mut loader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(130649));
loader.try_load(&mut f, len).await.unwrap();
loader.try_load(f, len).await.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
let metadata = loader.finish().unwrap();
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch exactly enough
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(130650))
.load_and_finish(&mut f, len)
.load_and_finish(f, len)
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
Expand Down

0 comments on commit e538289

Please sign in to comment.