Skip to content

Commit 29cd685

Browse files
rambleraptoretseidlscovich
authored
Change some panics to errors in parquet decoder (#8602)
# Rationale for this change We've caused some unexpected panics from our internal testing. We've put in error checks for all of these so that they don't affect other users. # What changes are included in this PR? Various error checks to ensure panics don't occur. # Are these changes tested? Tests should continue to pass. If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? Existing tests should cover these changes. # Are there any user-facing changes? None. --------- Co-authored-by: Ed Seidl <[email protected]> Co-authored-by: Ryan Johnson <[email protected]>
1 parent 7f3d3ae commit 29cd685

File tree

8 files changed

+190
-22
lines changed

8 files changed

+190
-22
lines changed

parquet/src/column/page.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::file::statistics::{Statistics, page_stats_to_thrift};
3131
/// List of supported pages.
3232
/// These are 1-to-1 mapped from the equivalent Thrift definitions, except `buf` which
3333
/// used to store uncompressed bytes of the page.
34-
#[derive(Clone)]
34+
#[derive(Clone, Debug)]
3535
pub enum Page {
3636
/// Data page Parquet format v1.
3737
DataPage {

parquet/src/column/reader.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -569,11 +569,16 @@ fn parse_v1_level(
569569
match encoding {
570570
Encoding::RLE => {
571571
let i32_size = std::mem::size_of::<i32>();
572-
let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref()) as usize;
573-
Ok((
574-
i32_size + data_size,
575-
buf.slice(i32_size..i32_size + data_size),
576-
))
572+
if i32_size <= buf.len() {
573+
let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref()) as usize;
574+
let end = i32_size
575+
.checked_add(data_size)
576+
.ok_or(general_err!("invalid level length"))?;
577+
if end <= buf.len() {
578+
return Ok((end, buf.slice(i32_size..end)));
579+
}
580+
}
581+
Err(general_err!("not enough data to read levels"))
577582
}
578583
#[allow(deprecated)]
579584
Encoding::BIT_PACKED => {
@@ -597,6 +602,25 @@ mod tests {
597602
use crate::util::test_common::page_util::InMemoryPageReader;
598603
use crate::util::test_common::rand_gen::make_pages;
599604

605+
#[test]
606+
fn test_parse_v1_level_invalid_length() {
607+
// Say length is 10, but buffer is only 4
608+
let buf = Bytes::from(vec![10, 0, 0, 0]);
609+
let err = parse_v1_level(1, 100, Encoding::RLE, buf).unwrap_err();
610+
assert_eq!(
611+
err.to_string(),
612+
"Parquet error: not enough data to read levels"
613+
);
614+
615+
// Say length is 4, but buffer is only 3
616+
let buf = Bytes::from(vec![4, 0, 0]);
617+
let err = parse_v1_level(1, 100, Encoding::RLE, buf).unwrap_err();
618+
assert_eq!(
619+
err.to_string(),
620+
"Parquet error: not enough data to read levels"
621+
);
622+
}
623+
600624
const NUM_LEVELS: usize = 128;
601625
const NUM_PAGES: usize = 2;
602626
const MAX_DEF_LEVEL: i16 = 5;

parquet/src/encodings/decoding.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,17 @@ impl<T: DataType> DictDecoder<T> {
381381
impl<T: DataType> Decoder<T> for DictDecoder<T> {
382382
fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
383383
// First byte in `data` is bit width
384+
if data.is_empty() {
385+
return Err(eof_err!("Not enough bytes to decode bit_width"));
386+
}
387+
384388
let bit_width = data.as_ref()[0];
389+
if bit_width > 32 {
390+
return Err(general_err!(
391+
"Invalid or corrupted RLE bit width {}. Max allowed is 32",
392+
bit_width
393+
));
394+
}
385395
let mut rle_decoder = RleDecoder::new(bit_width);
386396
rle_decoder.set_data(data.slice(1..));
387397
self.num_values = num_values;
@@ -1395,6 +1405,13 @@ mod tests {
13951405
test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 6, 4, &[]);
13961406
}
13971407

1408+
#[test]
1409+
fn test_dict_decoder_empty_data() {
1410+
let mut decoder = DictDecoder::<Int32Type>::new();
1411+
let err = decoder.set_data(Bytes::new(), 10).unwrap_err();
1412+
assert_eq!(err.to_string(), "EOF: Not enough bytes to decode bit_width");
1413+
}
1414+
13981415
fn test_plain_decode<T: DataType>(
13991416
data: Bytes,
14001417
num_values: usize,

parquet/src/encodings/rle.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,10 @@ impl RleDecoder {
513513
self.rle_left = (indicator_value >> 1) as u32;
514514
let value_width = bit_util::ceil(self.bit_width as usize, 8);
515515
self.current_value = bit_reader.get_aligned::<u64>(value_width);
516-
assert!(self.current_value.is_some());
516+
assert!(
517+
self.current_value.is_some(),
518+
"parquet_data_error: not enough data for RLE decoding"
519+
);
517520
}
518521
true
519522
} else {

parquet/src/file/reader.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,25 @@ impl ChunkReader for Bytes {
124124

125125
fn get_read(&self, start: u64) -> Result<Self::T> {
126126
let start = start as usize;
127+
if start > self.len() {
128+
return Err(eof_err!(
129+
"Expected to read at offset {start}, while file has length {}",
130+
self.len()
131+
));
132+
}
127133
Ok(self.slice(start..).reader())
128134
}
129135

130136
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
131137
let start = start as usize;
138+
if start > self.len() || start + length > self.len() {
139+
return Err(eof_err!(
140+
"Expected to read {} bytes at offset {}, while file has length {}",
141+
length,
142+
start,
143+
self.len()
144+
));
145+
}
132146
Ok(self.slice(start..start + length))
133147
}
134148
}
@@ -274,3 +288,34 @@ impl Iterator for FilePageIterator {
274288
}
275289

276290
impl PageIterator for FilePageIterator {}
291+
292+
#[cfg(test)]
293+
mod tests {
294+
use super::*;
295+
296+
#[test]
297+
fn test_bytes_chunk_reader_get_read_out_of_bounds() {
298+
let data = Bytes::from(vec![0, 1, 2, 3]);
299+
let err = data.get_read(5).unwrap_err();
300+
assert_eq!(
301+
err.to_string(),
302+
"EOF: Expected to read at offset 5, while file has length 4"
303+
);
304+
}
305+
306+
#[test]
307+
fn test_bytes_chunk_reader_get_bytes_out_of_bounds() {
308+
let data = Bytes::from(vec![0, 1, 2, 3]);
309+
let err = data.get_bytes(5, 1).unwrap_err();
310+
assert_eq!(
311+
err.to_string(),
312+
"EOF: Expected to read 1 bytes at offset 5, while file has length 4"
313+
);
314+
315+
let err = data.get_bytes(2, 3).unwrap_err();
316+
assert_eq!(
317+
err.to_string(),
318+
"EOF: Expected to read 3 bytes at offset 2, while file has length 4"
319+
);
320+
}
321+
}

parquet/src/file/serialized_reader.rs

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,9 @@ pub(crate) fn decode_page(
392392
let buffer = match decompressor {
393393
Some(decompressor) if can_decompress => {
394394
let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
395+
if offset > buffer.len() || offset > uncompressed_page_size {
396+
return Err(general_err!("Invalid page header"));
397+
}
395398
let decompressed_size = uncompressed_page_size - offset;
396399
let mut decompressed = Vec::with_capacity(uncompressed_page_size);
397400
decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
@@ -458,7 +461,10 @@ pub(crate) fn decode_page(
458461
}
459462
_ => {
460463
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
461-
unimplemented!("Page type {:?} is not supported", page_header.r#type)
464+
return Err(general_err!(
465+
"Page type {:?} is not supported",
466+
page_header.r#type
467+
));
462468
}
463469
};
464470

@@ -1130,6 +1136,7 @@ mod tests {
11301136
use crate::column::reader::ColumnReader;
11311137
use crate::data_type::private::ParquetValueType;
11321138
use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1139+
use crate::file::metadata::thrift::DataPageHeaderV2;
11331140
#[allow(deprecated)]
11341141
use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
11351142
use crate::file::writer::SerializedFileWriter;
@@ -1139,6 +1146,72 @@ mod tests {
11391146

11401147
use super::*;
11411148

1149+
#[test]
1150+
fn test_decode_page_invalid_offset() {
1151+
let page_header = PageHeader {
1152+
r#type: PageType::DATA_PAGE_V2,
1153+
uncompressed_page_size: 10,
1154+
compressed_page_size: 10,
1155+
data_page_header: None,
1156+
index_page_header: None,
1157+
dictionary_page_header: None,
1158+
crc: None,
1159+
data_page_header_v2: Some(DataPageHeaderV2 {
1160+
num_nulls: 0,
1161+
num_rows: 0,
1162+
num_values: 0,
1163+
encoding: Encoding::PLAIN,
1164+
definition_levels_byte_length: 11,
1165+
repetition_levels_byte_length: 0,
1166+
is_compressed: None,
1167+
statistics: None,
1168+
}),
1169+
};
1170+
1171+
let buffer = Bytes::new();
1172+
let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1173+
assert!(
1174+
err.to_string()
1175+
.contains("DataPage v2 header contains implausible values")
1176+
);
1177+
}
1178+
1179+
#[test]
1180+
fn test_decode_unsupported_page() {
1181+
let mut page_header = PageHeader {
1182+
r#type: PageType::INDEX_PAGE,
1183+
uncompressed_page_size: 10,
1184+
compressed_page_size: 10,
1185+
data_page_header: None,
1186+
index_page_header: None,
1187+
dictionary_page_header: None,
1188+
crc: None,
1189+
data_page_header_v2: None,
1190+
};
1191+
let buffer = Bytes::new();
1192+
let err = decode_page(page_header.clone(), buffer.clone(), Type::INT32, None).unwrap_err();
1193+
assert_eq!(
1194+
err.to_string(),
1195+
"Parquet error: Page type INDEX_PAGE is not supported"
1196+
);
1197+
1198+
page_header.data_page_header_v2 = Some(DataPageHeaderV2 {
1199+
num_nulls: 0,
1200+
num_rows: 0,
1201+
num_values: 0,
1202+
encoding: Encoding::PLAIN,
1203+
definition_levels_byte_length: 11,
1204+
repetition_levels_byte_length: 0,
1205+
is_compressed: None,
1206+
statistics: None,
1207+
});
1208+
let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1209+
assert!(
1210+
err.to_string()
1211+
.contains("DataPage v2 header contains implausible values")
1212+
);
1213+
}
1214+
11421215
#[test]
11431216
fn test_cursor_and_file_has_the_same_behaviour() {
11441217
let mut buf: Vec<u8> = Vec::new();

parquet/src/schema/types.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,19 +1348,23 @@ fn schema_from_array_helper<'a>(
13481348
.with_logical_type(logical_type)
13491349
.with_fields(fields)
13501350
.with_id(field_id);
1351-
if let Some(rep) = repetition {
1352-
// Sometimes parquet-cpp and parquet-mr set repetition level REQUIRED or
1353-
// REPEATED for root node.
1354-
//
1355-
// We only set repetition for group types that are not top-level message
1356-
// type. According to parquet-format:
1357-
// Root of the schema does not have a repetition_type.
1358-
// All other types must have one.
1359-
if !is_root_node {
1360-
builder = builder.with_repetition(rep);
1361-
}
1351+
1352+
// Sometimes parquet-cpp and parquet-mr set repetition level REQUIRED or
1353+
// REPEATED for root node.
1354+
//
1355+
// We only set repetition for group types that are not top-level message
1356+
// type. According to parquet-format:
1357+
// Root of the schema does not have a repetition_type.
1358+
// All other types must have one.
1359+
if !is_root_node {
1360+
let Some(rep) = repetition else {
1361+
return Err(general_err!(
1362+
"Repetition level must be defined for non-root types"
1363+
));
1364+
};
1365+
builder = builder.with_repetition(rep);
13621366
}
1363-
Ok((next_index, Arc::new(builder.build().unwrap())))
1367+
Ok((next_index, Arc::new(builder.build()?)))
13641368
}
13651369
}
13661370
}

parquet/tests/arrow_reader/bad_data.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,12 @@ fn test_parquet_1481() {
8484
}
8585

8686
#[test]
87-
#[should_panic(expected = "assertion failed: self.current_value.is_some()")]
8887
fn test_arrow_gh_41321() {
8988
let err = read_file("ARROW-GH-41321.parquet").unwrap_err();
90-
assert_eq!(err.to_string(), "TBD (currently panics)");
89+
assert_eq!(
90+
err.to_string(),
91+
"External: Parquet argument error: Parquet error: Invalid or corrupted RLE bit width 254. Max allowed is 32"
92+
);
9193
}
9294

9395
#[test]

0 commit comments

Comments
 (0)