Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions sqlmesh/core/plan/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""

import abc
import itertools
import logging
import typing as t
from sqlmesh.core import analytics
Expand Down Expand Up @@ -379,6 +380,12 @@ def visit_migrate_schemas_stage(
allow_destructive_snapshots=plan.allow_destructive_models,
allow_additive_snapshots=plan.allow_additive_models,
deployability_index=stage.deployability_index,
directly_or_indirectly_modified_snapshots_ids=set(
itertools.chain(
*plan.indirectly_modified_snapshots.values(),
plan.directly_modified_snapshots,
)
),
)
except NodeExecutionFailedError as ex:
raise PlanError(str(ex.__cause__) if ex.__cause__ else str(ex))
Expand Down
39 changes: 26 additions & 13 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ def migrate(
allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
allow_additive_snapshots: t.Optional[t.Set[str]] = None,
deployability_index: t.Optional[DeployabilityIndex] = None,
directly_or_indirectly_modified_snapshots_ids: t.Optional[t.Set[SnapshotId]] = None,
) -> None:
"""Alters a physical snapshot table to match its snapshot's schema for the given collection of snapshots.
Expand All @@ -486,6 +487,7 @@ def migrate(
allow_destructive_snapshots: Set of snapshots that are allowed to have destructive schema changes.
allow_additive_snapshots: Set of snapshots that are allowed to have additive schema changes.
deployability_index: Determines snapshots that are deployable in the context of this evaluation.
directly_or_indirectly_modified_snapshots_ids: Set of SnapshotIds with direct or indirect changes.
"""
deployability_index = deployability_index or DeployabilityIndex.all_deployable()
target_data_objects = self._get_physical_data_objects(target_snapshots, deployability_index)
Expand All @@ -510,6 +512,10 @@ def migrate(
allow_additive_snapshots,
self.get_adapter(s.model_gateway),
deployability_index,
only_metadata_changes=s.snapshot_id
not in directly_or_indirectly_modified_snapshots_ids
if directly_or_indirectly_modified_snapshots_ids is not None
else False,
),
self.ddl_concurrent_tasks,
)
Expand Down Expand Up @@ -1111,6 +1117,7 @@ def _migrate_snapshot(
allow_additive_snapshots: t.Set[str],
adapter: EngineAdapter,
deployability_index: DeployabilityIndex,
only_metadata_changes: bool,
) -> None:
if not snapshot.is_model or snapshot.is_symbolic:
return
Expand Down Expand Up @@ -1154,6 +1161,7 @@ def _migrate_snapshot(
allow_destructive_snapshots=allow_destructive_snapshots,
allow_additive_snapshots=allow_additive_snapshots,
run_pre_post_statements=True,
only_metadata_changes=only_metadata_changes,
)
else:
self._execute_create(
Expand Down Expand Up @@ -1190,6 +1198,7 @@ def _migrate_target_table(
allow_destructive_snapshots: t.Set[str],
allow_additive_snapshots: t.Set[str],
run_pre_post_statements: bool = False,
only_metadata_changes: bool = False,
) -> None:
adapter = self.get_adapter(snapshot.model.gateway)

Expand Down Expand Up @@ -1226,6 +1235,7 @@ def _migrate_target_table(
ignore_destructive=snapshot.model.on_destructive_change.is_ignore,
ignore_additive=snapshot.model.on_additive_change.is_ignore,
deployability_index=deployability_index,
only_metadata_changes=only_metadata_changes,
)
finally:
if snapshot.is_materialized:
Expand Down Expand Up @@ -2760,20 +2770,23 @@ def migrate(
**kwargs: t.Any,
) -> None:
logger.info("Migrating view '%s'", target_table_name)
model = snapshot.model
render_kwargs = dict(
execution_time=now(), snapshots=kwargs["snapshots"], engine_adapter=self.adapter
)
if not (
kwargs["only_metadata_changes"] and self.adapter.COMMENT_CREATION_VIEW.is_unsupported
):
model = snapshot.model
render_kwargs = dict(
execution_time=now(), snapshots=kwargs["snapshots"], engine_adapter=self.adapter
)

self.adapter.create_view(
target_table_name,
model.render_query_or_raise(**render_kwargs),
model.columns_to_types,
materialized=self._is_materialized_view(model),
view_properties=model.render_physical_properties(**render_kwargs),
table_description=model.description,
column_descriptions=model.column_descriptions,
)
self.adapter.create_view(
target_table_name,
model.render_query_or_raise(**render_kwargs),
model.columns_to_types,
materialized=self._is_materialized_view(model),
view_properties=model.render_physical_properties(**render_kwargs),
table_description=model.description,
column_descriptions=model.column_descriptions,
)

# Apply grants after view migration
deployability_index = kwargs.get("deployability_index")
Expand Down
36 changes: 36 additions & 0 deletions tests/core/test_snapshot_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
DataObject,
DataObjectType,
InsertOverwriteStrategy,
CommentCreationView,
)
from sqlmesh.core.environment import EnvironmentNamingInfo
from sqlmesh.core.macros import RuntimeStage, macro, MacroEvaluator, MacroFunc
Expand Down Expand Up @@ -1519,6 +1520,41 @@ def test_migrate_view(
)


def test_migrate_view_recreation_not_needed(
mocker: MockerFixture,
make_snapshot,
make_mocked_engine_adapter,
):
model = SqlModel(
name="test_schema.test_model",
kind=ViewKind(),
description="my_description",
query=parse_one("SELECT c, a FROM tbl"),
)
snapshot = make_snapshot(model, version="1")
snapshot.change_category = SnapshotChangeCategory.METADATA
snapshot.forward_only = False

adapter = make_mocked_engine_adapter(EngineAdapter)
adapter.COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED
adapter.with_settings = lambda **kwargs: adapter
mocker.patch(
"sqlmesh.core.engine_adapter.base.EngineAdapter.get_data_objects",
return_value=[
DataObject(
schema="sqlmesh__test_schema",
name=f"test_schema__test_model__{snapshot.version}",
type="view",
)
],
)

evaluator = SnapshotEvaluator(adapter)
evaluator.migrate([snapshot], {}, directly_or_indirectly_modified_snapshots_ids=set())

adapter.cursor.execute.assert_not_called()


def test_migrate_snapshot_data_object_type_mismatch(
mocker: MockerFixture,
make_snapshot,
Expand Down