Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update aggregated conversion node to only have queried linkable specs #1381

Merged
merged 7 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240823-123108.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fixes bug where conversion metric query fails when filter with base semantic model's dimension is provided
time: 2024-08-23T12:31:08.257686-04:00
custom:
Author: WilliamDee
Issue: "1210"
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ def post_aggregation_spec(self) -> MeasureSpec:
fill_nulls_with=self.fill_nulls_with,
)
else:
return self.measure_spec
return MeasureSpec(
element_name=self.measure_spec.element_name,
non_additive_dimension_spec=self.measure_spec.non_additive_dimension_spec,
fill_nulls_with=self.fill_nulls_with,
)


@dataclass(frozen=True)
Expand Down
11 changes: 8 additions & 3 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def _build_aggregated_conversion_node(
)

# Build measure recipes
base_required_linkable_specs, _ = self.__get_required_and_extraneous_linkable_specs(
base_required_linkable_specs, extraneous_linkable_specs = self.__get_required_and_extraneous_linkable_specs(
queried_linkable_specs=queried_linkable_specs,
filter_specs=base_measure_spec.filter_spec_set.all_filter_specs,
)
Expand Down Expand Up @@ -333,6 +333,11 @@ def _build_aggregated_conversion_node(
unaggregated_base_measure_node = JoinOnEntitiesNode.create(
left_node=unaggregated_base_measure_node, join_targets=base_measure_recipe.join_targets
)
if len(base_measure_spec.filter_spec_set.all_filter_specs) > 0:
unaggregated_base_measure_node = WhereConstraintNode.create(
parent_node=unaggregated_base_measure_node,
where_specs=base_measure_spec.filter_spec_set.all_filter_specs,
)
filtered_unaggregated_base_node = FilterElementsNode.create(
parent_node=unaggregated_base_measure_node,
include_specs=group_specs_by_type(required_local_specs)
Expand All @@ -359,8 +364,8 @@ def _build_aggregated_conversion_node(
# Aggregate the conversion events with the JoinConversionEventsNode as the source node
recipe_with_join_conversion_source_node = SourceNodeRecipe(
source_node=join_conversion_node,
required_local_linkable_specs=base_measure_recipe.required_local_linkable_specs,
join_linkable_instances_recipes=base_measure_recipe.join_linkable_instances_recipes,
required_local_linkable_specs=queried_linkable_specs.as_tuple,
WilliamDee marked this conversation as resolved.
Show resolved Hide resolved
join_linkable_instances_recipes=(),
)
# TODO: Refine conversion metric configuration to fit into the standard dataflow plan building model
# In this case we override the measure recipe, which currently results in us bypassing predicate pushdown
Expand Down
178 changes: 178 additions & 0 deletions tests_metricflow/integration/query_output/test_conversion_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
from __future__ import annotations

import datetime

import pytest
from _pytest.fixtures import FixtureRequest
from metricflow_semantics.test_helpers.config_helpers import MetricFlowTestConfiguration

from metricflow.engine.metricflow_engine import MetricFlowQueryRequest
from metricflow.protocols.sql_client import SqlClient
from tests_metricflow.integration.conftest import IntegrationTestHelpers
from tests_metricflow.snapshot_utils import assert_str_snapshot_equal


@pytest.mark.sql_engine_snapshot
def test_conversion_metric(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Test query against a conversion metric."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=("visit_buy_conversion_rate",),
group_by_names=("metric_time",),
order_by_names=("metric_time",),
)
)
assert query_result.result_df is not None, "Unexpected empty result."

assert_str_snapshot_equal(
request=request,
mf_test_configuration=mf_test_configuration,
snapshot_id="query_output",
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)


@pytest.mark.sql_engine_snapshot
def test_conversion_metric_with_window(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Test query against a conversion metric with a window."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=("visit_buy_conversion_rate_7days",),
group_by_names=("metric_time",),
order_by_names=("metric_time",),
)
)
assert query_result.result_df is not None, "Unexpected empty result."

assert_str_snapshot_equal(
request=request,
mf_test_configuration=mf_test_configuration,
snapshot_id="query_output",
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)


@pytest.mark.sql_engine_snapshot
def test_conversion_metric_with_categorical_filter(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Test query against a conversion metric with a categorical filter."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=("visit_buy_conversion_rate",),
group_by_names=("metric_time", "visit__referrer_id"),
order_by_names=("metric_time", "visit__referrer_id"),
where_constraints=("{{ Dimension('visit__referrer_id') }} = 'fb_ad_1'",),
)
)
assert query_result.result_df is not None, "Unexpected empty result."

assert_str_snapshot_equal(
request=request,
mf_test_configuration=mf_test_configuration,
snapshot_id="query_output",
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)


@pytest.mark.sql_engine_snapshot
def test_conversion_metric_with_time_constraint(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Test query against a conversion metric with a time constraint and categorical filter."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=("visit_buy_conversion_rate",),
group_by_names=("visit__referrer_id",),
order_by_names=("visit__referrer_id",),
where_constraints=("{{ Dimension('visit__referrer_id') }} = 'fb_ad_1'",),
time_constraint_start=datetime.datetime(2020, 1, 1),
time_constraint_end=datetime.datetime(2020, 1, 2),
)
)
assert query_result.result_df is not None, "Unexpected empty result."

