diff --git a/src/databricks/labs/remorph/reconcile/constants.py b/src/databricks/labs/remorph/reconcile/constants.py index 91d30d035d..78635ca359 100644 --- a/src/databricks/labs/remorph/reconcile/constants.py +++ b/src/databricks/labs/remorph/reconcile/constants.py @@ -1,5 +1,6 @@ from enum import Enum, auto +COLUMN_PLACEHOLDER = "{}" class AutoName(Enum): """ diff --git a/src/databricks/labs/remorph/reconcile/metadata/__init__.py b/src/databricks/labs/remorph/reconcile/metadata/__init__.py new file mode 100644 index 0000000000..d83f7c65e5 --- /dev/null +++ b/src/databricks/labs/remorph/reconcile/metadata/__init__.py @@ -0,0 +1,36 @@ +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class TableFQN: + catalog: str | None + schema: str + name: str + + @property + def fqn(self) -> str: + if self.catalog: + return f"{self.catalog}.{self.schema}.{self.name}" + else: + return f"{self.schema}.{self.name}" + + +@dataclass +class FieldInfo: + name: str + data_type: str + nullable: bool | None = None + metadata: dict[str, Any] | None = None + + +@dataclass +class TableDefinition: + fqn: TableFQN + location: str | None = None + table_format: str | None = None + view_text: str | None = None + columns: list[FieldInfo] = field(default_factory=list) + primary_keys: list[str] | None = None + size_gb: int = 0 + comment: str | None = None diff --git a/src/databricks/labs/remorph/reconcile/normalization/__init__.py b/src/databricks/labs/remorph/reconcile/normalization/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/databricks/labs/remorph/reconcile/normalization/databricks.py b/src/databricks/labs/remorph/reconcile/normalization/databricks.py new file mode 100644 index 0000000000..10476f8958 --- /dev/null +++ b/src/databricks/labs/remorph/reconcile/normalization/databricks.py @@ -0,0 +1,24 @@ +from sqlglot import parse_one +from sqlglot.dialects import Dialect +from sqlglot.expressions import Cast, Column, Expression, DataType + +from databricks.labs.remorph.recon.normalization.normalizer import Normalizer + + +class DatabricksNormalizer(Normalizer): + def __init__(self, dialect: Dialect, default_normalizations: dict[str, str]): + super().__init__(dialect, default_normalizations) + + def normalize(self, data_type: DataType, column: Column) -> Expression: + if data_type.this == DataType.Type.BOOLEAN: + return self.to_string(Cast(this=column, to=DataType.Type.INT)) + if data_type.this == DataType.Type.DATE: + return parse_one(f"date_format({column.sql()}, 'yyyy-MM-dd')", dialect=self.dialect) + # if TIMESTAMP type + # convert to a string with 'yyyy-MM-dd HH:mm:ss.SSSSSS' format + # if fractional number type + # value = parse_one(f"cast({column.sql()} as decimal(38, {scale}))", dialect=self.dialect) + # if scale > 0: + # value = parse_one(f"format_number({value.sql()}, {scale})", dialect=self.dialect) + # return parse_one(f"replace({self.to_string(value)}, ',', '')", dialect=self.dialect) + return self.apply_default_normalizations(data_type, column) diff --git a/src/databricks/labs/remorph/reconcile/normalization/normalizer.py b/src/databricks/labs/remorph/reconcile/normalization/normalizer.py new file mode 100644 index 0000000000..082a36fb45 --- /dev/null +++ b/src/databricks/labs/remorph/reconcile/normalization/normalizer.py @@ -0,0 +1,46 @@ +from abc import abstractmethod, ABC + +from sqlglot import parse_one +from sqlglot.dialects import Dialect +from sqlglot.expressions import Cast, Column, Expression, DataType + +from databricks.labs.remorph.reconcile.constants import COLUMN_PLACEHOLDER + + +class Normalizer(ABC): + def __init__( + self, + dialect: Dialect, + default_normalizations: dict[str, str], + ): + self._dialect = dialect + self._default_normalizations = self._prep_default_normalizations(default_normalizations) + + @abstractmethod + def normalize(self, data_type: DataType, column: Column) -> Expression: + pass + + @property + def dialect(self) -> Dialect: + return self._dialect + + def apply_default_normalizations(self, data_type: DataType, column: Column) -> Expression: + transformation = self._default_normalizations.get(data_type) + if not transformation: + return column + if COLUMN_PLACEHOLDER in transformation: + return parse_one(transformation.format(column.sql()), dialect=self.dialect) + return parse_one(transformation, dialect=self.dialect) + + def to_string(self, expr: Expression): + return Cast(this=expr, to=DataType.Type.TEXT) + + def _prep_default_normalizations(self, default_normalizations: dict[str, str]) -> dict[DataType, str]: + if not default_normalizations: + return {} + + normalizations = {} + for dt, expr in default_normalizations.items(): + data_type = DataType.build(dt, dialect=self.dialect, udt=True) + normalizations[data_type] = expr + return normalizations diff --git a/src/databricks/labs/remorph/reconcile/normalization/oracle.py b/src/databricks/labs/remorph/reconcile/normalization/oracle.py new file mode 100644 index 0000000000..5c0b06149f --- /dev/null +++ b/src/databricks/labs/remorph/reconcile/normalization/oracle.py @@ -0,0 +1,25 @@ +from sqlglot.dialects import Dialect +from sqlglot.expressions import Cast, Column, Expression, DataType, Trim + +from databricks.labs.remorph.recon.normalization.normalizer import Normalizer + + +class OracleNormalizer(Normalizer): + def __init__(self, dialect: Dialect, default_normalizations: dict[str, str]): + super().__init__(dialect, default_normalizations) + + def normalize(self, data_type: DataType, column: Column) -> Expression: + if data_type.this == DataType.Type.BOOLEAN: + return self.to_string(column) + if data_type.this == DataType.Type.UUID: + return Cast(this=Trim(this=column), to=DataType.build("VARCHAR(36)", dialect=self.dialect)) + if data_type.this == DataType.Type.DATE: + return self.to_string(column) + # if TIMESTAMP type + # convert to a string with 'YYYY-MM-DD HH24:MI:SS.FF6' format + # if fractional number type + # convert to a string with FM999.9990 based format based on precision and scale + return self.apply_default_normalizations(data_type, column) + + def to_string(self, expr: Expression): + return Cast(this=expr, to=DataType.build("VARCHAR(1024)", dialect=self.dialect)) diff --git a/src/databricks/labs/remorph/reconcile/normalization/snow.py b/src/databricks/labs/remorph/reconcile/normalization/snow.py new file mode 100644 index 0000000000..607e8b4097 --- /dev/null +++ b/src/databricks/labs/remorph/reconcile/normalization/snow.py @@ -0,0 +1,22 @@ +from sqlglot.dialects import Dialect +from sqlglot.expressions import Cast, Column, Expression, DataType + +from databricks.labs.remorph.recon.normalization.normalizer import Normalizer + + +class SnowNormalizer(Normalizer): + def __init__(self, dialect: Dialect, default_normalizations: dict[str, str]): + super().__init__(dialect, default_normalizations) + + def normalize(self, data_type: DataType, column: Column) -> Expression: + if data_type.this == DataType.Type.BOOLEAN: + return self.to_string(Cast(this=column, to=DataType.Type.INT)) + if data_type.this == DataType.Type.UUID: + return self.to_string(column) + if data_type.this == DataType.Type.DATE: + return self.to_string(column) + # if TIMESTAMP type + # convert to a string with 'YYYY-MM-DD HH24:MI:SS.FF6' format + # if fractional number type + # convert to a string with CAST(column AS DECIMAL(38, scale)) + return self.apply_default_normalizations(data_type, column)