Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,9 @@ commands:
flags:
- name: source-tech
description: (Optional) The technology/platform of the sources to Profile

- name: test-profiler-connection
description: (Internal) Test the connection to the source database for profiling
flags:
- name: source-tech
description: The technology/platform of the source to test connection
79 changes: 78 additions & 1 deletion src/databricks/labs/lakebridge/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@

from databricks.labs.lakebridge.config import TranspileConfig, LSPConfigOptionV1
from databricks.labs.lakebridge.contexts.application import ApplicationContext
from databricks.labs.lakebridge.connections.credential_manager import cred_file
from databricks.labs.lakebridge.connections.credential_manager import cred_file, create_credential_manager
from databricks.labs.lakebridge.connections.database_manager import DatabaseManager
from databricks.labs.lakebridge.connections.env_getter import EnvGetter
from databricks.labs.lakebridge.connections.synapse_helpers import validate_synapse_pools
from databricks.labs.lakebridge.helpers.recon_config_utils import ReconConfigPrompts
from databricks.labs.lakebridge.helpers.telemetry_utils import make_alphanum_or_semver
from databricks.labs.lakebridge.install import installer
Expand Down Expand Up @@ -1008,6 +1011,80 @@ def create_profiler_dashboard(
ctx.dashboard_manager.create_profiler_summary_dashboard(source_tech, catalog_name, schema_name)


def _transform_profiler_credentials(source_tech: str, raw_config: dict) -> dict:
"""Transform source-specific credential structures to flat connection config."""
if source_tech == "synapse":
# Synapse has nested structure: extract workspace config and add database
workspace_config = raw_config.get("workspace", {})
jdbc_config = raw_config.get("jdbc", {})

return {
**workspace_config,
"database": "master", # Use master database for connection test
"auth_type": jdbc_config.get("auth_type", "sql_authentication"),
}
return raw_config


@lakebridge.command()
def test_profiler_connection(w: WorkspaceClient, source_tech: str | None = None) -> None:
"""[Internal] Test the connection to the source database for profiling"""
ctx = ApplicationContext(w)
ctx.add_user_agent_extra("cmd", "test-profiler-connection")
prompts = ctx.prompts
if source_tech is None:
source_tech = prompts.choice("Select the source technology", PROFILER_SOURCE_SYSTEM)
source_tech = source_tech.lower()

if source_tech not in PROFILER_SOURCE_SYSTEM:
logger.error(f"Only the following source systems are supported: {PROFILER_SOURCE_SYSTEM}")
raise_validation_exception(f"Invalid source technology {source_tech}")

ctx.add_user_agent_extra("profiler_source_tech", make_alphanum_or_semver(source_tech))
user = ctx.current_user
logger.debug(f"User: {user}")

# Check if credential file exists
file = cred_file(PRODUCT_NAME)
if not file.exists():
raise_validation_exception(
f"Connection details not found. Please run `databricks labs lakebridge configure-database-profiler` "
f"to set up connection details for {source_tech}."
)

logger.info(f"Testing connection for source technology: {source_tech}")

try:
# Create credential manager and get credentials
cred_manager = create_credential_manager(PRODUCT_NAME, EnvGetter())
raw_config = cred_manager.get_credentials(source_tech)

# Handle source-specific connection testing
if source_tech == "synapse":
validate_synapse_pools(raw_config)
else:
# Other sources use flat config structure
config = _transform_profiler_credentials(source_tech, raw_config)
db_manager = DatabaseManager(source_tech, config)
if db_manager.check_connection():
logger.info("Connection to the source system successful")
else:
logger.error("Connection to the source system failed, check logs in debug mode")
raise_validation_exception("Connection validation failed")

except ConnectionError as e:
logger.error(f"Failed to connect to the source system: {e}")
raise SystemExit("Connection validation failed. Exiting...") from e
except KeyError as e:
logger.error(f"Credential configuration error: {e}")
raise SystemExit(
f"Invalid credentials for {source_tech}. Please run `databricks labs lakebridge configure-database-profiler`. Exiting..."
) from e
except Exception as e:
logger.error(f"Unexpected error during connection test: {e}")
raise SystemExit("Connection test failed. Exiting...") from e


if __name__ == "__main__":
app = lakebridge
logger = app.get_logger()
Expand Down
33 changes: 33 additions & 0 deletions src/databricks/labs/lakebridge/connections/database_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def _create_connector(db_type: str, config: dict[str, Any]) -> DatabaseConnector
"snowflake": SnowflakeConnector,
"mssql": MSSQLConnector,
"tsql": MSSQLConnector,
"synapse": SynapseConnector,
}

