Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions src/databricks/labs/lakebridge/assessments/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
from pathlib import Path
from subprocess import run, CalledProcessError, Popen, PIPE, STDOUT, DEVNULL
from dataclasses import dataclass
from enum import Enum

import sys
import json
import logging
import os
import sys
import venv
import tempfile
import json
import logging
import yaml
import duckdb
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from subprocess import CalledProcessError, DEVNULL, PIPE, Popen, STDOUT, run

from databricks.labs.lakebridge.connections.credential_manager import cred_file
import duckdb
import yaml

from databricks.labs.lakebridge.assessments.profiler_config import PipelineConfig, Step
from databricks.labs.lakebridge.connections.credential_manager import cred_file
from databricks.labs.lakebridge.connections.database_manager import DatabaseManager, FetchResult

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -245,20 +244,20 @@ def _save_to_db(self, result: FetchResult, step_name: str, mode: str):

row_count = len(result.rows)
logging.info(f"Query for step '{step_name}' returned {row_count} rows.")
# TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable
_result_frame = result.to_df().astype(str)

with duckdb.connect(db_path) as conn:
# TODO: Add support for figuring out data types from SQLALCHEMY result object result.cursor.description is not reliable
schema = ', '.join(f"{col} STRING" for col in result.columns)

# Handle write modes
# DuckDB can access _result_frame from the local scope automatically.
if mode == 'overwrite':
conn.execute(f"CREATE OR REPLACE TABLE {step_name} ({schema})")
statement = f"CREATE OR REPLACE TABLE {step_name} AS SELECT * FROM _result_frame"
elif mode == 'append' and step_name not in conn.get_table_names(""):
conn.execute(f"CREATE TABLE {step_name} ({schema})")

placeholders = ', '.join(['?'] * len(result.columns))
conn.executemany(f"INSERT INTO {step_name} VALUES ({placeholders})", result.rows)
logging.info(f"Successfully inserted {row_count} rows into table '{step_name}'.")
statement = f"CREATE TABLE {step_name} AS SELECT * FROM _result_frame"
else:
statement = f"INSERT INTO {step_name} SELECT * FROM _result_frame"
logging.debug(f"Inserting {row_count} rows: {statement}")
conn.execute(statement)
logging.info(f"Successfully inserted {row_count} rows into table '{step_name}'.")

@staticmethod
def _create_dir(dir_path: Path):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import Any
from collections.abc import Sequence, Set

import pandas as pd

from sqlalchemy import create_engine
from sqlalchemy.engine import Engine, URL
from sqlalchemy.engine.row import Row
Expand All @@ -20,6 +22,12 @@ class FetchResult:
columns: Set[str]
rows: Sequence[Row[Any]]

def to_df(self) -> pd.DataFrame:
"""Create a pandas dataframe based on these results."""
# Row emulates a named tuple, which Pandas understands natively. So the columns are safely inferred unless
# we have an empty result-set.
return pd.DataFrame(data=self.rows) if self.rows else pd.DataFrame(columns=list(self.columns))


class DatabaseConnector(ABC):
@abstractmethod
Expand Down
Loading