diff --git a/labs.yml b/labs.yml index cf34650466..98ed4b24d5 100644 --- a/labs.yml +++ b/labs.yml @@ -121,3 +121,9 @@ commands: flags: - name: source-tech description: (Optional) The technology/platform of the sources to Profile + + - name: test-profiler-connection + description: (Internal) Test the connection to the source database for profiling + flags: + - name: source-tech + description: The technology/platform of the source to test connection diff --git a/src/databricks/labs/lakebridge/cli.py b/src/databricks/labs/lakebridge/cli.py index 3e424c6a62..2a0841d00b 100644 --- a/src/databricks/labs/lakebridge/cli.py +++ b/src/databricks/labs/lakebridge/cli.py @@ -27,7 +27,10 @@ from databricks.labs.lakebridge.config import TranspileConfig, LSPConfigOptionV1 from databricks.labs.lakebridge.contexts.application import ApplicationContext -from databricks.labs.lakebridge.connections.credential_manager import cred_file +from databricks.labs.lakebridge.connections.credential_manager import cred_file, create_credential_manager +from databricks.labs.lakebridge.connections.database_manager import DatabaseManager +from databricks.labs.lakebridge.connections.env_getter import EnvGetter +from databricks.labs.lakebridge.connections.synapse_helpers import validate_synapse_pools from databricks.labs.lakebridge.helpers.recon_config_utils import ReconConfigPrompts from databricks.labs.lakebridge.helpers.telemetry_utils import make_alphanum_or_semver from databricks.labs.lakebridge.reconcile.runner import ReconcileRunner @@ -1037,5 +1040,82 @@ def create_profiler_dashboard( ctx.dashboard_manager.create_profiler_summary_dashboard(source_tech, catalog_name, schema_name) +def _transform_profiler_credentials(source_tech: str, raw_config: dict) -> dict: + """Transform source-specific credential structures to flat connection config.""" + if source_tech == "synapse": + # Synapse has nested structure: extract workspace config and add database + workspace_config = raw_config.get("workspace", {}) + jdbc_config = raw_config.get("jdbc", {}) + + return { + **workspace_config, + "database": "master", # Use master database for connection test + "auth_type": jdbc_config.get("auth_type", "sql_authentication"), + } + return raw_config + + +@lakebridge.command() +def test_profiler_connection(w: WorkspaceClient, source_tech: str | None = None) -> None: + """[Internal] Test the connection to the source database for profiling""" + ctx = ApplicationContext(w) + ctx.add_user_agent_extra("cmd", "test-profiler-connection") + prompts = ctx.prompts + + source_tech = ( + source_tech.lower() + if source_tech + else prompts.choice("Select the source technology", PROFILER_SOURCE_SYSTEM).lower() + ) + + if source_tech not in PROFILER_SOURCE_SYSTEM: + logger.error(f"Only the following source systems are supported: {PROFILER_SOURCE_SYSTEM}") + raise_validation_exception(f"Invalid source technology {source_tech}") + + ctx.add_user_agent_extra("profiler_source_tech", make_alphanum_or_semver(source_tech)) + logger.debug(f"User: {ctx.current_user}") + + # Check if credential file exists + if not cred_file(PRODUCT_NAME).exists(): + raise_validation_exception( + f"Connection details not found. Please run `databricks labs lakebridge configure-database-profiler` " + f"to set up connection details for {source_tech}." + ) + + logger.info(f"Testing connection for source technology: {source_tech}") + + cred_manager = create_credential_manager(PRODUCT_NAME, EnvGetter()) + + try: + raw_config = cred_manager.get_credentials(source_tech) + except KeyError as e: + logger.error(f"Credential configuration error: {e}") + raise SystemExit( + f"Invalid credentials for {source_tech}. Please run `databricks labs lakebridge configure-database-profiler`. Exiting..." + ) from e + + # Validate connection for other source technologies + config = _transform_profiler_credentials(source_tech, raw_config) + db_manager = DatabaseManager(source_tech, config) + + try: + # Handle synapse-specific validation + if source_tech == "synapse": + validate_synapse_pools(raw_config) + logger.info("Connection to the source system successful") + return + response = db_manager.check_connection() + logger.debug(f"Connection response: {response}") + logger.info("Connection to the source system successful") + except ConnectionError as e: + logger.error(f"Failed to connect to the source system: {e}") + if "IM002" in str(e) or "ODBC driver not found" in str(e): + raise SystemExit("Missing ODBC driver, Please install pre-req. Exiting...") from e + raise SystemExit("Connection validation failed. Exiting...") from e + except Exception as e: + logger.error(f"Unexpected error during connection test: {e}") + raise SystemExit("Connection test failed. Exiting...") from e + + if __name__ == "__main__": lakebridge() diff --git a/src/databricks/labs/lakebridge/connections/database_manager.py b/src/databricks/labs/lakebridge/connections/database_manager.py index eeecaded1d..f7cd4905c8 100644 --- a/src/databricks/labs/lakebridge/connections/database_manager.py +++ b/src/databricks/labs/lakebridge/connections/database_manager.py @@ -60,6 +60,7 @@ def _create_connector(db_type: str, config: dict[str, Any]) -> DatabaseConnector "snowflake": SnowflakeConnector, "mssql": MSSQLConnector, "tsql": MSSQLConnector, + "synapse": SynapseConnector, } connector_class = connectors.get(db_type.lower()) @@ -105,6 +106,38 @@ def _connect(self) -> Engine: return create_engine(connection_string) +class SynapseConnector(MSSQLConnector): + """ + Azure Synapse SQL Pool connector. + + This is an adapter that translates Synapse-specific configuration + to MSSQL connection format, then delegates to MSSQLConnector. + + Synapse SQL pools use the same protocol as SQL Server, so this + connector inherits from MSSQLConnector and only transforms the config. + """ + + def __init__(self, config: dict[str, Any]): + # Synapse config may have endpoint_key for dedicated/serverless pools + # Transform to MSSQL-compatible config + endpoint_key = config.get('endpoint_key', 'dedicated_sql_endpoint') + + # Build MSSQL-compatible configuration + mssql_config = { + 'driver': config['driver'], + 'server': config.get('server') or config.get(endpoint_key, config.get('dedicated_sql_endpoint')), + 'database': config['database'], + 'user': config.get('user') or config.get('sql_user'), + 'password': config.get('password') or config.get('sql_password'), + 'port': config.get('port', 1433), + 'auth_type': config.get('auth_type', 'sql_authentication'), + } + + # Initialize parent with transformed config + # This will call MSSQLConnector.__init__ which calls _connect() + super().__init__(mssql_config) + + class DatabaseManager: def __init__(self, db_type: str, config: dict[str, Any]): self.connector = _create_connector(db_type, config) @@ -112,9 +145,10 @@ def __init__(self, db_type: str, config: dict[str, Any]): def fetch(self, query: str) -> FetchResult: try: return self.connector.fetch(query) - except OperationalError: - logger.error("Error connecting to the database check credentials") - raise ConnectionError("Error connecting to the database check credentials") from None + except OperationalError as e: + error_msg = f"Error connecting to the database: {e}" + logger.error(error_msg) + raise ConnectionError(error_msg) from e def check_connection(self) -> bool: query = "SELECT 101 AS test_column" diff --git a/src/databricks/labs/lakebridge/connections/synapse_helpers.py b/src/databricks/labs/lakebridge/connections/synapse_helpers.py new file mode 100644 index 0000000000..db08325d62 --- /dev/null +++ b/src/databricks/labs/lakebridge/connections/synapse_helpers.py @@ -0,0 +1,97 @@ +import logging +from databricks.labs.lakebridge.connections.database_manager import DatabaseManager + +logger = logging.getLogger(__name__) + + +def _test_pool_connection(pool_name: str, base_config: dict, endpoint_key: str) -> tuple[bool, str | None]: + """Test connection to a single Synapse SQL pool with proper resource cleanup. + + Returns: + Tuple of (success, error_message). error_message is None if successful. + """ + logger.info(f"Testing connection to {pool_name} SQL pool...") + pool_config = {**base_config, "endpoint_key": endpoint_key} + db_manager = None + + try: + db_manager = DatabaseManager("synapse", pool_config) + if db_manager.check_connection(): + logger.info(f"✓ {pool_name.capitalize()} SQL pool connection successful") + return True, None + logger.error(f"✗ {pool_name.capitalize()} SQL pool connection failed") + return False, f"{pool_name.capitalize()} SQL pool connection check failed" + except Exception as e: # pylint: disable=broad-exception-caught + # Catch all exceptions to gracefully handle any connection failure (network, auth, config, etc.) + error_msg = f"Failed to connect to {pool_name} SQL pool: {e}" + logger.error(f"✗ {error_msg}") + return False, error_msg + finally: + # Clean up database engine resources + if db_manager and hasattr(db_manager, 'connector') and hasattr(db_manager.connector, 'engine'): + db_manager.connector.engine.dispose() + logger.debug(f"Disposed engine for {pool_name} SQL pool") + + +def validate_synapse_pools(raw_config: dict) -> None: + """ + Validate connections to enabled Synapse SQL pools based on profiler configuration. + Each connection is properly cleaned up after testing to prevent resource leaks. + + Example: + >>> config = { + ... 'workspace': { + ... 'dedicated_sql_endpoint': 'workspace.sql.azuresynapse.net', + ... 'serverless_sql_endpoint': 'workspace-ondemand.sql.azuresynapse.net', + ... 'sql_user': 'admin', + ... 'sql_password': 'pass', + ... 'driver': 'ODBC Driver 18 for SQL Server', + ... }, + ... 'jdbc': {'auth_type': 'sql_authentication'}, + ... 'profiler': {'exclude_serverless_sql_pool': False}, + ... } + >>> validate_synapse_pools(config) # Tests both pools + """ + workspace_config = raw_config.get("workspace", {}) + jdbc_config = raw_config.get("jdbc", {}) + profiler_config = raw_config.get("profiler", {}) + + auth_type = jdbc_config.get("auth_type", "sql_authentication") + + # Build base config shared by all pools + base_config = { + **workspace_config, + "database": "master", + "auth_type": auth_type, + } + + # Determine which pools to test + test_dedicated = not profiler_config.get("exclude_dedicated_sql_pools", False) + test_serverless = not profiler_config.get("exclude_serverless_sql_pool", False) + + if not test_dedicated and not test_serverless: + logger.warning("Both dedicated and serverless SQL pools are excluded in profiler configuration") + raise ValueError("No SQL pools enabled for testing") + + # Track results and error messages + results = {} + error_messages = {} + + # Test enabled pools sequentially + if test_dedicated: + success, error_msg = _test_pool_connection("dedicated", base_config, "dedicated_sql_endpoint") + results["dedicated"] = success + if error_msg: + error_messages["dedicated"] = error_msg + + if test_serverless: + success, error_msg = _test_pool_connection("serverless", base_config, "serverless_sql_endpoint") + results["serverless"] = success + if error_msg: + error_messages["serverless"] = error_msg + + # Check if any pools failed + if not all(results.values()): + failed_pools = [pool for pool, success in results.items() if not success] + error_details = "; ".join([f"{pool}: {error_messages.get(pool, 'Unknown error')}" for pool in failed_pools]) + raise ConnectionError(f"Connection failed for SQL pools - {error_details}") diff --git a/src/databricks/labs/lakebridge/resources/assessments/synapse/common/connector.py b/src/databricks/labs/lakebridge/resources/assessments/synapse/common/connector.py index 66a7da342c..b695057a22 100644 --- a/src/databricks/labs/lakebridge/resources/assessments/synapse/common/connector.py +++ b/src/databricks/labs/lakebridge/resources/assessments/synapse/common/connector.py @@ -8,16 +8,22 @@ def get_sqlpool_reader( endpoint_key: str = 'dedicated_sql_endpoint', auth_type: str = 'sql_authentication', ) -> DatabaseManager: + """ + Create Synapse SQL pool reader. + """ config = { + "endpoint_key": endpoint_key, "driver": input_cred['driver'], "server": input_cred[endpoint_key], "database": db_name, + "sql_user": input_cred['sql_user'], + "sql_password": input_cred['sql_password'], "user": input_cred['sql_user'], "password": input_cred['sql_password'], "port": input_cred.get('port', 1433), "auth_type": auth_type, } - # synapse and mssql use the same connector - source = "mssql" + # Use synapse connector which inherits from mssql + source = "synapse" return DatabaseManager(source, config) diff --git a/tests/integration/connections/test_synapse_connector.py b/tests/integration/connections/test_synapse_connector.py new file mode 100644 index 0000000000..6f3af7a20b --- /dev/null +++ b/tests/integration/connections/test_synapse_connector.py @@ -0,0 +1,71 @@ +import pytest +from databricks.labs.lakebridge.connections.database_manager import DatabaseManager, SynapseConnector +from databricks.labs.lakebridge.resources.assessments.synapse.common.connector import get_sqlpool_reader + + +@pytest.fixture() +def sandbox_synapse_config(sandbox_sqlserver_config) -> dict: + """Convert SQL Server config to Synapse config format.""" + # Transform MSSQL config to Synapse format + # In testing, we use SQL Server as a stand-in for Synapse since they use the same protocol + return { + "dedicated_sql_endpoint": sandbox_sqlserver_config["server"], + "sql_user": sandbox_sqlserver_config["user"], + "sql_password": sandbox_sqlserver_config["password"], + "driver": sandbox_sqlserver_config["driver"], + "database": sandbox_sqlserver_config["database"], + } + + +@pytest.fixture() +def sandbox_synapse(sandbox_synapse_config) -> DatabaseManager: + """Create a DatabaseManager using SynapseConnector.""" + return DatabaseManager("synapse", sandbox_synapse_config) + + +def test_synapse_connector_connection(sandbox_synapse): + """Test that SynapseConnector can be instantiated.""" + assert isinstance(sandbox_synapse.connector, SynapseConnector) + + +def test_synapse_connector_execute_query(sandbox_synapse): + """Test executing a query through SynapseConnector.""" + query = "SELECT 101 AS test_column" + result = sandbox_synapse.fetch(query).rows + assert result[0][0] == 101 + + +def test_synapse_connection_check(sandbox_synapse): + """Test connection check through SynapseConnector.""" + assert sandbox_synapse.check_connection() + + +def test_get_sqlpool_reader_dedicated(sandbox_synapse_config): + """Test get_sqlpool_reader with dedicated endpoint.""" + db_name = sandbox_synapse_config["database"] + + manager = get_sqlpool_reader( + sandbox_synapse_config, + db_name, + endpoint_key='dedicated_sql_endpoint', + auth_type='sql_authentication', + ) + + assert isinstance(manager, DatabaseManager) + assert isinstance(manager.connector, SynapseConnector) + assert manager.check_connection() + + +def test_get_sqlpool_reader_query(sandbox_synapse_config): + """Test get_sqlpool_reader can execute queries.""" + db_name = sandbox_synapse_config["database"] + + manager = get_sqlpool_reader( + sandbox_synapse_config, + db_name, + endpoint_key='dedicated_sql_endpoint', + ) + + query = "SELECT 202 AS test_column" + result = manager.fetch(query).rows + assert result[0][0] == 202