connector_class = connectors.get(db_type.lower())
Expand Down Expand Up @@ -98,6 +99,38 @@ def _connect(self) -> Engine:
return create_engine(connection_string)


class SynapseConnector(MSSQLConnector):
"""
Azure Synapse SQL Pool connector.

This is an adapter that translates Synapse-specific configuration
to MSSQL connection format, then delegates to MSSQLConnector.

Synapse SQL pools use the same protocol as SQL Server, so this
connector inherits from MSSQLConnector and only transforms the config.
"""

def __init__(self, config: dict[str, Any]):
# Synapse config may have endpoint_key for dedicated/serverless pools
# Transform to MSSQL-compatible config
endpoint_key = config.get('endpoint_key', 'dedicated_sql_endpoint')

# Build MSSQL-compatible configuration
mssql_config = {
'driver': config['driver'],
'server': config.get('server') or config.get(endpoint_key, config.get('dedicated_sql_endpoint')),
'database': config['database'],
'user': config.get('user') or config.get('sql_user'),
'password': config.get('password') or config.get('sql_password'),
'port': config.get('port', 1433),
'auth_type': config.get('auth_type', 'sql_authentication'),
}

# Initialize parent with transformed config
# This will call MSSQLConnector.__init__ which calls _connect()
super().__init__(mssql_config)


class DatabaseManager:
def __init__(self, db_type: str, config: dict[str, Any]):
self.connector = _create_connector(db_type, config)
Expand Down
85 changes: 85 additions & 0 deletions src/databricks/labs/lakebridge/connections/synapse_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import logging
from databricks.labs.lakebridge.connections.database_manager import DatabaseManager

logger = logging.getLogger(__name__)


def _test_pool_connection(pool_name: str, base_config: dict, endpoint_key: str) -> bool:
"""Test connection to a single Synapse SQL pool with proper resource cleanup."""
logger.info(f"Testing connection to {pool_name} SQL pool...")
pool_config = {**base_config, "endpoint_key": endpoint_key}
db_manager = None

try:
db_manager = DatabaseManager("synapse", pool_config)
if db_manager.check_connection():
logger.info(f"✓ {pool_name.capitalize()} SQL pool connection successful")
return True
logger.error(f"✗ {pool_name.capitalize()} SQL pool connection failed")
return False
except Exception as e: # pylint: disable=broad-exception-caught
# Catch all exceptions to gracefully handle any connection failure (network, auth, config, etc.)
logger.error(f"✗ Failed to connect to {pool_name} SQL pool: {e}")
return False
finally:
# Clean up database engine resources
if db_manager and hasattr(db_manager, 'connector') and hasattr(db_manager.connector, 'engine'):
db_manager.connector.engine.dispose()
logger.debug(f"Disposed engine for {pool_name} SQL pool")


def validate_synapse_pools(raw_config: dict) -> None:
"""
Validate connections to enabled Synapse SQL pools based on profiler configuration.

Each connection is properly cleaned up after testing to prevent resource leaks.

Example:
>>> config = {
... 'workspace': {
... 'dedicated_sql_endpoint': 'workspace.sql.azuresynapse.net',
... 'serverless_sql_endpoint': 'workspace-ondemand.sql.azuresynapse.net',
... 'sql_user': 'admin',
... 'sql_password': 'pass',
... 'driver': 'ODBC Driver 18 for SQL Server',
... },
... 'jdbc': {'auth_type': 'sql_authentication'},
... 'profiler': {'exclude_serverless_sql_pool': False},
... }
>>> validate_synapse_pools(config) # Tests both pools
"""
workspace_config = raw_config.get("workspace", {})
jdbc_config = raw_config.get("jdbc", {})
profiler_config = raw_config.get("profiler", {})

auth_type = jdbc_config.get("auth_type", "sql_authentication")

# Build base config shared by all pools
base_config = {
**workspace_config,
"database": "master",
"auth_type": auth_type,
}

# Determine which pools to test
test_dedicated = not profiler_config.get("exclude_dedicated_sql_pools", False)
test_serverless = not profiler_config.get("exclude_serverless_sql_pool", False)

if not test_dedicated and not test_serverless:
logger.warning("Both dedicated and serverless SQL pools are excluded in profiler configuration")
raise ValueError("No SQL pools enabled for testing")

# Track results
results = {}

