Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3ca7ce2
Use path traversal instead of string interpolation/parsing.
asnare Dec 30, 2025
24926ba
Type hint.
asnare Dec 30, 2025
081afb6
Use class-method directly.
asnare Dec 30, 2025
8f8698d
Tidy up path manipulation to load version file.
asnare Dec 30, 2025
39654af
Type hints.
asnare Dec 30, 2025
8b57115
Introduce a test fixture that provides the location of resources need…
asnare Dec 30, 2025
d0b547d
Refactor existing tests that use `Path(__file__)` to locate test reso…
asnare Dec 30, 2025
d58b0ab
Drop unused global.
asnare Dec 30, 2025
b6af977
Eliminate more Path(__file__) manipulation.
asnare Dec 30, 2025
f50ec68
Use pathlib instead of string interpolation.
asnare Dec 30, 2025
00261c9
Add missing import.
asnare Dec 30, 2025
2c48403
Set defaults directly instead of during __post_init__().
asnare Dec 30, 2025
2007cc3
Make PipelineConfig immutable.
asnare Dec 30, 2025
4e0f733
Update the test LSP server to not use Path(__file__) manipulation.
asnare Dec 30, 2025
2b4a8a2
Consolidate Presto functional tests, and eliminate Path(__file__) man…
asnare Dec 30, 2025
234e6f5
Update Oracle functional tests to eliminate Path(__file__) manipulation.
asnare Dec 30, 2025
6bd340c
Consolidate Snowflake functional tests, and eliminate Path(__file__) …
asnare Dec 30, 2025
b7e81ce
Update Morpheus integration tests to not use Path(__file__) manipulat…
asnare Dec 30, 2025
d991934
Update tests that run our stub LSP server to not use Path(__file__) m…
asnare Dec 30, 2025
04e22c5
Remove unused code.
asnare Dec 30, 2025
b6b85a4
TODO marker for future work.
asnare Dec 30, 2025
32a8519
Update assessment pipeline to avoid Path(__file__) manipulation.
asnare Dec 30, 2025
a9faad1
Fix formatting.
asnare Dec 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions src/databricks/labs/lakebridge/assessments/dashboards/execute.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import logging
import os
import sys
from collections.abc import Sequence
from importlib import resources
from importlib.abc import Traversable
from pathlib import Path
import yaml
from yaml.parser import ParserError
from yaml.scanner import ScannerError

import duckdb
import yaml
from pyspark.sql import SparkSession
from yaml.parser import ParserError
from yaml.scanner import ScannerError

