Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions internal/tigerfs/db/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,17 @@ type LogWriter interface {
// from the snapshot (operation in 'edit','rename','delete').
QueryHistoryOperation(ctx context.Context, schema, historyTable, versionID string) (string, error)

// QueryUndoAffectedFiles returns the first log entry per file after a target point.
// Uses DISTINCT ON with SkipScan on the (file_id, log_id ASC) index.
// The version_id on each entry is the before-state at the target point.
// Optional userID filter limits to operations by a specific user.
QueryUndoAffectedFiles(ctx context.Context, schema, logTable, afterID, userID string, filters []UndoFilter) ([]UndoAffectedFile, error)
// QueryUndoAffectedFiles returns the first log entry per file after a
// target point, ordered child-first (topological by parent_id depth,
// deepest first). The inner DISTINCT ON picks each file's oldest log
// entry in the window (SkipScan on (file_id, log_id ASC)); the outer
// recursive CTE walks each affected file's snapshot parent_id (from
// history when the file is in the affected set, otherwise current
// source.parent_id) to compute depth and sort. The version_id on each
// entry is the before-state at the target point. Optional userID filter
// limits to operations by a specific user. See QueryUndoAffectedFiles in
// query.go for the full design rationale.
QueryUndoAffectedFiles(ctx context.Context, schema, logTable, sourceTable, historyTable, afterID, userID string, filters []UndoFilter) ([]UndoAffectedFile, error)

// QueryLogEntry fetches a single log entry by log_id.
QueryLogEntry(ctx context.Context, schema, logTable, logID string) (*UndoAffectedFile, error)
Expand Down
2 changes: 1 addition & 1 deletion internal/tigerfs/db/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ func (m *MockLogWriter) QueryHistoryOperation(ctx context.Context, schema, histo
return "", fmt.Errorf("no history row for version %s", versionID)
}

func (m *MockLogWriter) QueryUndoAffectedFiles(ctx context.Context, schema, logTable, afterID, userID string, filters []UndoFilter) ([]UndoAffectedFile, error) {
func (m *MockLogWriter) QueryUndoAffectedFiles(ctx context.Context, schema, logTable, sourceTable, historyTable, afterID, userID string, filters []UndoFilter) ([]UndoAffectedFile, error) {
return nil, nil
}

Expand Down
171 changes: 162 additions & 9 deletions internal/tigerfs/db/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,15 +949,133 @@ func (c *Client) QueryCurrentPath(ctx context.Context, schema, table, fileID str
return *path, nil
}

// QueryUndoAffectedFiles returns the first log entry per file after a target point.
// Uses DISTINCT ON to find one entry per file_id, ordered by log_id ASC (oldest first).
// TimescaleDB's SkipScan optimizes this on the (file_id, log_id ASC) index.
func (c *Client) QueryUndoAffectedFiles(ctx context.Context, schema, logTable, afterID, userID string, filters []UndoFilter) ([]UndoAffectedFile, error) {
// QueryUndoAffectedFiles returns the first log entry per file after a target
// point, ordered child-first (topological by parent_id depth, deepest first,
// with file_id ASC as the same-depth tiebreaker).
//
// Why ordering matters at all
// ===========================
//
// ExecuteUndoTransaction processes one UPSERT per affected file. The
// bump_parent_mtime AFTER trigger cascades on every child UPSERT: it runs
// UPDATE source SET modified_at=now() WHERE id=<child's parent_id>, which
// re-fires the archive trigger on the parent dir and writes an extra history
// row (no-semantic-content "edit" -- only modified_at changed). The cascade
// row's relationship to the parent's OWN restore row depends on iteration
// order:
//
// - Parent-first iteration: the parent's own restore runs first and writes
// a history row. Then each child UPSERT cascades onto the now-existing
// parent, writing a fresh cascade row with a NEWER UUIDv7 than the
// parent's own row. The cascade row ends up "newest" for the parent's
// file_id.
//
// - Child-first iteration: each child UPSERT cascades onto a parent that
// either (a) doesn't exist in source yet -- the cascade UPDATE matches
// zero rows, no row is written -- or (b) exists in its pre-restore
// state, in which case the cascade row IS written but is then superseded
// by the parent's own subsequent UPSERT writing an even newer row.
//
// Under child-first iteration, the parent's own restore row is always
// "newest" for the parent's file_id at end-of-transaction.
//
// Why this is defense in depth, not correctness
// =============================================
//
// ExecuteUndoTransaction captures each affected file's new undo-log
// version_id INLINE -- right after that file's own UPSERT/DELETE returns,
// before any other iteration runs. That capture is guaranteed to be the row
// the archive trigger wrote for THIS file's mutation, because no other SQL
// runs between the mutation and the inline SELECT and PG18's per-session
// monotonic uuidv7() makes our own write the largest version_id for this
// file at that instant. So undo-of-undo correctness does NOT depend on this
// query's iteration order.
//
// Earlier, version_id was captured POST-HOC at the end of the transaction
// by re-querying "newest history row for file_id." Under parent-first
// iteration, that re-query returned a cascade artifact instead of the
// parent's own restore snapshot, so a subsequent undo-of-undo silently
// restored from a snapshot with the WRONG filename (a no-op for the
// directory rename case). Moving the capture inline removed the
// iteration-order dependency.
//
// With the inline capture in place, this query could in principle be
// simplified back to a flat DISTINCT ON without the walk -- the cascade
// rows still exist in history, but they no longer corrupt the undo-log's
// version_id pointers, so correctness holds under any order. For reference,
// the simpler form looks like:
//
// SELECT DISTINCT ON ("file_id")
// "file_id", "type", "version_id", "filename", "log_id", "user_id"
// FROM <log_table>
// WHERE <conditions>
// ORDER BY "file_id", "log_id" ASC
//
// That single statement returns one row per affected file, ordered by
// file_id ASC (i.e., creation order, which is approximately parent-first
// because dirs are created before their contents). It would be correct
// today thanks to the inline capture, and is what this function looked
// like before the recursive-CTE rewrite below.
//
// The child-first ordering here is intentional DEFENSE IN DEPTH. If the
// inline capture is ever refactored away, bypassed, replaced with a wrapper
// that defers the lookup, or otherwise regresses, the child-first iteration
// preserves the original correctness invariant: cascades onto absent or
// pre-restore parents leave the parent's own restore row as the newest
// history row, which is the correct version_id pointer for undo-of-undo
// regardless of when it's captured. Iteration order also reduces cascade-
// noise rows in history for the common rm-rf-then-undo pattern, where
// cascade UPDATEs target not-yet-restored parents and write nothing.
//
// Algorithm
// =========
//
// 1. `affected` CTE: DISTINCT ON (file_id) over the log table, picking the
// oldest log_id per file_id in the undo window. TimescaleDB SkipScan on
// the (file_id, log_id ASC) index makes this O(distinct_affected_files).
// This is the same inner query as the flat form above.
//
// 2. `walk` recursive CTE: starting from each affected file at distance 0,
// walk UP the parent_id chain one step per iteration. For each step, the
// ancestor's parent_id is looked up via one of two sources, in priority:
//
// (a) If the ancestor is itself in the affected set AND has a non-null
// version_id, use the parent_id from THAT entry's snapshot (via
// affected.version_id -> history row). This is the topology the
// undo will restore the ancestor to, and -- crucially for the
// rm-rf-then-undo case -- it works even when the ancestor's source
// row is currently gone.
//
// (b) Otherwise (ancestor not affected, or affected with NULL
// version_id which means a forward-create entry), use the current
// source.parent_id. The ancestor wasn't modified in this window,
// so source is the topology truth.
//
// Recursion terminates when the next-step parent_id is NULL (the
// workspace root, or the lookup yielded no row). A safety bound of
// distance < 100 prevents runaway recursion in the theoretical case of
// a cycle in history (history's parent_id has no FK constraint;
// source's does, so cycles can only appear via append-only history).
//
// 3. Final SELECT: join `affected` with MAX(distance) per file_id (via a
// grouped subquery), sort by depth DESC (deepest first) with file_id
// ASC as a stable tiebreaker among same-depth entries.
//
// Performance for typical workloads
// =================================
//
// Every JOIN in the walk's recursive step is a PRIMARY KEY lookup (affected
// is a small in-memory CTE; history.version_id is the PK; source.id is the
// PK). For an undo with M affected files and tree depth D, the walk
// produces ~M*D rows with 3 PK probes per row -- single-digit milliseconds
// for M=10, D=3 in a workspace with O(1k) source rows and O(10k) history
// rows. Scales linearly with M*D thereafter.
func (c *Client) QueryUndoAffectedFiles(ctx context.Context, schema, logTable, sourceTable, historyTable, afterID, userID string, filters []UndoFilter) ([]UndoAffectedFile, error) {
if c.pool == nil {
return nil, fmt.Errorf("database connection not initialized")
}

// Build WHERE clause
// Build WHERE clause for the inner DISTINCT ON.
conditions := []string{fmt.Sprintf("%s > $1", qi("log_id"))}
args := []interface{}{afterID}
argIdx := 2
Expand All @@ -976,13 +1094,48 @@ func (c *Client) QueryUndoAffectedFiles(ctx context.Context, schema, logTable, a

where := strings.Join(conditions, " AND ")

// Recursive-CTE form. See the function comment for the full design.
// The duplicated CASE in SELECT and WHERE intentionally evaluates
// twice rather than introducing a LATERAL subquery -- it's easier to
// read and PG's planner handles the duplication fine.
query := fmt.Sprintf(
`SELECT DISTINCT ON (%s) %s, %s, %s, %s, %s, %s FROM %s WHERE %s ORDER BY %s, %s ASC`,
qi("file_id"),
qi("file_id"), qi("type"), qi("version_id"), qi("filename"), qi("log_id"), qi("user_id"),
`WITH RECURSIVE
affected AS (
SELECT DISTINCT ON ("file_id") "file_id", "type", "version_id", "filename", "log_id", "user_id"
FROM %s
WHERE %s
ORDER BY "file_id", "log_id" ASC
),
walk("file_id", ancestor_id, distance) AS (
SELECT "file_id", "file_id", 0 FROM affected
UNION ALL
SELECT w."file_id",
CASE
WHEN a."version_id" IS NOT NULL THEN h_snap."parent_id"
ELSE s."parent_id"
END,
w.distance + 1
FROM walk w
LEFT JOIN affected a ON a."file_id" = w.ancestor_id
LEFT JOIN %s h_snap ON h_snap."version_id" = a."version_id"
LEFT JOIN %s s ON s."id" = w.ancestor_id
WHERE w.distance < 100
AND CASE
WHEN a."version_id" IS NOT NULL THEN h_snap."parent_id"
ELSE s."parent_id"
END IS NOT NULL
),
depths AS (
SELECT "file_id", MAX(distance) AS depth FROM walk GROUP BY "file_id"
)
SELECT a."file_id", a."type", a."version_id", a."filename", a."log_id", a."user_id"
FROM affected a
LEFT JOIN depths d USING ("file_id")
ORDER BY COALESCE(d.depth, 0) DESC, a."file_id" ASC`,
qt(schema, logTable),
where,
qi("file_id"), qi("log_id"),
qt(schema, historyTable),
qt(schema, sourceTable),
)

rows, err := c.pool.Query(ctx, query, args...)
Expand Down
131 changes: 131 additions & 0 deletions internal/tigerfs/db/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2859,3 +2859,134 @@ func TestExecuteUndoTransaction_CapturesVersionIDInline_NotPostHoc(t *testing.T)
childOp, childFilename)
}
}

// TestQueryUndoAffectedFiles_TopologicalOrder verifies that the affected-files
// query returns rows sorted child-first: deepest by source.parent_id chain
// comes first, with file_id ASC as a stable tiebreaker among rows at the
// same depth. Files whose source row doesn't exist (deleted from source --
// would be recreated by undo) default to depth 0 via the COALESCE and sort
// to the end among the depth-0 group.
//
// Scenario: workspace with a 4-deep dir tree A->B->C->leaf.md, a sibling
// S.md at root, and a "ghost" file whose log entry exists but whose source
// row was deleted (simulating an affected file that the undo will INSERT
// back from history). Total 6 affected files, expected order:
// 1. leaf.md (depth 3)
// 2. C (depth 2)
// 3. B (depth 1)
// 4-6. [A, S.md, ghost] sorted by file_id ASC (all depth 0)
func TestQueryUndoAffectedFiles_TopologicalOrder(t *testing.T) {
env, cleanup := setupUndoTestEnv(t)
defer cleanup()

ids := env.genUUIDs(t, 6)
aID := ids[0] // depth 0 dir
bID := ids[1] // depth 1 (in A)
cID := ids[2] // depth 2 (in B)
leafID := ids[3] // depth 3 (in C)
sID := ids[4] // depth 0 sibling
ghostID := ids[5] // depth 0 default (no source row)

// Seed source in dependency order (parent before child) to satisfy the
// self-FK at insertion time. The "ghost" file_id is intentionally NOT
// inserted into source -- the affected-files query should still include
// it (it has a log entry) and sort it to the end with depth=0.
if _, err := env.client.pool.Exec(env.ctx, fmt.Sprintf(
`INSERT INTO %s (id, parent_id, filename, filetype, body, modified_at) VALUES
($1, NULL, 'A', 'directory', '', now()),
($2, NULL, 'S.md', 'file', '', now()),
($3, $1, 'B', 'directory', '', now()),
($4, $3, 'C', 'directory', '', now()),
($5, $4, 'leaf.md', 'file', '', now())`, env.srcQT),
aID, sID, bID, cID, leafID); err != nil {
t.Fatalf("seed source: %v", err)
}

// Seed log: one 'edit' entry per file (including the ghost). log_id
// auto-generated via DEFAULT uuidv7(), so all are > afterID below.
seedLog := func(fileID, filename string) {
if _, err := env.client.pool.Exec(env.ctx, fmt.Sprintf(
`INSERT INTO %s (file_id, type, user_id, filename, version_id)
VALUES ($1, 'edit', 'u', $2, NULL)`, env.logQT),
fileID, filename); err != nil {
t.Fatalf("seed log (%s): %v", filename, err)
}
}
seedLog(aID, "A")
seedLog(bID, "B")
seedLog(cID, "C")
seedLog(leafID, "leaf.md")
seedLog(sID, "S.md")
seedLog(ghostID, "ghost.md")

// afterID is the all-zeros UUID -- smaller than any UUIDv7 the log
// inserts above generated, so the WHERE log_id > afterID picks them all.
afterID := "00000000-0000-0000-0000-000000000000"

result, err := env.client.QueryUndoAffectedFiles(env.ctx,
env.schema, env.logTable, env.sourceTable, env.historyTable, afterID, "", nil)
if err != nil {
t.Fatalf("QueryUndoAffectedFiles: %v", err)
}

if len(result) != 6 {
t.Fatalf("expected 6 affected files, got %d: %+v", len(result), result)
}

// Expected per-file depths.
depthOf := map[string]int{
aID: 0,
sID: 0,
ghostID: 0,
bID: 1,
cID: 2,
leafID: 3,
}

// 1. Result must be monotonically non-increasing in depth.
for i := 0; i < len(result)-1; i++ {
curDepth := depthOf[result[i].FileID]
nextDepth := depthOf[result[i+1].FileID]
if curDepth < nextDepth {
t.Errorf("result[%d] (file=%s, depth=%d) sorts before result[%d] (file=%s, depth=%d) -- depth must be DESC",
i, result[i].FileID, curDepth, i+1, result[i+1].FileID, nextDepth)
}
}

// 2. The deepest single file (leaf.md, depth 3) must be first.
if result[0].FileID != leafID {
t.Errorf("result[0] should be leaf.md (depth 3), got file_id=%s", result[0].FileID)
}

// 3. C (depth 2) must be second; B (depth 1) third.
if result[1].FileID != cID {
t.Errorf("result[1] should be C (depth 2), got file_id=%s", result[1].FileID)
}
if result[2].FileID != bID {
t.Errorf("result[2] should be B (depth 1), got file_id=%s", result[2].FileID)
}

// 4. The last three (depth 0) must be the depth-0 file_ids sorted ASC
// by the file_id tiebreaker.
depth0Want := []string{aID, sID, ghostID}
sortStringsAsc(depth0Want)
depth0Got := []string{result[3].FileID, result[4].FileID, result[5].FileID}
for i := 0; i < 3; i++ {
if depth0Got[i] != depth0Want[i] {
t.Errorf("depth-0 tiebreaker order[%d]: got %s, want %s (full got=%v, want=%v)",
i, depth0Got[i], depth0Want[i], depth0Got, depth0Want)
}
}
}

// sortStringsAsc sorts a slice of strings in ascending order in place.
// Helper for TestQueryUndoAffectedFiles_TopologicalOrder tiebreaker check.
func sortStringsAsc(s []string) {
for i := 0; i < len(s); i++ {
for j := i + 1; j < len(s); j++ {
if s[j] < s[i] {
s[i], s[j] = s[j], s[i]
}
}
}
}
2 changes: 1 addition & 1 deletion internal/tigerfs/fs/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2080,7 +2080,7 @@ func (m *mockDBClient) QueryHistoryOperation(ctx context.Context, schema, histor
return "", fmt.Errorf("no history row for version %s", versionID)
}

func (m *mockDBClient) QueryUndoAffectedFiles(ctx context.Context, schema, logTable, afterID, userID string, filters []db.UndoFilter) ([]db.UndoAffectedFile, error) {
func (m *mockDBClient) QueryUndoAffectedFiles(ctx context.Context, schema, logTable, sourceTable, historyTable, afterID, userID string, filters []db.UndoFilter) ([]db.UndoAffectedFile, error) {
return m.undoAffectedFiles, nil
}

Expand Down
Loading