Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1e0a883
Handle empty SQL resultsets gracefully in assessment pipeline
sundarshankar89 Dec 2, 2025
57a3f9d
added resources
sundarshankar89 Dec 2, 2025
3cad168
Fixed Review Comments
sundarshankar89 Dec 3, 2025
c46b6d3
Merge branch 'main' into feature/handle_empty_sql_output
sundarshankar89 Dec 3, 2025
839af11
Fix DuckDB file locking errors in concurrent profiler validator tests
sundarshankar89 Dec 4, 2025
7f151f3
Fix DuckDB file locking errors in concurrent profiler validator tests
sundarshankar89 Dec 4, 2025
aae9455
Merge branch 'patch/profiler_test_tmp_path' into feature/handle_empty…
sundarshankar89 Dec 4, 2025
f495f05
isolating failing tests
sundarshankar89 Dec 4, 2025
0ff326e
isolating failing tests
sundarshankar89 Dec 4, 2025
71211a6
isolating failing tests
sundarshankar89 Dec 4, 2025
514a752
isolating failing tests
sundarshankar89 Dec 4, 2025
ee008b3
isolating failing tests
sundarshankar89 Dec 4, 2025
67abfde
Merge branch 'main' into patch/profiler_test_tmp_path
sundarshankar89 Dec 10, 2025
e536890
Merge branch 'patch/profiler_test_tmp_path' into feature/handle_empty…
sundarshankar89 Dec 10, 2025
a873787
Merge branch 'main' into patch/profiler_test_tmp_path
sundarshankar89 Dec 11, 2025
6d649ce
Merge branch 'patch/profiler_test_tmp_path' into feature/handle_empty…
sundarshankar89 Dec 11, 2025
55b3b87
isolating failing tests
sundarshankar89 Dec 11, 2025
7aa7761
Merge branch 'patch/profiler_test_tmp_path' into feature/handle_empty…
sundarshankar89 Dec 11, 2025
dd357cb
Merge branch 'main' into feature/handle_empty_sql_output
sundarshankar89 Dec 15, 2025
e6d426e
Revise SQL handling in #2172 (#2203)
asnare Dec 19, 2025
66e1525
Merge branch 'main' into feature/handle_empty_sql_output
sundarshankar89 Dec 22, 2025
8801636
Merge branch 'main' into feature/handle_empty_sql_output
sundarshankar89 Dec 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions src/databricks/labs/lakebridge/assessments/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
from pathlib import Path
from subprocess import run, CalledProcessError, Popen, PIPE, STDOUT, DEVNULL
from dataclasses import dataclass
from enum import Enum

import sys
import json
import logging
import os
import sys
import venv
import tempfile
import json
import logging
import yaml
import duckdb
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from subprocess import CalledProcessError, DEVNULL, PIPE, Popen, STDOUT, run

from databricks.labs.lakebridge.connections.credential_manager import cred_file
import duckdb
import yaml

from databricks.labs.lakebridge.assessments.profiler_config import PipelineConfig, Step
from databricks.labs.lakebridge.connections.credential_manager import cred_file
from databricks.labs.lakebridge.connections.database_manager import DatabaseManager, FetchResult

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -236,21 +235,29 @@ def _run_python_script(venv_exec_cmd, script_path, db_path, credential_config):
def _save_to_db(self, result: FetchResult, step_name: str, mode: str):
db_path = str(self.db_path_prefix / DB_NAME)

with duckdb.connect(db_path) as conn:
# TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable
schema = ' STRING, '.join(result.columns) + ' STRING'
# Check row count and log appropriately and skip data insertion if 0 rows
if not result.rows:
logging.warning(
f"Query for step '{step_name}' returned 0 rows. Skipping table creation and data insertion."
)
return

row_count = len(result.rows)
logging.info(f"Query for step '{step_name}' returned {row_count} rows.")
# TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable
_result_frame = result.to_df().astype(str)
Comment on lines +247 to +248
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In switching to the DF approach I deliberately preserved the existing all-string behaviour. However this TODO might be resolved trivially if the .astype(str) is dropped: Pandas will infer the column types, which DuckDB will take over.


# Handle write modes
with duckdb.connect(db_path) as conn:
# DuckDB can access _result_frame from the local scope automatically.
if mode == 'overwrite':
conn.execute(f"CREATE OR REPLACE TABLE {step_name} ({schema})")
statement = f"CREATE OR REPLACE TABLE {step_name} AS SELECT * FROM _result_frame"
elif mode == 'append' and step_name not in conn.get_table_names(""):
conn.execute(f"CREATE TABLE {step_name} ({schema})")

