Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
339 changes: 253 additions & 86 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ rust-version = "1.87"
anyhow = "1.0.72"
apache-avro = { version = "0.20", features = ["zstandard"] }
array-init = "2"
arrow-arith = { version = "55.1" }
arrow-array = { version = "55.1" }
arrow-buffer = { version = "55.1" }
arrow-cast = { version = "55.1" }
arrow-ord = { version = "55.1" }
arrow-schema = { version = "55.1" }
arrow-select = { version = "55.1" }
arrow-string = { version = "55.1" }
arrow-arith = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" }
arrow-array = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" }
arrow-buffer = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" }
arrow-cast = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" }
arrow-ord = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" }
arrow-schema = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" }
arrow-select = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" }
arrow-string = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" }
as-any = "0.3.2"
async-trait = "0.1.88"
aws-config = "1.8.1"
Expand Down Expand Up @@ -99,7 +99,7 @@ num-bigint = "0.4.6"
once_cell = "1.20"
opendal = "0.54.0"
ordered-float = "4"
parquet = "55.1"
parquet = { git = "https://github.com/vustef/arrow-rs", branch = "feature/parquet-virtual-row-numbers" }
pilota = "0.11.10"
port_scanner = "0.1.5"
pretty_assertions = "1.4"
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl BasicDeleteFileLoader {
data_file_path,
self.file_io.clone(),
false,
vec![],
)
.await?
.build()?
Expand Down
106 changes: 98 additions & 8 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask, RowNumber};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};

use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
Expand All @@ -57,13 +57,20 @@ use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

/// Reserved field ID for the row ordinal (_pos) column per Iceberg spec
pub const RESERVED_FIELD_ID_POS: i32 = 2147483645;

/// Column name for the row ordinal metadata column per Iceberg spec
pub const RESERVED_COL_NAME_POS: &str = "_pos";

/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
file_io: FileIO,
concurrency_limit_data_files: usize,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
metadata_columns: Vec<String>,
}

impl ArrowReaderBuilder {
Expand All @@ -77,6 +84,7 @@ impl ArrowReaderBuilder {
concurrency_limit_data_files: num_cpus,
row_group_filtering_enabled: true,
row_selection_enabled: false,
metadata_columns: vec![],
}
}

Expand Down Expand Up @@ -105,6 +113,15 @@ impl ArrowReaderBuilder {
self
}

/// Sets the metadata columns to include in the result
///
/// Metadata columns are virtual columns that provide metadata about the rows,
/// such as file paths or row positions. These come from https://iceberg.apache.org/spec/#identifier-field-ids
pub fn with_metadata_columns(mut self, metadata_columns: Vec<String>) -> Self {
self.metadata_columns = metadata_columns;
self
}

/// Build the ArrowReader.
pub fn build(self) -> ArrowReader {
ArrowReader {
Expand All @@ -117,6 +134,7 @@ impl ArrowReaderBuilder {
concurrency_limit_data_files: self.concurrency_limit_data_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
row_selection_enabled: self.row_selection_enabled,
metadata_columns: self.metadata_columns,
}
}
}
Expand All @@ -133,6 +151,7 @@ pub struct ArrowReader {

row_group_filtering_enabled: bool,
row_selection_enabled: bool,
metadata_columns: Vec<String>,
}

impl ArrowReader {
Expand All @@ -156,6 +175,7 @@ impl ArrowReader {
self.delete_file_loader.clone(),
row_group_filtering_enabled,
row_selection_enabled,
self.metadata_columns.clone(),
)
})
.map_err(|err| {
Expand All @@ -175,19 +195,88 @@ impl ArrowReader {
delete_file_loader: CachingDeleteFileLoader,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
metadata_columns: Vec<String>,
) -> Result<ArrowRecordBatchStream> {
let should_load_page_index =
(row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();

let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, task.schema.clone());

let mut virtual_columns: Vec<arrow_schema::Field> = vec![];
for metadata_column in metadata_columns {
if metadata_column == RESERVED_COL_NAME_POS {
let row_number_field = arrow_schema::Field::new(metadata_column, arrow_schema::DataType::Int64, false)
.with_metadata(std::collections::HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
RESERVED_FIELD_ID_POS.to_string(),
)]))
.with_extension_type(RowNumber);
virtual_columns.push(row_number_field);
} else {
return Err(Error::new(
ErrorKind::FeatureUnsupported, // TODO @vustef: This is appropriate only for columns from iceberg spec.
format!("Metadata column '{}' not supported", metadata_column),
));
}
}

