Skip to content

Commit 797c023

Browse files
authored
refactor: move aggregation spill logic into partial aggregate (#18999)
* refactor: create a new partial * refactor: move aggregation spill logic into partial aggregate * refactor: move aggregation spill logic into partial aggregate * chore: catch up main * chore: clean up * enable enable_experiment_aggregate for test * chore: make writers lazy init * make max_aggregate_spill_level to 1 * fix: final aggregate must call spill finish in every round * fix: final aggregate must call spill finish in every round * fix: final aggregate must call spill finish in every round * set enable_experiment_aggregate = 0
1 parent 5bc4a5d commit 797c023

19 files changed

+781
-524
lines changed

src/common/base/src/runtime/perf/flamegraph_main_template.html

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ <h1>Query Performance Flamegraphs</h1>
4848

4949
<script>
5050
function resizeIframe(iframe) {
51-
// A small delay can help ensure the content has been fully rendered
52-
// before calculating its height, especially for complex SVGs.
5351
setTimeout(() => {
5452
try {
5553
const body = iframe.contentWindow.document.body;
@@ -66,4 +64,4 @@ <h1>Query Performance Flamegraphs</h1>
6664
}
6765
</script>
6866
</body>
69-
</html>
67+
</html>

src/query/service/src/physical_plans/physical_aggregate_partial.rs

Lines changed: 58 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,15 @@ use databend_common_sql::IndexType;
3838
use databend_common_storage::DataOperator;
3939
use itertools::Itertools;
4040

41+
use crate::clusters::ClusterHelper;
4142
use crate::physical_plans::explain::PlanStatsInfo;
4243
use crate::physical_plans::format::AggregatePartialFormatter;
4344
use crate::physical_plans::format::PhysicalFormat;
4445
use crate::physical_plans::physical_plan::IPhysicalPlan;
4546
use crate::physical_plans::physical_plan::PhysicalPlan;
4647
use crate::physical_plans::physical_plan::PhysicalPlanMeta;
4748
use crate::pipelines::processors::transforms::aggregator::AggregateInjector;
48-
use crate::pipelines::processors::transforms::aggregator::NewTransformAggregateSpillWriter;
49+
use crate::pipelines::processors::transforms::aggregator::NewTransformPartialAggregate;
4950
use crate::pipelines::processors::transforms::aggregator::PartialSingleStateAggregator;
5051
use crate::pipelines::processors::transforms::aggregator::SharedPartitionStream;
5152
use crate::pipelines::processors::transforms::aggregator::TransformAggregateSpillWriter;
@@ -220,51 +221,66 @@ impl IPhysicalPlan for AggregatePartial {
220221
});
221222
}
222223

223-
builder.main_pipeline.add_transform(|input, output| {
224-
Ok(ProcessorPtr::create(TransformPartialAggregate::try_create(
225-
builder.ctx.clone(),
226-
input,
227-
output,
228-
params.clone(),
229-
partial_agg_config.clone(),
230-
)?))
231-
})?;
224+
if params.enable_experiment_aggregate {
225+
let cluster = &builder.ctx.get_cluster();
226+
let streams_num = if !builder.is_exchange_parent() {
227+
1
228+
} else {
229+
cluster.nodes.len()
230+
};
231+
let local_pos = cluster.ordered_index();
232+
let shared_partition_streams = (0..streams_num)
233+
.map(|_| {
234+
SharedPartitionStream::new(
235+
builder.main_pipeline.output_len(),
236+
max_block_rows,
237+
max_block_bytes,
238+
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize,
239+
)
240+
})
241+
.collect::<Vec<_>>();
242+
243+
builder.main_pipeline.add_transform(|input, output| {
244+
Ok(ProcessorPtr::create(
245+
NewTransformPartialAggregate::try_create(
246+
builder.ctx.clone(),
247+
input,
248+
output,
249+
params.clone(),
250+
partial_agg_config.clone(),
251+
shared_partition_streams.clone(),
252+
local_pos,
253+
)?,
254+
))
255+
})?;
256+
} else {
257+
builder.main_pipeline.add_transform(|input, output| {
258+
Ok(ProcessorPtr::create(TransformPartialAggregate::try_create(
259+
builder.ctx.clone(),
260+
input,
261+
output,
262+
params.clone(),
263+
partial_agg_config.clone(),
264+
)?))
265+
})?;
266+
}
232267

233268
// If cluster mode, spill write will be completed in exchange serialize, because we need scatter the block data first
234-
if !builder.is_exchange_parent() {
269+
if !builder.is_exchange_parent() && !params.enable_experiment_aggregate {
235270
let operator = DataOperator::instance().spill_operator();
236271
let location_prefix = builder.ctx.query_id_spill_prefix();
237-
if params.enable_experiment_aggregate {
238-
let shared_partition_stream = SharedPartitionStream::new(
239-
builder.main_pipeline.output_len(),
240-
max_block_rows,
241-
max_block_bytes,
242-
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize,
243-
);
244-
builder.main_pipeline.add_transform(|input, output| {
245-
Ok(ProcessorPtr::create(
246-
NewTransformAggregateSpillWriter::try_create(
247-
input,
248-
output,
249-
builder.ctx.clone(),
250-
shared_partition_stream.clone(),
251-
)?,
252-
))
253-
})?;
254-
} else {
255-
builder.main_pipeline.add_transform(|input, output| {
256-
Ok(ProcessorPtr::create(
257-
TransformAggregateSpillWriter::try_create(
258-
builder.ctx.clone(),
259-
input,
260-
output,
261-
operator.clone(),
262-
params.clone(),
263-
location_prefix.clone(),
264-
)?,
265-
))
266-
})?;
267-
}
272+
builder.main_pipeline.add_transform(|input, output| {
273+
Ok(ProcessorPtr::create(
274+
TransformAggregateSpillWriter::try_create(
275+
builder.ctx.clone(),
276+
input,
277+
output,
278+
operator.clone(),
279+
params.clone(),
280+
location_prefix.clone(),
281+
)?,
282+
))
283+
})?;
268284
}
269285

270286
builder.exchange_injector = AggregateInjector::create(builder.ctx.clone(), params.clone());

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use databend_common_expression::DataBlock;
2222
use databend_common_expression::PartitionedPayload;
2323
use databend_common_expression::Payload;
2424
use databend_common_expression::PayloadFlushState;
25-
use databend_common_expression::MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM;
2625
use databend_common_pipeline::core::Pipeline;
2726
use databend_common_pipeline::core::ProcessorPtr;
2827
use databend_common_settings::FlightCompression;
@@ -32,8 +31,6 @@ use crate::pipelines::processors::transforms::aggregator::aggregate_meta::Aggreg
3231
use crate::pipelines::processors::transforms::aggregator::serde::TransformExchangeAggregateSerializer;
3332
use crate::pipelines::processors::transforms::aggregator::serde::TransformExchangeAsyncBarrier;
3433
use crate::pipelines::processors::transforms::aggregator::AggregatorParams;
35-
use crate::pipelines::processors::transforms::aggregator::NewTransformAggregateSpillWriter;
36-
use crate::pipelines::processors::transforms::aggregator::SharedPartitionStream;
3734
use crate::pipelines::processors::transforms::aggregator::TransformAggregateDeserializer;
3835
use crate::pipelines::processors::transforms::aggregator::TransformAggregateSerializer;
3936
use crate::pipelines::processors::transforms::aggregator::TransformAggregateSpillWriter;
@@ -117,7 +114,7 @@ fn scatter_payload(mut payload: Payload, buckets: usize) -> Result<Vec<Payload>>
117114
Ok(buckets)
118115
}
119116

120-
fn scatter_partitioned_payload(
117+
pub fn scatter_partitioned_payload(
121118
partitioned_payload: PartitionedPayload,
122119
buckets: usize,
123120
) -> Result<Vec<PartitionedPayload>> {
@@ -264,25 +261,7 @@ impl ExchangeInjector for AggregateInjector {
264261
) -> Result<()> {
265262
let params = self.aggregator_params.clone();
266263

267-
if self.aggregator_params.enable_experiment_aggregate {
268-
let shared_partition_stream = SharedPartitionStream::new(
269-
pipeline.output_len(),
270-
self.aggregator_params.max_block_rows,
271-
self.aggregator_params.max_block_bytes,
272-
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize,
273-
);
274-
275-
pipeline.add_transform(|input, output| {
276-
Ok(ProcessorPtr::create(
277-
NewTransformAggregateSpillWriter::try_create(
278-
input,
279-
output,
280-
self.ctx.clone(),
281-
shared_partition_stream.clone(),
282-
)?,
283-
))
284-
})?;
285-
} else {
264+
if !self.aggregator_params.enable_experiment_aggregate {
286265
let operator = DataOperator::instance().spill_operator();
287266
let location_prefix = self.ctx.query_id_spill_prefix();
288267

@@ -322,18 +301,6 @@ impl ExchangeInjector for AggregateInjector {
322301
.position(|x| x == local_id)
323302
.unwrap();
324303

325-
let mut partition_streams = vec![];
326-
if self.aggregator_params.enable_experiment_aggregate {
327-
for _i in 0..shuffle_params.destination_ids.len() {
328-
partition_streams.push(SharedPartitionStream::new(
329-
pipeline.output_len(),
330-
self.aggregator_params.max_block_rows,
331-
self.aggregator_params.max_block_bytes,
332-
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize,
333-
));
334-
}
335-
}
336-
337304
pipeline.add_transform(|input, output| {
338305
Ok(ProcessorPtr::create(
339306
TransformExchangeAggregateSerializer::try_create(
@@ -345,7 +312,6 @@ impl ExchangeInjector for AggregateInjector {
345312
params.clone(),
346313
compression,
347314
local_pos,
348-
partition_streams.clone(),
349315
)?,
350316
))
351317
})?;

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,11 @@ pub enum AggregateMeta {
137137
BucketSpilled(BucketSpilledPayload),
138138
Spilled(Vec<BucketSpilledPayload>),
139139

140-
Partitioned { bucket: isize, data: Vec<Self> },
140+
Partitioned {
141+
bucket: isize,
142+
data: Vec<Self>,
143+
activate_worker: Option<usize>,
144+
},
141145

142146
NewBucketSpilled(NewSpilledPayload),
143147
NewSpilled(Vec<NewSpilledPayload>),
@@ -180,8 +184,16 @@ impl AggregateMeta {
180184
Box::new(AggregateMeta::BucketSpilled(payload))
181185
}
182186

183-
pub fn create_partitioned(bucket: isize, data: Vec<Self>) -> BlockMetaInfoPtr {
184-
Box::new(AggregateMeta::Partitioned { data, bucket })
187+
pub fn create_partitioned(
188+
bucket: isize,
189+
data: Vec<Self>,
190+
activate_worker: Option<usize>,
191+
) -> BlockMetaInfoPtr {
192+
Box::new(AggregateMeta::Partitioned {
193+
data,
194+
bucket,
195+
activate_worker,
196+
})
185197
}
186198

187199
pub fn create_new_bucket_spilled(payload: NewSpilledPayload) -> BlockMetaInfoPtr {

src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ pub use aggregate_exchange_injector::AggregateInjector;
2929
pub use aggregate_meta::*;
3030
pub use aggregator_params::AggregatorParams;
3131
pub use build_partition_bucket::build_partition_bucket;
32-
pub use new_aggregate::NewTransformAggregateSpillWriter;
33-
pub use new_aggregate::SharedPartitionStream;
32+
pub use new_aggregate::*;
3433
pub use transform_aggregate_expand::TransformExpandGroupingSets;
3534
pub use transform_aggregate_final::TransformFinalAggregate;
3635
pub use transform_aggregate_partial::TransformPartialAggregate;

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/datablock_splitter.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ pub fn split_partitioned_meta_into_datablocks(
3030
let base_chunk_size = total_len / outputs_len;
3131
let remainder = total_len % outputs_len;
3232

33-
let mut result = Vec::with_capacity(outputs_len);
3433
let mut data_iter = data.into_iter();
34+
let mut chunks = Vec::with_capacity(outputs_len);
35+
let mut activated_workers = 0;
3536

3637
for index in 0..outputs_len {
3738
let chunk_size = if index < remainder {
@@ -41,8 +42,17 @@ pub fn split_partitioned_meta_into_datablocks(
4142
};
4243

4344
let chunk: Vec<AggregateMeta> = data_iter.by_ref().take(chunk_size).collect();
45+
if !chunk.is_empty() {
46+
activated_workers += 1;
47+
}
48+
chunks.push(chunk);
49+
}
50+
51+
let activate_worker = Some(activated_workers);
52+
let mut result = Vec::with_capacity(outputs_len);
53+
for chunk in chunks {
4454
result.push(DataBlock::empty_with_meta(
45-
AggregateMeta::create_partitioned(bucket, chunk),
55+
AggregateMeta::create_partitioned(bucket, chunk, activate_worker),
4656
));
4757
}
4858

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
mod datablock_splitter;
1616
mod new_aggregate_spiller;
1717
mod new_final_aggregate_state;
18-
mod new_transform_aggregate_spill_writer;
18+
mod new_transform_aggregate_partial;
1919
mod new_transform_final_aggregate;
2020
mod transform_partition_bucket_scatter;
2121

2222
pub use datablock_splitter::split_partitioned_meta_into_datablocks;
2323
pub use new_aggregate_spiller::NewAggregateSpiller;
2424
pub use new_aggregate_spiller::SharedPartitionStream;
2525
pub use new_final_aggregate_state::FinalAggregateSharedState;
26-
pub use new_transform_aggregate_spill_writer::NewTransformAggregateSpillWriter;
26+
pub use new_transform_aggregate_partial::NewTransformPartialAggregate;
2727
pub use new_transform_final_aggregate::NewFinalAggregateTransform;
2828
pub use transform_partition_bucket_scatter::TransformPartitionBucketScatter;

0 commit comments

Comments
 (0)