Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 72 additions & 46 deletions internal/tigerfs/db/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,13 @@ func (c *Client) ExecuteUndoTransaction(ctx context.Context, params *UndoTransac
historyTable := qt(params.Schema, params.HistoryTable)
logTable := qt(params.Schema, params.LogTable)

// Captured version_ids from inline SELECTs in Steps 1 and 2, keyed by
// file_id. Step 3 reads from these maps instead of re-querying history
// for "newest row per file_id." See the inline-capture rationale at the
// Step 1 DELETE loop below for why post-hoc lookups are wrong.
capturedDeleteVIDs := make(map[string]string)
capturedRestoreVIDs := make(map[string]string)

// Step 1: DELETE rows that were created after the target point.
for _, fileID := range params.DeleteFileIDs {
_, err := tx.Exec(ctx,
Expand All @@ -1077,6 +1084,34 @@ func (c *Client) ExecuteUndoTransaction(ctx context.Context, params *UndoTransac
if err != nil {
return fmt.Errorf("failed to delete created row %s: %w", fileID, err)
}

// Capture the version_id of the history row that the BEFORE-DELETE
// trigger just wrote for this file_id, NOW, before any later iteration
// in Step 2 can fire trigger B (bump_parent_mtime) and cascade an
// extra archive write onto this file_id's history.
//
// Correctness rests on: trigger A is row-level BEFORE on the source
// table; one row per UPSERT/DELETE => one trigger invocation; no
// other SQL runs between the mutation and this SELECT. Combined with
// PG18's per-session monotonic uuidv7(), our row is guaranteed-newest
// at this instant. If we deferred this capture to Step 3 (as before),
// a later child-restore's cascade onto this row would overwrite the
// "newest" entry with a no-semantic-content edit row, silently
// breaking undo-of-undo for the dir-rename case.
//
// See test/integration/undo_test.go::TestSynth_UndoOfUndo_DirRenameWithChild
// for the regression scenario.
var capturedVID *string
if err := tx.QueryRow(ctx,
fmt.Sprintf(
`SELECT %s FROM %s WHERE %s = $1 ORDER BY %s DESC LIMIT 1`,
qi("version_id"), historyTable,
qi("file_id"), qi("version_id"),
),
fileID,
).Scan(&capturedVID); err == nil && capturedVID != nil {
capturedDeleteVIDs[fileID] = *capturedVID
}
}

// Step 2: UPSERT rows from history for edits/renames/deletes.
Expand Down Expand Up @@ -1163,76 +1198,67 @@ func (c *Client) ExecuteUndoTransaction(ctx context.Context, params *UndoTransac
}
return fmt.Errorf("failed to restore file %s from history: %w", fileID, err)
}
}

// Step 3: Insert undo log entries for each affected file.
// DELETE targets get a log entry too (we're undoing the creation).
// Capture version_id from the history row that the BEFORE-DELETE trigger
// just wrote -- this is what makes undo-of-undo of a DELETE entry able
// to dispatch correctly (the version_id points to a row with
// operation='delete', and the undo-of-undo handler restores from it).
for i, fileID := range params.DeleteFileIDs {
filename := ""
if i < len(params.DeleteFilenames) {
filename = params.DeleteFilenames[i]
}
var latestVersionID *string
err := tx.QueryRow(ctx,
// Capture the version_id of the history row that the BEFORE trigger
// just wrote for this file_id's restore. Same invariant as Step 1's
// inline capture: our row is guaranteed-newest right now; deferring
// to Step 3 would let a later iteration's trigger B cascade pollute
// this file_id's "newest" entry. See the Step 1 comment for the full
// rationale.
var capturedVID *string
if err := tx.QueryRow(ctx,
fmt.Sprintf(
`SELECT %s FROM %s WHERE %s = $1 ORDER BY %s DESC LIMIT 1`,
qi("version_id"), qt(params.Schema, params.HistoryTable),
qi("version_id"), historyTable,
qi("file_id"), qi("version_id"),
),
fileID,
).Scan(&latestVersionID)
if err != nil {
latestVersionID = nil
).Scan(&capturedVID); err == nil && capturedVID != nil {
capturedRestoreVIDs[fileID] = *capturedVID
}
versionIDVal := ""
if latestVersionID != nil {
versionIDVal = *latestVersionID
}

// Step 3: Insert undo log entries for each affected file.
//
// Read version_ids from the captures stashed inline during Step 1
// (deletes) and Step 2 (restores). We deliberately do NOT re-query
// "newest history row for file_id" here: Step 2 iterations cascade onto
// each other's history via trigger B (bump_parent_mtime), so a post-hoc
// "newest" lookup can resolve to a cascade artifact rather than the
// file's own restore snapshot.
//
// The version_id stored on a type='undo' log row is what
// QueryHistoryOperation later dispatches on for undo-of-undo, so it
// must point at the row written by the actual mutation -- a tombstone
// for a recreate, or a rename/edit/delete OLD-state row for the others
// -- not at a cascade artifact from a sibling iteration.
for i, fileID := range params.DeleteFileIDs {
filename := ""
if i < len(params.DeleteFilenames) {
filename = params.DeleteFilenames[i]
}
_, err = tx.Exec(ctx,
versionIDVal := capturedDeleteVIDs[fileID]
if _, err := tx.Exec(ctx,
fmt.Sprintf(
`INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s) VALUES (uuidv7(), $1, 'undo', $2, $3, $4, $5)`,
logTable, qi("log_id"), qi("file_id"), qi("type"), qi("user_id"), qi("filename"), qi("version_id"), qi("description"),
),
fileID, params.UserID, filename, versionIDVal, params.Description,
)
if err != nil {
); err != nil {
return fmt.Errorf("failed to insert undo log entry for delete: %w", err)
}
}
for i, fileID := range params.RestoreFileIDs {
// Capture version_id of the state just before our restore (the BEFORE trigger has fired)
var latestVersionID *string
err := tx.QueryRow(ctx,
fmt.Sprintf(
`SELECT %s FROM %s WHERE %s = $1 ORDER BY %s DESC LIMIT 1`,
qi("version_id"), qt(params.Schema, params.HistoryTable),
qi("file_id"), qi("version_id"),
),
fileID,
).Scan(&latestVersionID)
if err != nil {
latestVersionID = nil
}

versionIDVal := ""
if latestVersionID != nil {
versionIDVal = *latestVersionID
}

_, err = tx.Exec(ctx,
versionIDVal := capturedRestoreVIDs[fileID]
if _, err := tx.Exec(ctx,
fmt.Sprintf(
`INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s) VALUES (uuidv7(), $1, 'undo', $2, $3, $4, $5)`,
logTable,
qi("log_id"), qi("file_id"), qi("type"), qi("user_id"),
qi("filename"), qi("version_id"), qi("description"),
),
fileID, params.UserID, params.RestoreFilenames[i], versionIDVal, params.Description,
)
if err != nil {
); err != nil {
return fmt.Errorf("failed to insert undo log entry for restore: %w", err)
}
}
Expand Down
225 changes: 225 additions & 0 deletions internal/tigerfs/db/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2634,3 +2634,228 @@ func bytesContains(haystack, needle []byte) bool {
}
return false
}

// installCascadeTriggers installs the archive BEFORE trigger and the
// bump_parent_mtime AFTER trigger on env.sourceTable, mirroring the
// production triggers in internal/tigerfs/fs/synth/build.go. Used by
// undo unit tests that need to exercise the cascade chain end-to-end
// (e.g., the inline-version-id-capture test below).
//
// The archive trigger writes a history row for every INSERT/UPDATE/DELETE
// on the source. bump_parent_mtime fires AFTER child changes and runs
// UPDATE parent SET modified_at=now() -- which itself re-fires the
// archive trigger on the parent, producing the cascade artifact that
// motivates Fix 1.
func (env *undoTestEnv) installCascadeTriggers(t *testing.T) {
t.Helper()

archiveFunc := fmt.Sprintf(`CREATE OR REPLACE FUNCTION archive_%s_history()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO %s (file_id, parent_id, filename, filetype, body, modified_at, operation)
VALUES (NEW.id, NEW.parent_id, NEW.filename, NEW.filetype, NEW.body, NEW.modified_at, 'create');
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO %s (file_id, parent_id, filename, filetype, body, modified_at, operation)
VALUES (OLD.id, OLD.parent_id, OLD.filename, OLD.filetype, OLD.body, OLD.modified_at,
CASE WHEN OLD.filename != NEW.filename OR OLD.parent_id IS DISTINCT FROM NEW.parent_id
THEN 'rename' ELSE 'edit' END);
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
INSERT INTO %s (file_id, parent_id, filename, filetype, body, modified_at, operation)
VALUES (OLD.id, OLD.parent_id, OLD.filename, OLD.filetype, OLD.body, OLD.modified_at, 'delete');
RETURN OLD;
END IF;
END;
$$ LANGUAGE plpgsql`, env.sourceTable, env.histQT, env.histQT, env.histQT)

archiveTrigger := fmt.Sprintf(`CREATE TRIGGER trg_archive_%s
BEFORE INSERT OR UPDATE OR DELETE ON %s
FOR EACH ROW EXECUTE FUNCTION archive_%s_history()`,
env.sourceTable, env.srcQT, env.sourceTable)

bumpFunc := fmt.Sprintf(`CREATE OR REPLACE FUNCTION bump_%s_parent_mtime()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
IF NEW.parent_id IS NOT NULL THEN
UPDATE %s SET modified_at = now()
WHERE id = NEW.parent_id AND filetype = 'directory';
END IF;
ELSIF TG_OP = 'DELETE' THEN
IF OLD.parent_id IS NOT NULL THEN
UPDATE %s SET modified_at = now()
WHERE id = OLD.parent_id AND filetype = 'directory';
END IF;
ELSIF TG_OP = 'UPDATE' THEN
IF OLD.parent_id IS DISTINCT FROM NEW.parent_id
OR OLD.filename IS DISTINCT FROM NEW.filename THEN
IF OLD.parent_id IS NOT NULL
AND OLD.parent_id IS DISTINCT FROM NEW.parent_id THEN
UPDATE %s SET modified_at = now()
WHERE id = OLD.parent_id AND filetype = 'directory';
END IF;
IF NEW.parent_id IS NOT NULL THEN
UPDATE %s SET modified_at = now()
WHERE id = NEW.parent_id AND filetype = 'directory';
END IF;
END IF;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql`, env.sourceTable, env.srcQT, env.srcQT, env.srcQT, env.srcQT)

bumpTrigger := fmt.Sprintf(`CREATE TRIGGER trg_bump_%s
AFTER INSERT OR DELETE OR UPDATE OF parent_id, filename ON %s
FOR EACH ROW EXECUTE FUNCTION bump_%s_parent_mtime()`,
env.sourceTable, env.srcQT, env.sourceTable)

for _, sql := range []string{archiveFunc, archiveTrigger, bumpFunc, bumpTrigger} {
if _, err := env.client.pool.Exec(env.ctx, sql); err != nil {
t.Fatalf("install cascade trigger: %v\nsql: %s", err, sql)
}
}
}

// TestExecuteUndoTransaction_CapturesVersionIDInline_NotPostHoc verifies
// the Fix-1 invariant: each affected file's undo-log version_id is the
// row that trigger A wrote during that file's own UPSERT, not whatever
// "newest history row for file_id" happened to be at end-of-transaction.
//
// Pre-Fix-1, Step 3 ran "SELECT version_id ... ORDER BY DESC LIMIT 1" per
// file AFTER all Step 2 UPSERTs were done. The bump_parent_mtime AFTER
// trigger cascades when a child is restored, re-firing the archive
// trigger on the parent dir with a NEW (newer) version_id pointing at a
// no-semantic-content edit row. That cascade artifact became "newest"
// and corrupted the parent's recorded undo-log version_id.
//
// Scenario: dir D (filename='e') with one child C (filename='b.md').
// History contains a "pre-demo" snapshot of D (filename='d', op='rename')
// and a "pre-demo" snapshot of C (filename='a.md', op='rename'). Call
// ExecuteUndoTransaction with parent-first iteration (D before C, the
// bug-exposing order matching file_id ASC). After the call:
// - Without Fix 1: D's undo log entry's version_id resolves to the
// cascade artifact -- a history row with operation='edit' and
// filename='d' (D's post-iteration-1 state, archived when trigger B
// bumped D's mtime during C's UPSERT).
// - With Fix 1: D's undo log entry's version_id resolves to D's own
// restore snapshot -- a history row with operation='rename' and
// filename='e' (D's pre-iteration-1 state, captured inline right
// after D's UPSERT completed).
//
// The test asserts the post-Fix-1 outcome.
func TestExecuteUndoTransaction_CapturesVersionIDInline_NotPostHoc(t *testing.T) {
env, cleanup := setupUndoTestEnv(t)
defer cleanup()
env.installCascadeTriggers(t)

// Generate stable UUIDs upfront so we can refer to them by name.
ids := env.genUUIDs(t, 4)
dirID := ids[0]
childID := ids[1]
dirSnapshotVID := ids[2]
childSnapshotVID := ids[3]

// Seed source with post-demo state: D ("e", a directory) and C
// ("b.md", inside D). Seeding INSERTs fire the archive trigger and
// write 'create' rows to history; that's fine, just background noise.
if _, err := env.client.pool.Exec(env.ctx, fmt.Sprintf(
`INSERT INTO %s (id, parent_id, filename, filetype, body, modified_at)
VALUES ($1, NULL, 'e', 'directory', '', now()),
($2, $1, 'b.md', 'file', 'body b', now())`, env.srcQT),
dirID, childID); err != nil {
t.Fatalf("seed source: %v", err)
}

// Manually insert the "pre-demo" snapshots. These are what
// ExecuteUndoTransaction will UPSERT from -- the rows whose
// version_ids appear in RestoreVersionIDs.
insertSnapshot := func(versionID, fileID, parent, filename, filetype string) {
var parentArg interface{}
if parent != "" {
parentArg = parent
}
if _, err := env.client.pool.Exec(env.ctx, fmt.Sprintf(
`INSERT INTO %s (version_id, file_id, parent_id, filename, filetype, body, modified_at, operation)
VALUES ($1, $2, $3, $4, $5, '', now() - interval '1 hour', 'rename')`, env.histQT),
versionID, fileID, parentArg, filename, filetype); err != nil {
t.Fatalf("seed snapshot for %s: %v", filename, err)
}
}
insertSnapshot(dirSnapshotVID, dirID, "", "d", "directory")
insertSnapshot(childSnapshotVID, childID, dirID, "a.md", "file")

// Call ExecuteUndoTransaction with parent-first iteration order
// (D before C). This matches the production file_id ASC ordering
// when dirs are created before their children, and is the order
// that exposes the cascade-artifact bug.
if err := env.client.ExecuteUndoTransaction(env.ctx, &UndoTransactionParams{
Schema: env.schema,
SourceTable: env.sourceTable,
HistoryTable: env.historyTable,
LogTable: env.logTable,
Description: "cascade-artifact regression",
RestoreVersionIDs: []string{dirSnapshotVID, childSnapshotVID},
RestoreFileIDs: []string{dirID, childID},
RestoreFilenames: []string{"d", "a.md"},
UserID: "test-user",
}); err != nil {
t.Fatalf("ExecuteUndoTransaction failed: %v", err)
}

// Look up the dir's new type='undo' log entry and follow its
// version_id to the underlying history row.
var dirUndoVID string
if err := env.client.pool.QueryRow(env.ctx, fmt.Sprintf(
`SELECT version_id::text FROM %s
WHERE file_id = $1 AND type = 'undo'
ORDER BY log_id DESC LIMIT 1`, env.logQT),
dirID).Scan(&dirUndoVID); err != nil {
t.Fatalf("query dir undo log entry: %v", err)
}

var op, filename string
if err := env.client.pool.QueryRow(env.ctx, fmt.Sprintf(
`SELECT operation, filename FROM %s WHERE version_id = $1`, env.histQT),
dirUndoVID).Scan(&op, &filename); err != nil {
t.Fatalf("query history row %s: %v", dirUndoVID, err)
}

// The dir's undo entry must point at the row trigger A wrote during
// D's own UPSERT -- {operation='rename', filename='e'}. If Step 3
// reads "newest" post-hoc instead (the pre-Fix-1 behavior), this
// resolves to the cascade artifact {operation='edit', filename='d'}.
if op != "rename" {
t.Errorf("dir undo entry version_id resolves to operation=%q, want %q "+
"(pre-Fix-1 would be 'edit' -- the cascade artifact)", op, "rename")
}
if filename != "e" {
t.Errorf("dir undo entry version_id resolves to filename=%q, want %q "+
"(pre-Fix-1 would be 'd' -- the cascade-artifact captures D's post-iteration-1 state)",
filename, "e")
}

// Sanity: child's undo entry should also resolve to its own rename
// snapshot. The child is a leaf -- no cascade lands on it -- so this
// is correct pre-Fix-1 too; the assertion just guards against
// regressions in the leaf path.
var childUndoVID string
if err := env.client.pool.QueryRow(env.ctx, fmt.Sprintf(
`SELECT version_id::text FROM %s
WHERE file_id = $1 AND type = 'undo'
ORDER BY log_id DESC LIMIT 1`, env.logQT),
childID).Scan(&childUndoVID); err != nil {
t.Fatalf("query child undo log entry: %v", err)
}
var childOp, childFilename string
if err := env.client.pool.QueryRow(env.ctx, fmt.Sprintf(
`SELECT operation, filename FROM %s WHERE version_id = $1`, env.histQT),
childUndoVID).Scan(&childOp, &childFilename); err != nil {
t.Fatalf("query child history row %s: %v", childUndoVID, err)
}
if childOp != "rename" || childFilename != "b.md" {
t.Errorf("child undo entry version_id resolves to (op=%q, filename=%q), want (rename, b.md)",
childOp, childFilename)
}
}
Loading