Skip to content

Commit 1756cdb

Browse files
authored
fix(tesseract): Segment support in pre-aggregations (cube-js#10422)
1 parent 53e9d9e commit 1756cdb

28 files changed

+597
-10
lines changed

packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3058,6 +3058,7 @@ SELECT 1 AS revenue, cast('2024-01-01' AS timestamp) as time UNION ALL
30583058
preAggregationsSchema: ''
30593059
});
30603060

3061+
query.buildSqlAndParams();
30613062
const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription()[0];
30623063

30633064
const res = await testWithPreAggregation(preAggregationsDescription, query);

rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/pre_aggregation_description.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ pub trait PreAggregationDescription {
3333
#[nbridge(field, optional)]
3434
fn time_dimension_reference(&self) -> Result<Option<Rc<dyn MemberSql>>, CubeError>;
3535

36+
#[nbridge(field, optional)]
37+
fn segment_references(&self) -> Result<Option<Rc<dyn MemberSql>>, CubeError>;
38+
3639
#[nbridge(field, optional)]
3740
fn rollup_references(&self) -> Result<Option<Rc<dyn MemberSql>>, CubeError>;
3841
}

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub struct CompiledPreAggregation {
4747
pub measures: Vec<Rc<MemberSymbol>>,
4848
pub dimensions: Vec<Rc<MemberSymbol>>,
4949
pub time_dimensions: Vec<Rc<MemberSymbol>>,
50+
pub segments: Vec<Rc<MemberSymbol>>,
5051
pub allow_non_strict_date_range_match: bool,
5152
}
5253

@@ -60,6 +61,7 @@ impl Debug for CompiledPreAggregation {
6061
.field("measures", &self.measures)
6162
.field("dimensions", &self.dimensions)
6263
.field("time_dimensions", &self.time_dimensions)
64+
.field("segments", &self.segments)
6365
.field(
6466
"allow_non_strict_date_range_match",
6567
&self.allow_non_strict_date_range_match,

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/dimension_matcher.rs

Lines changed: 172 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub struct DimensionMatcher<'a> {
3535
pre_aggregation: &'a CompiledPreAggregation,
3636
pre_aggregation_dimensions: HashMap<String, bool>,
3737
pre_aggregation_time_dimensions: HashMap<String, (Option<Rc<TimeDimensionSymbol>>, bool)>,
38+
pre_aggregation_segments: HashMap<String, bool>,
3839
result: MatchState,
3940
}
4041

@@ -56,11 +57,17 @@ impl<'a> DimensionMatcher<'a> {
5657
}
5758
})
5859
.collect::<HashMap<_, _>>();
60+
let pre_aggregation_segments = pre_aggregation
61+
.segments
62+
.iter()
63+
.map(|s| (s.full_name(), false))
64+
.collect();
5965
Self {
6066
query_tools,
6167
pre_aggregation,
6268
pre_aggregation_dimensions,
6369
pre_aggregation_time_dimensions,
70+
pre_aggregation_segments,
6471
result: MatchState::Full,
6572
}
6673
}
@@ -128,6 +135,12 @@ impl<'a> DimensionMatcher<'a> {
128135
MatchState::Partial
129136
};
130137
self.result = self.result.combine(&time_dimension_coverage_result);
138+
let segment_coverage_result = if self.pre_aggregation_segments.values().all(|v| *v) {
139+
MatchState::Full
140+
} else {
141+
MatchState::Partial
142+
};
143+
self.result = self.result.combine(&segment_coverage_result);
131144
self.result
132145
}
133146

