From 5b7af5813e7f86c79b3ea4503533d2ecf36caf10 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Fri, 19 Dec 2025 15:31:04 +0100 Subject: [PATCH 1/3] Use pandas to safely create the DuckDB table instead of building SQL strings. --- .../labs/lakebridge/assessments/pipeline.py | 40 +++++++++---------- .../connections/database_manager.py | 8 ++++ 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/src/databricks/labs/lakebridge/assessments/pipeline.py b/src/databricks/labs/lakebridge/assessments/pipeline.py index 63a56c29fb..037c7166d0 100644 --- a/src/databricks/labs/lakebridge/assessments/pipeline.py +++ b/src/databricks/labs/lakebridge/assessments/pipeline.py @@ -1,20 +1,19 @@ -from pathlib import Path -from subprocess import run, CalledProcessError, Popen, PIPE, STDOUT, DEVNULL -from dataclasses import dataclass -from enum import Enum - -import sys +import json +import logging import os +import sys import venv import tempfile -import json -import logging -import yaml -import duckdb +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from subprocess import CalledProcessError, DEVNULL, PIPE, Popen, STDOUT, run -from databricks.labs.lakebridge.connections.credential_manager import cred_file +import duckdb +import yaml from databricks.labs.lakebridge.assessments.profiler_config import PipelineConfig, Step +from databricks.labs.lakebridge.connections.credential_manager import cred_file from databricks.labs.lakebridge.connections.database_manager import DatabaseManager, FetchResult logger = logging.getLogger(__name__) @@ -245,20 +244,21 @@ def _save_to_db(self, result: FetchResult, step_name: str, mode: str): row_count = len(result.rows) logging.info(f"Query for step '{step_name}' returned {row_count} rows.") + # TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable + _result_frame = result.to_df().astype(str) with duckdb.connect(db_path) as conn: - # TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable - schema = ', '.join(f"{col} STRING" for col in result.columns) - # Handle write modes + # DuckDB can access _result_frame from the local scope automatically. if mode == 'overwrite': - conn.execute(f"CREATE OR REPLACE TABLE {step_name} ({schema})") + statement = f"CREATE OR REPLACE TABLE {step_name} AS SELECT * FROM _result_frame" elif mode == 'append' and step_name not in conn.get_table_names(""): - conn.execute(f"CREATE TABLE {step_name} ({schema})") - - placeholders = ', '.join(['?'] * len(result.columns)) - conn.executemany(f"INSERT INTO {step_name} VALUES ({placeholders})", result.rows) - logging.info(f"Successfully inserted {row_count} rows into table '{step_name}'.") + statement = f"CREATE TABLE {step_name} AS SELECT * FROM _result_frame" + else: + statement = f"INSERT INTO {step_name} SELECT * FROM _result_frame" + logging.debug(f"Inserting {row_count} rows: {statement}") + conn.execute(statement) + logging.info(f"Successfully inserted {row_count} rows into table '{step_name}'.") @staticmethod def _create_dir(dir_path: Path): diff --git a/src/databricks/labs/lakebridge/connections/database_manager.py b/src/databricks/labs/lakebridge/connections/database_manager.py index 501adcd866..8318d5743d 100644 --- a/src/databricks/labs/lakebridge/connections/database_manager.py +++ b/src/databricks/labs/lakebridge/connections/database_manager.py @@ -4,6 +4,8 @@ from typing import Any from collections.abc import Sequence, Set +import pandas as pd + from sqlalchemy import create_engine from sqlalchemy.engine import Engine, URL from sqlalchemy.engine.row import Row @@ -20,6 +22,12 @@ class FetchResult: columns: Set[str] rows: Sequence[Row[Any]] + def to_df(self) -> pd.DataFrame: + """Create a pandas dataframe based on these results.""" + # Because columns aren't necessarily in the same order as the row values, we have to do this the hard way. + rows = [row.as_dict() for row in self.rows] + return pd.DataFrame(data=rows, columns=list(self.columns)) + class DatabaseConnector(ABC): @abstractmethod From e46419cd56e06190849cb9d1f6164284a8d3ab0b Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Fri, 19 Dec 2025 15:33:06 +0100 Subject: [PATCH 2/3] Whitespace. --- src/databricks/labs/lakebridge/assessments/pipeline.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/databricks/labs/lakebridge/assessments/pipeline.py b/src/databricks/labs/lakebridge/assessments/pipeline.py index 037c7166d0..b61876f038 100644 --- a/src/databricks/labs/lakebridge/assessments/pipeline.py +++ b/src/databricks/labs/lakebridge/assessments/pipeline.py @@ -248,7 +248,6 @@ def _save_to_db(self, result: FetchResult, step_name: str, mode: str): _result_frame = result.to_df().astype(str) with duckdb.connect(db_path) as conn: - # DuckDB can access _result_frame from the local scope automatically. if mode == 'overwrite': statement = f"CREATE OR REPLACE TABLE {step_name} AS SELECT * FROM _result_frame" From 7b67aad6f0d20821aaea8163d13226b10c49a3fc Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Fri, 19 Dec 2025 15:56:32 +0100 Subject: [PATCH 3/3] Fix the way we build the dataframe. --- .../labs/lakebridge/connections/database_manager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/databricks/labs/lakebridge/connections/database_manager.py b/src/databricks/labs/lakebridge/connections/database_manager.py index 8318d5743d..08c6b23f97 100644 --- a/src/databricks/labs/lakebridge/connections/database_manager.py +++ b/src/databricks/labs/lakebridge/connections/database_manager.py @@ -24,9 +24,9 @@ class FetchResult: def to_df(self) -> pd.DataFrame: """Create a pandas dataframe based on these results.""" - # Because columns aren't necessarily in the same order as the row values, we have to do this the hard way. - rows = [row.as_dict() for row in self.rows] - return pd.DataFrame(data=rows, columns=list(self.columns)) + # Row emulates a named tuple, which Pandas understands natively. So the columns are safely inferred unless + # we have an empty result-set. + return pd.DataFrame(data=self.rows) if self.rows else pd.DataFrame(columns=list(self.columns)) class DatabaseConnector(ABC):