let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder(
&task.data_file_path,
file_io.clone(),
should_load_page_index,
virtual_columns.clone(),
)
.await?;

// Extract field IDs from virtual columns and create Iceberg fields
let mut virtual_iceberg_fields = vec![];
let mut virtual_field_ids = vec![];

for field in &virtual_columns {
let field_id_str = field.metadata().get(PARQUET_FIELD_ID_META_KEY)
.ok_or_else(|| Error::new(
ErrorKind::Unexpected,
format!("Virtual field '{}' missing field ID metadata", field.name()),
))?;
let field_id = field_id_str.parse::<i32>()?;

// Create an Iceberg NestedField for the virtual column
if field_id == RESERVED_FIELD_ID_POS {
virtual_field_ids.push(field_id);
let iceberg_field = NestedField::required(
field_id,
field.name(),
Type::Primitive(PrimitiveType::Long),
);
virtual_iceberg_fields.push(Arc::new(iceberg_field));
} else {
return Err(Error::new(
ErrorKind::FeatureUnsupported, // TODO @vustef: This is appropriate only for columns from iceberg spec.
format!("Field ID '{}' not supported", field_id),
));
}
}

// Create an extended schema that includes both regular fields and virtual fields
let extended_schema = if !virtual_iceberg_fields.is_empty() {
let mut all_fields: Vec<_> = task.schema.as_ref().as_struct().fields().iter().cloned().collect();
all_fields.extend(virtual_iceberg_fields);

Arc::new(
Schema::builder()
.with_schema_id(task.schema.schema_id())
.with_fields(all_fields)
.build()
.map_err(|e| Error::new(ErrorKind::Unexpected, format!("Failed to build extended schema: {}", e)))?
)
} else {
task.schema_ref()
};

// Combine regular field IDs with virtual field IDs
let mut all_field_ids = task.project_field_ids.clone();
all_field_ids.extend(virtual_field_ids);

// Create a projection mask for the batch stream to select which columns in the
// Parquet file that we want in the response
let projection_mask = Self::get_arrow_projection_mask(
Expand All @@ -202,7 +291,7 @@ impl ArrowReader {
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());
RecordBatchTransformer::build(extended_schema, &all_field_ids);

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
Expand Down Expand Up @@ -327,6 +416,7 @@ impl ArrowReader {
data_file_path: &str,
file_io: FileIO,
should_load_page_index: bool,
virtual_columns: Vec<arrow_schema::Field>,
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead + Sized>>> {
// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
Expand All @@ -341,7 +431,7 @@ impl ArrowReader {
// Create the record batch stream builder, which wraps the parquet file reader
let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(
parquet_file_reader,
ArrowReaderOptions::new(),
ArrowReaderOptions::new().with_virtual_columns(virtual_columns),
)
.await?;
Ok(record_batch_stream_builder)
Expand Down Expand Up @@ -1380,9 +1470,9 @@ impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
async move {
let reader = ParquetMetaDataReader::new()
.with_prefetch_hint(self.metadata_size_hint)
.with_column_indexes(self.preload_column_index)
.with_page_indexes(self.preload_page_index)
.with_offset_indexes(self.preload_offset_index);
.with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
.with_page_index_policy(PageIndexPolicy::from(self.preload_page_index))
.with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index));
let size = self.meta.size;
let meta = reader.load_and_finish(self, size).await?;

Expand Down Expand Up @@ -1576,7 +1666,7 @@ message schema {
assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert_eq!(
err.to_string(),
"DataInvalid => Unsupported Arrow data type: Duration(Microsecond)".to_string()
"DataInvalid => Unsupported Arrow data type: Duration(µs)".to_string()
);

// Omitting field c2, we still get an error due to c3 being selected
Expand Down
Loading
Loading