Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import os
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path

import duckdb
from faker import Faker
Expand Down Expand Up @@ -174,11 +173,11 @@ def generate_dedicated_storage_info(fake) -> tuple[int, int, datetime, int]:
}


def build_mock_synapse_extract(extract_db_name: str, path_prefix: Path = Path("/tmp/data/synapse_assessment")) -> str:
def build_mock_synapse_extract(extract_db_name: str, path_prefix: Path) -> Path:
synapse_extract_path = path_prefix
os.makedirs(synapse_extract_path, exist_ok=True)
full_synapse_extract_path = f"{synapse_extract_path}/{extract_db_name}.db"
builder = SynapseProfilerBuilder(table_definitions, full_synapse_extract_path)
synapse_extract_path.mkdir(parents=True, exist_ok=False)
full_synapse_extract_path = synapse_extract_path / f"{extract_db_name}.db"
builder = SynapseProfilerBuilder(table_definitions, db_path=str(full_synapse_extract_path))
builder.create_sample_data()
builder.shutdown()
return full_synapse_extract_path
24 changes: 14 additions & 10 deletions tests/integration/assessments/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,47 @@
from databricks.labs.lakebridge.assessments.profiler_config import Step, PipelineConfig


@pytest.fixture(scope="module")
def pipeline_config():
@pytest.fixture
def pipeline_config(tmp_path: Path) -> PipelineConfig:
prefix = Path(__file__).parent
config_path = f"{prefix}/../../resources/assessments/pipeline_config.yml"
config = PipelineClass.load_config_from_yaml(config_path)
config.extract_folder = str(tmp_path / "pipeline_output")

for step in config.steps:
step.extract_source = f"{prefix}/../../{step.extract_source}"
return config


@pytest.fixture(scope="module")
def pipeline_dep_failure_config():
@pytest.fixture
def pipeline_dep_failure_config(tmp_path: Path) -> PipelineConfig:
prefix = Path(__file__).parent
config_path = f"{prefix}/../../resources/assessments/pipeline_config_failure_dependency.yml"
config = PipelineClass.load_config_from_yaml(config_path)
config.extract_folder = str(tmp_path / "pipeline_output")

for step in config.steps:
step.extract_source = f"{prefix}/../../{step.extract_source}"
return config


@pytest.fixture(scope="module")
def sql_failure_config():
@pytest.fixture
def sql_failure_config(tmp_path: Path) -> PipelineConfig:
prefix = Path(__file__).parent
config_path = f"{prefix}/../../resources/assessments/pipeline_config_sql_failure.yml"
config = PipelineClass.load_config_from_yaml(config_path)
config.extract_folder = str(tmp_path / "pipeline_output")
for step in config.steps:
step.extract_source = f"{prefix}/../../{step.extract_source}"
return config


@pytest.fixture(scope="module")
def python_failure_config():
@pytest.fixture
def python_failure_config(tmp_path: Path) -> PipelineConfig:
prefix = Path(__file__).parent
config_path = f"{prefix}/../../resources/assessments/pipeline_config_python_failure.yml"
config = PipelineClass.load_config_from_yaml(config_path)
config.extract_folder = str(tmp_path / "pipeline_output")
for step in config.steps:
step.extract_source = f"{prefix}/../../{step.extract_source}"
return config
Expand Down Expand Up @@ -130,11 +134,11 @@ def test_pipeline_config_comments():
pipeline_w_comments = PipelineConfig(
name="warehouse_profiler",
version="1.0",
extract_folder="/tmp/extracts",
extract_folder="/the/output/path",
comment="A pipeline for extracting warehouse usage.",
)
pipeline_wo_comments = PipelineConfig(
name="another_warehouse_profiler", version="1.0", extract_folder="/tmp/extracts"
name="another_warehouse_profiler", version="1.0", extract_folder="/the/output/path"
)
assert pipeline_w_comments.comment == "A pipeline for extracting warehouse usage."
assert pipeline_wo_comments.comment is None
Expand Down
50 changes: 26 additions & 24 deletions tests/integration/assessments/test_profiler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from pathlib import Path

import shutil
import tempfile
import yaml
import pytest

