diff --git a/src/databricks/labs/lakebridge/assessments/pipeline.py b/src/databricks/labs/lakebridge/assessments/pipeline.py index 63a56c29fb..b61876f038 100644 --- a/src/databricks/labs/lakebridge/assessments/pipeline.py +++ b/src/databricks/labs/lakebridge/assessments/pipeline.py @@ -1,20 +1,19 @@ -from pathlib import Path -from subprocess import run, CalledProcessError, Popen, PIPE, STDOUT, DEVNULL -from dataclasses import dataclass -from enum import Enum - -import sys +import json +import logging import os +import sys import venv import tempfile -import json -import logging -import yaml -import duckdb +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from subprocess import CalledProcessError, DEVNULL, PIPE, Popen, STDOUT, run -from databricks.labs.lakebridge.connections.credential_manager import cred_file +import duckdb +import yaml from databricks.labs.lakebridge.assessments.profiler_config import PipelineConfig, Step +from databricks.labs.lakebridge.connections.credential_manager import cred_file from databricks.labs.lakebridge.connections.database_manager import DatabaseManager, FetchResult logger = logging.getLogger(__name__) @@ -245,20 +244,20 @@ def _save_to_db(self, result: FetchResult, step_name: str, mode: str): row_count = len(result.rows) logging.info(f"Query for step '{step_name}' returned {row_count} rows.") + # TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable + _result_frame = result.to_df().astype(str) with duckdb.connect(db_path) as conn: - # TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable - schema = ', '.join(f"{col} STRING" for col in result.columns) - - # Handle write modes + # DuckDB can access _result_frame from the local scope automatically. if mode == 'overwrite': - conn.execute(f"CREATE OR REPLACE TABLE {step_name} ({schema})") + statement = f"CREATE OR REPLACE TABLE {step_name} AS SELECT * FROM _result_frame" elif mode == 'append' and step_name not in conn.get_table_names(""): - conn.execute(f"CREATE TABLE {step_name} ({schema})") - - placeholders = ', '.join(['?'] * len(result.columns)) - conn.executemany(f"INSERT INTO {step_name} VALUES ({placeholders})", result.rows) - logging.info(f"Successfully inserted {row_count} rows into table '{step_name}'.") + statement = f"CREATE TABLE {step_name} AS SELECT * FROM _result_frame" + else: + statement = f"INSERT INTO {step_name} SELECT * FROM _result_frame" + logging.debug(f"Inserting {row_count} rows: {statement}") + conn.execute(statement) + logging.info(f"Successfully inserted {row_count} rows into table '{step_name}'.") @staticmethod def _create_dir(dir_path: Path): diff --git a/src/databricks/labs/lakebridge/connections/database_manager.py b/src/databricks/labs/lakebridge/connections/database_manager.py index 501adcd866..08c6b23f97 100644 --- a/src/databricks/labs/lakebridge/connections/database_manager.py +++ b/src/databricks/labs/lakebridge/connections/database_manager.py @@ -4,6 +4,8 @@ from typing import Any from collections.abc import Sequence, Set +import pandas as pd + from sqlalchemy import create_engine from sqlalchemy.engine import Engine, URL from sqlalchemy.engine.row import Row @@ -20,6 +22,12 @@ class FetchResult: columns: Set[str] rows: Sequence[Row[Any]] + def to_df(self) -> pd.DataFrame: + """Create a pandas dataframe based on these results.""" + # Row emulates a named tuple, which Pandas understands natively. So the columns are safely inferred unless + # we have an empty result-set. + return pd.DataFrame(data=self.rows) if self.rows else pd.DataFrame(columns=list(self.columns)) + class DatabaseConnector(ABC): @abstractmethod