Skip to content

Commit

Permalink
less invasive version
Browse files Browse the repository at this point in the history
  • Loading branch information
adriangb committed Jan 20, 2025
1 parent dfe74d1 commit 02619ec
Showing 1 changed file with 38 additions and 20 deletions.
58 changes: 38 additions & 20 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ impl ParquetMetaDataReader {
mut fetch: F,
file_size: usize,
) -> Result<()> {
let (metadata, fetched) =
let (metadata, remainder) =
Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?;

self.metadata = Some(metadata);
Expand All @@ -382,7 +382,7 @@ impl ParquetMetaDataReader {
return Ok(());
}

self.load_page_index_with_remainder(fetch, fetched).await
self.load_page_index_with_remainder(fetch, remainder).await
}

/// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
Expand All @@ -396,7 +396,7 @@ impl ParquetMetaDataReader {
async fn load_page_index_with_remainder<F: MetadataFetch>(
&mut self,
mut fetch: F,
fetched: Option<(usize, Bytes)>,
remaineder: Option<(usize, Bytes)>,
) -> Result<()> {
if self.metadata.is_none() {
return Err(general_err!("Footer metadata is not present"));
Expand All @@ -409,21 +409,12 @@ impl ParquetMetaDataReader {
None => return Ok(()),
};

let bytes = match &fetched {
Some((fetched_start, fetched)) if *fetched_start <= range.start => {
// `fetched`` is an amount of data spanning from fetched_start to the end of the file
// We want to slice out the range we need from that data, but need to adjust the
// range we are looking for to be relative to fetched_start.
let fetched_start = *fetched_start;
let range = range.start - fetched_start..range.end - fetched_start;
// santity check: `fetched` should always go until the end of the file
// so if our range is beyond that, something is wrong!
assert!(
range.end <= fetched_start + fetched.len(),
"range: {range:?}, fetched: {}, fetched_start: {fetched_start}",
fetched.len()
);
fetched.slice(range)
let bytes = match &remaineder {
Some((remainder_start, remaineder)) if *remainder_start <= range.start => {
let offset = range.start - *remainder_start;
let end = offset + range.end - range.start;
assert!(end <= remaineder.len());
remaineder.slice(offset..end)
}
// Note: this will potentially fetch data already in remainder, this keeps things simple
_ => fetch.fetch(range.start..range.end).await?,
Expand Down Expand Up @@ -591,7 +582,10 @@ impl ParquetMetaDataReader {
} else {
let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
Ok((Self::decode_metadata(slice)?, Some((footer_start, suffix))))
Ok((
Self::decode_metadata(slice)?,
Some((footer_start, suffix.slice(..metadata_start))),
))
}
}

Expand Down Expand Up @@ -1061,7 +1055,19 @@ mod async_tests {
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch more than enough
// Prefetch more than enough but less than the entire file
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(len - 1000)) // prefetch entire file
.load_and_finish(f, len)
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch the entire file
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let metadata = ParquetMetaDataReader::new()
Expand All @@ -1072,5 +1078,17 @@ mod async_tests {
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());

// Prefetch more than the entire file
fetch_count.store(0, Ordering::SeqCst);
let f = MetadataFetchFn(&mut fetch);
let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_prefetch_hint(Some(len + 1000)) // prefetch entire file
.load_and_finish(f, len)
.await
.unwrap();
assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
}
}

0 comments on commit 02619ec

Please sign in to comment.