Skip to content

Commit

Permalink
Update aggregated conversion node to only have queried linkable specs (
Browse files Browse the repository at this point in the history
…#1381)

## Context
When you try to query a conversion metric by supplying a filter with a
dimension that exists in the base measure's semantic model without
providing it in the group by, the rendering fails. This is because with
the logic in the conversion metric builder, it ended up "requiring" all
the linkable specs from the query (group by + filter), so the group by
provided in the filter gets added to the resulting `SELECT` even though
it was not supplied in the group by. This caused an error with the join
as it didn't expect that additional element. This PR updates it so that
instead of using all the required linkable specs, we should only use the
queried linkable specs when building out the conversion aggregate node.

Resolves #1210 
Resolves SL-2777
  • Loading branch information
WilliamDee authored Oct 25, 2024
1 parent 85dac0f commit 8c2e064
Show file tree
Hide file tree
Showing 161 changed files with 23,482 additions and 13,558 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240823-123108.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fixes bug where conversion metric query fails when filter with base semantic model's dimension is provided
time: 2024-08-23T12:31:08.257686-04:00
custom:
Author: WilliamDee
Issue: "1210"
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ def post_aggregation_spec(self) -> MeasureSpec:
fill_nulls_with=self.fill_nulls_with,
)
else:
return self.measure_spec
return MeasureSpec(
element_name=self.measure_spec.element_name,
non_additive_dimension_spec=self.measure_spec.non_additive_dimension_spec,
fill_nulls_with=self.fill_nulls_with,
)


@dataclass(frozen=True)
Expand Down
11 changes: 8 additions & 3 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def _build_aggregated_conversion_node(
)

# Build measure recipes
base_required_linkable_specs, _ = self.__get_required_and_extraneous_linkable_specs(
base_required_linkable_specs, extraneous_linkable_specs = self.__get_required_and_extraneous_linkable_specs(
queried_linkable_specs=queried_linkable_specs,
filter_specs=base_measure_spec.filter_spec_set.all_filter_specs,
)
Expand Down Expand Up @@ -333,6 +333,11 @@ def _build_aggregated_conversion_node(
unaggregated_base_measure_node = JoinOnEntitiesNode.create(
left_node=unaggregated_base_measure_node, join_targets=base_measure_recipe.join_targets
)
if len(base_measure_spec.filter_spec_set.all_filter_specs) > 0:
unaggregated_base_measure_node = WhereConstraintNode.create(
parent_node=unaggregated_base_measure_node,
where_specs=base_measure_spec.filter_spec_set.all_filter_specs,
)
filtered_unaggregated_base_node = FilterElementsNode.create(
parent_node=unaggregated_base_measure_node,
include_specs=group_specs_by_type(required_local_specs)
Expand All @@ -359,8 +364,8 @@ def _build_aggregated_conversion_node(
# Aggregate the conversion events with the JoinConversionEventsNode as the source node
recipe_with_join_conversion_source_node = SourceNodeRecipe(
source_node=join_conversion_node,
required_local_linkable_specs=base_measure_recipe.required_local_linkable_specs,
join_linkable_instances_recipes=base_measure_recipe.join_linkable_instances_recipes,
required_local_linkable_specs=queried_linkable_specs.as_tuple,
join_linkable_instances_recipes=(),
)
# TODO: Refine conversion metric configuration to fit into the standard dataflow plan building model
# In this case we override the measure recipe, which currently results in us bypassing predicate pushdown
Expand Down
178 changes: 178 additions & 0 deletions tests_metricflow/integration/query_output/test_conversion_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
from __future__ import annotations

import datetime

import pytest
from _pytest.fixtures import FixtureRequest
from metricflow_semantics.test_helpers.config_helpers import MetricFlowTestConfiguration

from metricflow.engine.metricflow_engine import MetricFlowQueryRequest
from metricflow.protocols.sql_client import SqlClient
from tests_metricflow.integration.conftest import IntegrationTestHelpers
from tests_metricflow.snapshot_utils import assert_str_snapshot_equal


@pytest.mark.sql_engine_snapshot
def test_conversion_metric(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Test query against a conversion metric."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=("visit_buy_conversion_rate",),
group_by_names=("metric_time",),
order_by_names=("metric_time",),
)
)
assert query_result.result_df is not None, "Unexpected empty result."

assert_str_snapshot_equal(
request=request,
mf_test_configuration=mf_test_configuration,
snapshot_id="query_output",
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)


@pytest.mark.sql_engine_snapshot
def test_conversion_metric_with_window(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Test query against a conversion metric with a window."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=("visit_buy_conversion_rate_7days",),
group_by_names=("metric_time",),
order_by_names=("metric_time",),
)
)
assert query_result.result_df is not None, "Unexpected empty result."

assert_str_snapshot_equal(
request=request,
mf_test_configuration=mf_test_configuration,
snapshot_id="query_output",
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)


@pytest.mark.sql_engine_snapshot
def test_conversion_metric_with_categorical_filter(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Test query against a conversion metric with a categorical filter."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=("visit_buy_conversion_rate",),
group_by_names=("metric_time", "visit__referrer_id"),
order_by_names=("metric_time", "visit__referrer_id"),
where_constraints=("{{ Dimension('visit__referrer_id') }} = 'fb_ad_1'",),
)
)
assert query_result.result_df is not None, "Unexpected empty result."

assert_str_snapshot_equal(
request=request,
mf_test_configuration=mf_test_configuration,
snapshot_id="query_output",
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)


@pytest.mark.sql_engine_snapshot
def test_conversion_metric_with_time_constraint(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Test query against a conversion metric with a time constraint and categorical filter."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=("visit_buy_conversion_rate",),
group_by_names=("visit__referrer_id",),
order_by_names=("visit__referrer_id",),
where_constraints=("{{ Dimension('visit__referrer_id') }} = 'fb_ad_1'",),
time_constraint_start=datetime.datetime(2020, 1, 1),
time_constraint_end=datetime.datetime(2020, 1, 2),
)
)
assert query_result.result_df is not None, "Unexpected empty result."

