diff --git a/src/databricks/labs/remorph/reconcile/recon_capture.py b/src/databricks/labs/remorph/reconcile/recon_capture.py index 656f755b93..9fc8c295e8 100644 --- a/src/databricks/labs/remorph/reconcile/recon_capture.py +++ b/src/databricks/labs/remorph/reconcile/recon_capture.py @@ -360,6 +360,8 @@ def _insert_into_metrics_table( f""" select {recon_table_id} as recon_table_id, named_struct( + 'source_record_count', cast({record_count.source} as bigint), + 'target_record_count', cast({record_count.target} as bigint), 'row_comparison', case when '{self.report_type.lower()}' in ('all', 'row', 'data') and '{exception_msg}' = '' then named_struct( diff --git a/src/databricks/labs/remorph/resources/reconcile/dashboards/reconciliation_metrics/05_0_summary_table.sql b/src/databricks/labs/remorph/resources/reconcile/dashboards/reconciliation_metrics/05_0_summary_table.sql index 4183d241a9..4f4fb64a10 100644 --- a/src/databricks/labs/remorph/resources/reconcile/dashboards/reconciliation_metrics/05_0_summary_table.sql +++ b/src/databricks/labs/remorph/resources/reconcile/dashboards/reconciliation_metrics/05_0_summary_table.sql @@ -21,6 +21,8 @@ SELECT main.recon_id, CONCAT(main.target_table.catalog, '.', main.target_table.schema, '.', main.target_table.table_name) AS target_table, metrics.run_metrics.status AS status, metrics.run_metrics.exception_message AS exception, + metrics.recon_metrics.source_record_count AS source_record_count, + metrics.recon_metrics.target_record_count AS target_record_count, metrics.recon_metrics.row_comparison.missing_in_source AS missing_in_source, metrics.recon_metrics.row_comparison.missing_in_target AS missing_in_target, metrics.recon_metrics.column_comparison.absolute_mismatch AS absolute_mismatch, diff --git a/src/databricks/labs/remorph/resources/reconcile/queries/installation/metrics.sql b/src/databricks/labs/remorph/resources/reconcile/queries/installation/metrics.sql index 33582e2d23..400cc40adb 100644 --- a/src/databricks/labs/remorph/resources/reconcile/queries/installation/metrics.sql +++ b/src/databricks/labs/remorph/resources/reconcile/queries/installation/metrics.sql @@ -1,6 +1,8 @@ CREATE TABLE IF NOT EXISTS metrics ( recon_table_id BIGINT NOT NULL, recon_metrics STRUCT< + source_record_count: BIGINT, + target_record_count: BIGINT, row_comparison: STRUCT< missing_in_source: BIGINT, missing_in_target: BIGINT diff --git a/src/databricks/labs/remorph/upgrades/v0.9.0_add_row_count_metrics.py b/src/databricks/labs/remorph/upgrades/v0.9.0_add_row_count_metrics.py new file mode 100644 index 0000000000..9c9142f8ee --- /dev/null +++ b/src/databricks/labs/remorph/upgrades/v0.9.0_add_row_count_metrics.py @@ -0,0 +1,39 @@ +# pylint: disable=invalid-name + +import logging + +from databricks.labs.blueprint.installation import Installation +from databricks.sdk import WorkspaceClient + +from databricks.labs.remorph.contexts.application import ApplicationContext +from databricks.labs.remorph.helpers import db_sql + +logger = logging.getLogger(__name__) + + +def _upgrade_reconcile_metadata_metrics_table( + installation: Installation, ws: WorkspaceClient, app_context: ApplicationContext +): + reconcile_config = app_context.recon_config + assert reconcile_config, "Reconcile config must be present to upgrade the reconcile metadata metrics table" + table_name = "metrics" + table_identifier = ( + f"{reconcile_config.metadata_config.catalog}.{reconcile_config.metadata_config.schema}.{table_name}" + ) + + sqls: list = [ + f"ALTER TABLE {table_identifier} ADD COLUMN recon_metrics.source_record_count BIGINT", + f"ALTER TABLE {table_identifier} ADD COLUMN recon_metrics.target_record_count BIGINT", + ] + + for sql in sqls: + logger.debug(f"Executing SQL to upgrade metrics table fields: \n{sql}") + db_sql.get_sql_backend(ws).execute(sql) + installation.save(reconcile_config) + logger.debug("Upgraded Reconcile metrics table") + + +def upgrade(installation: Installation, ws: WorkspaceClient): + app_context = ApplicationContext(ws) + if app_context.recon_config is not None: + _upgrade_reconcile_metadata_metrics_table(installation, ws, app_context) diff --git a/tests/conftest.py b/tests/conftest.py index 594475de3d..5c6816656f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -159,6 +159,8 @@ def report_tables_schema(): "recon_metrics", StructType( [ + StructField("source_record_count", IntegerType()), + StructField("target_record_count", IntegerType()), StructField( "row_comparison", StructType( diff --git a/tests/integration/reconcile/query_builder/test_execute.py b/tests/integration/reconcile/query_builder/test_execute.py index c0f2c35906..d4d1f2693b 100644 --- a/tests/integration/reconcile/query_builder/test_execute.py +++ b/tests/integration/reconcile/query_builder/test_execute.py @@ -786,7 +786,7 @@ def test_recon_for_report_type_is_data( data=[ ( 11111, - ((1, 1), (1, 0, "s_address,s_phone"), None), + (3, 3, (1, 1), (1, 0, "s_address,s_phone"), None), (False, "remorph", ""), datetime(2024, 5, 23, 9, 21, 25, 122185), ) @@ -975,7 +975,7 @@ def test_recon_for_report_type_schema( schema=recon_schema, ) expected_remorph_recon_metrics = mock_spark.createDataFrame( - data=[(22222, (None, None, True), (True, "remorph", ""), datetime(2024, 5, 23, 9, 21, 25, 122185))], + data=[(22222, (0, 0, None, None, True), (True, "remorph", ""), datetime(2024, 5, 23, 9, 21, 25, 122185))], schema=metrics_schema, ) expected_remorph_recon_details = mock_spark.createDataFrame( @@ -1187,7 +1187,7 @@ def test_recon_for_report_type_all( data=[ ( 33333, - ((1, 1), (1, 0, "s_address,s_phone"), False), + (3, 3, (1, 1), (1, 0, "s_address,s_phone"), False), (False, "remorph", ""), datetime(2024, 5, 23, 9, 21, 25, 122185), ) @@ -1458,7 +1458,7 @@ def test_recon_for_report_type_is_row( data=[ ( 33333, - ((2, 2), None, None), + (3, 3, (2, 2), None, None), (False, "remorph", ""), datetime(2024, 5, 23, 9, 21, 25, 122185), ) @@ -1599,7 +1599,7 @@ def test_schema_recon_with_data_source_exception( data=[ ( 33333, - (None, None, None), + (0, 0, None, None, None), ( False, "remorph", @@ -1669,7 +1669,7 @@ def test_schema_recon_with_general_exception( data=[ ( 33333, - (None, None, None), + (0, 0, None, None, None), ( False, "remorph", @@ -1740,7 +1740,7 @@ def test_data_recon_with_general_exception( data=[ ( 33333, - (None, None, None), + (3, 3, None, None, None), ( False, "remorph", @@ -1811,7 +1811,7 @@ def test_data_recon_with_source_exception( data=[ ( 33333, - (None, None, None), + (3, 3, None, None, None), ( False, "remorph", diff --git a/tests/integration/reconcile/test_recon_capture.py b/tests/integration/reconcile/test_recon_capture.py index 7511975a12..e9990d4842 100644 --- a/tests/integration/reconcile/test_recon_capture.py +++ b/tests/integration/reconcile/test_recon_capture.py @@ -169,6 +169,8 @@ def test_recon_capture_start_snowflake_all(mock_workspace_client, mock_spark): remorph_recon_metrics_df = spark.sql("select * from DEFAULT.metrics") row = remorph_recon_metrics_df.collect()[0] assert remorph_recon_metrics_df.count() == 1 + assert row.recon_metrics.source_record_count == 5 + assert row.recon_metrics.target_record_count == 5 assert row.recon_metrics.row_comparison.missing_in_source == 3 assert row.recon_metrics.row_comparison.missing_in_target == 4 assert row.recon_metrics.column_comparison.absolute_mismatch == 2