# Batch insert using prepared statements
placeholders = ', '.join(['?' for _ in result.columns])
insert_query = f"INSERT INTO {step_name} VALUES ({placeholders})"

conn.executemany(insert_query, result.rows)
statement = f"CREATE TABLE {step_name} AS SELECT * FROM _result_frame"
else:
statement = f"INSERT INTO {step_name} SELECT * FROM _result_frame"
logging.debug(f"Inserting {row_count} rows: {statement}")
conn.execute(statement)
logging.info(f"Successfully inserted {row_count} rows into table '{step_name}'.")

@staticmethod
def _create_dir(dir_path: Path):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import Any
from collections.abc import Sequence, Set

import pandas as pd

from sqlalchemy import create_engine
from sqlalchemy.engine import Engine, URL
from sqlalchemy.engine.row import Row
Expand All @@ -20,6 +22,12 @@ class FetchResult:
columns: Set[str]
rows: Sequence[Row[Any]]

def to_df(self) -> pd.DataFrame:
"""Create a pandas dataframe based on these results."""
# Row emulates a named tuple, which Pandas understands natively. So the columns are safely inferred unless
# we have an empty result-set.
return pd.DataFrame(data=self.rows) if self.rows else pd.DataFrame(columns=list(self.columns))


class DatabaseConnector(ABC):
@abstractmethod
Expand Down
84 changes: 71 additions & 13 deletions tests/integration/assessments/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
from pathlib import Path
from logging import Logger

import duckdb
import pytest

from databricks.labs.lakebridge.assessments.pipeline import PipelineClass, DB_NAME, StepExecutionStatus

from databricks.labs.lakebridge.assessments.pipeline import (
PipelineClass,
DB_NAME,
StepExecutionStatus,
StepExecutionResult,
)
from databricks.labs.lakebridge.assessments.profiler_config import Step, PipelineConfig
from databricks.labs.lakebridge.connections.database_manager import DatabaseManager


@pytest.fixture(scope="module")
def pipeline_config():
def pipeline_config() -> PipelineConfig:
prefix = Path(__file__).parent
config_path = f"{prefix}/../../resources/assessments/pipeline_config.yml"
config = PipelineClass.load_config_from_yaml(config_path)
Expand All @@ -19,7 +26,7 @@ def pipeline_config():


@pytest.fixture(scope="module")
def pipeline_dep_failure_config():
def pipeline_dep_failure_config() -> PipelineConfig:
prefix = Path(__file__).parent
config_path = f"{prefix}/../../resources/assessments/pipeline_config_failure_dependency.yml"
config = PipelineClass.load_config_from_yaml(config_path)
Expand All @@ -30,7 +37,7 @@ def pipeline_dep_failure_config():


@pytest.fixture(scope="module")
def sql_failure_config():
def sql_failure_config() -> PipelineConfig:
prefix = Path(__file__).parent
config_path = f"{prefix}/../../resources/assessments/pipeline_config_sql_failure.yml"
config = PipelineClass.load_config_from_yaml(config_path)
Expand All @@ -40,7 +47,7 @@ def sql_failure_config():


@pytest.fixture(scope="module")
def python_failure_config():
def python_failure_config() -> PipelineConfig:
prefix = Path(__file__).parent
config_path = f"{prefix}/../../resources/assessments/pipeline_config_python_failure.yml"
config = PipelineClass.load_config_from_yaml(config_path)
Expand All @@ -49,7 +56,21 @@ def python_failure_config():
return config


def test_run_pipeline(sandbox_sqlserver, pipeline_config, get_logger):
@pytest.fixture(scope="module")
def empty_result_config() -> PipelineConfig:
prefix = Path(__file__).parent
config_path = f"{prefix}/../../resources/assessments/pipeline_config_empty_result.yml"
config = PipelineClass.load_config_from_yaml(config_path)
for step in config.steps:
step.extract_source = f"{prefix}/../../{step.extract_source}"
Comment on lines +61 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self, will need adjustment w.r.t. #2210.

return config


def test_run_pipeline(
sandbox_sqlserver: DatabaseManager,
pipeline_config: PipelineConfig,
get_logger: Logger,
) -> None:
pipeline = PipelineClass(config=pipeline_config, executor=sandbox_sqlserver)
results = pipeline.execute()

Expand All @@ -63,7 +84,11 @@ def test_run_pipeline(sandbox_sqlserver, pipeline_config, get_logger):
assert verify_output(get_logger, pipeline_config.extract_folder)


