From 1e0a883ccab7eb56c99e6e756dd1ba933e623bef Mon Sep 17 00:00:00 2001 From: sundarshankar89 Date: Tue, 2 Dec 2025 12:59:00 +0530 Subject: [PATCH 01/13] Handle empty SQL resultsets gracefully in assessment pipeline --- .../labs/lakebridge/assessments/pipeline.py | 20 +++++++++---- .../integration/assessments/test_pipeline.py | 30 +++++++++++++++++++ 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/src/databricks/labs/lakebridge/assessments/pipeline.py b/src/databricks/labs/lakebridge/assessments/pipeline.py index 11450b1154..28fe535558 100644 --- a/src/databricks/labs/lakebridge/assessments/pipeline.py +++ b/src/databricks/labs/lakebridge/assessments/pipeline.py @@ -236,9 +236,19 @@ def _run_python_script(venv_exec_cmd, script_path, db_path, credential_config): def _save_to_db(self, result: FetchResult, step_name: str, mode: str): db_path = str(self.db_path_prefix / DB_NAME) + # Check row count and log appropriately and skip data insertion if 0 rows + row_count = len(result.rows) + if row_count == 0: + logging.warning( + f"Query for step '{step_name}' returned 0 rows. Skipping table creation and data insertion." + ) + return + + logging.info(f"Query for step '{step_name}' returned {row_count} rows.") + with duckdb.connect(db_path) as conn: # TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable - schema = ' STRING, '.join(result.columns) + ' STRING' + schema = ', '.join(f"{col} STRING" for col in result.columns) # Handle write modes if mode == 'overwrite': @@ -246,11 +256,9 @@ def _save_to_db(self, result: FetchResult, step_name: str, mode: str): elif mode == 'append' and step_name not in conn.get_table_names(""): conn.execute(f"CREATE TABLE {step_name} ({schema})") - # Batch insert using prepared statements - placeholders = ', '.join(['?' for _ in result.columns]) - insert_query = f"INSERT INTO {step_name} VALUES ({placeholders})" - - conn.executemany(insert_query, result.rows) + placeholders = ', '.join(['?'] * len(result.columns)) + conn.executemany(f"INSERT INTO {step_name} VALUES ({placeholders})", result.rows) + logging.info(f"Successfully inserted {row_count} rows into table '{step_name}'.") @staticmethod def _create_dir(dir_path: Path): diff --git a/tests/integration/assessments/test_pipeline.py b/tests/integration/assessments/test_pipeline.py index bdac1e6664..2dbdb107eb 100644 --- a/tests/integration/assessments/test_pipeline.py +++ b/tests/integration/assessments/test_pipeline.py @@ -49,6 +49,16 @@ def python_failure_config(): return config +@pytest.fixture(scope="module") +def empty_result_config(): + prefix = Path(__file__).parent + config_path = f"{prefix}/../../resources/assessments/pipeline_config_empty_result.yml" + config = PipelineClass.load_config_from_yaml(config_path) + for step in config.steps: + step.extract_source = f"{prefix}/../../{step.extract_source}" + return config + + def test_run_pipeline(sandbox_sqlserver, pipeline_config, get_logger): pipeline = PipelineClass(config=pipeline_config, executor=sandbox_sqlserver) results = pipeline.execute() @@ -160,3 +170,23 @@ def test_pipeline_step_comments(): ) assert step_w_comment.comment == "This is a step comment." assert step_wo_comment.comment is None + + +def test_run_empty_result_pipeline(sandbox_sqlserver, empty_result_config, get_logger): + pipeline = PipelineClass(config=empty_result_config, executor=sandbox_sqlserver) + results = pipeline.execute() + + # Verify step completed successfully despite empty results + assert len(results) == 1 + assert results[0].status == StepExecutionStatus.COMPLETE + assert results[0].step_name == "empty_result_step" + + # Verify that no table was created (processing was skipped for empty resultset) + conn = duckdb.connect(str(Path(empty_result_config.extract_folder)) + "/" + DB_NAME) + tables = conn.execute("SHOW TABLES").fetchall() + table_names = [table[0] for table in tables] + + # Table should NOT be created when resultset is empty + assert "empty_result_step" not in table_names, "Empty resultset should skip table creation" + + conn.close() From 57a3f9d0a34add5c56052df95ae543d3816d2ab4 Mon Sep 17 00:00:00 2001 From: sundarshankar89 Date: Tue, 2 Dec 2025 13:15:23 +0530 Subject: [PATCH 02/13] added resources --- tests/resources/assessments/empty_resultset.sql | 6 ++++++ .../assessments/pipeline_config_empty_result.yml | 10 ++++++++++ 2 files changed, 16 insertions(+) create mode 100644 tests/resources/assessments/empty_resultset.sql create mode 100644 tests/resources/assessments/pipeline_config_empty_result.yml diff --git a/tests/resources/assessments/empty_resultset.sql b/tests/resources/assessments/empty_resultset.sql new file mode 100644 index 0000000000..bd26433107 --- /dev/null +++ b/tests/resources/assessments/empty_resultset.sql @@ -0,0 +1,6 @@ +-- Query that returns valid schema but 0 rows +SELECT + 'test' as col1, + 'test' as col2, + 'test' as col3 +WHERE 1 = 0 \ No newline at end of file diff --git a/tests/resources/assessments/pipeline_config_empty_result.yml b/tests/resources/assessments/pipeline_config_empty_result.yml new file mode 100644 index 0000000000..45bacc29d2 --- /dev/null +++ b/tests/resources/assessments/pipeline_config_empty_result.yml @@ -0,0 +1,10 @@ +name: test_empty_result_pipeline +version: 1.0 +extract_folder: /tmp/lakebridge_test_empty_result +steps: + - name: empty_result_step + type: sql + extract_source: resources/assessments/empty_resultset.sql + mode: overwrite + frequency: once + flag: active \ No newline at end of file From 3cad168c6842dbdb4f02576a9b2f301efc7017ad Mon Sep 17 00:00:00 2001 From: sundarshankar89 Date: Wed, 3 Dec 2025 13:44:05 +0530 Subject: [PATCH 03/13] Fixed Review Comments --- .../labs/lakebridge/assessments/pipeline.py | 4 +- .../integration/assessments/test_pipeline.py | 72 +++++++++++++------ .../resources/assessments/empty_resultset.sql | 2 +- .../pipeline_config_empty_result.yml | 2 +- 4 files changed, 54 insertions(+), 26 deletions(-) diff --git a/src/databricks/labs/lakebridge/assessments/pipeline.py b/src/databricks/labs/lakebridge/assessments/pipeline.py index 28fe535558..dc392b4f20 100644 --- a/src/databricks/labs/lakebridge/assessments/pipeline.py +++ b/src/databricks/labs/lakebridge/assessments/pipeline.py @@ -237,13 +237,13 @@ def _save_to_db(self, result: FetchResult, step_name: str, mode: str): db_path = str(self.db_path_prefix / DB_NAME) # Check row count and log appropriately and skip data insertion if 0 rows - row_count = len(result.rows) - if row_count == 0: + if not result.rows == 0: logging.warning( f"Query for step '{step_name}' returned 0 rows. Skipping table creation and data insertion." ) return + row_count = len(result.rows) logging.info(f"Query for step '{step_name}' returned {row_count} rows.") with duckdb.connect(db_path) as conn: diff --git a/tests/integration/assessments/test_pipeline.py b/tests/integration/assessments/test_pipeline.py index 2dbdb107eb..95e9ce583f 100644 --- a/tests/integration/assessments/test_pipeline.py +++ b/tests/integration/assessments/test_pipeline.py @@ -1,14 +1,21 @@ from pathlib import Path +from logging import Logger + import duckdb import pytest -from databricks.labs.lakebridge.assessments.pipeline import PipelineClass, DB_NAME, StepExecutionStatus - +from databricks.labs.lakebridge.assessments.pipeline import ( + PipelineClass, + DB_NAME, + StepExecutionStatus, + StepExecutionResult, +) from databricks.labs.lakebridge.assessments.profiler_config import Step, PipelineConfig +from databricks.labs.lakebridge.connections.database_manager import DatabaseManager @pytest.fixture(scope="module") -def pipeline_config(): +def pipeline_config() -> PipelineConfig: prefix = Path(__file__).parent config_path = f"{prefix}/../../resources/assessments/pipeline_config.yml" config = PipelineClass.load_config_from_yaml(config_path) @@ -19,7 +26,7 @@ def pipeline_config(): @pytest.fixture(scope="module") -def pipeline_dep_failure_config(): +def pipeline_dep_failure_config() -> PipelineConfig: prefix = Path(__file__).parent config_path = f"{prefix}/../../resources/assessments/pipeline_config_failure_dependency.yml" config = PipelineClass.load_config_from_yaml(config_path) @@ -30,7 +37,7 @@ def pipeline_dep_failure_config(): @pytest.fixture(scope="module") -def sql_failure_config(): +def sql_failure_config() -> PipelineConfig: prefix = Path(__file__).parent config_path = f"{prefix}/../../resources/assessments/pipeline_config_sql_failure.yml" config = PipelineClass.load_config_from_yaml(config_path) @@ -40,7 +47,7 @@ def sql_failure_config(): @pytest.fixture(scope="module") -def python_failure_config(): +def python_failure_config() -> PipelineConfig: prefix = Path(__file__).parent config_path = f"{prefix}/../../resources/assessments/pipeline_config_python_failure.yml" config = PipelineClass.load_config_from_yaml(config_path) @@ -50,7 +57,7 @@ def python_failure_config(): @pytest.fixture(scope="module") -def empty_result_config(): +def empty_result_config() -> PipelineConfig: prefix = Path(__file__).parent config_path = f"{prefix}/../../resources/assessments/pipeline_config_empty_result.yml" config = PipelineClass.load_config_from_yaml(config_path) @@ -59,7 +66,11 @@ def empty_result_config(): return config -def test_run_pipeline(sandbox_sqlserver, pipeline_config, get_logger): +def test_run_pipeline( + sandbox_sqlserver: DatabaseManager, + pipeline_config: PipelineConfig, + get_logger: Logger, +) -> None: pipeline = PipelineClass(config=pipeline_config, executor=sandbox_sqlserver) results = pipeline.execute() @@ -73,7 +84,11 @@ def test_run_pipeline(sandbox_sqlserver, pipeline_config, get_logger): assert verify_output(get_logger, pipeline_config.extract_folder) -def test_run_sql_failure_pipeline(sandbox_sqlserver, sql_failure_config, get_logger): +def test_run_sql_failure_pipeline( + sandbox_sqlserver: DatabaseManager, + sql_failure_config: PipelineConfig, + get_logger: Logger, +) -> None: pipeline = PipelineClass(config=sql_failure_config, executor=sandbox_sqlserver) with pytest.raises(RuntimeError) as e: pipeline.execute() @@ -82,7 +97,11 @@ def test_run_sql_failure_pipeline(sandbox_sqlserver, sql_failure_config, get_log assert "Pipeline execution failed due to errors in steps: invalid_sql_step" in str(e.value) -def test_run_python_failure_pipeline(sandbox_sqlserver, python_failure_config, get_logger): +def test_run_python_failure_pipeline( + sandbox_sqlserver: DatabaseManager, + python_failure_config: PipelineConfig, + get_logger: Logger, +) -> None: pipeline = PipelineClass(config=python_failure_config, executor=sandbox_sqlserver) with pytest.raises(RuntimeError) as e: pipeline.execute() @@ -91,7 +110,9 @@ def test_run_python_failure_pipeline(sandbox_sqlserver, python_failure_config, g assert "Pipeline execution failed due to errors in steps: invalid_python_step" in str(e.value) -def test_run_python_dep_failure_pipeline(sandbox_sqlserver, pipeline_dep_failure_config, get_logger): +def test_run_python_dep_failure_pipeline( + sandbox_sqlserver: DatabaseManager, pipeline_dep_failure_config: PipelineConfig, get_logger +): pipeline = PipelineClass(config=pipeline_dep_failure_config, executor=sandbox_sqlserver) with pytest.raises(RuntimeError) as e: pipeline.execute() @@ -100,7 +121,11 @@ def test_run_python_dep_failure_pipeline(sandbox_sqlserver, pipeline_dep_failure assert "Pipeline execution failed due to errors in steps: package_status" in str(e.value) -def test_skipped_steps(sandbox_sqlserver, pipeline_config, get_logger): +def test_skipped_steps( + sandbox_sqlserver: DatabaseManager, + pipeline_config: PipelineConfig, + get_logger: Logger, +) -> None: # Modify config to have some inactive steps for step in pipeline_config.steps: step.flag = "inactive" @@ -136,7 +161,7 @@ def verify_output(get_logger, path): return True -def test_pipeline_config_comments(): +def test_pipeline_config_comments() -> None: pipeline_w_comments = PipelineConfig( name="warehouse_profiler", version="1.0", @@ -150,7 +175,7 @@ def test_pipeline_config_comments(): assert pipeline_wo_comments.comment is None -def test_pipeline_step_comments(): +def test_pipeline_step_comments() -> None: step_w_comment = Step( name="step_w_comment", type="sql", @@ -172,21 +197,24 @@ def test_pipeline_step_comments(): assert step_wo_comment.comment is None -def test_run_empty_result_pipeline(sandbox_sqlserver, empty_result_config, get_logger): +def test_run_empty_result_pipeline( + sandbox_sqlserver: DatabaseManager, + empty_result_config: PipelineConfig, + get_logger: Logger, +) -> None: pipeline = PipelineClass(config=empty_result_config, executor=sandbox_sqlserver) results = pipeline.execute() # Verify step completed successfully despite empty results assert len(results) == 1 - assert results[0].status == StepExecutionStatus.COMPLETE - assert results[0].step_name == "empty_result_step" + assert results == [ + StepExecutionResult(step_name="empty_result_step", status=StepExecutionStatus.COMPLETE, error_message=None) + ] # Verify that no table was created (processing was skipped for empty resultset) - conn = duckdb.connect(str(Path(empty_result_config.extract_folder)) + "/" + DB_NAME) - tables = conn.execute("SHOW TABLES").fetchall() - table_names = [table[0] for table in tables] + with duckdb.connect(str(Path(empty_result_config.extract_folder)) + "/" + DB_NAME) as conn: + tables = conn.execute("SHOW TABLES").fetchall() + table_names = [table[0] for table in tables] # Table should NOT be created when resultset is empty assert "empty_result_step" not in table_names, "Empty resultset should skip table creation" - - conn.close() diff --git a/tests/resources/assessments/empty_resultset.sql b/tests/resources/assessments/empty_resultset.sql index bd26433107..f169c9a70b 100644 --- a/tests/resources/assessments/empty_resultset.sql +++ b/tests/resources/assessments/empty_resultset.sql @@ -3,4 +3,4 @@ SELECT 'test' as col1, 'test' as col2, 'test' as col3 -WHERE 1 = 0 \ No newline at end of file +WHERE 1 = 0 diff --git a/tests/resources/assessments/pipeline_config_empty_result.yml b/tests/resources/assessments/pipeline_config_empty_result.yml index 45bacc29d2..d0c83ff175 100644 --- a/tests/resources/assessments/pipeline_config_empty_result.yml +++ b/tests/resources/assessments/pipeline_config_empty_result.yml @@ -7,4 +7,4 @@ steps: extract_source: resources/assessments/empty_resultset.sql mode: overwrite frequency: once - flag: active \ No newline at end of file + flag: active From 839af119faba22f8d53e741f5b4699a1ae465bd4 Mon Sep 17 00:00:00 2001 From: sundarshankar89 Date: Thu, 4 Dec 2025 13:09:33 +0530 Subject: [PATCH 04/13] Fix DuckDB file locking errors in concurrent profiler validator tests --- tests/integration/assessments/test_profiler_validator.py | 6 ++++-- tests/utils/profiler_extract_utils.py | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/integration/assessments/test_profiler_validator.py b/tests/integration/assessments/test_profiler_validator.py index 5d9aebc8e7..4cecb43ae5 100644 --- a/tests/integration/assessments/test_profiler_validator.py +++ b/tests/integration/assessments/test_profiler_validator.py @@ -29,8 +29,10 @@ def failure_pipeline_config_path(): @pytest.fixture(scope="module") -def mock_synapse_profiler_extract(): - synapse_extract_path = build_mock_synapse_extract("mock_profiler_extract") +def mock_synapse_profiler_extract(tmp_path): + # Use pytest's tmp_path to create unique temp directory per test + extract_dir = tmp_path / "synapse_assessment" + synapse_extract_path = build_mock_synapse_extract("mock_profiler_extract", path_prefix=str(extract_dir)) return synapse_extract_path diff --git a/tests/utils/profiler_extract_utils.py b/tests/utils/profiler_extract_utils.py index 0a10b1f29a..8fc9d6a7aa 100644 --- a/tests/utils/profiler_extract_utils.py +++ b/tests/utils/profiler_extract_utils.py @@ -173,8 +173,8 @@ def generate_dedicated_storage_info(fake) -> tuple[int, int, datetime, int]: } -def build_mock_synapse_extract(extract_db_name: str) -> str: - synapse_extract_path = "/tmp/data/synapse_assessment" +def build_mock_synapse_extract(extract_db_name: str, path_prefix: str = "/tmp/data/synapse_assessment") -> str: + synapse_extract_path = path_prefix os.makedirs(synapse_extract_path, exist_ok=True) full_synapse_extract_path = f"{synapse_extract_path}/{extract_db_name}.db" builder = SynapseProfilerBuilder(table_definitions, full_synapse_extract_path) From 7f151f32e93a230de58911efe7a57bed72b61053 Mon Sep 17 00:00:00 2001 From: sundarshankar89 Date: Thu, 4 Dec 2025 13:30:11 +0530 Subject: [PATCH 05/13] Fix DuckDB file locking errors in concurrent profiler validator tests --- tests/integration/assessments/test_profiler_validator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/assessments/test_profiler_validator.py b/tests/integration/assessments/test_profiler_validator.py index 4cecb43ae5..473f3e3d7e 100644 --- a/tests/integration/assessments/test_profiler_validator.py +++ b/tests/integration/assessments/test_profiler_validator.py @@ -28,7 +28,7 @@ def failure_pipeline_config_path(): return config_path -@pytest.fixture(scope="module") +@pytest.fixture(scope="function") def mock_synapse_profiler_extract(tmp_path): # Use pytest's tmp_path to create unique temp directory per test extract_dir = tmp_path / "synapse_assessment" From f495f053154f8b44f35e9f307f48e31755906d43 Mon Sep 17 00:00:00 2001 From: sundarshankar89 Date: Thu, 4 Dec 2025 16:28:29 +0530 Subject: [PATCH 06/13] isolating failing tests --- tests/integration/assessments/test_pipeline.py | 10 +++++----- ...pipeline_config.yml => pipeline_config_success.yml} | 0 2 files changed, 5 insertions(+), 5 deletions(-) rename tests/resources/assessments/{pipeline_config.yml => pipeline_config_success.yml} (100%) diff --git a/tests/integration/assessments/test_pipeline.py b/tests/integration/assessments/test_pipeline.py index 95e9ce583f..a7f3e7986a 100644 --- a/tests/integration/assessments/test_pipeline.py +++ b/tests/integration/assessments/test_pipeline.py @@ -15,9 +15,9 @@ @pytest.fixture(scope="module") -def pipeline_config() -> PipelineConfig: +def pipeline_config_success() -> PipelineConfig: prefix = Path(__file__).parent - config_path = f"{prefix}/../../resources/assessments/pipeline_config.yml" + config_path = f"{prefix}/../../resources/assessments/pipeline_config_success.yml" config = PipelineClass.load_config_from_yaml(config_path) for step in config.steps: @@ -68,10 +68,10 @@ def empty_result_config() -> PipelineConfig: def test_run_pipeline( sandbox_sqlserver: DatabaseManager, - pipeline_config: PipelineConfig, + pipeline_config_success: PipelineConfig, get_logger: Logger, ) -> None: - pipeline = PipelineClass(config=pipeline_config, executor=sandbox_sqlserver) + pipeline = PipelineClass(config=pipeline_config_success, executor=sandbox_sqlserver) results = pipeline.execute() # Verify all steps completed successfully @@ -81,7 +81,7 @@ def test_run_pipeline( StepExecutionStatus.SKIPPED, ), f"Step {result.step_name} failed with status {result.status}" - assert verify_output(get_logger, pipeline_config.extract_folder) + assert verify_output(get_logger, pipeline_config_success.extract_folder) def test_run_sql_failure_pipeline( diff --git a/tests/resources/assessments/pipeline_config.yml b/tests/resources/assessments/pipeline_config_success.yml similarity index 100% rename from tests/resources/assessments/pipeline_config.yml rename to tests/resources/assessments/pipeline_config_success.yml From 0ff326e881b18cd72cbf3c77b298ddedf224cbf5 Mon Sep 17 00:00:00 2001 From: sundarshankar89 Date: Thu, 4 Dec 2025 16:29:12 +0530 Subject: [PATCH 07/13] isolating failing tests --- tests/resources/assessments/pipeline_config_sql_failure.yml | 2 +- tests/resources/assessments/pipeline_config_success.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/resources/assessments/pipeline_config_sql_failure.yml b/tests/resources/assessments/pipeline_config_sql_failure.yml index fcc6e61e96..40ec77b7b0 100644 --- a/tests/resources/assessments/pipeline_config_sql_failure.yml +++ b/tests/resources/assessments/pipeline_config_sql_failure.yml @@ -1,6 +1,6 @@ name: "SQL Failure Pipeline" version: "1.0" -extract_folder: /tmp/extracts/ +extract_folder: /tmp/extracts_failure/ steps: - name: invalid_sql_step type: sql diff --git a/tests/resources/assessments/pipeline_config_success.yml b/tests/resources/assessments/pipeline_config_success.yml index 5e466ccb95..ad023bcb94 100644 --- a/tests/resources/assessments/pipeline_config_success.yml +++ b/tests/resources/assessments/pipeline_config_success.yml @@ -1,6 +1,6 @@ name: ExamplePipeline version: "1.0" -extract_folder: /tmp/extracts/ +extract_folder: /tmp/extracts_success/ steps: - name: inventory type: sql From 71211a6e672bb210df2a2843cc212808b85dc33c Mon Sep 17 00:00:00 2001 From: sundarshankar89 Date: Thu, 4 Dec 2025 19:03:43 +0530 Subject: [PATCH 08/13] isolating failing tests --- src/databricks/labs/lakebridge/assessments/pipeline.py | 2 +- tests/integration/assessments/test_pipeline.py | 10 +++++----- .../integration/assessments/test_profiler_validator.py | 8 +++----- ...pipeline_config_success.yml => pipeline_config.yml} | 2 +- .../assessments/pipeline_config_sql_failure.yml | 2 +- 5 files changed, 11 insertions(+), 13 deletions(-) rename tests/resources/assessments/{pipeline_config_success.yml => pipeline_config.yml} (94%) diff --git a/src/databricks/labs/lakebridge/assessments/pipeline.py b/src/databricks/labs/lakebridge/assessments/pipeline.py index dc392b4f20..a990662194 100644 --- a/src/databricks/labs/lakebridge/assessments/pipeline.py +++ b/src/databricks/labs/lakebridge/assessments/pipeline.py @@ -237,7 +237,7 @@ def _save_to_db(self, result: FetchResult, step_name: str, mode: str): db_path = str(self.db_path_prefix / DB_NAME) # Check row count and log appropriately and skip data insertion if 0 rows - if not result.rows == 0: + if len(result.rows) == 0: logging.warning( f"Query for step '{step_name}' returned 0 rows. Skipping table creation and data insertion." ) diff --git a/tests/integration/assessments/test_pipeline.py b/tests/integration/assessments/test_pipeline.py index a7f3e7986a..95e9ce583f 100644 --- a/tests/integration/assessments/test_pipeline.py +++ b/tests/integration/assessments/test_pipeline.py @@ -15,9 +15,9 @@ @pytest.fixture(scope="module") -def pipeline_config_success() -> PipelineConfig: +def pipeline_config() -> PipelineConfig: prefix = Path(__file__).parent - config_path = f"{prefix}/../../resources/assessments/pipeline_config_success.yml" + config_path = f"{prefix}/../../resources/assessments/pipeline_config.yml" config = PipelineClass.load_config_from_yaml(config_path) for step in config.steps: @@ -68,10 +68,10 @@ def empty_result_config() -> PipelineConfig: def test_run_pipeline( sandbox_sqlserver: DatabaseManager, - pipeline_config_success: PipelineConfig, + pipeline_config: PipelineConfig, get_logger: Logger, ) -> None: - pipeline = PipelineClass(config=pipeline_config_success, executor=sandbox_sqlserver) + pipeline = PipelineClass(config=pipeline_config, executor=sandbox_sqlserver) results = pipeline.execute() # Verify all steps completed successfully @@ -81,7 +81,7 @@ def test_run_pipeline( StepExecutionStatus.SKIPPED, ), f"Step {result.step_name} failed with status {result.status}" - assert verify_output(get_logger, pipeline_config_success.extract_folder) + assert verify_output(get_logger, pipeline_config.extract_folder) def test_run_sql_failure_pipeline( diff --git a/tests/integration/assessments/test_profiler_validator.py b/tests/integration/assessments/test_profiler_validator.py index 473f3e3d7e..5d9aebc8e7 100644 --- a/tests/integration/assessments/test_profiler_validator.py +++ b/tests/integration/assessments/test_profiler_validator.py @@ -28,11 +28,9 @@ def failure_pipeline_config_path(): return config_path -@pytest.fixture(scope="function") -def mock_synapse_profiler_extract(tmp_path): - # Use pytest's tmp_path to create unique temp directory per test - extract_dir = tmp_path / "synapse_assessment" - synapse_extract_path = build_mock_synapse_extract("mock_profiler_extract", path_prefix=str(extract_dir)) +@pytest.fixture(scope="module") +def mock_synapse_profiler_extract(): + synapse_extract_path = build_mock_synapse_extract("mock_profiler_extract") return synapse_extract_path diff --git a/tests/resources/assessments/pipeline_config_success.yml b/tests/resources/assessments/pipeline_config.yml similarity index 94% rename from tests/resources/assessments/pipeline_config_success.yml rename to tests/resources/assessments/pipeline_config.yml index ad023bcb94..5e466ccb95 100644 --- a/tests/resources/assessments/pipeline_config_success.yml +++ b/tests/resources/assessments/pipeline_config.yml @@ -1,6 +1,6 @@ name: ExamplePipeline version: "1.0" -extract_folder: /tmp/extracts_success/ +extract_folder: /tmp/extracts/ steps: - name: inventory type: sql diff --git a/tests/resources/assessments/pipeline_config_sql_failure.yml b/tests/resources/assessments/pipeline_config_sql_failure.yml index 40ec77b7b0..fcc6e61e96 100644 --- a/tests/resources/assessments/pipeline_config_sql_failure.yml +++ b/tests/resources/assessments/pipeline_config_sql_failure.yml @@ -1,6 +1,6 @@ name: "SQL Failure Pipeline" version: "1.0" -extract_folder: /tmp/extracts_failure/ +extract_folder: /tmp/extracts/ steps: - name: invalid_sql_step type: sql From 514a7525e78ace44478eb9a3d325ac2d4540acfa Mon Sep 17 00:00:00 2001 From: sundarshankar89 Date: Thu, 4 Dec 2025 19:27:16 +0530 Subject: [PATCH 09/13] isolating failing tests --- src/databricks/labs/lakebridge/assessments/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/lakebridge/assessments/pipeline.py b/src/databricks/labs/lakebridge/assessments/pipeline.py index a990662194..63a56c29fb 100644 --- a/src/databricks/labs/lakebridge/assessments/pipeline.py +++ b/src/databricks/labs/lakebridge/assessments/pipeline.py @@ -237,7 +237,7 @@ def _save_to_db(self, result: FetchResult, step_name: str, mode: str): db_path = str(self.db_path_prefix / DB_NAME) # Check row count and log appropriately and skip data insertion if 0 rows - if len(result.rows) == 0: + if not result.rows: logging.warning( f"Query for step '{step_name}' returned 0 rows. Skipping table creation and data insertion." ) From ee008b3e7ee6be32fd15578a4a0812f0add67677 Mon Sep 17 00:00:00 2001 From: sundarshankar89 Date: Thu, 4 Dec 2025 19:48:19 +0530 Subject: [PATCH 10/13] isolating failing tests --- tests/integration/assessments/test_profiler_validator.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/integration/assessments/test_profiler_validator.py b/tests/integration/assessments/test_profiler_validator.py index 5d9aebc8e7..473f3e3d7e 100644 --- a/tests/integration/assessments/test_profiler_validator.py +++ b/tests/integration/assessments/test_profiler_validator.py @@ -28,9 +28,11 @@ def failure_pipeline_config_path(): return config_path -@pytest.fixture(scope="module") -def mock_synapse_profiler_extract(): - synapse_extract_path = build_mock_synapse_extract("mock_profiler_extract") +@pytest.fixture(scope="function") +def mock_synapse_profiler_extract(tmp_path): + # Use pytest's tmp_path to create unique temp directory per test + extract_dir = tmp_path / "synapse_assessment" + synapse_extract_path = build_mock_synapse_extract("mock_profiler_extract", path_prefix=str(extract_dir)) return synapse_extract_path From 55b3b87a3890e1d7d80663aefd8efcdd1379ce9f Mon Sep 17 00:00:00 2001 From: sundarshankar89 Date: Thu, 11 Dec 2025 08:11:41 +0530 Subject: [PATCH 11/13] isolating failing tests --- tests/utils/profiler_extract_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/utils/profiler_extract_utils.py b/tests/utils/profiler_extract_utils.py index 8fc9d6a7aa..fc3ae0126c 100644 --- a/tests/utils/profiler_extract_utils.py +++ b/tests/utils/profiler_extract_utils.py @@ -1,4 +1,5 @@ import os +from pathlib import Path from datetime import datetime from dataclasses import dataclass from collections.abc import Callable @@ -173,7 +174,7 @@ def generate_dedicated_storage_info(fake) -> tuple[int, int, datetime, int]: } -def build_mock_synapse_extract(extract_db_name: str, path_prefix: str = "/tmp/data/synapse_assessment") -> str: +def build_mock_synapse_extract(extract_db_name: str, path_prefix: Path = Path("/tmp/data/synapse_assessment")) -> str: synapse_extract_path = path_prefix os.makedirs(synapse_extract_path, exist_ok=True) full_synapse_extract_path = f"{synapse_extract_path}/{extract_db_name}.db" From e6d426e95ab1a6c7d2f65f5498264dca5855604f Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Fri, 19 Dec 2025 17:49:01 +0100 Subject: [PATCH 12/13] Revise SQL handling in #2172 (#2203) ## Changes This PR revises #2172 so that SQL is handled more safely: in particular we no longer dynamically build SQL queries to run. ### Relevant implementation details DuckDB can bulk insert from Pandas data frames, and with CTAS will properly (and safely) set the table schema. This is preferable to building SQL statements dynamically. ### Linked issues Revises #2172. --- .../labs/lakebridge/assessments/pipeline.py | 41 +++++++++---------- .../connections/database_manager.py | 8 ++++ 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/src/databricks/labs/lakebridge/assessments/pipeline.py b/src/databricks/labs/lakebridge/assessments/pipeline.py index 63a56c29fb..b61876f038 100644 --- a/src/databricks/labs/lakebridge/assessments/pipeline.py +++ b/src/databricks/labs/lakebridge/assessments/pipeline.py @@ -1,20 +1,19 @@ -from pathlib import Path -from subprocess import run, CalledProcessError, Popen, PIPE, STDOUT, DEVNULL -from dataclasses import dataclass -from enum import Enum - -import sys +import json +import logging import os +import sys import venv import tempfile -import json -import logging -import yaml -import duckdb +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from subprocess import CalledProcessError, DEVNULL, PIPE, Popen, STDOUT, run -from databricks.labs.lakebridge.connections.credential_manager import cred_file +import duckdb +import yaml from databricks.labs.lakebridge.assessments.profiler_config import PipelineConfig, Step +from databricks.labs.lakebridge.connections.credential_manager import cred_file from databricks.labs.lakebridge.connections.database_manager import DatabaseManager, FetchResult logger = logging.getLogger(__name__) @@ -245,20 +244,20 @@ def _save_to_db(self, result: FetchResult, step_name: str, mode: str): row_count = len(result.rows) logging.info(f"Query for step '{step_name}' returned {row_count} rows.") + # TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable + _result_frame = result.to_df().astype(str) with duckdb.connect(db_path) as conn: - # TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable - schema = ', '.join(f"{col} STRING" for col in result.columns) - - # Handle write modes + # DuckDB can access _result_frame from the local scope automatically. if mode == 'overwrite': - conn.execute(f"CREATE OR REPLACE TABLE {step_name} ({schema})") + statement = f"CREATE OR REPLACE TABLE {step_name} AS SELECT * FROM _result_frame" elif mode == 'append' and step_name not in conn.get_table_names(""): - conn.execute(f"CREATE TABLE {step_name} ({schema})") - - placeholders = ', '.join(['?'] * len(result.columns)) - conn.executemany(f"INSERT INTO {step_name} VALUES ({placeholders})", result.rows) - logging.info(f"Successfully inserted {row_count} rows into table '{step_name}'.") + statement = f"CREATE TABLE {step_name} AS SELECT * FROM _result_frame" + else: + statement = f"INSERT INTO {step_name} SELECT * FROM _result_frame" + logging.debug(f"Inserting {row_count} rows: {statement}") + conn.execute(statement) + logging.info(f"Successfully inserted {row_count} rows into table '{step_name}'.") @staticmethod def _create_dir(dir_path: Path): diff --git a/src/databricks/labs/lakebridge/connections/database_manager.py b/src/databricks/labs/lakebridge/connections/database_manager.py index 501adcd866..08c6b23f97 100644 --- a/src/databricks/labs/lakebridge/connections/database_manager.py +++ b/src/databricks/labs/lakebridge/connections/database_manager.py @@ -4,6 +4,8 @@ from typing import Any from collections.abc import Sequence, Set +import pandas as pd + from sqlalchemy import create_engine from sqlalchemy.engine import Engine, URL from sqlalchemy.engine.row import Row @@ -20,6 +22,12 @@ class FetchResult: columns: Set[str] rows: Sequence[Row[Any]] + def to_df(self) -> pd.DataFrame: + """Create a pandas dataframe based on these results.""" + # Row emulates a named tuple, which Pandas understands natively. So the columns are safely inferred unless + # we have an empty result-set. + return pd.DataFrame(data=self.rows) if self.rows else pd.DataFrame(columns=list(self.columns)) + class DatabaseConnector(ABC): @abstractmethod From 808762c519513822ab5ace325fe20b58905c4c21 Mon Sep 17 00:00:00 2001 From: sundarshankar89 Date: Tue, 20 Jan 2026 14:48:48 +0530 Subject: [PATCH 13/13] fmt fixes --- .../integration/assessments/test_pipeline.py | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/tests/integration/assessments/test_pipeline.py b/tests/integration/assessments/test_pipeline.py index b0b5631c33..258e44d700 100644 --- a/tests/integration/assessments/test_pipeline.py +++ b/tests/integration/assessments/test_pipeline.py @@ -1,10 +1,16 @@ from collections.abc import Callable from pathlib import Path +from logging import Logger from typing import TypeAlias import duckdb import pytest -from databricks.labs.lakebridge.assessments.pipeline import PipelineClass, DB_NAME, StepExecutionStatus +from databricks.labs.lakebridge.assessments.pipeline import ( + PipelineClass, + DB_NAME, + StepExecutionStatus, + StepExecutionResult, +) from databricks.labs.lakebridge.assessments.profiler import Profiler from databricks.labs.lakebridge.assessments.profiler_config import Step, PipelineConfig @@ -49,10 +55,9 @@ def python_failure_config(pipeline_configuration_loader: _Loader) -> PipelineCon def empty_result_config() -> PipelineConfig: prefix = Path(__file__).parent config_path = f"{prefix}/../../resources/assessments/pipeline_config_empty_result.yml" - config = PipelineClass.load_config_from_yaml(config_path) - for step in config.steps: - step.extract_source = f"{prefix}/../../{step.extract_source}" - return config + config: PipelineConfig = PipelineClass.load_config_from_yaml(config_path) + updated_steps = [step.copy(extract_source=f"{prefix}/../../{step.extract_source}") for step in config.steps] + return config.copy(steps=updated_steps) def test_run_pipeline( @@ -70,7 +75,7 @@ def test_run_pipeline( StepExecutionStatus.SKIPPED, ), f"Step {result.step_name} failed with status {result.status}" - assert verify_output(get_logger, pipeline_config.extract_folder) + assert verify_output(get_logger, Path(pipeline_config.extract_folder)) def test_run_sql_failure_pipeline( @@ -100,7 +105,9 @@ def test_run_python_failure_pipeline( def test_run_python_dep_failure_pipeline( - sandbox_sqlserver: DatabaseManager, pipeline_dep_failure_config: PipelineConfig, get_logger + sandbox_sqlserver: DatabaseManager, + pipeline_dep_failure_config: PipelineConfig, + get_logger: Logger, ): pipeline = PipelineClass(config=pipeline_dep_failure_config, executor=sandbox_sqlserver) with pytest.raises(RuntimeError) as e: @@ -125,7 +132,7 @@ def test_skipped_steps(sandbox_sqlserver: DatabaseManager, pipeline_config: Pipe assert result.error_message is None, "Skipped steps should not have error messages" -def verify_output(get_logger, path): +def verify_output(get_logger: Logger, path: Path): conn = duckdb.connect(str(Path(path)) + "/" + DB_NAME) expected_tables = ["usage", "inventory", "random_data"] @@ -133,8 +140,8 @@ def verify_output(get_logger, path): for table in expected_tables: try: result = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone() - logger.info(f"Count for {table}: {result[0]}") - if result[0] == 0: + logger.info(f"Count for {table}: {result}") + if result is None or result[0] == 0: logger.debug(f"Table {table} is empty") return False except duckdb.CatalogException: