Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ export class BaseQuery {
ungrouped: this.options.ungrouped,
exportAnnotatedSql: false,
preAggregationQuery: this.options.preAggregationQuery,
preAggregationId: this.options.preAggregationId || null,
securityContext: this.contextSymbols.securityContext,
cubestoreSupportMultistage: this.options.cubestoreSupportMultistage ?? getEnv('cubeStoreRollingWindowJoin'),
disableExternalPreAggregations: !!this.options.disableExternalPreAggregations,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub struct BaseQueryOptionsStatic {
pub cubestore_support_multistage: Option<bool>,
#[serde(rename = "disableExternalPreAggregations")]
pub disable_external_pre_aggregations: bool,
#[serde(rename = "preAggregationId")]
pub pre_aggregation_id: Option<String>,
}

#[nativebridge::native_bridge(BaseQueryOptionsStatic)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,7 @@ impl<'a> DimensionMatcher<'a> {
add_to_matched_dimension: bool,
) -> Result<MatchState, CubeError> {
let granularity = if self.pre_aggregation.allow_non_strict_date_range_match {
if let Some(granularity) = time_dimension.granularity_obj() {
granularity.min_granularity()?
} else {
time_dimension.granularity().clone()
}
time_dimension.granularity().clone()
} else {
time_dimension.rollup_granularity(self.query_tools.clone())?
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl PreAggregationOptimizer {
&mut self,
plan: Rc<Query>,
disable_external_pre_aggregations: bool,
pre_aggregation_id: Option<&str>,
) -> Result<Option<Rc<Query>>, CubeError> {
let cube_names = collect_cube_names_from_node(&plan)?;
let mut compiler = PreAggregationsCompiler::try_new(self.query_tools.clone(), &cube_names)?;
Expand All @@ -36,6 +37,12 @@ impl PreAggregationOptimizer {
compiler.compile_all_pre_aggregations(disable_external_pre_aggregations)?;

for pre_aggregation in compiled_pre_aggregations.iter() {
if let Some(id) = pre_aggregation_id {
let full_name = format!("{}.{}", pre_aggregation.cube_name, pre_aggregation.name);
if full_name != id {
continue;
}
}
let new_query = self.try_rewrite_query(plan.clone(), pre_aggregation)?;
if new_query.is_some() {
return Ok(new_query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub struct QueryProperties {
query_join_hints: Rc<Vec<JoinHintItem>>,
allow_multi_stage: bool,
disable_external_pre_aggregations: bool,
pre_aggregation_id: Option<String>,
}

impl QueryProperties {
Expand Down Expand Up @@ -410,6 +411,7 @@ impl QueryProperties {
let total_query = options.static_data().total_query.unwrap_or(false);
let disable_external_pre_aggregations =
options.static_data().disable_external_pre_aggregations;
let pre_aggregation_id = options.static_data().pre_aggregation_id.clone();

let mut res = Self {
measures,
Expand All @@ -431,6 +433,7 @@ impl QueryProperties {
query_join_hints,
allow_multi_stage: true,
disable_external_pre_aggregations,
pre_aggregation_id,
};
res.apply_static_filters()?;
Ok(Rc::new(res))
Expand Down Expand Up @@ -482,6 +485,7 @@ impl QueryProperties {
query_join_hints,
allow_multi_stage,
disable_external_pre_aggregations,
pre_aggregation_id: None,
};
res.apply_static_filters()?;

Expand Down Expand Up @@ -753,6 +757,10 @@ impl QueryProperties {
self.disable_external_pre_aggregations
}

pub fn pre_aggregation_id(&self) -> Option<&str> {
self.pre_aggregation_id.as_deref()
}

pub fn all_filters(&self) -> Option<Filter> {
let items = self
.time_dimensions_filters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,12 @@ impl TopLevelPlanner {
);
let disable_external_pre_aggregations =
self.request.disable_external_pre_aggregations();
if let Some(result) = pre_aggregation_optimizer
.try_optimize(plan.clone(), disable_external_pre_aggregations)?
{
let pre_aggregation_id = self.request.pre_aggregation_id();
if let Some(result) = pre_aggregation_optimizer.try_optimize(
plan.clone(),
disable_external_pre_aggregations,
pre_aggregation_id,
)? {
if pre_aggregation_optimizer.get_used_pre_aggregations().len() == 1 {
(
result,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pub struct MockBaseQueryOptions {
cubestore_support_multistage: Option<bool>,
#[builder(default = false)]
disable_external_pre_aggregations: bool,
#[builder(default)]
pre_aggregation_id: Option<String>,
}

impl_static_data!(
Expand All @@ -82,7 +84,8 @@ impl_static_data!(
pre_aggregation_query,
total_query,
cubestore_support_multistage,
disable_external_pre_aggregations
disable_external_pre_aggregations,
pre_aggregation_id
);

pub fn members_from_strings<S: ToString>(strings: Vec<S>) -> Vec<OptionsMember> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl MockDimensionDefinition {
pub fn from_yaml(yaml: &str) -> Result<Rc<Self>, CubeError> {
let yaml_def: YamlDimensionDefinition = serde_yaml::from_str(yaml)
.map_err(|e| CubeError::user(format!("Failed to parse YAML: {}", e)))?;
Ok(yaml_def.build())
Ok(Rc::new(yaml_def.build().definition))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,23 +233,29 @@ impl CubeEvaluator for MockCubeEvaluator {

let granularity = &path[3];

let valid_granularities = [
// Check custom granularities in schema first
if let Some(custom) = self.schema.get_granularity(&path[0], &path[1], granularity) {
return Ok(custom as Rc<dyn GranularityDefinition>);
}

// Fall back to predefined granularities
let predefined = [
"second", "minute", "hour", "day", "week", "month", "quarter", "year",
];

if !valid_granularities.contains(&granularity.as_str()) {
return Err(CubeError::user(format!(
"Unsupported granularity: '{}'. Supported: second, minute, hour, day, week, month, quarter, year",
if predefined.contains(&granularity.as_str()) {
use crate::test_fixtures::cube_bridge::MockGranularityDefinition;
Ok(Rc::new(
MockGranularityDefinition::builder()
.interval(format!("1 {}", granularity))
.build(),
) as Rc<dyn GranularityDefinition>)
} else {
Err(CubeError::user(format!(
"Granularity '{}' not found",
granularity
)));
)))
}

use crate::test_fixtures::cube_bridge::MockGranularityDefinition;
Ok(Rc::new(
MockGranularityDefinition::builder()
.interval(granularity.clone())
.build(),
) as Rc<dyn GranularityDefinition>)
}

fn pre_aggregations_for_cube_as_array(
Expand Down Expand Up @@ -308,3 +314,72 @@ impl CubeEvaluator for MockCubeEvaluator {
self
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test_fixtures::cube_bridge::MockSchema;

fn create_custom_granularity_schema() -> MockSchema {
MockSchema::from_yaml_file("common/custom_granularity_test.yaml")
}

fn resolve(
evaluator: &MockCubeEvaluator,
granularity: &str,
) -> Result<Rc<dyn GranularityDefinition>, CubeError> {
evaluator.resolve_granularity(vec![
"orders".to_string(),
"created_at".to_string(),
"granularities".to_string(),
granularity.to_string(),
])
}

#[test]
fn test_resolve_predefined_granularity() {
let schema = create_custom_granularity_schema();
let evaluator = schema.create_evaluator();

let result = resolve(&evaluator, "day").expect("should resolve predefined granularity");
assert_eq!(result.static_data().interval, "1 day");
assert_eq!(result.static_data().origin, None);
assert_eq!(result.static_data().offset, None);
}

#[test]
fn test_resolve_custom_granularity() {
let schema = create_custom_granularity_schema();
let evaluator = schema.create_evaluator();

let result = resolve(&evaluator, "half_year").expect("should resolve custom granularity");
assert_eq!(result.static_data().interval, "6 months");
assert_eq!(result.static_data().origin, Some("2024-01-01".to_string()));
assert_eq!(result.static_data().offset, None);
}

#[test]
fn test_resolve_custom_granularity_with_offset() {
let schema = create_custom_granularity_schema();
let evaluator = schema.create_evaluator();

let result = resolve(&evaluator, "fiscal_year").expect("should resolve custom granularity");
assert_eq!(result.static_data().interval, "1 year");
assert_eq!(result.static_data().offset, Some("1 month".to_string()));
}

#[test]
fn test_resolve_unknown_granularity_error() {
let schema = create_custom_granularity_schema();
let evaluator = schema.create_evaluator();

let result = resolve(&evaluator, "nonexistent");
assert!(result.is_err());
let err = result.err().unwrap();
assert!(
err.message.contains("Granularity 'nonexistent' not found"),
"unexpected error: {}",
err.message
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use typed_builder::TypedBuilder;
pub struct MockGranularityDefinition {
#[builder(setter(into))]
interval: String,
#[builder(default, setter(strip_option, into))]
#[builder(default, setter(strip_option(fallback = origin_opt), into))]
origin: Option<String>,
#[builder(default, setter(strip_option, into))]
#[builder(default, setter(strip_option(fallback = offset_opt), into))]
offset: Option<String>,
#[builder(default, setter(strip_option))]
sql: Option<Rc<dyn MemberSql>>,
Expand Down
Loading
Loading