import databricks.labs.lakebridge.resources.assessments as assessment_resources
from databricks.labs.lakebridge.assessments.profiler_validator import (
EmptyTableValidationCheck,
build_validation_report,
Expand All @@ -34,22 +38,22 @@ def main(*argv) -> None:
raise ValueError("Corrupt or invalid profiler extract.")


def _get_extract_tables(schema_def_path: str) -> list:
def _get_extract_tables(schema_def_path: Path | Traversable) -> Sequence[tuple[str, str, str]]:
"""
Given a schema definition file for a source technology, returns a list of table info tuples:
(schema_name, table_name, fully_qualified_name)
"""
# First, load the schema definition file
try:
with open(schema_def_path, 'r', encoding="UTF-8") as f:
with schema_def_path.open(mode="r", encoding="utf-8") as f:
data = yaml.safe_load(f)
except (ParserError, ScannerError) as e:
raise ValueError(f"Could not read extract schema definition '{schema_def_path}': {e}") from e
except FileNotFoundError as e:
raise FileNotFoundError(f"Schema definition not found: {schema_def_path}") from e
# Iterate through the defined schemas and build a list of
# table info tuples: (schema_name, table_name, fully_qualified_name)
extracted_tables = []
extracted_tables: list[tuple[str, str, str]] = []
for schema_name, schema_def in data.get("schemas", {}).items():
tables = schema_def.get("tables", {})
for table_name in tables.keys():
Expand All @@ -64,10 +68,11 @@ def _validate_profiler_extract(
) -> bool:
logger.info("Validating the profiler extract file.")
validation_checks: list[EmptyTableValidationCheck | ExtractSchemaValidationCheck] = []
schema_def_path = f"{Path(__file__).parent}/../../resources/assessments/{source_tech}_schema_def.yml"
tables = _get_extract_tables(schema_def_path)
# TODO: Verify this, I don't think it works? (These files are part of the test resources.)
schema_def = resources.files(assessment_resources).joinpath(f"{source_tech}_schema_def.yml")
Comment on lines +71 to +72
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this worked prior to this PR (or after): this is a pure refactoring.

But it does raise an interesting point: how is this tested?

@sundarshankar89: How can I check this?

tables = _get_extract_tables(schema_def)
try:
with duckdb.connect(database=extract_location) as duck_conn:
with duckdb.connect(database=extract_location) as duck_conn, resources.as_file(schema_def) as schema_def_path:
for table_info in tables:
# Ensure that the table contains data
empty_check = EmptyTableValidationCheck(table_info[2])
Expand All @@ -79,7 +84,7 @@ def _validate_profiler_extract(
table_info[1],
source_tech=source_tech,
extract_path=extract_location,
schema_path=schema_def_path,
schema_path=str(schema_def_path),
)
validation_checks.append(schema_check)
report = build_validation_report(validation_checks, duck_conn)
Expand Down
5 changes: 2 additions & 3 deletions src/databricks/labs/lakebridge/assessments/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ def supported_platforms(cls) -> list[str]:
def path_modifier(*, config_file: str | Path, path_prefix: Path = PRODUCT_PATH_PREFIX) -> PipelineConfig:
# TODO: Make this work install during developer mode
config = PipelineClass.load_config_from_yaml(config_file)
for step in config.steps:
step.extract_source = f"{path_prefix}/{step.extract_source}"
return config
new_steps = [step.copy(extract_source=str(path_prefix / step.extract_source)) for step in config.steps]
return config.copy(steps=new_steps)

def profile(
self,
Expand Down
23 changes: 11 additions & 12 deletions src/databricks/labs/lakebridge/assessments/profiler_config.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
import dataclasses
from dataclasses import dataclass, field


@dataclass
@dataclass(frozen=True)
class Step:
name: str
type: str | None
extract_source: str
mode: str | None
frequency: str | None
flag: str | None
mode: str = "append"
frequency: str = "once"
flag: str = "active"
dependencies: list[str] = field(default_factory=list)
comment: str | None = None

def __post_init__(self):
if self.frequency is None:
self.frequency = "once"
if self.flag is None:
self.flag = "active"
if self.mode is None:
self.mode = "append"
def copy(self, /, **changes) -> "Step":
return dataclasses.replace(self, **changes)


@dataclass
@dataclass(frozen=True)
class PipelineConfig:
name: str
version: str
extract_folder: str
comment: str | None = None
steps: list[Step] = field(default_factory=list)

def copy(self, /, **changes) -> "PipelineConfig":
return dataclasses.replace(self, **changes)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from dataclasses import dataclass
from collections.abc import Sequence
from pathlib import Path

import yaml
from duckdb import DuckDBPyConnection, CatalogException, ParserException, Error
Expand Down Expand Up @@ -201,7 +202,7 @@ def validate(self, connection) -> ValidationOutcome:
)


def get_profiler_extract_path(pipeline_config_path: str) -> str:
def get_profiler_extract_path(pipeline_config_path: Path) -> Path:
"""
Returns the filesystem path of the profiler extract database.
input:
Expand All @@ -211,7 +212,7 @@ def get_profiler_extract_path(pipeline_config_path: str) -> str:
"""
pipeline_config = PipelineClass.load_config_from_yaml(pipeline_config_path)
normalized_db_path = os.path.normpath(pipeline_config.extract_folder)
database_path = f"{normalized_db_path}/{PROFILER_DB_NAME}"
database_path = Path(normalized_db_path) / PROFILER_DB_NAME
return database_path


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ def _get_secret_value(self, key: str) -> str:


def _get_home() -> Path:
return Path(__file__).home()
return Path.home()


def cred_file(product_name) -> Path:
return Path(f"{_get_home()}/.databricks/labs/{product_name}/.credentials.yml")
return _get_home() / ".databricks" / "labs" / product_name / ".credentials.yml"


def _load_credentials(path: Path) -> dict:
Expand Down
5 changes: 2 additions & 3 deletions src/databricks/labs/lakebridge/deployment/installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ def __init__(
self._product_info = product_info
self._upgrades = upgrades

def _get_local_version_file_path(self):
user_home = f"{Path(__file__).home()}"
return Path(f"{user_home}/.databricks/labs/{self._product_info.product_name()}/state/version.json")
def _get_local_version_file_path(self) -> Path:
return Path.home() / ".databricks" / "labs" / self._product_info.product_name() / "state" / "version.json"

def _get_local_version_file(self, file_path: Path):
data = None
Expand Down
32 changes: 16 additions & 16 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@
from databricks.labs.lakebridge.reconcile.normalize_recon_config_service import NormalizeReconConfigService


@pytest.fixture(scope="session")
def project_path(pytestconfig: pytest.Config) -> Path:
"""The path of the directory where this project is located."""
return pytestconfig.rootpath


@pytest.fixture(scope="session")
def test_resources(project_path: Path) -> Path:
"""Obtain the path to where resources used by tests are stored."""
return project_path / "tests" / "resources"


@pytest.fixture()
def mock_workspace_client():
client = create_autospec(WorkspaceClient)
Expand Down Expand Up @@ -285,29 +297,17 @@ def mock_data_source():


@pytest.fixture(scope="session")
def bladebridge_artifact() -> Path:
def bladebridge_artifact(test_resources: Path) -> Path:
artifact = (
Path(__file__).parent
/ "resources"
/ "transpiler_configs"
/ "bladebridge"
/ "wheel"
/ "databricks_bb_plugin-0.1.9-py3-none-any.whl"
test_resources / "transpiler_configs" / "bladebridge" / "wheel" / "databricks_bb_plugin-0.1.9-py3-none-any.whl"
)
assert artifact.exists()
return artifact


@pytest.fixture(scope="session")
def morpheus_artifact() -> Path:
artifact = (
Path(__file__).parent
/ "resources"
/ "transpiler_configs"
/ "morpheus"
/ "jar"
/ "databricks-morph-plugin-0.4.0.jar"
)
def morpheus_artifact(test_resources: Path) -> Path:
artifact = test_resources / "transpiler_configs" / "morpheus" / "jar" / "databricks-morph-plugin-0.4.0.jar"
assert artifact.exists()
return artifact

Expand Down
60 changes: 25 additions & 35 deletions tests/integration/assessments/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,42 @@
from collections.abc import Callable
from pathlib import Path
import duckdb
import pytest

from databricks.labs.lakebridge.assessments.pipeline import PipelineClass, DB_NAME, StepExecutionStatus
from databricks.labs.lakebridge.assessments.profiler import Profiler

from databricks.labs.lakebridge.assessments.profiler_config import Step, PipelineConfig
from databricks.labs.lakebridge.connections.database_manager import DatabaseManager


@pytest.fixture(scope="module")
def pipeline_config():
prefix = Path(__file__).parent
config_path = f"{prefix}/../../resources/assessments/pipeline_config.yml"
config = PipelineClass.load_config_from_yaml(config_path)
@pytest.fixture
def pipeline_configuration_loader(test_resources: Path, project_path: Path) -> Callable[[Path], PipelineConfig]:
def _load(resource_name: Path) -> PipelineConfig:
config_path = test_resources / "assessments" / resource_name
return Profiler.path_modifier(config_file=config_path, path_prefix=test_resources)

for step in config.steps:
step.extract_source = f"{prefix}/../../{step.extract_source}"
return config
return _load


@pytest.fixture(scope="module")
def pipeline_dep_failure_config():
prefix = Path(__file__).parent
config_path = f"{prefix}/../../resources/assessments/pipeline_config_failure_dependency.yml"
config = PipelineClass.load_config_from_yaml(config_path)
@pytest.fixture
def pipeline_config(pipeline_configuration_loader) -> PipelineConfig:
return pipeline_configuration_loader(Path("pipeline_config.yml"))

for step in config.steps:
step.extract_source = f"{prefix}/../../{step.extract_source}"
return config

@pytest.fixture
def pipeline_dep_failure_config(pipeline_configuration_loader) -> PipelineConfig:
return pipeline_configuration_loader(Path("pipeline_config_failure_dependency.yml"))

@pytest.fixture(scope="module")
def sql_failure_config():
prefix = Path(__file__).parent
config_path = f"{prefix}/../../resources/assessments/pipeline_config_sql_failure.yml"
config = PipelineClass.load_config_from_yaml(config_path)
for step in config.steps:
step.extract_source = f"{prefix}/../../{step.extract_source}"
return config

@pytest.fixture
def sql_failure_config(pipeline_configuration_loader) -> PipelineConfig:
return pipeline_configuration_loader(Path("pipeline_config_sql_failure.yml"))

@pytest.fixture(scope="module")
def python_failure_config():
prefix = Path(__file__).parent
config_path = f"{prefix}/../../resources/assessments/pipeline_config_python_failure.yml"
config = PipelineClass.load_config_from_yaml(config_path)
for step in config.steps:
step.extract_source = f"{prefix}/../../{step.extract_source}"
return config

@pytest.fixture
def python_failure_config(pipeline_configuration_loader) -> PipelineConfig:
return pipeline_configuration_loader(Path("pipeline_config_python_failure.yml"))


def test_run_pipeline(sandbox_sqlserver, pipeline_config, get_logger):
Expand Down Expand Up @@ -90,10 +80,10 @@ def test_run_python_dep_failure_pipeline(sandbox_sqlserver, pipeline_dep_failure
assert "Pipeline execution failed due to errors in steps: package_status" in str(e.value)


def test_skipped_steps(sandbox_sqlserver, pipeline_config, get_logger):
def test_skipped_steps(sandbox_sqlserver: DatabaseManager, pipeline_config: PipelineConfig) -> None:
# Modify config to have some inactive steps
for step in pipeline_config.steps:
step.flag = "inactive"
inactive_steps = [step.copy(flag="inactive") for step in pipeline_config.steps]
pipeline_config = pipeline_config.copy(steps=inactive_steps)

pipeline = PipelineClass(config=pipeline_config, executor=sandbox_sqlserver)
results = pipeline.execute()
Expand Down
24 changes: 9 additions & 15 deletions tests/integration/assessments/test_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,31 @@ def test_profile_missing_platform_config() -> None:
profiler.profile()


def test_profile_execution() -> None:
def test_profile_execution(test_resources: Path) -> None:
"""Test successful profiling execution using actual pipeline configuration"""
profiler = Profiler("synapse")
path_prefix = Path(__file__).parent / "../../../"
config_file = path_prefix / "tests/resources/assessments/pipeline_config_main.yml"
config = profiler.path_modifier(config_file=config_file, path_prefix=path_prefix)
config_file = test_resources / "assessments" / "pipeline_config_main.yml"
config = profiler.path_modifier(config_file=config_file, path_prefix=test_resources)
profiler.profile(pipeline_config=config)
assert Path("/tmp/profiler_main/profiler_extract.db").exists(), "Profiler extract database should be created"


def test_profile_execution_with_invalid_config() -> None:
def test_profile_execution_with_invalid_config(test_resources: Path) -> None:
"""Test profiling execution with invalid configuration"""
profiler = Profiler("synapse")
path_prefix = Path(__file__).parent / "../../../"
with pytest.raises(FileNotFoundError):
config_file = path_prefix / "tests/resources/assessments/invalid_pipeline_config.yml"
pipeline_config = profiler.path_modifier(
config_file=config_file,
path_prefix=path_prefix,
)
config_file = test_resources / "assessments" / "invalid_pipeline_config.yml"
pipeline_config = profiler.path_modifier(config_file=config_file, path_prefix=test_resources)
profiler.profile(pipeline_config=pipeline_config)


def test_profile_execution_config_override() -> None:
def test_profile_execution_config_override(test_resources: Path) -> None:
"""Test successful profiling execution using actual pipeline configuration with config file override"""
with tempfile.TemporaryDirectory() as temp_dir:
# Copy the YAML file and Python script to the temp directory
prefix = Path(__file__).parent / ".." / ".."
config_file_src = prefix / Path("resources/assessments/pipeline_config_absolute.yml")
config_file_src = test_resources / "assessments" / "pipeline_config_absolute.yml"
config_file_dest = Path(temp_dir) / config_file_src.name
script_src = prefix / Path("resources/assessments/db_extract.py")
script_src = test_resources / "assessments" / "db_extract.py"
script_dest = Path(temp_dir) / script_src.name
shutil.copy(script_src, script_dest)

Expand Down
Loading
Loading