Expand All @@ -24,14 +23,16 @@ def test_profile_missing_platform_config() -> None:
profiler.profile()


def test_profile_execution() -> None:
def test_profile_execution(tmp_path: Path) -> None:
"""Test successful profiling execution using actual pipeline configuration"""
profiler = Profiler("synapse")
path_prefix = Path(__file__).parent / "../../../"
extract_folder = tmp_path / "profiler_main"
config_file = path_prefix / "tests/resources/assessments/pipeline_config_main.yml"
config = profiler.path_modifier(config_file=config_file, path_prefix=path_prefix)
config.extract_folder = str(extract_folder)
profiler.profile(pipeline_config=config)
assert Path("/tmp/profiler_main/profiler_extract.db").exists(), "Profiler extract database should be created"
assert (extract_folder / "profiler_extract.db").exists(), "Profiler extract database should be created"


def test_profile_execution_with_invalid_config() -> None:
Expand All @@ -47,27 +48,28 @@ def test_profile_execution_with_invalid_config() -> None:
profiler.profile(pipeline_config=pipeline_config)


def test_profile_execution_config_override() -> None:
def test_profile_execution_config_override(tmp_path: Path) -> None:
"""Test successful profiling execution using actual pipeline configuration with config file override"""
with tempfile.TemporaryDirectory() as temp_dir:
# Copy the YAML file and Python script to the temp directory
prefix = Path(__file__).parent / ".." / ".."
config_file_src = prefix / Path("resources/assessments/pipeline_config_absolute.yml")
config_file_dest = Path(temp_dir) / config_file_src.name
script_src = prefix / Path("resources/assessments/db_extract.py")
script_dest = Path(temp_dir) / script_src.name
shutil.copy(script_src, script_dest)
config_dir = tmp_path / "config_dir"
config_dir.mkdir()
extract_folder = tmp_path / "profiler_absolute"
# Copy the YAML file and Python script to the temp directory
prefix = Path(__file__).parent / ".." / ".."
config_file_src = prefix / Path("resources/assessments/pipeline_config_absolute.yml")
config_file_dest = config_dir / config_file_src.name
script_src = prefix / Path("resources/assessments/db_extract.py")
script_dest = config_dir / script_src.name
shutil.copy(script_src, script_dest)

with open(config_file_src, 'r', encoding="utf-8") as file:
config_data = yaml.safe_load(file)
for step in config_data['steps']:
step['extract_source'] = str(script_dest)
with open(config_file_dest, 'w', encoding="utf-8") as file:
yaml.safe_dump(config_data, file)
with open(config_file_src, 'r', encoding="utf-8") as file:
config_data = yaml.safe_load(file)
config_data['extract_folder'] = str(extract_folder)
for step in config_data['steps']:
step['extract_source'] = str(script_dest)
with open(config_file_dest, 'w', encoding="utf-8") as file:
yaml.safe_dump(config_data, file)

profiler = Profiler("synapse")
pipeline_config = PipelineClass.load_config_from_yaml(config_file_dest)
profiler.profile(pipeline_config=pipeline_config)
assert Path(
"/tmp/profiler_absolute/profiler_extract.db"
).exists(), "Profiler extract database should be created"
profiler = Profiler("synapse")
pipeline_config = PipelineClass.load_config_from_yaml(config_file_dest)
profiler.profile(pipeline_config=pipeline_config)
assert (extract_folder / "profiler_extract.db").exists(), "Profiler extract database should be created"
38 changes: 20 additions & 18 deletions tests/integration/assessments/test_profiler_validator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from pathlib import Path
import tempfile
from collections.abc import Generator
import pytest
from pathlib import Path

import duckdb
import pytest

from databricks.labs.lakebridge.assessments.profiler_validator import (
get_profiler_extract_path,
Expand All @@ -13,7 +14,7 @@
SchemaDefinitionLoadError,
SchemaValidationError,
)
from tests.utils.profiler_extract_utils import build_mock_synapse_extract
from .profiler_extract_utils import build_mock_synapse_extract


@pytest.fixture(scope="module")
Expand All @@ -31,7 +32,8 @@ def failure_pipeline_config_path():


