Skip to content

Commit 91426f5

Browse files
dtsongclaude
andauthored
fix: harden schema properties and add CODE_MAP.md (#25)
* fix: harden schema properties and add CODE_MAP.md Resolves #22 and #23: harden schema property error handling in Cte, Join, and Select classes. Replace type(s)(...) with s.new(...), use safezip for length validation, add CTE params type validator, and wrap inner schema resolution with error context. Add CODE_MAP.md for AI agent and contributor codebase navigation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: address PR review findings - Add null guard to Join.schema to prevent AttributeError on None schema - Change Join's ValueError to QueryBuilderError for consistent error hierarchy - Use QB_TypeError in CTE params validator instead of plain TypeError - Widen Cte.schema catch to include ValueError from inner schema resolution - Include CTE name in wrapped error message for better diagnostics - Narrow try block around safezip to avoid masking errors from s.new() - Fix CODE_MAP.md inaccuracies: frozen claim, Schema type alias, errors.py Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: include original error in CTE schema wrapping message Add the original exception text to the wrapped QueryBuilderError message so callers inspecting str(e) see the root cause without needing to walk __cause__. Also document why ValueError is in the catch tuple. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7c9e4ad commit 91426f5

File tree

2 files changed

+192
-10
lines changed

2 files changed

+192
-10
lines changed

CODE_MAP.md

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
# CODE_MAP.md — data-diff Codebase Navigation
2+
3+
> Quick-reference map for AI agents and new contributors. ~150 lines, dense by design.
4+
5+
## Package Layout
6+
7+
```
8+
data_diff/
9+
├── __init__.py # Public API: connect_to_table(), diff_tables(), Algorithm
10+
├── __main__.py # CLI entry point (Click): `data-diff` command
11+
├── version.py # __version__ = "1.0.0"
12+
├── config.py # TOML config parsing (--conf flag)
13+
├── errors.py # Custom exceptions (all dbt-integration and config-related)
14+
├── schema.py # create_schema() factory, RawColumnInfo, Schema type alias
15+
├── utils.py # CaseAwareMapping, CaseInsensitiveDict, ArithString, safezip
16+
├── _compat.py # Compatibility shims (tomllib)
17+
18+
├── queries/ # SQL query builder (AST-based)
19+
│ ├── api.py # User-facing: table(), cte(), join(), leftjoin(), or_()
20+
│ ├── ast_classes.py # AST nodes: Select, Join, Cte, Column, BinOp, DDL stmts
21+
│ ├── base.py # SqeletonError, SKIP sentinel, args_as_tuple()
22+
│ └── extras.py # Checksum, NormalizeAsString (diff-specific query helpers)
23+
24+
├── abcs/ # Abstract base classes
25+
│ ├── database_types.py # DbPath, ColType hierarchy, Collation
26+
│ └── compiler.py # AbstractCompiler, Compilable protocol
27+
28+
├── databases/ # Database drivers (one file per backend)
29+
│ ├── base.py # Database ABC, BaseDialect, connection pooling
30+
│ ├── _connect.py # connect(dsn) → Database instance
31+
│ ├── postgresql.py # PostgreSQL (psycopg2)
32+
│ ├── mysql.py # MySQL (mysql-connector-python)
33+
│ ├── snowflake.py # Snowflake (snowflake-connector-python)
34+
│ ├── bigquery.py # BigQuery (google-cloud-bigquery)
35+
│ ├── redshift.py # Redshift (extends PostgreSQL)
36+
│ ├── databricks.py # Databricks (databricks-sql-connector)
37+
│ ├── duckdb.py # DuckDB (duckdb)
38+
│ ├── clickhouse.py # ClickHouse (clickhouse-driver)
39+
│ ├── mssql.py # SQL Server (pyodbc)
40+
│ ├── oracle.py # Oracle (oracledb)
41+
│ ├── trino.py # Trino (trino)
42+
│ ├── presto.py # Presto (presto-python-client)
43+
│ └── vertica.py # Vertica (vertica-python)
44+
45+
├── diff_tables.py # Algorithm enum, TableDiffer ABC, DiffResultWrapper
46+
├── hashdiff_tables.py # HashDiffer: cross-DB bisection diff (checksum + download)
47+
├── joindiff_tables.py # JoinDiffer: same-DB outer-join diff (single query)
48+
├── table_segment.py # TableSegment: key ranges, split_key_space(), checksums
49+
├── info_tree.py # InfoTree: hierarchical diff metadata tracking
50+
51+
├── thread_utils.py # PriorityThreadPoolExecutor, ThreadedYielder
52+
├── query_utils.py # drop_table(), append_to_table() helpers
53+
├── format.py # Output formatting (JSONL, human-readable)
54+
├── parse_time.py # Relative time parsing ("5min", "1day")
55+
├── lexicographic_space.py # String key range splitting
56+
57+
├── dbt.py # dbt integration: dbt_diff()
58+
├── dbt_parser.py # DbtParser: manifest/profile parsing
59+
└── dbt_config_validators.py # dbt config validation
60+
```
61+
62+
## Entry Points
63+
64+
| Entry Point | Location | Description |
65+
|-------------|----------|-------------|
66+
| CLI | `__main__.py:main()` | Click command: `data-diff db1 table1 db2 table2 -k id` |
67+
| Python API | `__init__.py:diff_tables()` | Primary function: takes two `TableSegment`s, returns diff iterator |
68+
| Python API | `__init__.py:connect_to_table()` | Convenience: DSN string → `TableSegment` |
69+
| pyproject.toml | `[project.scripts]` | `data-diff = "data_diff.__main__:main"` |
70+
71+
## Core Data Flow
72+
73+
```
74+
CLI / API call
75+
76+
77+
connect(dsn) → Database instance
78+
79+
80+
db.query_table_schema() → Schema (column names + types)
81+
82+
83+
TableSegment(db, path, key_columns, schema)
84+
85+
86+
Algorithm selection (AUTO → JOINDIFF if same-db, else HASHDIFF)
87+
88+
├─── HASHDIFF (cross-database) ──────────────────────────┐
89+
│ 1. Checksum full table on both sides │
90+
│ 2. If mismatch → bisect key range (factor=32) │
91+
│ 3. Recurse until segment < threshold (16384 rows) │
92+
│ 4. Download small segments, compare locally │
93+
│ 5. diff_sets() → yield ("+", row) / ("-", row) │
94+
│ │
95+
├─── JOINDIFF (same database) ───────────────────────────┐
96+
│ 1. FULL OUTER JOIN on key columns │
97+
│ 2. CASE WHEN to detect exclusive/changed rows │
98+
│ 3. Optional: materialize results to temp table │
99+
│ 4. Stream results → yield ("+", row) / ("-", row) │
100+
│ │
101+
102+
DiffResultWrapper (streaming iterator + stats)
103+
104+
105+
Output: human-readable / JSONL / stats summary
106+
```
107+
108+
## Query Builder Architecture
109+
110+
```
111+
api.py (user-facing functions)
112+
│ table(), cte(), join(), select(), where()
113+
114+
ast_classes.py (immutable AST nodes)
115+
│ Select, Join, Cte, Column, BinOp, Code, ...
116+
117+
Database.dialect.compile(compiler, node)
118+
│ Each driver overrides compilation for its SQL dialect
119+
120+
Raw SQL string → db.query(sql)
121+
```
122+
123+
Key pattern: Most AST nodes use `@attrs.define(frozen=True)` (some like `ExprNode`, `_ResolveColumn` are mutable). Where immutability holds, `attrs.evolve()` returns modified copies.
124+
125+
## Test Organization
126+
127+
```
128+
tests/
129+
├── test_query.py # Query AST construction + CTE schema tests
130+
├── test_sql.py # SQL generation across dialects
131+
├── test_database.py # DB integration tests (skip with --ignore)
132+
├── test_diff_tables.py # Diff framework + threading
133+
├── test_joindiff.py # JoinDiffer algorithm
134+
├── test_utils.py # Utility functions (UUID, case-aware dicts)
135+
├── test_thread_utils.py # PriorityThreadPoolExecutor
136+
├── test_api.py # Public API surface
137+
├── test_cli.py # CLI argument parsing
138+
├── test_duckdb.py # DuckDB-specific
139+
├── test_postgresql.py # PostgreSQL-specific
140+
├── test_mssql.py # SQL Server-specific
141+
├── test_parse_time.py # Time parsing
142+
├── test_datetime_parsing.py # Datetime parsing
143+
├── test_format.py # Output formatting
144+
├── test_config.py # TOML config
145+
├── test_dbt*.py # dbt integration (3 files)
146+
├── test_mesh.py # Multi-dim segmentation
147+
├── test_main.py # CLI main function
148+
├── test_database_types.py # Column type system
149+
├── common.py # Shared fixtures
150+
└── conftest.py # pytest configuration
151+
```
152+
153+
Run unit tests: `uv run pytest tests/ -x -q --ignore=tests/test_database.py`
154+
Run query tests only: `uv run pytest tests/test_query.py -x -q`
155+
156+
## Key Types
157+
158+
| Type | Location | Purpose |
159+
|------|----------|---------|
160+
| `Schema` | `schema.py` | Type alias for `CaseAwareMapping` (used as `CaseAwareMapping[str, ColType]`) |
161+
| `TableSegment` | `table_segment.py` | Table + key columns + range bounds |
162+
| `DbPath` | `abcs/database_types.py` | `tuple[str, ...]` — schema-qualified table path |
163+
| `Database` | `databases/base.py` | ABC for all database drivers |
164+
| `ExprNode` | `queries/ast_classes.py` | Base class for all query AST nodes |
165+
| `ITable` | `queries/ast_classes.py` | Interface for table-like query nodes |

data_diff/queries/ast_classes.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from data_diff.abcs.database_types import DbPath
99
from data_diff.queries.base import SKIP, SqeletonError, args_as_tuple
1010
from data_diff.schema import Schema
11-
from data_diff.utils import ArithString
11+
from data_diff.utils import ArithString, safezip
1212

1313

1414
class QueryBuilderError(SqeletonError):
@@ -482,10 +482,12 @@ class Join(ExprNode, ITable, Root):
482482
@property
483483
def schema(self) -> Schema:
484484
if not self.columns:
485-
raise ValueError("Join must specify columns explicitly (SELECT * not yet implemented).")
485+
raise QueryBuilderError("Join must specify columns explicitly (SELECT * not yet implemented).")
486486
# No cross-table type validation needed: join combines columns from both tables rather than unioning rows
487487
s = self.source_tables[0].schema
488-
return type(s)({c.name: c.type for c in self.columns})
488+
if s is None:
489+
raise QueryBuilderError("Cannot resolve Join schema: source table has no schema defined")
490+
return s.new({c.name: c.type for c in self.columns})
489491

490492
def on(self, *exprs) -> Self:
491493
"""Add an ON clause, for filtering the result of the cartesian product (i.e. the JOIN)"""
@@ -596,7 +598,7 @@ def schema(self) -> Schema:
596598
s = self.table.schema
597599
if s is None or self.columns is None:
598600
return s
599-
return type(s)({c.name: c.type for c in self.columns})
601+
return s.new({c.name: c.type for c in self.columns})
600602

601603
@classmethod
602604
def make(cls, table: ITable, distinct: bool = SKIP, optimizer_hints: str = SKIP, **kwargs):
@@ -641,24 +643,39 @@ def make(cls, table: ITable, distinct: bool = SKIP, optimizer_hints: str = SKIP,
641643
class Cte(ExprNode, ITable):
642644
table: Expr
643645
name: str | None = None
644-
params: Sequence[str] | None = None
646+
params: Sequence[str] | None = attrs.field(default=None)
647+
648+
@params.validator
649+
def _validate_params(self, attribute, value):
650+
if value is not None:
651+
for i, p in enumerate(value):
652+
if not isinstance(p, str):
653+
raise QB_TypeError(f"CTE params[{i}] must be str, got {type(p).__name__}")
645654

646655
@property
647656
def source_table(self) -> "ITable":
648657
return self.table
649658

650659
@property
651-
def schema(self) -> Schema:
652-
s = self.table.schema
660+
def schema(self) -> Schema | None:
661+
try:
662+
s = self.table.schema
663+
except (QueryBuilderError, ValueError) as exc:
664+
# ValueError caught because some ITable.schema implementations (e.g. TableOp)
665+
# still raise ValueError for validation errors pre-dating QueryBuilderError.
666+
name_hint = f" '{self.name}'" if self.name else ""
667+
raise QueryBuilderError(f"Failed to resolve schema for CTE{name_hint}: {exc}") from exc
653668
if not self.params:
654669
return s
655670
if s is None:
656671
raise QueryBuilderError(f"CTE params were provided ({self.params!r}) but the source table has no schema")
657-
if len(self.params) != len(s):
672+
try:
673+
pairs = dict(safezip(self.params, s.values()))
674+
except ValueError as e:
658675
raise QueryBuilderError(
659676
f"CTE params length ({len(self.params)}) does not match source schema length ({len(s)})"
660-
)
661-
result = type(s)(dict(zip(self.params, s.values())))
677+
) from e
678+
result = s.new(pairs)
662679
if len(result) != len(s):
663680
raise QueryBuilderError(f"CTE params contain duplicate column names: {self.params!r}")
664681
return result

0 commit comments

Comments
 (0)