Skip to content
Draft
2 changes: 1 addition & 1 deletion src/common/base/src/runtime/perf/query_perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl QueryPerf {
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.set_filter_func(filter_closure)
.build()
.map_err(|_e| ErrorCode::Internal("Failed to create profiler"))?;
.map_err(|e| ErrorCode::Internal(format!("Failed to create profiler, {e}")))?;
debug!("starting perf with frequency: {}", frequency);
let mut payload = ThreadTracker::new_tracking_payload();
payload.perf_enabled = true;
Expand Down
17 changes: 14 additions & 3 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,18 @@ impl BlockEntry {
BlockEntry::Column(column) => Ok(ColumnView::Column(T::try_downcast_column(column)?)),
}
}

pub fn into_nullable(self) -> BlockEntry {
match self {
BlockEntry::Const(scalar, data_type, n) if !data_type.is_nullable_or_null() => {
BlockEntry::Const(scalar, DataType::Nullable(Box::new(data_type)), n)
}
entry @ BlockEntry::Const(_, _, _)
| entry @ BlockEntry::Column(Column::Nullable(_))
| entry @ BlockEntry::Column(Column::Null { .. }) => entry,
BlockEntry::Column(column) => column.wrap_nullable(None).into(),
}
}
}

impl From<Column> for BlockEntry {
Expand Down Expand Up @@ -846,10 +858,9 @@ impl DataBlock {
pub fn project(mut self, projections: &ColumnSet) -> Self {
let mut entries = Vec::with_capacity(projections.len());
for (index, column) in self.entries.into_iter().enumerate() {
if !projections.contains(&index) {
continue;
if projections.contains(&index) {
entries.push(column);
}
entries.push(column);
}
self.entries = entries;
self
Expand Down
6 changes: 6 additions & 0 deletions src/query/expression/src/filter/filter_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl FilterExecutor {
// Filter a DataBlock, return the filtered DataBlock.
pub fn filter(&mut self, data_block: DataBlock) -> Result<DataBlock> {
if self.func_ctx.enable_selector_executor {
debug_assert!(data_block.num_rows() <= self.max_block_size);
let origin_count = data_block.num_rows();
let result_count = self.select(&data_block)?;
self.take(data_block, origin_count, result_count)
Expand All @@ -100,6 +101,7 @@ impl FilterExecutor {

// Store the filtered indices of data_block in `true_selection` and return the number of filtered indices.
pub fn select(&mut self, data_block: &DataBlock) -> Result<usize> {
debug_assert!(data_block.num_rows() <= self.max_block_size);
let evaluator = Evaluator::new(data_block, &self.func_ctx, self.fn_registry);
let selector = Selector::new(evaluator, data_block.num_rows());
selector.select(
Expand Down Expand Up @@ -208,4 +210,8 @@ impl FilterExecutor {
pub fn mutable_true_selection(&mut self) -> &mut [u32] {
&mut self.true_selection
}

pub fn max_block_size(&self) -> usize {
self.max_block_size
}
}
15 changes: 15 additions & 0 deletions src/query/expression/src/kernels/take_ranges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ impl DataBlock {
self.get_meta().cloned(),
))
}

pub fn merge_indices_to_ranges(indices: &[u32]) -> Vec<Range<u32>> {
debug_assert!(indices.is_sorted());
let mut ranges: Vec<Range<u32>> = Vec::with_capacity(indices.len() / 2);
for &index in indices {
if let Some(cur) = ranges.last_mut()
&& cur.end == index
{
cur.end += 1;
} else {
ranges.push(index..index + 1)
}
}
ranges
}
}

struct TakeRangeVisitor<'a> {
Expand Down
7 changes: 5 additions & 2 deletions src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ impl DataSchema {
}
}

pub fn new_ref(fields: Vec<DataField>) -> Arc<Self> {
Self::new(fields).into()
}

pub fn new_from(fields: Vec<DataField>, metadata: BTreeMap<String, String>) -> Self {
Self { fields, metadata }
}
Expand Down Expand Up @@ -416,8 +420,7 @@ impl DataSchema {
let mut valid_fields: Vec<String> = self.fields.iter().map(|f| f.name().clone()).collect();
valid_fields.truncate(16);
Err(ErrorCode::BadArguments(format!(
"Unable to get field named \"{}\". Valid fields: {:?} ...",
name, valid_fields
"Unable to get field named {name:?}. Valid fields: {valid_fields:?} ...",
)))
}

Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#![allow(clippy::diverging_sub_expression)]
#![allow(clippy::arc_with_non_send_sync)]
#![feature(debug_closure_helpers)]
#![feature(stmt_expr_attributes)]

extern crate core;

Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/physical_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub use physical_exchange::Exchange;
pub use physical_exchange_sink::ExchangeSink;
pub use physical_exchange_source::ExchangeSource;
pub use physical_filter::Filter;
pub use physical_hash_join::HashJoin;
pub use physical_hash_join::*;
pub use physical_limit::Limit;
pub use physical_materialized_cte::*;
pub use physical_multi_table_insert::*;
Expand Down
Loading
Loading