@pytest.fixture(scope="session")
def mock_synapse_profiler_extract() -> Generator[str, None, None]:
def mock_synapse_profiler_extract() -> Generator[Path]:
# We don't use tmp_path because this is quite expensive to set up.
# Use context manager for automatic cleanup
with tempfile.TemporaryDirectory(prefix="lakebridge_test_") as temp_dir:
extract_dir = Path(temp_dir) / "synapse_assessment"
Expand All @@ -41,17 +43,17 @@ def mock_synapse_profiler_extract() -> Generator[str, None, None]:

def test_get_profiler_extract_path(pipeline_config_path, failure_pipeline_config_path):
# Parse `extract_folder` **with** a trailing "/" character
expected_db_path = "/tmp/extracts/profiler_extract.db"
expected_db_path = "/replaced/after/loading/profiler_extract.db"
profiler_db_path = get_profiler_extract_path(pipeline_config_path)
assert profiler_db_path == expected_db_path

# Parse `extract_folder` **without** a trailing "/" character
expected_db_path = "tests/resources/assessments/profiler_extract.db"
expected_db_path = "/replaced/after/loading/profiler_extract.db"
profiler_db_path = get_profiler_extract_path(failure_pipeline_config_path)
assert profiler_db_path == expected_db_path


def test_validate_non_empty_tables(mock_synapse_profiler_extract):
def test_validate_non_empty_tables(mock_synapse_profiler_extract: Path) -> None:
with duckdb.connect(database=mock_synapse_profiler_extract) as duck_conn:
validation_checks = []
# Get a list of all tables in profiler extract and add an EmptyTableValidationCheck
Expand All @@ -69,7 +71,7 @@ def test_validate_non_empty_tables(mock_synapse_profiler_extract):
assert num_passing == 2


def test_validate_mixed_checks(mock_synapse_profiler_extract):
def test_validate_mixed_checks(mock_synapse_profiler_extract: Path) -> None:
table_1 = "mock_profiler_extract.main.dedicated_sql_pool_metrics"
table_2 = "mock_profiler_extract.main.workspace_sql_pools"
with duckdb.connect(database=mock_synapse_profiler_extract) as duck_conn:
Expand All @@ -87,7 +89,7 @@ def test_validate_mixed_checks(mock_synapse_profiler_extract):
assert num_passing == 4


def test_validate_invalid_schema_path(mock_synapse_profiler_extract):
def test_validate_invalid_schema_path(mock_synapse_profiler_extract: Path) -> None:
with duckdb.connect(database=mock_synapse_profiler_extract) as duck_conn:
validation_checks = []
# Build a schema check with an invalid schema def path
Expand All @@ -97,7 +99,7 @@ def test_validate_invalid_schema_path(mock_synapse_profiler_extract):
"main",
"dedicated_routines",
source_tech="synapse",
extract_path=mock_synapse_profiler_extract,
extract_path=str(mock_synapse_profiler_extract),
schema_path=schema_def_path,
)
validation_checks.append(schema_check)
Expand All @@ -110,7 +112,7 @@ def test_validate_invalid_schema_path(mock_synapse_profiler_extract):
assert "Schema definition file not found:" in str(exec_info.value)


def test_validate_invalid_source_tech(mock_synapse_profiler_extract):
def test_validate_invalid_source_tech(mock_synapse_profiler_extract: Path) -> None:
with duckdb.connect(database=mock_synapse_profiler_extract) as duck_conn:
validation_checks = []
prefix = Path(__file__).parent
Expand All @@ -120,7 +122,7 @@ def test_validate_invalid_source_tech(mock_synapse_profiler_extract):
"main",
"dedicated_routines",
source_tech="oracle",
extract_path=mock_synapse_profiler_extract,
extract_path=str(mock_synapse_profiler_extract),
schema_path=schema_def_path,
)
validation_checks.append(schema_check)
Expand All @@ -133,7 +135,7 @@ def test_validate_invalid_source_tech(mock_synapse_profiler_extract):
assert "Incorrect schema definition type for source tech" in str(exec_info.value)