# Test enabled pools sequentially
if test_dedicated:
results["dedicated"] = _test_pool_connection("dedicated", base_config, "dedicated_sql_endpoint")

if test_serverless:
results["serverless"] = _test_pool_connection("serverless", base_config, "serverless_sql_endpoint")

# Check if any pools failed
if not all(results.values()):
failed_pools = [pool for pool, success in results.items() if not success]
raise ValueError(f"Connection failed for SQL pools: {', '.join(failed_pools)}")
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,36 @@ def get_sqlpool_reader(
endpoint_key: str = 'dedicated_sql_endpoint',
auth_type: str = 'sql_authentication',
) -> DatabaseManager:
"""
Create Synapse SQL pool reader.

Args:
input_cred: Synapse workspace credentials with keys:
- dedicated_sql_endpoint or serverless_sql_endpoint: SQL endpoint
- sql_user: SQL username
- sql_password: SQL password
- driver: ODBC driver name (e.g., "ODBC Driver 18 for SQL Server")
- port (optional): Port number, defaults to 1433
db_name: SQL pool database name
endpoint_key: 'dedicated_sql_endpoint' or 'serverless_sql_endpoint'
auth_type: Authentication type (sql_authentication, ad_passwd_authentication, etc.)

Returns:
DatabaseManager instance configured for Synapse SQL pool
"""
config = {
"endpoint_key": endpoint_key,
"driver": input_cred['driver'],
"server": input_cred[endpoint_key],
"database": db_name,
"sql_user": input_cred['sql_user'],
"sql_password": input_cred['sql_password'],
"user": input_cred['sql_user'],
"password": input_cred['sql_password'],
"port": input_cred.get('port', 1433),
"auth_type": auth_type,
}
# synapse and mssql use the same connector
source = "mssql"
# Use synapse connector which inherits from mssql
source = "synapse"

return DatabaseManager(source, config)
71 changes: 71 additions & 0 deletions tests/integration/connections/test_synapse_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import pytest
from databricks.labs.lakebridge.connections.database_manager import DatabaseManager, SynapseConnector
from databricks.labs.lakebridge.resources.assessments.synapse.common.connector import get_sqlpool_reader


@pytest.fixture()
def sandbox_synapse_config(sandbox_sqlserver_config) -> dict:
"""Convert SQL Server config to Synapse config format."""
# Transform MSSQL config to Synapse format
# In testing, we use SQL Server as a stand-in for Synapse since they use the same protocol
return {
"dedicated_sql_endpoint": sandbox_sqlserver_config["server"],
"sql_user": sandbox_sqlserver_config["user"],
"sql_password": sandbox_sqlserver_config["password"],
"driver": sandbox_sqlserver_config["driver"],
"database": sandbox_sqlserver_config["database"],
}


@pytest.fixture()
def sandbox_synapse(sandbox_synapse_config) -> DatabaseManager:
"""Create a DatabaseManager using SynapseConnector."""
return DatabaseManager("synapse", sandbox_synapse_config)


def test_synapse_connector_connection(sandbox_synapse):
"""Test that SynapseConnector can be instantiated."""
assert isinstance(sandbox_synapse.connector, SynapseConnector)


def test_synapse_connector_execute_query(sandbox_synapse):
"""Test executing a query through SynapseConnector."""
query = "SELECT 101 AS test_column"
result = sandbox_synapse.fetch(query).rows
assert result[0][0] == 101


def test_synapse_connection_check(sandbox_synapse):
"""Test connection check through SynapseConnector."""
assert sandbox_synapse.check_connection()


def test_get_sqlpool_reader_dedicated(sandbox_synapse_config):
"""Test get_sqlpool_reader with dedicated endpoint."""
db_name = sandbox_synapse_config["database"]

manager = get_sqlpool_reader(
sandbox_synapse_config,
db_name,
endpoint_key='dedicated_sql_endpoint',
auth_type='sql_authentication',
)

assert isinstance(manager, DatabaseManager)
assert isinstance(manager.connector, SynapseConnector)
assert manager.check_connection()


def test_get_sqlpool_reader_query(sandbox_synapse_config):
"""Test get_sqlpool_reader can execute queries."""
db_name = sandbox_synapse_config["database"]

manager = get_sqlpool_reader(
sandbox_synapse_config,
db_name,
endpoint_key='dedicated_sql_endpoint',
)

query = "SELECT 202 AS test_column"
result = manager.fetch(query).rows
assert result[0][0] == 202
Loading