Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor arrow-ipc: Move create_*_array methods into RecordBatchDecoder #7029

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 104 additions & 124 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,15 @@ impl RecordBatchDecoder<'_> {
) -> Result<ArrayRef, ArrowError> {
let data_type = field.data_type();
match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
self.next_node(field)?,
data_type,
&[
Utf8 | Binary | LargeBinary | LargeUtf8 => {
let field_node = self.next_node(field)?;
let buffers = [
self.next_buffer()?,
self.next_buffer()?,
self.next_buffer()?,
],
self.require_alignment,
),
];
self.create_primitive_array(field_node, data_type, &buffers)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea here is to simply call self. instead of free functions like create_primitive_array

}
BinaryView | Utf8View => {
let count = variadic_counts
.pop_front()
Expand All @@ -105,42 +104,25 @@ impl RecordBatchDecoder<'_> {
let buffers = (0..count)
.map(|_| self.next_buffer())
.collect::<Result<Vec<_>, _>>()?;
create_primitive_array(
self.next_node(field)?,
data_type,
&buffers,
self.require_alignment,
)
let field_node = self.next_node(field)?;
self.create_primitive_array(field_node, data_type, &buffers)
}
FixedSizeBinary(_) => {
let field_node = self.next_node(field)?;
let buffers = [self.next_buffer()?, self.next_buffer()?];
self.create_primitive_array(field_node, data_type, &buffers)
}
FixedSizeBinary(_) => create_primitive_array(
self.next_node(field)?,
data_type,
&[self.next_buffer()?, self.next_buffer()?],
self.require_alignment,
),
List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => {
let list_node = self.next_node(field)?;
let list_buffers = [self.next_buffer()?, self.next_buffer()?];
let values = self.create_array(list_field, variadic_counts)?;
create_list_array(
list_node,
data_type,
&list_buffers,
values,
self.require_alignment,
)
self.create_list_array(list_node, data_type, &list_buffers, values)
}
FixedSizeList(ref list_field, _) => {
let list_node = self.next_node(field)?;
let list_buffers = [self.next_buffer()?];
let values = self.create_array(list_field, variadic_counts)?;
create_list_array(
list_node,
data_type,
&list_buffers,
values,
self.require_alignment,
)
self.create_list_array(list_node, data_type, &list_buffers, values)
}
Struct(struct_fields) => {
let struct_node = self.next_node(field)?;
Expand Down Expand Up @@ -205,12 +187,11 @@ impl RecordBatchDecoder<'_> {
))
})?;

create_dictionary_array(
self.create_dictionary_array(
index_node,
data_type,
&index_buffers,
value_array.clone(),
self.require_alignment,
)
}
Union(fields, mode) => {
Expand Down Expand Up @@ -265,107 +246,106 @@ impl RecordBatchDecoder<'_> {
// no buffer increases
Ok(Arc::new(NullArray::from(array_data)))
}
_ => create_primitive_array(
self.next_node(field)?,
data_type,
&[self.next_buffer()?, self.next_buffer()?],
self.require_alignment,
),
_ => {
let field_node = self.next_node(field)?;
let buffers = [self.next_buffer()?, self.next_buffer()?];
self.create_primitive_array(field_node, data_type, &buffers)
}
}
}
}

/// Reads the correct number of buffers based on data type and null_count, and creates a
/// primitive array ref
fn create_primitive_array(
field_node: &FieldNode,
data_type: &DataType,
buffers: &[Buffer],
require_alignment: bool,
) -> Result<ArrayRef, ArrowError> {
let length = field_node.length() as usize;
let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
let builder = match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => {
// read 3 buffers: null buffer (optional), offsets buffer and data buffer
ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..3].to_vec())
.null_bit_buffer(null_buffer)
}
BinaryView | Utf8View => ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.null_bit_buffer(null_buffer),
_ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
// read 2 buffers: null buffer (optional) and data buffer
ArrayData::builder(data_type.clone())
/// Reads the correct number of buffers based on data type and null_count, and creates a
/// primitive array ref
fn create_primitive_array(
&self,
field_node: &FieldNode,
data_type: &DataType,
buffers: &[Buffer],
) -> Result<ArrayRef, ArrowError> {
let length = field_node.length() as usize;
let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
let builder = match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => {
// read 3 buffers: null buffer (optional), offsets buffer and data buffer
ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..3].to_vec())
.null_bit_buffer(null_buffer)
}
BinaryView | Utf8View => ArrayData::builder(data_type.clone())
.len(length)
.add_buffer(buffers[1].clone())
.null_bit_buffer(null_buffer)
}
t => unreachable!("Data type {:?} either unsupported or not primitive", t),
};
.buffers(buffers[1..].to_vec())
.null_bit_buffer(null_buffer),
_ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
// read 2 buffers: null buffer (optional) and data buffer
ArrayData::builder(data_type.clone())
.len(length)
.add_buffer(buffers[1].clone())
.null_bit_buffer(null_buffer)
}
t => unreachable!("Data type {:?} either unsupported or not primitive", t),
};