assert_str_snapshot_equal(
request=request,
mf_test_configuration=mf_test_configuration,
snapshot_id="query_output",
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)


@pytest.mark.sql_engine_snapshot
def test_conversion_metric_with_window_and_time_constraint(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Test query against a conversion metric with a window, time constraint, and categorical filter."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=("visit_buy_conversion_rate_7days",),
group_by_names=(
"metric_time",
"visit__referrer_id",
),
order_by_names=("metric_time", "visit__referrer_id"),
where_constraints=("{{ Dimension('visit__referrer_id') }} = 'fb_ad_1'",),
time_constraint_start=datetime.datetime(2020, 1, 1),
time_constraint_end=datetime.datetime(2020, 1, 2),
)
)
assert query_result.result_df is not None, "Unexpected empty result."

assert_str_snapshot_equal(
request=request,
mf_test_configuration=mf_test_configuration,
snapshot_id="query_output",
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)


@pytest.mark.sql_engine_snapshot
def test_conversion_metric_with_filter_not_in_group_by(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
sql_client: SqlClient,
it_helpers: IntegrationTestHelpers,
) -> None:
"""Test query against a conversion metric with a filter that doesn't exist in group by."""
query_result = it_helpers.mf_engine.query(
MetricFlowQueryRequest.create_with_random_request_id(
metric_names=("visit_buy_conversions",),
time_constraint_start=datetime.datetime(2020, 1, 1),
time_constraint_end=datetime.datetime(2020, 1, 1),
)
)
assert query_result.result_df is not None, "Unexpected empty result."

assert_str_snapshot_equal(
request=request,
mf_test_configuration=mf_test_configuration,
snapshot_id="query_output",
snapshot_str=query_result.result_df.text_format(),
sql_engine=sql_client.sql_engine_type,
)
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,59 @@ def test_conversion_metric_with_window_and_time_constraint(
dataflow_plan_builder=dataflow_plan_builder,
query_spec=parsed_query.query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_conversion_metric_with_filter(
WilliamDee marked this conversation as resolved.
Show resolved Hide resolved
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
create_source_tables: bool,
) -> None:
"""Test rendering a query against a conversion metric."""
parsed_query = query_parser.parse_and_validate_query(
metric_names=("visit_buy_conversion_rate",),
where_constraints=(
PydanticWhereFilter(where_sql_template=("{{ TimeDimension('metric_time', 'day') }} = '2020-01-01'")),
),
)

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=parsed_query.query_spec,
)


@pytest.mark.sql_engine_snapshot
def test_conversion_metric_with_filter_not_in_group_by(
request: FixtureRequest,
mf_test_configuration: MetricFlowTestConfiguration,
dataflow_plan_builder: DataflowPlanBuilder,
dataflow_to_sql_converter: DataflowToSqlQueryPlanConverter,
sql_client: SqlClient,
query_parser: MetricFlowQueryParser,
create_source_tables: bool,
) -> None:
"""Test rendering a query against a conversion metric."""
parsed_query = query_parser.parse_and_validate_query(
metric_names=("visit_buy_conversions",),
where_constraints=(
PydanticWhereFilter(where_sql_template=("{{ Dimension('visit__referrer_id') }} = 'ref_id_01'")),
),
)

render_and_check(
request=request,
mf_test_configuration=mf_test_configuration,
dataflow_to_sql_converter=dataflow_to_sql_converter,
sql_client=sql_client,
dataflow_plan_builder=dataflow_plan_builder,
query_spec=parsed_query.query_spec,
)
Loading
Loading