def test_validate_table_not_found(mock_synapse_profiler_extract):
def test_validate_table_not_found(mock_synapse_profiler_extract: Path) -> None:
with duckdb.connect(database=mock_synapse_profiler_extract) as duck_conn:
validation_checks = []
prefix = Path(__file__).parent
Expand All @@ -143,7 +145,7 @@ def test_validate_table_not_found(mock_synapse_profiler_extract):
"main",
"table_does_not_exist",
source_tech="synapse",
extract_path=mock_synapse_profiler_extract,
extract_path=str(mock_synapse_profiler_extract),
schema_path=schema_def_path,
)
validation_checks.append(schema_check)
Expand All @@ -156,7 +158,7 @@ def test_validate_table_not_found(mock_synapse_profiler_extract):
assert "could not be found" in str(exec_info.value)


def test_validate_successful_schema_check(mock_synapse_profiler_extract):
def test_validate_successful_schema_check(mock_synapse_profiler_extract: Path) -> None:
with duckdb.connect(database=mock_synapse_profiler_extract) as duck_conn:
validation_checks = []
prefix = Path(__file__).parent
Expand All @@ -167,7 +169,7 @@ def test_validate_successful_schema_check(mock_synapse_profiler_extract):
"main",
"dedicated_sql_pool_metrics",
source_tech="synapse",
extract_path=mock_synapse_profiler_extract,
extract_path=str(mock_synapse_profiler_extract),
schema_path=schema_def_path,
)
validation_checks.append(schema_check)
Expand All @@ -181,7 +183,7 @@ def test_validate_successful_schema_check(mock_synapse_profiler_extract):
assert num_passing == 1


def test_validate_invalid_schema_check(mock_synapse_profiler_extract):
def test_validate_invalid_schema_check(mock_synapse_profiler_extract: Path) -> None:
with duckdb.connect(database=mock_synapse_profiler_extract) as duck_conn:
validation_checks = []
prefix = Path(__file__).parent
Expand All @@ -192,7 +194,7 @@ def test_validate_invalid_schema_check(mock_synapse_profiler_extract):
"main",
"dedicated_storage_info",
source_tech="synapse",
extract_path=mock_synapse_profiler_extract,
extract_path=str(mock_synapse_profiler_extract),
schema_path=schema_def_path,
)
validation_checks.append(schema_check)
Expand Down
3 changes: 2 additions & 1 deletion tests/resources/assessments/pipeline_config.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: ExamplePipeline
version: "1.0"
extract_folder: /tmp/extracts/
# Value replaced prior to actual use:
extract_folder: /replaced/after/loading/
steps:
- name: inventory
type: sql
Expand Down
3 changes: 2 additions & 1 deletion tests/resources/assessments/pipeline_config_absolute.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: ExamplePipeline
version: "1.0"
extract_folder: /tmp/profiler_absolute/
# Value replaced prior to actual use:
extract_folder: /replaced/after/loading/
steps:
- name: random_data
type: python
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: ExamplePipeline
version: "1.0"
extract_folder: /tmp/extracts/
# Value replaced prior to actual use:
extract_folder: /replaced/after/loading/
steps:
- name: package_status
type: python
Expand Down
4 changes: 3 additions & 1 deletion tests/resources/assessments/pipeline_config_main.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
name: ExamplePipeline
version: "1.0"
extract_folder: /tmp/profiler_main/
# Value replaced prior to actual use.
# Note: test_get_profiler_extract_path() requires a trailing '/'.
extract_folder: /replaced/after/loading/
steps:
- name: random_data
type: python
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
name: "Python Failure Pipeline"
version: "1.0"
extract_folder: "tests/resources/assessments"
# Value replaced prior to actual use.
# Note: test_get_profiler_extract_path() requires _no_ trailing '/'.
extract_folder: /replaced/after/loading
steps:
- name: invalid_python_step
type: python
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: "SQL Failure Pipeline"
version: "1.0"
extract_folder: /tmp/extracts/
# Value replaced prior to actual use.
extract_folder: /replaced/after/loading/
steps:
- name: invalid_sql_step
type: sql
Expand Down
Loading
Loading