let array_data = builder.align_buffers(!require_alignment).build()?;
let array_data = builder.align_buffers(!self.require_alignment).build()?;

Ok(make_array(array_data))
}
Ok(make_array(array_data))
}

/// Reads the correct number of buffers based on list type and null_count, and creates a
/// list array ref
fn create_list_array(
field_node: &FieldNode,
data_type: &DataType,
buffers: &[Buffer],
child_array: ArrayRef,
require_alignment: bool,
) -> Result<ArrayRef, ArrowError> {
let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
let length = field_node.length() as usize;
let child_data = child_array.into_data();
let builder = match data_type {
List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
.len(length)
.add_buffer(buffers[1].clone())
.add_child_data(child_data)
.null_bit_buffer(null_buffer),

FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
.len(length)
.add_child_data(child_data)
.null_bit_buffer(null_buffer),

_ => unreachable!("Cannot create list or map array from {:?}", data_type),
};
/// Reads the correct number of buffers based on list type and null_count, and creates a
/// list array ref
fn create_list_array(
&self,
field_node: &FieldNode,
data_type: &DataType,
buffers: &[Buffer],
child_array: ArrayRef,
) -> Result<ArrayRef, ArrowError> {
let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
let length = field_node.length() as usize;
let child_data = child_array.into_data();
let builder = match data_type {
List(_) | LargeList(_) | Map(_, _) => ArrayData::builder(data_type.clone())
.len(length)
.add_buffer(buffers[1].clone())
.add_child_data(child_data)
.null_bit_buffer(null_buffer),

let array_data = builder.align_buffers(!require_alignment).build()?;
FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
.len(length)
.add_child_data(child_data)
.null_bit_buffer(null_buffer),

Ok(make_array(array_data))
}
_ => unreachable!("Cannot create list or map array from {:?}", data_type),
};

/// Reads the correct number of buffers based on list type and null_count, and creates a
/// list array ref
fn create_dictionary_array(
field_node: &FieldNode,
data_type: &DataType,
buffers: &[Buffer],
value_array: ArrayRef,
require_alignment: bool,
) -> Result<ArrayRef, ArrowError> {
if let Dictionary(_, _) = *data_type {
let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
let array_data = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.add_buffer(buffers[1].clone())
.add_child_data(value_array.into_data())
.null_bit_buffer(null_buffer)
.align_buffers(!require_alignment)
.build()?;
let array_data = builder.align_buffers(!self.require_alignment).build()?;

Ok(make_array(array_data))
} else {
unreachable!("Cannot create dictionary array from {:?}", data_type)
}

/// Reads the correct number of buffers based on list type and null_count, and creates a
/// list array ref
fn create_dictionary_array(
&self,
field_node: &FieldNode,
data_type: &DataType,
buffers: &[Buffer],
value_array: ArrayRef,
) -> Result<ArrayRef, ArrowError> {
if let Dictionary(_, _) = *data_type {
let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone());
let array_data = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.add_buffer(buffers[1].clone())
.add_child_data(value_array.into_data())
.null_bit_buffer(null_buffer)
.align_buffers(!self.require_alignment)
.build()?;

Ok(make_array(array_data))
} else {
unreachable!("Cannot create dictionary array from {:?}", data_type)
}
}
}

Expand Down
Loading