Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/tigerfs/db/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,13 @@ type LogWriter interface {
// after an UPDATE/DELETE fires the BEFORE trigger.
QueryLatestVersionID(ctx context.Context, schema, historyTable, fileID string) (string, error)

// QueryHistoryOperation returns the operation marker ('create', 'edit',
// 'rename', 'delete') for a single history row identified by version_id.
// Used by undo-of-undo dispatch to determine whether reversing an undo
// entry means deleting the row (operation='create' tombstone) or restoring
// from the snapshot (operation in 'edit','rename','delete').
QueryHistoryOperation(ctx context.Context, schema, historyTable, versionID string) (string, error)

// QueryUndoAffectedFiles returns the first log entry per file after a target point.
// Uses DISTINCT ON with SkipScan on the (file_id, log_id ASC) index.
// The version_id on each entry is the before-state at the target point.
Expand Down
18 changes: 14 additions & 4 deletions internal/tigerfs/db/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,10 +750,11 @@ func NewMockDBClient() *MockDBClient {

// MockLogWriter implements LogWriter for testing.
type MockLogWriter struct {
LogEntries []MockLogEntry // Recorded log entries for verification
VersionIDs map[string]string // fileID -> latest versionID
MetadataEntries []MetadataEntry // Returned by QueryMetadata
MetadataInserts []MetadataEntry // Recorded for verification
LogEntries []MockLogEntry // Recorded log entries for verification
VersionIDs map[string]string // fileID -> latest versionID
HistoryOperations map[string]string // versionID -> operation marker
MetadataEntries []MetadataEntry // Returned by QueryMetadata
MetadataInserts []MetadataEntry // Recorded for verification
}

// MockLogEntry records a single log entry for test verification.
Expand Down Expand Up @@ -790,6 +791,15 @@ func (m *MockLogWriter) QueryLatestVersionID(ctx context.Context, schema, histor
return "", fmt.Errorf("no history entry for %s", fileID)
}

func (m *MockLogWriter) QueryHistoryOperation(ctx context.Context, schema, historyTable, versionID string) (string, error) {
if m.HistoryOperations != nil {
if op, ok := m.HistoryOperations[versionID]; ok {
return op, nil
}
}
return "", fmt.Errorf("no history row for version %s", versionID)
}

func (m *MockLogWriter) QueryUndoAffectedFiles(ctx context.Context, schema, logTable, afterID, userID string, filters []UndoFilter) ([]UndoAffectedFile, error) {
return nil, nil
}
Expand Down
51 changes: 47 additions & 4 deletions internal/tigerfs/db/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,17 +1167,37 @@ func (c *Client) ExecuteUndoTransaction(ctx context.Context, params *UndoTransac

// Step 3: Insert undo log entries for each affected file.
// DELETE targets get a log entry too (we're undoing the creation).
// Capture version_id from the history row that the BEFORE-DELETE trigger
// just wrote -- this is what makes undo-of-undo of a DELETE entry able
// to dispatch correctly (the version_id points to a row with
// operation='delete', and the undo-of-undo handler restores from it).
for i, fileID := range params.DeleteFileIDs {
filename := ""
if i < len(params.DeleteFilenames) {
filename = params.DeleteFilenames[i]
}
_, err := tx.Exec(ctx,
var latestVersionID *string
err := tx.QueryRow(ctx,
fmt.Sprintf(
`INSERT INTO %s (%s, %s, %s, %s, %s, %s) VALUES (uuidv7(), $1, 'undo', $2, $3, $4)`,
logTable, qi("log_id"), qi("file_id"), qi("type"), qi("user_id"), qi("filename"), qi("description"),
`SELECT %s FROM %s WHERE %s = $1 ORDER BY %s DESC LIMIT 1`,
qi("version_id"), qt(params.Schema, params.HistoryTable),
qi("file_id"), qi("version_id"),
),
fileID, params.UserID, filename, params.Description,
fileID,
).Scan(&latestVersionID)
if err != nil {
latestVersionID = nil
}
versionIDVal := ""
if latestVersionID != nil {
versionIDVal = *latestVersionID
}
_, err = tx.Exec(ctx,
fmt.Sprintf(
`INSERT INTO %s (%s, %s, %s, %s, %s, %s, %s) VALUES (uuidv7(), $1, 'undo', $2, $3, $4, $5)`,
logTable, qi("log_id"), qi("file_id"), qi("type"), qi("user_id"), qi("filename"), qi("version_id"), qi("description"),
),
fileID, params.UserID, filename, versionIDVal, params.Description,
)
if err != nil {
return fmt.Errorf("failed to insert undo log entry for delete: %w", err)
Expand Down Expand Up @@ -1330,6 +1350,29 @@ func (c *Client) QueryHistoryByID(ctx context.Context, schema, historyTable, row
return c.queryRows(ctx, query, rowID)
}

// QueryHistoryOperation returns the operation column of a single history row
// identified by its version_id. Used by undo-of-undo dispatch: the operation
// value ('create'/'edit'/'rename'/'delete') tells the handler whether the undo
// entry should be reversed by re-deleting the row (operation='create') or by
// restoring from the row's snapshot (operation in 'edit','rename','delete').
//
// Returns the empty string and a non-nil error if no row matches. The lookup
// is by the primary key (version_id), so it's O(1) on the hypertable.
func (c *Client) QueryHistoryOperation(ctx context.Context, schema, historyTable, versionID string) (string, error) {
if c.pool == nil {
return "", fmt.Errorf("database connection not initialized")
}
query := fmt.Sprintf(
`SELECT "operation" FROM %s WHERE "version_id" = $1`,
qt(schema, historyTable),
)
var op string
if err := c.pool.QueryRow(ctx, query, versionID).Scan(&op); err != nil {
return "", fmt.Errorf("failed to query history operation for version %s: %w", versionID, err)
}
return op, nil
}

// QueryHistoryDistinctFilenames returns distinct filenames from the history table.
func (c *Client) QueryHistoryDistinctFilenames(ctx context.Context, schema, historyTable string, limit int) ([]string, error) {
query := fmt.Sprintf(
Expand Down
12 changes: 12 additions & 0 deletions internal/tigerfs/fs/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,9 @@ type mockDBClient struct {
// Version ID return value for QueryLatestVersionID
latestVersionIDs map[string]string // fileID -> versionID

// History operation values for QueryHistoryOperation (versionID -> operation)
historyOperations map[string]string

// Row-by-columns lookup data (for savepoint name-based tests)
rowByColumnsData map[string]*mockRowByColumns

Expand Down Expand Up @@ -2068,6 +2071,15 @@ func (m *mockDBClient) QueryLatestVersionID(ctx context.Context, schema, history
return "", fmt.Errorf("no history entry for %s", fileID)
}

func (m *mockDBClient) QueryHistoryOperation(ctx context.Context, schema, historyTable, versionID string) (string, error) {
if m.historyOperations != nil {
if op, ok := m.historyOperations[versionID]; ok {
return op, nil
}
}
return "", fmt.Errorf("no history row for version %s", versionID)
}

func (m *mockDBClient) QueryUndoAffectedFiles(ctx context.Context, schema, logTable, afterID, userID string, filters []db.UndoFilter) ([]db.UndoAffectedFile, error) {
return m.undoAffectedFiles, nil
}
Expand Down
43 changes: 35 additions & 8 deletions internal/tigerfs/fs/synth/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ func GenerateHistorySQL(schema, appName string, format SynthFormat) []string {
// This replaces the old create_hypertable() + ALTER TABLE SET + add_compression_policy() calls.
// Column renames from ADR-017: id->file_id, _history_id->version_id, _operation->operation.
// Added: parent_id, CHECK constraints on filetype/encoding/operation.
// `operation` values: 'create' captures the BEFORE-INSERT state (tombstone --
// makes undo-of-undo for DELETE operations work, since the original delete's
// undo INSERT now has a fresh history row of its own); 'edit'/'rename'/'delete'
// capture the BEFORE-UPDATE/DELETE state. No 'undo' value: undo is realized as
// one of these four physical ops, not a distinct kind of history row.
createTable := fmt.Sprintf(`CREATE TABLE %s (
file_id UUID,
parent_id UUID,
Expand All @@ -316,7 +321,7 @@ func GenerateHistorySQL(schema, appName string, format SynthFormat) []string {
created_at TIMESTAMPTZ,
modified_at TIMESTAMPTZ,
version_id UUID NOT NULL DEFAULT uuidv7() PRIMARY KEY,
operation TEXT NOT NULL CHECK (operation IN ('edit', 'rename', 'delete'))
operation TEXT NOT NULL CHECK (operation IN ('create', 'edit', 'rename', 'delete'))
) WITH (
tsdb.hypertable,
tsdb.partition_column = 'version_id',
Expand All @@ -337,21 +342,43 @@ func GenerateHistorySQL(schema, appName string, format SynthFormat) []string {
qualifiedHistory,
)

// Archive trigger function -- copies OLD row (including parent_id) to history
// table on UPDATE or DELETE. Column list must match the source table's columns.
// Archive trigger function -- captures every row state change in the history
// table. The INSERT branch writes the NEW row with operation='create'
// (tombstone), so undo-of-undo for DELETE operations finds a fresh, self-
// describing history row to dispatch on. The UPDATE/DELETE branch writes the
// OLD row with operation='rename'/'edit'/'delete'. Column lists must match the
// source table's columns.
funcName := fmt.Sprintf("%s.%s", db.QuoteIdent(TigerFSSchema), db.QuoteIdent("archive_"+historyTable))

var insertColumns, insertValues string
// Markdown format adds title/author/headers columns; plain text does not.
var formatNewValues string
if format == FormatMarkdown {
formatNewValues = "NEW.title, NEW.author, NEW.headers, "
}

var insertColumns, oldInsertValues, newInsertValues string
if format == FormatMarkdown {
insertColumns = "file_id, parent_id, filename, filetype, title, author, headers, body, encoding, created_at, modified_at"
insertValues = fmt.Sprintf("OLD.id, OLD.parent_id, OLD.filename, OLD.filetype, %sOLD.body,\n OLD.encoding, OLD.created_at, OLD.modified_at", formatOldValues)
oldInsertValues = fmt.Sprintf("OLD.id, OLD.parent_id, OLD.filename, OLD.filetype, %sOLD.body,\n OLD.encoding, OLD.created_at, OLD.modified_at", formatOldValues)
newInsertValues = fmt.Sprintf("NEW.id, NEW.parent_id, NEW.filename, NEW.filetype, %sNEW.body,\n NEW.encoding, NEW.created_at, NEW.modified_at", formatNewValues)
} else {
insertColumns = "file_id, parent_id, filename, filetype, body, encoding, created_at, modified_at"
insertValues = "OLD.id, OLD.parent_id, OLD.filename, OLD.filetype, OLD.body,\n OLD.encoding, OLD.created_at, OLD.modified_at"
oldInsertValues = "OLD.id, OLD.parent_id, OLD.filename, OLD.filetype, OLD.body,\n OLD.encoding, OLD.created_at, OLD.modified_at"
newInsertValues = "NEW.id, NEW.parent_id, NEW.filename, NEW.filetype, NEW.body,\n NEW.encoding, NEW.created_at, NEW.modified_at"
}

createFunc := fmt.Sprintf(`CREATE OR REPLACE FUNCTION %s() RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO %s
(%s,
version_id, operation)
VALUES
(%s,
uuidv7(),
'create');
RETURN NEW;
END IF;
INSERT INTO %s
(%s,
version_id, operation)
Expand All @@ -372,11 +399,11 @@ BEGIN
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql`, funcName, qualifiedHistory, insertColumns, insertValues)
$$ LANGUAGE plpgsql`, funcName, qualifiedHistory, insertColumns, newInsertValues, qualifiedHistory, insertColumns, oldInsertValues)

triggerName := db.QuoteIdent("trg_" + historyTable + "_archive")
createTrigger := fmt.Sprintf(`CREATE TRIGGER %s
BEFORE UPDATE OR DELETE ON %s
BEFORE INSERT OR UPDATE OR DELETE ON %s
FOR EACH ROW EXECUTE FUNCTION %s()`,
triggerName, qualifiedTable, funcName)

Expand Down
21 changes: 16 additions & 5 deletions internal/tigerfs/fs/synth/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ func TestSynth_GenerateHistorySQL_Markdown(t *testing.T) {
if !strings.Contains(allSQL, "encoding TEXT CHECK (encoding IN ('utf8', 'base64'))") {
t.Errorf("history should have encoding CHECK constraint, got:\n%s", allSQL)
}
if !strings.Contains(allSQL, "CHECK (operation IN ('edit', 'rename', 'delete'))") {
t.Errorf("history should have operation CHECK constraint, got:\n%s", allSQL)
if !strings.Contains(allSQL, "CHECK (operation IN ('create', 'edit', 'rename', 'delete'))") {
t.Errorf("history should have operation CHECK constraint including 'create' tombstone, got:\n%s", allSQL)
}

// History table uses modern CREATE TABLE WITH syntax for hypertable + columnstore
Expand Down Expand Up @@ -376,9 +376,20 @@ func TestSynth_GenerateHistorySQL_Markdown(t *testing.T) {
t.Errorf("markdown trigger should copy title, got:\n%s", allSQL)
}

// Trigger copies parent_id and uses new column names
if !strings.Contains(allSQL, "BEFORE UPDATE OR DELETE") {
t.Errorf("should create BEFORE UPDATE OR DELETE trigger, got:\n%s", allSQL)
// Trigger copies parent_id and uses new column names. After tombstone, the
// trigger fires on INSERT too, capturing operation='create'.
if !strings.Contains(allSQL, "BEFORE INSERT OR UPDATE OR DELETE") {
t.Errorf("should create BEFORE INSERT OR UPDATE OR DELETE trigger (tombstone), got:\n%s", allSQL)
}
// Confirm the INSERT branch is wired in the trigger function.
if !strings.Contains(allSQL, "IF TG_OP = 'INSERT' THEN") {
t.Errorf("trigger function should branch on INSERT for tombstone capture, got:\n%s", allSQL)
}
if !strings.Contains(allSQL, "'create'") {
t.Errorf("trigger function should label INSERT history rows as 'create', got:\n%s", allSQL)
}
if !strings.Contains(allSQL, "NEW.id, NEW.parent_id, NEW.filename") {
t.Errorf("trigger function INSERT branch should reference NEW.*, got:\n%s", allSQL)
}
if !strings.Contains(allSQL, `ON "tigerfs"."memory"`) {
t.Errorf("trigger should be on tigerfs.memory, got:\n%s", allSQL)
Expand Down
79 changes: 77 additions & 2 deletions internal/tigerfs/fs/undo.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (o *Operations) ExecuteUndo(ctx context.Context, schema, tableName, afterID
skipped++ // Created then already deleted -- no-op
}

case "edit", "rename", "delete", "undo":
case "edit", "rename", "delete":
if f.VersionID == "" {
logging.Warn("undo: missing version_id for non-create operation",
zap.String("file_id", f.FileID), zap.String("type", f.Type))
Expand All @@ -180,6 +180,45 @@ func (o *Operations) ExecuteUndo(ctx context.Context, schema, tableName, afterID
restoreFileIDs = append(restoreFileIDs, f.FileID)
restoreFilenames = append(restoreFilenames, f.Filename)

case "undo":
// Undo-of-undo: dispatch by the history row's operation column.
// 'create' tombstone => the original entry restored a row, so
// reversing it means re-deleting. Other ops => the standard
// "restore from version_id" path produces the right state.
if f.VersionID == "" {
logging.Warn("undo: missing version_id for undo entry",
zap.String("file_id", f.FileID))
skipped++
continue
}
histOp, err := o.db.QueryHistoryOperation(ctx, synth.TigerFSSchema, historyTable, f.VersionID)
if err != nil {
logging.Warn("undo: failed to query history operation for undo entry; falling back to restore-from-version_id",
zap.String("file_id", f.FileID), zap.String("version_id", f.VersionID), zap.Error(err))
restoreVersionIDs = append(restoreVersionIDs, f.VersionID)
restoreFileIDs = append(restoreFileIDs, f.FileID)
restoreFilenames = append(restoreFilenames, f.Filename)
continue
}
switch histOp {
case "create":
exists, err := o.db.QueryFileExists(ctx, synth.TigerFSSchema, tableName, f.FileID)
if err != nil || !exists {
skipped++
continue
}
deleteFileIDs = append(deleteFileIDs, f.FileID)
deleteFilenames = append(deleteFilenames, f.Filename)
case "edit", "rename", "delete":
restoreVersionIDs = append(restoreVersionIDs, f.VersionID)
restoreFileIDs = append(restoreFileIDs, f.FileID)
restoreFilenames = append(restoreFilenames, f.Filename)
default:
logging.Warn("undo: unexpected history operation",
zap.String("op", histOp), zap.String("file_id", f.FileID))
skipped++
}

default:
logging.Warn("undo: unknown operation type",
zap.String("type", f.Type), zap.String("file_id", f.FileID))
Expand Down Expand Up @@ -279,14 +318,50 @@ func (o *Operations) ExecuteUndoSingle(ctx context.Context, schema, tableName, l
deleteFilenames = append(deleteFilenames, entry.Filename)
}

case "edit", "rename", "delete", "undo":
case "edit", "rename", "delete":
if entry.VersionID == "" {
return nil, fmt.Errorf("cannot undo %s operation: no version_id (before-state not captured)", entry.Type)
}
restoreVersionIDs = append(restoreVersionIDs, entry.VersionID)
restoreFileIDs = append(restoreFileIDs, entry.FileID)
restoreFilenames = append(restoreFilenames, entry.Filename)

case "undo":
// Undo-of-undo: dispatch by the history row's operation column to
// distinguish the cases that need DELETE (operation='create' tombstone)
// from those that need restore (operation in 'edit','rename','delete').
if entry.VersionID == "" {
return nil, fmt.Errorf("cannot undo undo operation: no version_id (before-state not captured)")
}
histOp, err := o.db.QueryHistoryOperation(ctx, synth.TigerFSSchema, historyTable, entry.VersionID)
if err != nil {
// Legacy undo entry from before the tombstone trigger landed (or a
// transient DB error). Fall back to restore-from-version_id, which
// matches pre-fix behavior for backwards compatibility.
logging.Warn("undo-of-undo: failed to query history operation; falling back to restore-from-version_id",
zap.String("version_id", entry.VersionID), zap.Error(err))
restoreVersionIDs = append(restoreVersionIDs, entry.VersionID)
restoreFileIDs = append(restoreFileIDs, entry.FileID)
restoreFilenames = append(restoreFilenames, entry.Filename)
break
}
switch histOp {
case "create":
exists, err := o.db.QueryFileExists(ctx, synth.TigerFSSchema, tableName, entry.FileID)
if err != nil || !exists {
skipped = 1
} else {
deleteFileIDs = append(deleteFileIDs, entry.FileID)
deleteFilenames = append(deleteFilenames, entry.Filename)
}
case "edit", "rename", "delete":
restoreVersionIDs = append(restoreVersionIDs, entry.VersionID)
restoreFileIDs = append(restoreFileIDs, entry.FileID)
restoreFilenames = append(restoreFilenames, entry.Filename)
default:
return nil, fmt.Errorf("unexpected history operation for undo entry: %s", histOp)
}

default:
return nil, fmt.Errorf("unknown operation type: %s", entry.Type)
}
Expand Down
7 changes: 5 additions & 2 deletions test/integration/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,10 @@ func TestSynth_HistoryMultipleVersions(t *testing.T) {
require.Nil(t, fsErr, "WriteFile v%d should succeed", i)
}

// Should have 3 history entries (v1, v2, v3 — v4 is current)
// Should have 4 history entries: the create tombstone (capturing v1 at
// INSERT) plus the BEFORE-UPDATE captures of v1, v2, v3 from each of the
// three subsequent writes. v4 is current (lives in the source table, not
// in history).
entries, fsErr := ops.ReadDir(ctx, "/hist_list/.history/evolving.md")
require.Nil(t, fsErr, "ReadDir .history/evolving.md should succeed")

Expand All @@ -241,7 +244,7 @@ func TestSynth_HistoryMultipleVersions(t *testing.T) {
versionCount++
}
}
assert.Equal(t, 3, versionCount, "should have 3 history versions (v1, v2, v3)")
assert.Equal(t, 4, versionCount, "should have 4 history versions (create tombstone of v1, plus pre-update captures of v1, v2, v3)")
}

// TestSynth_HistoryReadOnly tests that writes to .history/ are rejected.
Expand Down
Loading