Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 286 additions & 10 deletions rust/lance-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,49 @@ impl RecordBatchExt for RecordBatch {
}
}

/// Recursively projects an array to match the target field's structure.
/// This handles reordering fields inside nested List<Struct> types.
fn project_array(array: &ArrayRef, target_field: &Field) -> Result<ArrayRef> {
match target_field.data_type() {
DataType::Struct(subfields) => {
let struct_arr = array.as_struct();
let projected = project(struct_arr, subfields)?;
Ok(Arc::new(projected))
}
DataType::List(inner_field) => {
let list_arr: &ListArray = array.as_list();
let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
Ok(Arc::new(ListArray::new(
inner_field.clone(),
list_arr.offsets().clone(),
projected_values,
list_arr.nulls().cloned(),
)))
}
DataType::LargeList(inner_field) => {
let list_arr: &LargeListArray = array.as_list();
let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
Ok(Arc::new(LargeListArray::new(
inner_field.clone(),
list_arr.offsets().clone(),
projected_values,
list_arr.nulls().cloned(),
)))
}
DataType::FixedSizeList(inner_field, size) => {
let list_arr = array.as_fixed_size_list();
let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
Ok(Arc::new(FixedSizeListArray::new(
inner_field.clone(),
*size,
projected_values,
list_arr.nulls().cloned(),
)))
}
_ => Ok(array.clone()),
}
}

fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
if fields.is_empty() {
return Ok(StructArray::new_empty_fields(
Expand All @@ -805,16 +848,8 @@ fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
let mut columns: Vec<ArrayRef> = vec![];
for field in fields.iter() {
if let Some(col) = struct_array.column_by_name(field.name()) {
match field.data_type() {
// TODO handle list-of-struct
DataType::Struct(subfields) => {
let projected = project(col.as_struct(), subfields)?;
columns.push(Arc::new(projected));
}
_ => {
columns.push(col.clone());
}
}
let projected = project_array(col, field.as_ref())?;
columns.push(projected);
} else {
return Err(ArrowError::SchemaError(format!(
"field {} does not exist in the RecordBatch",
Expand Down Expand Up @@ -2244,4 +2279,245 @@ mod tests {
let merged_array = merge_with_schema(&left_list_struct, &right_list_struct, &target_fields);
assert_eq!(merged_array.len(), 2);
}

#[test]
fn test_project_by_schema_list_struct_reorder() {
// Test that project_by_schema correctly reorders fields inside List<Struct>
// This is a regression test for issue #5702

// Source schema with inner struct fields in order: c, b, a
let source_inner_struct = DataType::Struct(Fields::from(vec![
Field::new("c", DataType::Utf8, true),
Field::new("b", DataType::Utf8, true),
Field::new("a", DataType::Utf8, true),
]));
let source_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(
"data",
DataType::List(Arc::new(Field::new(
"item",
source_inner_struct.clone(),
true,
))),
true,
),
]));

// Create source data with c, b, a order
let c_array = StringArray::from(vec!["c1", "c2"]);
let b_array = StringArray::from(vec!["b1", "b2"]);
let a_array = StringArray::from(vec!["a1", "a2"]);
let inner_struct = StructArray::from(vec![
(
Arc::new(Field::new("c", DataType::Utf8, true)),
Arc::new(c_array) as ArrayRef,
),
(
Arc::new(Field::new("b", DataType::Utf8, true)),
Arc::new(b_array) as ArrayRef,
),
(
Arc::new(Field::new("a", DataType::Utf8, true)),
Arc::new(a_array) as ArrayRef,
),
]);

let list_array = ListArray::new(
Arc::new(Field::new("item", source_inner_struct, true)),
OffsetBuffer::from_lengths([1, 1]),
Arc::new(inner_struct),
None,
);

let batch = RecordBatch::try_new(
source_schema,
vec![Arc::new(Int32Array::from(vec![1, 2])), Arc::new(list_array)],
)
.unwrap();

// Target schema with inner struct fields in order: a, b, c
let target_inner_struct = DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Utf8, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Utf8, true),
]));
let target_schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(
"data",
DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
true,
),
]);

// Project should reorder the inner struct fields
let projected = batch.project_by_schema(&target_schema).unwrap();

// Verify the schema is correct
assert_eq!(projected.schema().as_ref(), &target_schema);

// Verify the data is correct by checking inner struct field order
let projected_list = projected.column(1).as_list::<i32>();
let projected_struct = projected_list.values().as_struct();

// Fields should now be in order: a, b, c
assert_eq!(
projected_struct.column_by_name("a").unwrap().as_ref(),
&StringArray::from(vec!["a1", "a2"]) as &dyn Array
);
assert_eq!(
projected_struct.column_by_name("b").unwrap().as_ref(),
&StringArray::from(vec!["b1", "b2"]) as &dyn Array
);
assert_eq!(
projected_struct.column_by_name("c").unwrap().as_ref(),
&StringArray::from(vec!["c1", "c2"]) as &dyn Array
);

// Also verify positional access matches expected order (a=0, b=1, c=2)
assert_eq!(
projected_struct.column(0).as_ref(),
&StringArray::from(vec!["a1", "a2"]) as &dyn Array
);
assert_eq!(
projected_struct.column(1).as_ref(),
&StringArray::from(vec!["b1", "b2"]) as &dyn Array
);
assert_eq!(
projected_struct.column(2).as_ref(),
&StringArray::from(vec!["c1", "c2"]) as &dyn Array
);
}