@@ -143,7 +156,30 @@ impl<'a> DimensionMatcher<'a> {
143156
MemberSymbol::TimeDimension(time_dimension) => {
144157
self.try_match_time_dimension(time_dimension, add_to_matched_dimension)
145158
}
146-
MemberSymbol::MemberExpression(_member_expression) => Ok(MatchState::NotMatched), //TODO We don't allow to use pre-aggregations with member expressions before SQL API is ready for it
159+
MemberSymbol::MemberExpression(me) => {
160+
if let Some(found) = self.pre_aggregation_segments.get_mut(&me.full_name()) {
161+
if add_to_matched_dimension {
162+
*found = true;
163+
}
164+
Ok(MatchState::Full)
165+
} else {
166+
let dependencies = me.get_dependencies();
167+
if dependencies.is_empty() {
168+
Ok(MatchState::NotMatched)
169+
} else {
170+
let mut result = MatchState::Partial;
171+
for dep in dependencies {
172+
let dep_match =
173+
self.try_match_symbol(&dep, add_to_matched_dimension)?;
174+
if dep_match == MatchState::NotMatched {
175+
return Ok(MatchState::NotMatched);
176+
}
177+
result = result.combine(&dep_match);
178+
}
179+
Ok(result)
180+
}
181+
}
182+
}
147183
_ => Ok(MatchState::NotMatched),
148184
}
149185
}
@@ -655,4 +691,139 @@ mod tests {
655691
MatchState::Full,
656692
);
657693
}
694+
695+
#[test]
696+
fn test_segment_full_match() {
697+
let ctx = create_test_context();
698+
assert_eq!(
699+
match_pre_agg(
700+
&ctx,
701+
"segment_rollup",
702+
indoc! {"
703+
measures:
704+
- orders.count
705+
dimensions:
706+
- orders.status
707+
segments:
708+
- orders.high_priority
709+
time_dimensions:
710+
- dimension: orders.created_at
711+
granularity: day
712+
"},
713+
),
714+
MatchState::Full,
715+
);
716+
}
717+
718+
#[test]
719+
fn test_segment_partial_match_unused_segment() {
720+
let ctx = create_test_context();
721+
// No segment in query, but pre-agg has segment => partial (unused segment coverage)
722+
assert_eq!(
723+
match_pre_agg(
724+
&ctx,
725+
"segment_rollup",
726+
indoc! {"
727+
measures:
728+
- orders.count
729+
dimensions:
730+
- orders.status
731+
time_dimensions:
732+
- dimension: orders.created_at
733+
granularity: day
734+
"},
735+
),
736+
MatchState::Partial,
737+
);
738+
}
739+
740+
#[test]
741+
fn test_segment_not_matched_missing_in_pre_agg() {
742+
let ctx = create_test_context();
743+
// Segment in query, but pre-agg without segments => NotMatched
744+
assert_eq!(
745+
match_pre_agg(
746+
&ctx,
747+
"main_rollup",
748+
indoc! {"
749+
measures:
750+
- orders.count
751+
dimensions:
752+
- orders.status
753+
- orders.city
754+
segments:
755+
- orders.high_priority
756+
"},
757+
),
758+
MatchState::NotMatched,
759+
);
760+
}
761+
762+
#[test]
763+
fn test_expression_segment_matched_via_dimensions() {
764+
let ctx = create_test_context();
765+
// status_filter depends on {CUBE.status}, status is in main_rollup dimensions
766+
assert_eq!(
767+
match_pre_agg(
768+
&ctx,
769+
"main_rollup",
770+
indoc! {"
771+
measures:
772+
- orders.count
773+
dimensions:
774+
- orders.status
775+
- orders.city
776+
segments:
777+
- orders.status_filter
778+
"},
779+
),
780+
MatchState::Partial,
781+
);
782+
}
783+
784+
#[test]
785+
fn test_expression_segment_not_matched_missing_dependency() {
786+
let ctx = create_test_context();
787+
// status_filter depends on {CUBE.status}, but daily_countries_rollup has no status dimension
788+
assert_eq!(
789+
match_pre_agg(
790+
&ctx,
791+
"daily_countries_rollup",
792+
indoc! {"
793+
measures:
794+
- orders.count
795+
dimensions:
796+
- orders.country
797+
segments:
798+
- orders.status_filter
799+
time_dimensions:
800+
- dimension: orders.created_at
801+
granularity: day
802+
"},
803+
),
804+
MatchState::NotMatched,
805+
);
806+
}
807+
808+
#[test]
809+
fn test_plain_segment_not_matched_via_dimensions() {
810+
let ctx = create_test_context();
811+
// high_priority has no {CUBE.x} deps, main_rollup has no segments => NotMatched
812+
assert_eq!(
813+
match_pre_agg(
814+
&ctx,
815+
"main_rollup",
816+
indoc! {"
817+
measures:
818+
- orders.count
819+
dimensions:
820+
- orders.status
821+
- orders.city
822+
segments:
823+
- orders.high_priority
824+
"},
825+
),
826+
MatchState::NotMatched,
827+
);
828+
}
658829
}

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,8 @@ impl PreAggregationOptimizer {
399399
.dimensions
400400
.iter()
401401
.cloned()
402-
.chain(pre_aggregation.time_dimensions.iter().map(|d| d.clone()))
402+
.chain(pre_aggregation.time_dimensions.iter().cloned())
403+
.chain(pre_aggregation.segments.iter().cloned())
403404
.collect(),
404405
measures: filtered_measures.clone(),
405406
multiplied_measures: HashSet::new(),
@@ -414,6 +415,7 @@ impl PreAggregationOptimizer {
414415
.name(pre_aggregation.name.clone())
415416
.time_dimensions(pre_aggregation.time_dimensions.clone())
416417
.dimensions(pre_aggregation.dimensions.clone())
418+
.segments(pre_aggregation.segments.clone())
417419
.measures(filtered_measures)
418420
.schema(Rc::new(schema))
419421
.external(pre_aggregation.external.unwrap_or_default())

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,16 @@ impl PreAggregationsCompiler {
165165
} else {
166166
Vec::new()
167167
};
168+
let segments = if let Some(refs) = description.segment_references()? {
169+
Self::symbols_from_ref(
170+
self.query_tools.clone(),
171+
&name.cube_name,
172+
refs,
173+
Self::check_is_segment,
174+
)?
175+
} else {
176+
Vec::new()
177+
};
168178
let allow_non_strict_date_range_match = description
169179
.static_data()
170180
.allow_non_strict_date_range_match
@@ -208,6 +218,7 @@ impl PreAggregationsCompiler {
208218
measures,
209219
dimensions,
210220
time_dimensions,
221+
segments,
211222
allow_non_strict_date_range_match,
212223
});
213224
self.compiled_cache.insert(name.clone(), res.clone());
@@ -265,6 +276,7 @@ impl PreAggregationsCompiler {
265276
let measures = pre_aggrs_for_lambda[0].measures.clone();
266277
let dimensions = pre_aggrs_for_lambda[0].dimensions.clone();
267278
let time_dimensions = pre_aggrs_for_lambda[0].time_dimensions.clone();
279+
let segments = pre_aggrs_for_lambda[0].segments.clone();
268280
let allow_non_strict_date_range_match = description
269281
.static_data()
270282
.allow_non_strict_date_range_match
@@ -282,6 +294,7 @@ impl PreAggregationsCompiler {
282294
measures,
283295
dimensions,
284296
time_dimensions,
297+
segments,
285298
allow_non_strict_date_range_match,
286299
});
287300
self.compiled_cache.insert(name.clone(), res.clone());
@@ -504,6 +517,15 @@ impl PreAggregationsCompiler {
504517
}
505518
Ok(())
506519
}
520+
521+
fn check_is_segment(symbol: &MemberSymbol) -> Result<(), CubeError> {
522+
symbol.as_member_expression().map_err(|_| {
523+
CubeError::user(
524+
"Pre-aggregation segment reference must be a member expression".to_string(),
525+
)
526+
})?;
527+
Ok(())
528+
}
507529
}
508530

509531
#[cfg(test)]
@@ -759,4 +781,39 @@ mod tests {
759781
let result2 = PreAggregationFullName::from_string("too.many.parts");
760782
assert!(result2.is_err());
761783
}
784+
785+
#[test]
786+
fn test_compile_rollup_with_segments() {
787+
let schema = MockSchema::from_yaml_file("common/pre_aggregation_matching_test.yaml");
788+
let test_context = TestContext::new(schema).unwrap();
789+
let query_tools = test_context.query_tools().clone();
790+
791+
let cube_names = vec!["orders".to_string()];
792+
let mut compiler = PreAggregationsCompiler::try_new(query_tools, &cube_names).unwrap();
793+
794+
let pre_agg_name =
795+
PreAggregationFullName::new("orders".to_string(), "segment_rollup".to_string());
796+
let compiled = compiler.compile_pre_aggregation(&pre_agg_name).unwrap();
797+
798+
assert_eq!(compiled.name, "segment_rollup");
799+
assert_eq!(compiled.cube_name, "orders");
800+
assert_eq!(compiled.granularity, Some("day".to_string()));
801+
802+
// Check segments
803+
assert_eq!(compiled.segments.len(), 1);
804+
assert_eq!(
805+
compiled.segments[0].full_name(),
806+
"expr:orders.high_priority"
807+
);
808+
809+
// Check measures
810+
assert_eq!(compiled.measures.len(), 2);
811+
let measure_names: Vec<String> = compiled.measures.iter().map(|m| m.full_name()).collect();
812+
assert!(measure_names.contains(&"orders.count".to_string()));
813+
assert!(measure_names.contains(&"orders.total_amount".to_string()));
814+
815+
// Check dimensions
816+
assert_eq!(compiled.dimensions.len(), 1);
817+
assert_eq!(compiled.dimensions[0].full_name(), "orders.status");
818+
}
762819
}

rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pre_aggregation.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ pub struct PreAggregation {
1515
dimensions: Vec<Rc<MemberSymbol>>,
1616
#[builder(default)]
1717
time_dimensions: Vec<Rc<MemberSymbol>>,
18+
#[builder(default)]
19+
segments: Vec<Rc<MemberSymbol>>,
1820
external: bool,
1921
#[builder(default)]
2022
granularity: Option<String>,
@@ -43,6 +45,10 @@ impl PreAggregation {
4345
&self.time_dimensions
4446
}
4547

48+
pub fn segments(&self) -> &Vec<Rc<MemberSymbol>> {
49+
&self.segments
50+
}
51+
4652
pub fn external(&self) -> bool {
4753
self.external
4854
}
@@ -115,6 +121,11 @@ impl PreAggregation {
115121
);
116122
}
117123

124+
for segment in self.segments().iter() {
125+
let alias = segment.alias();
126+
res.insert(segment.full_name(), QualifiedColumnName::new(None, alias));
127+
}
128+
118129
if let PreAggregationSource::Join(join) = self.source().as_ref() {
119130
for item in join.items.iter() {
120131
for member in item.from_members.iter().chain(item.to_members.iter()) {

rust/cubesqlplanner/cubesqlplanner/src/planner/filter/base_segment.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ impl BaseSegment {
3434
name.clone(),
3535
MemberExpressionExpression::SqlCall(expression),
3636
None,
37+
None,
3738
query_tools.base_tools().clone(),
3839
)?;
3940
let full_name = full_name.unwrap_or(member_expression_symbol.full_name());

0 commit comments

Comments
 (0)