def test_run_sql_failure_pipeline(sandbox_sqlserver, sql_failure_config, get_logger):
def test_run_sql_failure_pipeline(
sandbox_sqlserver: DatabaseManager,
sql_failure_config: PipelineConfig,
get_logger: Logger,
) -> None:
pipeline = PipelineClass(config=sql_failure_config, executor=sandbox_sqlserver)
with pytest.raises(RuntimeError) as e:
pipeline.execute()
Expand All @@ -72,7 +97,11 @@ def test_run_sql_failure_pipeline(sandbox_sqlserver, sql_failure_config, get_log
assert "Pipeline execution failed due to errors in steps: invalid_sql_step" in str(e.value)


def test_run_python_failure_pipeline(sandbox_sqlserver, python_failure_config, get_logger):
def test_run_python_failure_pipeline(
sandbox_sqlserver: DatabaseManager,
python_failure_config: PipelineConfig,
get_logger: Logger,
) -> None:
pipeline = PipelineClass(config=python_failure_config, executor=sandbox_sqlserver)
with pytest.raises(RuntimeError) as e:
pipeline.execute()
Expand All @@ -81,7 +110,9 @@ def test_run_python_failure_pipeline(sandbox_sqlserver, python_failure_config, g
assert "Pipeline execution failed due to errors in steps: invalid_python_step" in str(e.value)


def test_run_python_dep_failure_pipeline(sandbox_sqlserver, pipeline_dep_failure_config, get_logger):
def test_run_python_dep_failure_pipeline(
sandbox_sqlserver: DatabaseManager, pipeline_dep_failure_config: PipelineConfig, get_logger
):
pipeline = PipelineClass(config=pipeline_dep_failure_config, executor=sandbox_sqlserver)
with pytest.raises(RuntimeError) as e:
pipeline.execute()
Expand All @@ -90,7 +121,11 @@ def test_run_python_dep_failure_pipeline(sandbox_sqlserver, pipeline_dep_failure
assert "Pipeline execution failed due to errors in steps: package_status" in str(e.value)


def test_skipped_steps(sandbox_sqlserver, pipeline_config, get_logger):
def test_skipped_steps(
sandbox_sqlserver: DatabaseManager,
pipeline_config: PipelineConfig,
get_logger: Logger,
) -> None:
# Modify config to have some inactive steps
for step in pipeline_config.steps:
step.flag = "inactive"
Expand Down Expand Up @@ -126,7 +161,7 @@ def verify_output(get_logger, path):
return True


def test_pipeline_config_comments():
def test_pipeline_config_comments() -> None:
pipeline_w_comments = PipelineConfig(
name="warehouse_profiler",
version="1.0",
Expand All @@ -140,7 +175,7 @@ def test_pipeline_config_comments():
assert pipeline_wo_comments.comment is None


def test_pipeline_step_comments():
def test_pipeline_step_comments() -> None:
step_w_comment = Step(
name="step_w_comment",
type="sql",
Expand All @@ -160,3 +195,26 @@ def test_pipeline_step_comments():
)
assert step_w_comment.comment == "This is a step comment."
assert step_wo_comment.comment is None


def test_run_empty_result_pipeline(
sandbox_sqlserver: DatabaseManager,
empty_result_config: PipelineConfig,
get_logger: Logger,
) -> None:
pipeline = PipelineClass(config=empty_result_config, executor=sandbox_sqlserver)
results = pipeline.execute()

# Verify step completed successfully despite empty results
assert len(results) == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: unnecessary assertion.

assert results == [
StepExecutionResult(step_name="empty_result_step", status=StepExecutionStatus.COMPLETE, error_message=None)
]

# Verify that no table was created (processing was skipped for empty resultset)
with duckdb.connect(str(Path(empty_result_config.extract_folder)) + "/" + DB_NAME) as conn:
tables = conn.execute("SHOW TABLES").fetchall()
table_names = [table[0] for table in tables]

# Table should NOT be created when resultset is empty
assert "empty_result_step" not in table_names, "Empty resultset should skip table creation"
6 changes: 6 additions & 0 deletions tests/resources/assessments/empty_resultset.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- Query that returns valid schema but 0 rows
SELECT
'test' as col1,
'test' as col2,
'test' as col3
WHERE 1 = 0
10 changes: 10 additions & 0 deletions tests/resources/assessments/pipeline_config_empty_result.yml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another note to self, this will need adjusting w.r.t. #2208 and #2210.

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
name: test_empty_result_pipeline
version: 1.0
extract_folder: /tmp/lakebridge_test_empty_result
steps:
- name: empty_result_step
type: sql
extract_source: resources/assessments/empty_resultset.sql
mode: overwrite
frequency: once
flag: active
Loading