assert_str_snapshot_equal(
request=request,
mf_test_configuration=mf_test_configuration,
snapshot_id="query_output",
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)


@pytest.mark.sql_engine_snapshot
def test_conversion_metric_with_window_and_time_constraint(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Test query against a conversion metric with a window, time constraint, and categorical filter."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=("visit_buy_conversion_rate_7days",),
group_by_names=(
"metric_time",
"visit__referrer_id",
),
order_by_names=("metric_time", "visit__referrer_id"),
where_constraints=("{{ Dimension('visit__referrer_id') }} = 'fb_ad_1'",),
time_constraint_start=datetime.datetime(2020, 1, 1),
time_constraint_end=datetime.datetime(2020, 1, 2),
)
)
assert query_result.result_df is not None, "Unexpected empty result."

assert_str_snapshot_equal(
request=request,
mf_test_configuration=mf_test_configuration,
snapshot_id="query_output",
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)


@pytest.mark.sql_engine_snapshot
def test_conversion_metric_with_filter_not_in_group_by(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Test query against a conversion metric with a filter that doesn't exist in group by."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=("visit_buy_conversions",),
time_constraint_start=datetime.datetime(2020, 1, 1),
time_constraint_end=datetime.datetime(2020, 1, 1),
)
)
assert query_result.result_df is not None, "Unexpected empty result."

assert_str_snapshot_equal(
request=request,
mf_test_configuration=mf_test_configuration,
snapshot_id="query_output",
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,59 @@ def test_conversion_metric_with_window_and_time_constraint(
dataflow_plan_builder=dataflow_plan_builder,
query_spec=parsed_query.query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_conversion_metric_with_filter(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
create_source_tables: bool,
) -> None:
"""Test rendering a query against a conversion metric."""
parsed_query = query_parser.parse_and_validate_query(
metric_names=("visit_buy_conversion_rate",),
where_constraints=(
PydanticWhereFilter(where_sql_template=("{{ TimeDimension('metric_time', 'day') }} = '2020-01-01'")),
),
)

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=parsed_query.query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_conversion_metric_with_filter_not_in_group_by(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
create_source_tables: bool,
) -> None:
"""Test rendering a query against a conversion metric."""
parsed_query = query_parser.parse_and_validate_query(
metric_names=("visit_buy_conversions",),
where_constraints=(
PydanticWhereFilter(where_sql_template=("{{ Dimension('visit__referrer_id') }} = 'ref_id_01'")),
),
)

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=parsed_query.query_spec,
)
Loading

0 comments on commit 8c2e064

Please sign in to comment.