#[test]
fn test_project_by_schema_nested_list_struct() {
// Test deeply nested List<Struct<List<Struct>>> projection
let inner_struct = DataType::Struct(Fields::from(vec![
Field::new("y", DataType::Int32, true),
Field::new("x", DataType::Int32, true),
]));
let source_schema = Arc::new(Schema::new(vec![Field::new(
"outer",
DataType::List(Arc::new(Field::new(
"item",
DataType::Struct(Fields::from(vec![
Field::new("b", DataType::Utf8, true),
Field::new(
"inner_list",
DataType::List(Arc::new(Field::new("item", inner_struct.clone(), true))),
true,
),
Field::new("a", DataType::Utf8, true),
])),
true,
))),
true,
)]));

// Create deeply nested data
let y_array = Int32Array::from(vec![1, 2]);
let x_array = Int32Array::from(vec![3, 4]);
let innermost_struct = StructArray::from(vec![
(
Arc::new(Field::new("y", DataType::Int32, true)),
Arc::new(y_array) as ArrayRef,
),
(
Arc::new(Field::new("x", DataType::Int32, true)),
Arc::new(x_array) as ArrayRef,
),
]);
let inner_list = ListArray::new(
Arc::new(Field::new("item", inner_struct.clone(), true)),
OffsetBuffer::from_lengths([2]),
Arc::new(innermost_struct),
None,
);

let b_array = StringArray::from(vec!["b1"]);
let a_array = StringArray::from(vec!["a1"]);
let middle_struct = StructArray::from(vec![
(
Arc::new(Field::new("b", DataType::Utf8, true)),
Arc::new(b_array) as ArrayRef,
),
(
Arc::new(Field::new(
"inner_list",
DataType::List(Arc::new(Field::new("item", inner_struct, true))),
true,
)),
Arc::new(inner_list) as ArrayRef,
),
(
Arc::new(Field::new("a", DataType::Utf8, true)),
Arc::new(a_array) as ArrayRef,
),
]);

let outer_list = ListArray::new(
Arc::new(Field::new("item", middle_struct.data_type().clone(), true)),
OffsetBuffer::from_lengths([1]),
Arc::new(middle_struct),
None,
);

let batch =
RecordBatch::try_new(source_schema, vec![Arc::new(outer_list) as ArrayRef]).unwrap();

// Target schema with reordered fields at all levels
let target_inner_struct = DataType::Struct(Fields::from(vec![
Field::new("x", DataType::Int32, true), // x before y now
Field::new("y", DataType::Int32, true),
]));
let target_schema = Schema::new(vec![Field::new(
"outer",
DataType::List(Arc::new(Field::new(
"item",
DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Utf8, true), // a before b now
Field::new(
"inner_list",
DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
true,
),
Field::new("b", DataType::Utf8, true),
])),
true,
))),
true,
)]);

let projected = batch.project_by_schema(&target_schema).unwrap();

// Verify schema
assert_eq!(projected.schema().as_ref(), &target_schema);

// Verify deeply nested data is reordered correctly
let outer_list = projected.column(0).as_list::<i32>();
let middle_struct = outer_list.values().as_struct();

// Middle struct should have a first, then inner_list, then b
assert_eq!(
middle_struct.column(0).as_ref(),
&StringArray::from(vec!["a1"]) as &dyn Array
);
assert_eq!(
middle_struct.column(2).as_ref(),
&StringArray::from(vec!["b1"]) as &dyn Array
);

// Inner list's struct should have x first, then y
let inner_list = middle_struct.column(1).as_list::<i32>();
let innermost_struct = inner_list.values().as_struct();
assert_eq!(
innermost_struct.column(0).as_ref(),
&Int32Array::from(vec![3, 4]) as &dyn Array
);
assert_eq!(
innermost_struct.column(1).as_ref(),
&Int32Array::from(vec![1, 2]) as &dyn Array
);
}
}
34 changes: 34 additions & 0 deletions rust/lance/src/dataset/tests/dataset_migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,37 @@ async fn test_max_fragment_id_migration() {
assert_eq!(dataset.manifest.max_fragment_id(), Some(2));
}
}

/// Regression test for issue #5702: project_by_schema should reorder fields inside List<Struct>.
///
/// This test reads a dataset with:
/// - Fragment 0: List<Struct<a, b, c>> with all fields + "extra" column
/// - Fragment 1: List<Struct<c, b>> with reordered/missing inner struct fields
///
/// Before the fix, reading would fail with:
/// "Incorrect datatype for StructArray field expected List(Struct(...)) got List(Struct(...))"
#[tokio::test]
async fn test_list_struct_field_reorder_issue_5702() {
let test_dir = copy_test_data_to_tmp("v1.0.1/list_struct_reorder.lance")
.expect("Failed to copy test data");
let test_uri = test_dir.path_str();

let dataset = Dataset::open(&test_uri)
.await
.expect("Failed to open dataset");

// Verify we have 2 fragments
assert_eq!(dataset.get_fragments().len(), 2);

// This read would fail before the fix for #5702
let batches = scan_dataset(&test_uri)
.await
.expect("Failed to scan dataset");
let batch = concat_batches(&batches[0].schema(), batches.iter()).expect("Failed to concat");

// Verify we got all 4 rows
assert_eq!(batch.num_rows(), 4);

// Verify schema has expected columns
assert_eq!(batch.schema().fields().len(), 3); // id, data, extra
}
Loading
Loading