Skip to content

Commit 9cfa359

Browse files
committed
feat: enable Decimal64 handling throughout the Fuse-table deserialization pipeline
1 parent 8b4a558 commit 9cfa359

File tree

8 files changed

+77
-32
lines changed

8 files changed

+77
-32
lines changed

src/common/native/src/read/batch_read.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,16 @@ fn read_nested_column<R: NativeReadBuf>(
7373
}
7474
),
7575
Decimal(decimal) => match decimal {
76+
DecimalDataType::Decimal64(decimal_size) => {
77+
init.push(InitNested::Primitive(is_nullable));
78+
read_nested_decimal::<i64, i64, _>(
79+
&mut readers.pop().unwrap(),
80+
data_type.clone(),
81+
decimal_size,
82+
init,
83+
page_metas.pop().unwrap(),
84+
)?
85+
}
7686
DecimalDataType::Decimal128(decimal_size) => {
7787
init.push(InitNested::Primitive(is_nullable));
7888
let mut results = read_nested_decimal::<i128, i128, _>(
@@ -115,7 +125,6 @@ fn read_nested_column<R: NativeReadBuf>(
115125
page_metas.pop().unwrap(),
116126
)?
117127
}
118-
_ => unreachable!(),
119128
},
120129
Interval => {
121130
init.push(InitNested::Primitive(is_nullable));

src/common/native/src/read/deserialize.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,15 @@ where
168168
))
169169
}
170170
TableDataType::Decimal(decimal) => match decimal {
171+
DecimalDataType::Decimal64(size) => {
172+
init.push(InitNested::Primitive(is_nullable));
173+
DynIter::new(DecimalNestedIter::<_, i64, i64>::new(
174+
readers.pop().unwrap(),
175+
data_type.clone(),
176+
size,
177+
init,
178+
))
179+
}
171180
DecimalDataType::Decimal128(size) => {
172181
init.push(InitNested::Primitive(is_nullable));
173182
DynIter::new(DecimalNestedIter::<_, i128, i128>::new(
@@ -187,7 +196,6 @@ where
187196
readers.pop().unwrap(), data_type.clone(), size, init
188197
))
189198
}
190-
_ => unreachable!(),
191199
},
192200
t if t.is_physical_binary() => {
193201
init.push(InitNested::Primitive(is_nullable));

src/common/native/src/write/serialize.rs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@ use std::io::Write;
1717
use databend_common_column::bitmap::Bitmap;
1818
use databend_common_column::buffer::Buffer;
1919
use databend_common_column::types::i256;
20-
use databend_common_expression::types::AccessType;
2120
use databend_common_expression::types::AnyType;
2221
use databend_common_expression::types::DataType;
2322
use databend_common_expression::types::DecimalColumn;
24-
use databend_common_expression::types::DecimalView;
2523
use databend_common_expression::types::GeographyColumn;
2624
use databend_common_expression::types::NullableColumn;
2725
use databend_common_expression::types::NumberColumn;
@@ -100,16 +98,13 @@ impl<'a, W: Write> ValueVisitor for WriteVisitor<'a, W> {
10098

10199
fn visit_any_decimal(&mut self, column: DecimalColumn) -> Result<()> {
102100
match column {
103-
DecimalColumn::Decimal64(buffer, _) => {
104-
let buffer: Vec<_> = DecimalView::<i64, i128>::iter_column(&buffer).collect();
105-
write_primitive::<_, W>(
106-
self.w,
107-
&buffer.into(),
108-
self.validity.clone(),
109-
&self.write_options,
110-
self.scratch,
111-
)
112-
}
101+
DecimalColumn::Decimal64(buffer, _) => write_primitive::<_, W>(
102+
self.w,
103+
&buffer,
104+
self.validity.clone(),
105+
&self.write_options,
106+
self.scratch,
107+
),
113108
DecimalColumn::Decimal128(column, _) => write_primitive::<_, W>(
114109
self.w,
115110
&column,

src/query/expression/src/converts/arrow/from.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use crate::types::AnyType;
4545
use crate::types::ArrayColumn;
4646
use crate::types::DataType;
4747
use crate::types::DecimalColumn;
48-
use crate::types::DecimalDataKind;
4948
use crate::types::DecimalDataType;
5049
use crate::types::DecimalSize;
5150
use crate::types::GeographyColumn;
@@ -135,14 +134,17 @@ impl TryFrom<&Field> for TableField {
135134
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View => {
136135
TableDataType::String
137136
}
137+
ArrowDataType::Decimal64(precision, scale) if *scale >= 0 => {
138+
TableDataType::Decimal(DecimalDataType::Decimal64(DecimalSize::new(
139+
*precision,
140+
*scale as _,
141+
)?))
142+
}
138143
ArrowDataType::Decimal128(precision, scale) if *scale >= 0 => {
139-
let size = DecimalSize::new(*precision, *scale as _)?;
140-
match size.data_kind() {
141-
DecimalDataKind::Decimal64 | DecimalDataKind::Decimal128 => {
142-
TableDataType::Decimal(DecimalDataType::Decimal128(size))
143-
}
144-
_ => unreachable!(),
145-
}
144+
TableDataType::Decimal(DecimalDataType::Decimal128(DecimalSize::new(
145+
*precision,
146+
*scale as _,
147+
)?))
146148
}
147149
ArrowDataType::Decimal256(precision, scale) if *scale >= 0 => {
148150
TableDataType::Decimal(DecimalDataType::Decimal256(DecimalSize::new(

src/query/expression/src/converts/arrow/to.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,22 @@ impl From<&TableField> for Field {
125125
TableDataType::Number(ty) => with_number_type!(|TYPE| match ty {
126126
NumberDataType::TYPE => ArrowDataType::TYPE,
127127
}),
128-
TableDataType::Decimal(
129-
DecimalDataType::Decimal64(size) | DecimalDataType::Decimal128(size),
130-
) => ArrowDataType::Decimal128(size.precision(), size.scale() as i8),
128+
TableDataType::Decimal(DecimalDataType::Decimal64(size)) => {
129+
ArrowDataType::Decimal64(size.precision(), size.scale() as i8)
130+
}
131+
TableDataType::Decimal(DecimalDataType::Decimal128(size)) => {
132+
// The proto layer of meta supports only Decimal128/256 , so Decimal64 columns
133+
// are still serialized into Decimal128 metadata (see datatype.proto and schema_from_to_protobuf_impl.rs).
134+
//
135+
// For rolling upgrades we leave the proto definition untouched and instead coerce
136+
// the Arrow type here based on the precision.
137+
138+
if size.can_carried_by_64() {
139+
ArrowDataType::Decimal64(size.precision(), size.scale() as i8)
140+
} else {
141+
ArrowDataType::Decimal128(size.precision(), size.scale() as i8)
142+
}
143+
}
131144
TableDataType::Decimal(DecimalDataType::Decimal256(size)) => {
132145
ArrowDataType::Decimal256(size.precision(), size.scale() as i8)
133146
}
@@ -338,7 +351,10 @@ impl From<&Column> for ArrayData {
338351
Column::Decimal(c) => {
339352
let c = c.clone().strict_decimal();
340353
let arrow_type = match c {
341-
DecimalColumn::Decimal64(_, size) | DecimalColumn::Decimal128(_, size) => {
354+
DecimalColumn::Decimal64(_, size) => {
355+
ArrowDataType::Decimal64(size.precision(), size.scale() as _)
356+
}
357+
DecimalColumn::Decimal128(_, size) => {
342358
ArrowDataType::Decimal128(size.precision(), size.scale() as _)
343359
}
344360
DecimalColumn::Decimal256(_, size) => {

src/query/expression/src/schema.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1678,7 +1678,10 @@ pub fn infer_schema_type(data_type: &DataType) -> Result<TableDataType> {
16781678
DataType::Number(number_type) => Ok(TableDataType::Number(*number_type)),
16791679
DataType::Timestamp => Ok(TableDataType::Timestamp),
16801680
DataType::Decimal(size) => match size.data_kind() {
1681-
DecimalDataKind::Decimal64 | DecimalDataKind::Decimal128 => {
1681+
DecimalDataKind::Decimal64 => {
1682+
Ok(TableDataType::Decimal(DecimalDataType::Decimal64(*size)))
1683+
}
1684+
DecimalDataKind::Decimal128 => {
16821685
Ok(TableDataType::Decimal(DecimalDataType::Decimal128(*size)))
16831686
}
16841687
DecimalDataKind::Decimal256 => {

src/query/expression/src/types/decimal.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1809,7 +1809,8 @@ impl DecimalColumn {
18091809
#[cfg(debug_assertions)]
18101810
{
18111811
match (&arrow_type, self) {
1812-
(arrow_schema::DataType::Decimal128(p, s), DecimalColumn::Decimal64(_, size))
1812+
(arrow_schema::DataType::Decimal64(p, s), DecimalColumn::Decimal64(_, size))
1813+
| (arrow_schema::DataType::Decimal128(p, s), DecimalColumn::Decimal64(_, size))
18131814
| (arrow_schema::DataType::Decimal128(p, s), DecimalColumn::Decimal128(_, size))
18141815
| (arrow_schema::DataType::Decimal256(p, s), DecimalColumn::Decimal256(_, size)) => {
18151816
assert_eq!(size.precision, *p);
@@ -1821,8 +1822,12 @@ impl DecimalColumn {
18211822

18221823
let buffer = match self {
18231824
DecimalColumn::Decimal64(col, _) => {
1824-
let builder = Decimal64As128Type::iter_column(col).collect::<Vec<_>>();
1825-
Decimal128Type::build_column(builder).into()
1825+
if matches!(arrow_type, arrow_schema::DataType::Decimal64(_, _)) {
1826+
col.clone().into()
1827+
} else {
1828+
let builder = Decimal64As128Type::iter_column(col).collect::<Vec<_>>();
1829+
Decimal128Type::build_column(builder).into()
1830+
}
18261831
}
18271832
DecimalColumn::Decimal128(col, _) => col.clone().into(),
18281833
DecimalColumn::Decimal256(col, _) => {
@@ -1844,6 +1849,13 @@ impl DecimalColumn {
18441849
pub fn try_from_arrow_data(array: ArrayData) -> Result<Self> {
18451850
let buffer = array.buffers()[0].clone();
18461851
match array.data_type() {
1852+
arrow_schema::DataType::Decimal64(p, s) => {
1853+
let decimal_size = DecimalSize {
1854+
precision: *p,
1855+
scale: *s as u8,
1856+
};
1857+
Ok(Self::Decimal64(buffer.into(), decimal_size).strict_decimal())
1858+
}
18471859
arrow_schema::DataType::Decimal128(p, s) => {
18481860
let decimal_size = DecimalSize {
18491861
precision: *p,

tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.result

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@
1717
2 {"k":"v"}
1818
--- large parquet file should be worked on parallel by rowgroups
1919
5000000
20-
├── partitions total: 6
21-
├── partitions scanned: 6
20+
├── partitions total: 3
21+
├── partitions scanned: 3
2222
5000000

0 commit comments

Comments
 (0)