diff --git a/backend/internal/db/db.go b/backend/internal/db/db.go index 260080393..a43b282aa 100644 --- a/backend/internal/db/db.go +++ b/backend/internal/db/db.go @@ -101,7 +101,9 @@ func openPostgresDatabase(dsn string) (*gorm.DB, error) { database, err := gorm.Open(postgres.New(postgres.Config{ DSN: dsn, PreferSimpleProtocol: true, - }), &gorm.Config{}) + }), &gorm.Config{ + TranslateError: true, + }) if err != nil { return nil, err } @@ -331,6 +333,10 @@ func syncSchema(database *gorm.DB) error { } func syncSchemaUnlocked(database *gorm.DB) error { + if err := ensureMonthlyEventPartitions(database, time.Now().UTC()); err != nil { + return err + } + if err := database.AutoMigrate( &models.User{}, &models.Workspace{}, @@ -373,10 +379,14 @@ func syncSchemaUnlocked(database *gorm.DB) error { &models.CollabDocumentState{}, &models.CollabDocumentUpdateBatch{}, &models.ExtensionCallbackToken{}, + &models.ExtensionExecutionEventClaim{}, &models.ExtensionExecutionEvent{}, ); err != nil { return err } + if err := backfillExtensionExecutionEventClaims(database); err != nil { + return err + } // Redis owns normal active-session locking; this index is the atomic fallback when Redis is disabled. if err := database.Exec(` diff --git a/backend/internal/db/db_test.go b/backend/internal/db/db_test.go index 1e3163609..e6f317837 100644 --- a/backend/internal/db/db_test.go +++ b/backend/internal/db/db_test.go @@ -2,6 +2,7 @@ package db import ( "context" + "database/sql" "testing" "time" @@ -132,6 +133,90 @@ func TestSyncSchemaAddsArchiveScanIndexes(t *testing.T) { require.True(t, database.Migrator().HasIndex(&models.RemoteBrowserSession{}, "idx_remote_browser_sessions_archive_status_created_id")) } +func TestMonthlyPartitionedEventModelsUsePartitionCompatiblePrimaryKeys(t *testing.T) { + database, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, syncSchema(database)) + + for _, tableName := range []string{ + "publish_events", + "extension_execution_events", + "project_activities", + "workspace_activities", + } { + primaryKeyColumns := sqlitePrimaryKeyColumns(t, database, tableName) + + require.ElementsMatch(t, []string{"id", "created_at"}, primaryKeyColumns, tableName) + } + require.True(t, database.Migrator().HasTable(&models.ExtensionExecutionEventClaim{})) +} + +func TestCreateMonthlyPartitionSQLUsesMonthRange(t *testing.T) { + start := time.Date(2026, 6, 1, 0, 0, 0, 0, time.UTC) + + sql := createMonthlyPartitionSQL("publish_events", start, start.AddDate(0, 1, 0)) + + require.Contains(t, sql, `"publish_events_2026_06"`) + require.Contains(t, sql, `PARTITION OF "publish_events"`) + require.Contains(t, sql, `TIMESTAMPTZ '2026-06-01 00:00:00+00'`) + require.Contains(t, sql, `TIMESTAMPTZ '2026-07-01 00:00:00+00'`) +} + +func TestCreateDefaultMonthlyPartitionSQLUsesDefaultPartition(t *testing.T) { + sql := createDefaultMonthlyPartitionSQL("publish_events") + + require.Contains(t, sql, `"publish_events_default"`) + require.Contains(t, sql, `PARTITION OF "publish_events"`) + require.Contains(t, sql, " DEFAULT") +} + +func TestBackfillExtensionExecutionEventClaims(t *testing.T) { + database, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{ + TranslateError: true, + }) + require.NoError(t, err) + require.NoError(t, database.AutoMigrate( + &models.ExtensionExecutionEvent{}, + &models.ExtensionExecutionEventClaim{}, + )) + + olderID := uuid.New() + newerID := uuid.New() + older := time.Date(2026, 6, 1, 10, 0, 0, 0, time.UTC) + newer := older.Add(time.Hour) + require.NoError(t, database.Create(&models.ExtensionExecutionEvent{ + ID: newerID, + CallbackTokenID: uuid.New(), + ExecutionID: "execution-newer", + ProjectID: uuid.New(), + UserID: uuid.New(), + EventID: "event-1", + Platform: "x", + Status: "failed", + Metadata: []byte(`{}`), + CreatedAt: newer, + }).Error) + require.NoError(t, database.Create(&models.ExtensionExecutionEvent{ + ID: olderID, + CallbackTokenID: uuid.New(), + ExecutionID: "execution-older", + ProjectID: uuid.New(), + UserID: uuid.New(), + EventID: "event-1", + Platform: "x", + Status: "failed", + Metadata: []byte(`{}`), + CreatedAt: older, + }).Error) + + require.NoError(t, backfillExtensionExecutionEventClaims(database)) + require.NoError(t, backfillExtensionExecutionEventClaims(database)) + + var claim models.ExtensionExecutionEventClaim + require.NoError(t, database.First(&claim, "event_id = ?", "event-1").Error) + require.Equal(t, olderID, claim.RecordID) +} + func TestConnectionPoolConfigFromEnvUsesDefaults(t *testing.T) { clearConnectionPoolEnv(t) @@ -448,3 +533,29 @@ func setDatabaseConnectionEnv(t *testing.T) { t.Setenv("DB_NAME", "poster_db") t.Setenv("DB_PORT", "5432") } + +func sqlitePrimaryKeyColumns(t *testing.T, database *gorm.DB, tableName string) []string { + t.Helper() + + rows, err := database.Raw("PRAGMA table_info(" + tableName + ")").Rows() + require.NoError(t, err) + defer func() { + require.NoError(t, rows.Close()) + }() + + columns := []string{} + for rows.Next() { + var cid int + var name string + var dataType string + var notNull int + var defaultValue sql.NullString + var primaryKeyPosition int + require.NoError(t, rows.Scan(&cid, &name, &dataType, ¬Null, &defaultValue, &primaryKeyPosition)) + if primaryKeyPosition > 0 { + columns = append(columns, name) + } + } + require.NoError(t, rows.Err()) + return columns +} diff --git a/backend/internal/db/extension_event_claims.go b/backend/internal/db/extension_event_claims.go new file mode 100644 index 000000000..88cdc3c76 --- /dev/null +++ b/backend/internal/db/extension_event_claims.go @@ -0,0 +1,42 @@ +package db + +import ( + "fmt" + + "gorm.io/gorm" +) + +func backfillExtensionExecutionEventClaims(database *gorm.DB) error { + if !database.Migrator().HasTable("extension_execution_events") || + !database.Migrator().HasTable("extension_execution_event_claims") { + return nil + } + + switch database.Name() { + case "postgres": + return database.Exec(` + INSERT INTO extension_execution_event_claims (event_id, record_id, created_at) + SELECT DISTINCT ON (event_id) event_id, id, created_at + FROM extension_execution_events + ORDER BY event_id, created_at ASC, id ASC + ON CONFLICT (event_id) DO NOTHING + `).Error + case "sqlite": + return database.Exec(` + INSERT OR IGNORE INTO extension_execution_event_claims (event_id, record_id, created_at) + SELECT e.event_id, e.id, e.created_at + FROM extension_execution_events e + WHERE NOT EXISTS ( + SELECT 1 + FROM extension_execution_events older + WHERE older.event_id = e.event_id + AND ( + older.created_at < e.created_at + OR (older.created_at = e.created_at AND older.id < e.id) + ) + ) + `).Error + default: + return fmt.Errorf("unsupported database dialect %q for extension event claim backfill", database.Name()) + } +} diff --git a/backend/internal/db/monthly_partitions.go b/backend/internal/db/monthly_partitions.go new file mode 100644 index 000000000..28a3b7cec --- /dev/null +++ b/backend/internal/db/monthly_partitions.go @@ -0,0 +1,411 @@ +package db + +import ( + "database/sql" + "fmt" + "strings" + "time" + + "gorm.io/gorm" +) + +const ( + monthlyPartitionPastMonths = 12 + monthlyPartitionFutureMonths = 3 +) + +type monthlyPartitionedTable struct { + name string + createSQL string + columns []string +} + +var monthlyEventPartitionedTables = []monthlyPartitionedTable{ + { + name: "publish_events", + createSQL: ` + CREATE TABLE IF NOT EXISTS publish_events ( + id uuid NOT NULL, + publication_id uuid NOT NULL, + project_id uuid NOT NULL, + user_id uuid NOT NULL, + platform text NOT NULL, + job_id uuid NOT NULL, + idempotency_key text NOT NULL, + event_type text NOT NULL, + status text NOT NULL, + message text, + remote_id text, + publish_url text, + error_message text, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL, + CONSTRAINT pk_publish_events_partitioned PRIMARY KEY (id, created_at) + ) PARTITION BY RANGE (created_at) + `, + columns: []string{ + "id", + "publication_id", + "project_id", + "user_id", + "platform", + "job_id", + "idempotency_key", + "event_type", + "status", + "message", + "remote_id", + "publish_url", + "error_message", + "metadata", + "created_at", + }, + }, + { + name: "extension_execution_events", + createSQL: ` + CREATE TABLE IF NOT EXISTS extension_execution_events ( + id uuid NOT NULL, + callback_token_id uuid NOT NULL, + execution_id text NOT NULL, + project_id uuid NOT NULL, + user_id uuid NOT NULL, + event_id text NOT NULL, + platform text NOT NULL, + status text NOT NULL, + message text, + remote_id text, + publish_url text, + error_message text, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL, + CONSTRAINT pk_extension_execution_events_partitioned PRIMARY KEY (id, created_at) + ) PARTITION BY RANGE (created_at) + `, + columns: []string{ + "id", + "callback_token_id", + "execution_id", + "project_id", + "user_id", + "event_id", + "platform", + "status", + "message", + "remote_id", + "publish_url", + "error_message", + "metadata", + "created_at", + }, + }, + { + name: "project_activities", + createSQL: ` + CREATE TABLE IF NOT EXISTS project_activities ( + id uuid NOT NULL, + project_id uuid NOT NULL, + actor_user_id uuid NOT NULL, + target_user_id uuid, + event_type text NOT NULL, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL, + CONSTRAINT pk_project_activities_partitioned PRIMARY KEY (id, created_at) + ) PARTITION BY RANGE (created_at) + `, + columns: []string{ + "id", + "project_id", + "actor_user_id", + "target_user_id", + "event_type", + "metadata", + "created_at", + }, + }, + { + name: "workspace_activities", + createSQL: ` + CREATE TABLE IF NOT EXISTS workspace_activities ( + id uuid NOT NULL, + workspace_id uuid NOT NULL, + actor_user_id uuid NOT NULL, + target_user_id uuid, + event_type text NOT NULL, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL, + CONSTRAINT pk_workspace_activities_partitioned PRIMARY KEY (id, created_at) + ) PARTITION BY RANGE (created_at) + `, + columns: []string{ + "id", + "workspace_id", + "actor_user_id", + "target_user_id", + "event_type", + "metadata", + "created_at", + }, + }, +} + +func ensureMonthlyEventPartitions(database *gorm.DB, now time.Time) error { + if database.Name() != "postgres" { + return nil + } + for _, table := range monthlyEventPartitionedTables { + if err := ensureMonthlyPartitionedTable(database, table, now); err != nil { + return err + } + } + return nil +} + +func ensureMonthlyPartitionedTable(database *gorm.DB, table monthlyPartitionedTable, now time.Time) error { + partitioned, exists, err := postgresPartitionedTableState(database, table.name) + if err != nil { + return err + } + if exists && !partitioned { + if err := migrateRegularTableToMonthlyPartitions(database, table, now); err != nil { + return err + } + } else if !exists { + if err := database.Exec(table.createSQL).Error; err != nil { + return err + } + } + if err := ensureDefaultMonthlyPartition(database, table.name); err != nil { + return err + } + return ensureRollingMonthlyPartitions(database, table, now) +} + +func postgresPartitionedTableState(database *gorm.DB, tableName string) (partitioned bool, exists bool, err error) { + err = database.Raw(` + SELECT + to_regclass(?) IS NOT NULL AS exists, + EXISTS ( + SELECT 1 + FROM pg_partitioned_table + WHERE partrelid = to_regclass(?) + ) AS partitioned + `, tableName, tableName).Row().Scan(&exists, &partitioned) + return partitioned, exists, err +} + +func migrateRegularTableToMonthlyPartitions(database *gorm.DB, table monthlyPartitionedTable, now time.Time) error { + legacyName := fmt.Sprintf("%s_legacy_%s", table.name, now.UTC().Format("20060102150405")) + if err := database.Exec(fmt.Sprintf( + "ALTER TABLE %s RENAME TO %s", + quotePostgresIdentifier(table.name), + quotePostgresIdentifier(legacyName), + )).Error; err != nil { + return err + } + + if err := database.Exec(table.createSQL).Error; err != nil { + return err + } + if err := ensureDefaultMonthlyPartition(database, table.name); err != nil { + return err + } + if err := ensureRollingMonthlyPartitions(database, table, now); err != nil { + return err + } + if err := ensureLegacyDataMonthlyPartitions(database, table, legacyName); err != nil { + return err + } + if err := copyLegacyRowsIntoPartitionedTable(database, table, legacyName, now); err != nil { + return err + } + return database.Exec(fmt.Sprintf("DROP TABLE %s", quotePostgresIdentifier(legacyName))).Error +} + +func ensureRollingMonthlyPartitions(database *gorm.DB, table monthlyPartitionedTable, now time.Time) error { + start := monthStartUTC(now).AddDate(0, -monthlyPartitionPastMonths, 0) + for offset := 0; offset <= monthlyPartitionPastMonths+monthlyPartitionFutureMonths; offset++ { + if err := ensureMonthlyPartition(database, table, start.AddDate(0, offset, 0)); err != nil { + return err + } + } + return nil +} + +func ensureLegacyDataMonthlyPartitions(database *gorm.DB, table monthlyPartitionedTable, legacyName string) error { + var minCreatedAt, maxCreatedAt sql.NullTime + if err := database.Raw(fmt.Sprintf( + "SELECT MIN(created_at), MAX(created_at) FROM %s", + quotePostgresIdentifier(legacyName), + )).Row().Scan(&minCreatedAt, &maxCreatedAt); err != nil { + return err + } + if !minCreatedAt.Valid || !maxCreatedAt.Valid { + return nil + } + + start := monthStartUTC(minCreatedAt.Time) + end := monthStartUTC(maxCreatedAt.Time) + for partitionStart := start; !partitionStart.After(end); partitionStart = partitionStart.AddDate(0, 1, 0) { + if err := ensureMonthlyPartition(database, table, partitionStart); err != nil { + return err + } + } + return nil +} + +func ensureMonthlyPartition(database *gorm.DB, table monthlyPartitionedTable, partitionStart time.Time) error { + partitionStart = monthStartUTC(partitionStart) + partitionEnd := partitionStart.AddDate(0, 1, 0) + hasRows, err := defaultPartitionHasRowsInRange(database, table.name, partitionStart, partitionEnd) + if err != nil { + return err + } + if hasRows { + return drainDefaultRowsIntoMonthlyPartition(database, table, partitionStart, partitionEnd) + } + return database.Exec(createMonthlyPartitionSQL(table.name, partitionStart, partitionEnd)).Error +} + +func ensureDefaultMonthlyPartition(database *gorm.DB, tableName string) error { + return database.Exec(createDefaultMonthlyPartitionSQL(tableName)).Error +} + +func defaultPartitionHasRowsInRange(database *gorm.DB, tableName string, partitionStart time.Time, partitionEnd time.Time) (bool, error) { + defaultPartitionName := defaultMonthlyPartitionName(tableName) + var exists bool + if err := database.Raw("SELECT to_regclass(?) IS NOT NULL", defaultPartitionName).Row().Scan(&exists); err != nil { + return false, err + } + if !exists { + return false, nil + } + + var hasRows bool + err := database.Raw(fmt.Sprintf( + "SELECT EXISTS (SELECT 1 FROM %s WHERE created_at >= ? AND created_at < ?)", + quotePostgresIdentifier(defaultPartitionName), + ), partitionStart, partitionEnd).Row().Scan(&hasRows) + return hasRows, err +} + +func drainDefaultRowsIntoMonthlyPartition(database *gorm.DB, table monthlyPartitionedTable, partitionStart time.Time, partitionEnd time.Time) error { + tempTableName := fmt.Sprintf("tmp_%s_%s_%d", table.name, partitionStart.UTC().Format("200601"), time.Now().UTC().UnixNano()) + columnList := quotedColumnList(table.columns) + defaultPartitionName := defaultMonthlyPartitionName(table.name) + + return database.Transaction(func(tx *gorm.DB) error { + if err := tx.Exec(fmt.Sprintf( + "LOCK TABLE %s IN ACCESS EXCLUSIVE MODE", + quotePostgresIdentifier(table.name), + )).Error; err != nil { + return err + } + if err := tx.Exec(fmt.Sprintf( + "CREATE TEMP TABLE %s ON COMMIT DROP AS SELECT %s FROM %s WHERE false", + quotePostgresIdentifier(tempTableName), + columnList, + quotePostgresIdentifier(defaultPartitionName), + )).Error; err != nil { + return err + } + if err := tx.Exec(fmt.Sprintf( + "INSERT INTO %s (%s) SELECT %s FROM %s WHERE created_at >= ? AND created_at < ?", + quotePostgresIdentifier(tempTableName), + columnList, + columnList, + quotePostgresIdentifier(defaultPartitionName), + ), partitionStart, partitionEnd).Error; err != nil { + return err + } + if err := tx.Exec(fmt.Sprintf( + "DELETE FROM %s WHERE created_at >= ? AND created_at < ?", + quotePostgresIdentifier(defaultPartitionName), + ), partitionStart, partitionEnd).Error; err != nil { + return err + } + if err := tx.Exec(createMonthlyPartitionSQL(table.name, partitionStart, partitionEnd)).Error; err != nil { + return err + } + return tx.Exec(fmt.Sprintf( + "INSERT INTO %s (%s) SELECT %s FROM %s", + quotePostgresIdentifier(table.name), + columnList, + columnList, + quotePostgresIdentifier(tempTableName), + )).Error + }) +} + +func copyLegacyRowsIntoPartitionedTable(database *gorm.DB, table monthlyPartitionedTable, legacyName string, now time.Time) error { + columnList := quotedColumnList(table.columns) + selectList := legacySelectColumnList(table.columns, now) + return database.Exec(fmt.Sprintf( + "INSERT INTO %s (%s) SELECT %s FROM %s", + quotePostgresIdentifier(table.name), + columnList, + selectList, + quotePostgresIdentifier(legacyName), + )).Error +} + +func createMonthlyPartitionSQL(tableName string, partitionStart time.Time, partitionEnd time.Time) string { + partitionName := fmt.Sprintf("%s_%s", tableName, partitionStart.UTC().Format("2006_01")) + return fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES FROM (%s) TO (%s)", + quotePostgresIdentifier(partitionName), + quotePostgresIdentifier(tableName), + quotePostgresTimestampLiteral(partitionStart), + quotePostgresTimestampLiteral(partitionEnd), + ) +} + +func createDefaultMonthlyPartitionSQL(tableName string) string { + return fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s PARTITION OF %s DEFAULT", + quotePostgresIdentifier(defaultMonthlyPartitionName(tableName)), + quotePostgresIdentifier(tableName), + ) +} + +func defaultMonthlyPartitionName(tableName string) string { + return tableName + "_default" +} + +func quotedColumnList(columns []string) string { + quoted := make([]string, 0, len(columns)) + for _, column := range columns { + quoted = append(quoted, quotePostgresIdentifier(column)) + } + return strings.Join(quoted, ", ") +} + +func legacySelectColumnList(columns []string, now time.Time) string { + selected := make([]string, 0, len(columns)) + for _, column := range columns { + if column == "created_at" { + selected = append(selected, fmt.Sprintf( + "COALESCE(%s, %s) AS %s", + quotePostgresIdentifier(column), + quotePostgresTimestampLiteral(now), + quotePostgresIdentifier(column), + )) + continue + } + selected = append(selected, quotePostgresIdentifier(column)) + } + return strings.Join(selected, ", ") +} + +func monthStartUTC(value time.Time) time.Time { + value = value.UTC() + return time.Date(value.Year(), value.Month(), 1, 0, 0, 0, 0, time.UTC) +} + +func quotePostgresIdentifier(identifier string) string { + return `"` + strings.ReplaceAll(identifier, `"`, `""`) + `"` +} + +func quotePostgresTimestampLiteral(value time.Time) string { + return "TIMESTAMPTZ '" + value.UTC().Format("2006-01-02 15:04:05-07") + "'" +} diff --git a/backend/internal/handlers/dashboard_test.go b/backend/internal/handlers/dashboard_test.go index 674462ca8..b06cf9925 100644 --- a/backend/internal/handlers/dashboard_test.go +++ b/backend/internal/handlers/dashboard_test.go @@ -297,12 +297,12 @@ func setupHandlerTestDB(t *testing.T) *gorm.DB { )`).Error) require.NoError(t, db.Exec(`CREATE TABLE extension_execution_events ( - id TEXT PRIMARY KEY, + id TEXT NOT NULL, callback_token_id TEXT NOT NULL, execution_id TEXT NOT NULL, project_id TEXT NOT NULL, user_id TEXT NOT NULL, - event_id TEXT NOT NULL UNIQUE, + event_id TEXT NOT NULL, platform TEXT NOT NULL, status TEXT NOT NULL, message TEXT, @@ -310,7 +310,13 @@ func setupHandlerTestDB(t *testing.T) *gorm.DB { publish_url TEXT, error_message TEXT, metadata TEXT NOT NULL DEFAULT '{}', - created_at DATETIME + created_at DATETIME NOT NULL, + PRIMARY KEY (id, created_at) + )`).Error) + require.NoError(t, db.Exec(`CREATE TABLE extension_execution_event_claims ( + event_id TEXT PRIMARY KEY, + record_id TEXT NOT NULL UNIQUE, + created_at DATETIME NOT NULL )`).Error) return db diff --git a/backend/internal/models/models.go b/backend/internal/models/models.go index 65c41d9f4..301b51b8d 100644 --- a/backend/internal/models/models.go +++ b/backend/internal/models/models.go @@ -289,7 +289,7 @@ type ProjectActivity struct { TargetUserID *uuid.UUID `gorm:"type:uuid;index"` EventType string `gorm:"not null;index"` Metadata datatypes.JSON `gorm:"type:jsonb;not null;default:'{}'"` - CreatedAt time.Time `gorm:"not null;index:idx_project_activities_project_created_at,priority:2;index:idx_project_activities_archive_created_id,priority:1"` + CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_project_activities_project_created_at,priority:2;index:idx_project_activities_archive_created_id,priority:1"` Project Project `gorm:"foreignKey:ProjectID;constraint:OnDelete:CASCADE"` Actor User `gorm:"foreignKey:ActorUserID;constraint:OnDelete:CASCADE"` TargetUser *User `gorm:"foreignKey:TargetUserID;constraint:OnDelete:SET NULL"` @@ -430,7 +430,7 @@ type WorkspaceActivity struct { TargetUserID *uuid.UUID `gorm:"type:uuid;index"` EventType string `gorm:"not null;index"` Metadata datatypes.JSON `gorm:"type:jsonb;not null;default:'{}'"` - CreatedAt time.Time `gorm:"not null;index:idx_workspace_activities_workspace_created_at,priority:2;index:idx_workspace_activities_archive_created_id,priority:1"` + CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_workspace_activities_workspace_created_at,priority:2;index:idx_workspace_activities_archive_created_id,priority:1"` Workspace Workspace `gorm:"foreignKey:WorkspaceID;constraint:OnDelete:CASCADE"` Actor User `gorm:"foreignKey:ActorUserID;constraint:OnDelete:CASCADE"` TargetUser *User `gorm:"foreignKey:TargetUserID;constraint:OnDelete:SET NULL"` @@ -523,7 +523,7 @@ type PublishEvent struct { PublishURL string ErrorMessage string Metadata datatypes.JSON `gorm:"type:jsonb;not null;default:'{}'"` - CreatedAt time.Time `gorm:"index:idx_publish_events_archive_created_id,priority:1"` + CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_publish_events_archive_created_id,priority:1"` } type ScheduledPublication struct { @@ -669,7 +669,7 @@ type ExtensionExecutionEvent struct { ExecutionID string `gorm:"not null;index"` ProjectID uuid.UUID `gorm:"type:uuid;not null;index"` UserID uuid.UUID `gorm:"type:uuid;not null;index"` - EventID string `gorm:"not null;uniqueIndex"` + EventID string `gorm:"not null;index"` Platform string `gorm:"not null;index"` Status string `gorm:"not null;index"` Message string @@ -677,7 +677,13 @@ type ExtensionExecutionEvent struct { PublishURL string ErrorMessage string Metadata datatypes.JSON `gorm:"type:jsonb;not null;default:'{}'"` - CreatedAt time.Time `gorm:"index:idx_extension_execution_events_archive_created_id,priority:1"` + CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_extension_execution_events_archive_created_id,priority:1"` +} + +type ExtensionExecutionEventClaim struct { + EventID string `gorm:"primaryKey;not null"` + RecordID uuid.UUID `gorm:"type:uuid;not null;uniqueIndex"` + CreatedAt time.Time `gorm:"not null;index"` } // BeforeCreate hook to generate UUID if not set @@ -767,6 +773,9 @@ func (e *PublishEvent) BeforeCreate(_ *gorm.DB) (err error) { if e.ID == uuid.Nil { e.ID = uuid.New() } + if e.CreatedAt.IsZero() { + e.CreatedAt = time.Now().UTC() + } return } @@ -820,6 +829,9 @@ func (a *WorkspaceActivity) BeforeCreate(_ *gorm.DB) (err error) { if a.ID == uuid.Nil { a.ID = uuid.New() } + if a.CreatedAt.IsZero() { + a.CreatedAt = time.Now().UTC() + } return } @@ -847,6 +859,9 @@ func (a *ProjectActivity) BeforeCreate(_ *gorm.DB) (err error) { if a.ID == uuid.Nil { a.ID = uuid.New() } + if a.CreatedAt.IsZero() { + a.CreatedAt = time.Now().UTC() + } return } @@ -933,5 +948,15 @@ func (e *ExtensionExecutionEvent) BeforeCreate(_ *gorm.DB) (err error) { if e.ID == uuid.Nil { e.ID = uuid.New() } + if e.CreatedAt.IsZero() { + e.CreatedAt = time.Now().UTC() + } + return +} + +func (c *ExtensionExecutionEventClaim) BeforeCreate(_ *gorm.DB) (err error) { + if c.CreatedAt.IsZero() { + c.CreatedAt = time.Now().UTC() + } return } diff --git a/backend/internal/services/archive/worker.go b/backend/internal/services/archive/worker.go index 912e58066..ee7615d4e 100644 --- a/backend/internal/services/archive/worker.go +++ b/backend/internal/services/archive/worker.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/google/uuid" "gorm.io/gorm" "github.com/kurodakayn/mpp-backend/internal/models" @@ -44,6 +45,8 @@ type archiveLine[T any] struct { Row T `json:"row"` } +type archiveBeforeDeleteHook[T any] func(*gorm.DB, []T) error + // NewWorker creates a cold-row archival worker. func NewWorker(db *gorm.DB, storage objectstorage.Client, config Config) *Worker { return &Worker{db: db, storage: storage, config: config} @@ -131,9 +134,9 @@ func (w *Worker) archivePublishEvents(ctx context.Context, db *gorm.DB, now time } func (w *Worker) archiveExtensionExecutionEvents(ctx context.Context, db *gorm.DB, now time.Time) (TableResult, error) { - return archiveModel[models.ExtensionExecutionEvent](ctx, db, w.storage, w.config, "extension_execution_events", w.config.ExtensionExecutionEventRetention, now, func(query *gorm.DB, cutoff time.Time) *gorm.DB { + return archiveModelWithDeleteHook(ctx, db, w.storage, w.config, "extension_execution_events", w.config.ExtensionExecutionEventRetention, now, func(query *gorm.DB, cutoff time.Time) *gorm.DB { return query.Where("created_at < ?", cutoff) - }) + }, deleteExtensionExecutionEventClaims) } func (w *Worker) archiveProjectActivities(ctx context.Context, db *gorm.DB, now time.Time) (TableResult, error) { @@ -168,12 +171,26 @@ func archiveModel[T any]( retention time.Duration, now time.Time, scope func(*gorm.DB, time.Time) *gorm.DB, +) (TableResult, error) { + return archiveModelWithDeleteHook[T](ctx, db, storage, config, table, retention, now, scope, nil) +} + +func archiveModelWithDeleteHook[T any]( + ctx context.Context, + db *gorm.DB, + storage objectstorage.Client, + config Config, + table string, + retention time.Duration, + now time.Time, + scope func(*gorm.DB, time.Time) *gorm.DB, + beforeDelete archiveBeforeDeleteHook[T], ) (TableResult, error) { cutoff := now.Add(-retention) result := TableResult{Table: table, Cutoff: cutoff} for batchNumber := 0; ; batchNumber++ { - batchResult, err := archiveModelBatch[T](ctx, db, storage, config, table, cutoff, now, batchNumber, scope) + batchResult, err := archiveModelBatch[T](ctx, db, storage, config, table, cutoff, now, batchNumber, scope, beforeDelete) if err != nil { return result, err } @@ -199,6 +216,7 @@ func archiveModelBatch[T any]( now time.Time, batchNumber int, scope func(*gorm.DB, time.Time) *gorm.DB, + beforeDelete archiveBeforeDeleteHook[T], ) (TableResult, error) { result := TableResult{Table: table, Cutoff: cutoff} var records []T @@ -225,12 +243,22 @@ func archiveModelBatch[T any]( }); err != nil { return result, fmt.Errorf("upload %s archive object: %w", table, err) } - deleteResult := db.WithContext(ctx).Delete(&records) - if deleteResult.Error != nil { - return result, fmt.Errorf("delete archived %s rows: %w", table, deleteResult.Error) - } - if deleteResult.RowsAffected != int64(len(records)) { - return result, fmt.Errorf("delete archived %s rows: expected %d, deleted %d", table, len(records), deleteResult.RowsAffected) + if err := db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + if beforeDelete != nil { + if err := beforeDelete(tx, records); err != nil { + return err + } + } + deleteResult := tx.Delete(&records) + if deleteResult.Error != nil { + return fmt.Errorf("delete archived %s rows: %w", table, deleteResult.Error) + } + if deleteResult.RowsAffected != int64(len(records)) { + return fmt.Errorf("delete archived %s rows: expected %d, deleted %d", table, len(records), deleteResult.RowsAffected) + } + return nil + }); err != nil { + return result, err } result.RowsArchived = len(records) @@ -239,6 +267,17 @@ func archiveModelBatch[T any]( return result, nil } +func deleteExtensionExecutionEventClaims(tx *gorm.DB, records []models.ExtensionExecutionEvent) error { + if len(records) == 0 || !tx.Migrator().HasTable(&models.ExtensionExecutionEventClaim{}) { + return nil + } + recordIDs := make([]uuid.UUID, 0, len(records)) + for _, record := range records { + recordIDs = append(recordIDs, record.ID) + } + return tx.Where("record_id IN ?", recordIDs).Delete(&models.ExtensionExecutionEventClaim{}).Error +} + func tryArchiveWorkerLock(ctx context.Context, db *gorm.DB) (bool, error) { var locked bool if err := db.WithContext(ctx).Raw("SELECT pg_try_advisory_lock(?)", archiveWorkerAdvisoryLockKey).Scan(&locked).Error; err != nil { diff --git a/backend/internal/services/archive/worker_test.go b/backend/internal/services/archive/worker_test.go index 432b3b010..5954a92a6 100644 --- a/backend/internal/services/archive/worker_test.go +++ b/backend/internal/services/archive/worker_test.go @@ -129,6 +129,104 @@ func TestWorkerRunOnceArchivesColdEventsAndDeletesRows(t *testing.T) { } } +func TestWorkerRunOnceDeletesExtensionEventClaimsForArchivedRows(t *testing.T) { + db := setupArchiveTestDB(t) + storage := fake.NewClient() + now := time.Date(2026, 6, 11, 10, 0, 0, 0, time.UTC) + coldCreatedAt := now.Add(-181 * 24 * time.Hour) + hotCreatedAt := now.Add(-10 * 24 * time.Hour) + + coldEvent := models.ExtensionExecutionEvent{ + CallbackTokenID: uuid.New(), + ExecutionID: "execution-cold", + ProjectID: uuid.New(), + UserID: uuid.New(), + EventID: "event-cold", + Platform: "wechat", + Status: "failed", + Metadata: datatypes.JSON(`{}`), + CreatedAt: coldCreatedAt, + } + hotEvent := models.ExtensionExecutionEvent{ + CallbackTokenID: uuid.New(), + ExecutionID: "execution-hot", + ProjectID: uuid.New(), + UserID: uuid.New(), + EventID: "event-hot", + Platform: "wechat", + Status: "failed", + Metadata: datatypes.JSON(`{}`), + CreatedAt: hotCreatedAt, + } + if err := db.Create(&coldEvent).Error; err != nil { + t.Fatalf("create cold extension event: %v", err) + } + if err := db.Create(&hotEvent).Error; err != nil { + t.Fatalf("create hot extension event: %v", err) + } + if err := db.Create(&models.ExtensionExecutionEventClaim{ + EventID: coldEvent.EventID, + RecordID: coldEvent.ID, + }).Error; err != nil { + t.Fatalf("create cold extension event claim: %v", err) + } + if err := db.Create(&models.ExtensionExecutionEventClaim{ + EventID: hotEvent.EventID, + RecordID: hotEvent.ID, + }).Error; err != nil { + t.Fatalf("create hot extension event claim: %v", err) + } + + worker := NewWorker(db, storage, Config{ + Enabled: true, + Interval: time.Hour, + BatchSize: 10, + ObjectKeyPrefix: "test-archive", + PublishEventRetention: 180 * 24 * time.Hour, + ExtensionExecutionEventRetention: 180 * 24 * time.Hour, + ProjectActivityRetention: 365 * 24 * time.Hour, + WorkspaceActivityRetention: 365 * 24 * time.Hour, + BrowserSessionHistoryRetention: 90 * 24 * time.Hour, + }) + result, err := worker.RunOnce(context.Background(), now) + if err != nil { + t.Fatalf("run archive worker: %v", err) + } + + extensionResult := tableResult(result, "extension_execution_events") + if extensionResult.RowsArchived != 1 { + t.Fatalf("expected one archived extension event, got %d", extensionResult.RowsArchived) + } + + var remainingEvents []models.ExtensionExecutionEvent + if err := db.Find(&remainingEvents).Error; err != nil { + t.Fatalf("query remaining extension events: %v", err) + } + if len(remainingEvents) != 1 || remainingEvents[0].ID != hotEvent.ID { + t.Fatalf("expected only hot extension event to remain, got %#v", remainingEvents) + } + + if countArchiveRows(t, db, &models.ExtensionExecutionEventClaim{}, "record_id = ?", coldEvent.ID) != 0 { + t.Fatalf("expected archived extension event claim to be deleted") + } + if countArchiveRows(t, db, &models.ExtensionExecutionEventClaim{}, "record_id = ?", hotEvent.ID) != 1 { + t.Fatalf("expected hot extension event claim to remain") + } + + body := readObject(t, storage, extensionResult.ObjectKey) + lines := jsonLines(t, body) + if len(lines) != 1 { + t.Fatalf("expected one archived extension event line, got %d", len(lines)) + } + if lines[0]["table"] != "extension_execution_events" { + t.Fatalf("expected extension event table metadata, got %#v", lines[0]["table"]) + } + row := lines[0]["row"].(map[string]any) + if row["ID"] != coldEvent.ID.String() { + t.Fatalf("expected archived extension event ID %s, got %#v", coldEvent.ID, row["ID"]) + } +} + func TestWorkerRunOnceArchivesOnlyTerminalColdBrowserSessions(t *testing.T) { db := setupArchiveTestDB(t) storage := fake.NewClient() @@ -384,3 +482,12 @@ func jsonLines(t *testing.T, body string) []map[string]any { } return lines } + +func countArchiveRows(t *testing.T, db *gorm.DB, model any, query string, args ...any) int64 { + t.Helper() + var count int64 + if err := db.Model(model).Where(query, args...).Count(&count).Error; err != nil { + t.Fatalf("count archive rows: %v", err) + } + return count +} diff --git a/backend/internal/services/extension/dedup.go b/backend/internal/services/extension/dedup.go new file mode 100644 index 000000000..fd2055be9 --- /dev/null +++ b/backend/internal/services/extension/dedup.go @@ -0,0 +1,23 @@ +package extension + +import ( + "errors" + + "gorm.io/gorm" + + "github.com/kurodakayn/mpp-backend/internal/models" +) + +func loadClaimedExtensionEvent(tx *gorm.DB, eventID string, event *models.ExtensionExecutionEvent) error { + var claim models.ExtensionExecutionEventClaim + if err := tx.First(&claim, "event_id = ?", eventID).Error; err != nil { + return err + } + if err := tx.First(event, "id = ?", claim.RecordID).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil + } + return err + } + return nil +} diff --git a/backend/internal/services/extension/handoffs.go b/backend/internal/services/extension/handoffs.go index 8f985ef01..5bae51626 100644 --- a/backend/internal/services/extension/handoffs.go +++ b/backend/internal/services/extension/handoffs.go @@ -9,6 +9,7 @@ import ( "github.com/google/uuid" "gorm.io/datatypes" "gorm.io/gorm" + "gorm.io/gorm/clause" "github.com/kurodakayn/mpp-backend/internal/dto" "github.com/kurodakayn/mpp-backend/internal/models" @@ -212,13 +213,6 @@ func (s *Service) RecordExtensionEvent(req dto.ExtensionEventCallbackRequest) (* return nil, ErrExtensionCallbackTokenInvalid } - var existing models.ExtensionExecutionEvent - if err := s.writerDB().First(&existing, "event_id = ?", eventID).Error; err == nil { - return &dto.ExtensionEventCallbackResponse{Accepted: true, Duplicate: true}, nil - } else if !errors.Is(err, gorm.ErrRecordNotFound) { - return nil, err - } - metadata := datatypes.JSON([]byte(`{}`)) if req.Metadata != nil { payload, err := json.Marshal(req.Metadata) @@ -229,6 +223,7 @@ func (s *Service) RecordExtensionEvent(req dto.ExtensionEventCallbackRequest) (* } event := models.ExtensionExecutionEvent{ + ID: uuid.New(), CallbackTokenID: token.ID, ExecutionID: token.ExecutionID, ProjectID: token.ProjectID, @@ -242,7 +237,23 @@ func (s *Service) RecordExtensionEvent(req dto.ExtensionEventCallbackRequest) (* ErrorMessage: strings.TrimSpace(req.ErrorMessage), Metadata: metadata, } + duplicate := false if err := s.writerDB().Transaction(func(tx *gorm.DB) error { + claim := models.ExtensionExecutionEventClaim{ + EventID: eventID, + RecordID: event.ID, + } + result := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "event_id"}}, + DoNothing: true, + }).Create(&claim) + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + duplicate = true + return loadClaimedExtensionEvent(tx, eventID, &event) + } if err := tx.Create(&event).Error; err != nil { return err } @@ -251,7 +262,7 @@ func (s *Service) RecordExtensionEvent(req dto.ExtensionEventCallbackRequest) (* return nil, err } - return &dto.ExtensionEventCallbackResponse{Accepted: true, Duplicate: false}, nil + return &dto.ExtensionEventCallbackResponse{Accepted: true, Duplicate: duplicate}, nil } func applyExtensionPublicationEvent(tx *gorm.DB, token models.ExtensionCallbackToken, event models.ExtensionExecutionEvent) error { diff --git a/backend/internal/services/extension/handoffs_test.go b/backend/internal/services/extension/handoffs_test.go index 149ef817a..37afe1269 100644 --- a/backend/internal/services/extension/handoffs_test.go +++ b/backend/internal/services/extension/handoffs_test.go @@ -402,6 +402,10 @@ func TestRecordExtensionEventAcceptsKnownTokenAndDeduplicatesEventID(t *testing. assert.Equal(t, "event-1", events[0].EventID) assert.Equal(t, "user_review", events[0].Status) assert.Contains(t, string(events[0].Metadata), "DYNAMIC_DOUYIN") + + var claim models.ExtensionExecutionEventClaim + require.NoError(t, db.First(&claim, "event_id = ?", "event-1").Error) + assert.Equal(t, events[0].ID, claim.RecordID) } func TestRecordExtensionEventMarksXPublicationReadyForUserReview(t *testing.T) { @@ -539,6 +543,18 @@ func TestRecordExtensionEventDoesNotApplyDuplicatePublicationUpdate(t *testing.T require.NoError(t, err) assert.True(t, second.Duplicate) + var eventCount int64 + require.NoError(t, db.Model(&models.ExtensionExecutionEvent{}). + Where("event_id = ?", req.EventID). + Count(&eventCount).Error) + assert.Equal(t, int64(1), eventCount) + + var claimCount int64 + require.NoError(t, db.Model(&models.ExtensionExecutionEventClaim{}). + Where("event_id = ?", req.EventID). + Count(&claimCount).Error) + assert.Equal(t, int64(1), claimCount) + var publication models.ProjectPlatformPublication require.NoError(t, db.First(&publication, "project_id = ? AND platform = ?", project.ID, "x").Error) assert.Equal(t, 1, publication.RetryCount) diff --git a/backend/internal/services/project/delete.go b/backend/internal/services/project/delete.go index 2f0aee66d..f8a789c15 100644 --- a/backend/internal/services/project/delete.go +++ b/backend/internal/services/project/delete.go @@ -51,7 +51,6 @@ func (s *Service) DeleteProject(projectID uuid.UUID, userID uuid.UUID) error { cleanup := []any{ &models.PublishEvent{}, &models.ExtensionCallbackToken{}, - &models.ExtensionExecutionEvent{}, &models.MediaAssetUsage{}, &models.PlatformAccountGrant{}, &models.ProjectListSummary{}, @@ -70,6 +69,24 @@ func (s *Service) DeleteProject(projectID uuid.UUID, userID uuid.UUID) error { return err } } + if tx.Migrator().HasTable(&models.ExtensionExecutionEvent{}) { + var extensionEventIDs []uuid.UUID + if err := tx.Model(&models.ExtensionExecutionEvent{}). + Where("project_id = ?", projectID). + Pluck("id", &extensionEventIDs).Error; err != nil { + return err + } + if len(extensionEventIDs) > 0 && tx.Migrator().HasTable(&models.ExtensionExecutionEventClaim{}) { + if err := tx.Where("record_id IN ?", extensionEventIDs). + Delete(&models.ExtensionExecutionEventClaim{}).Error; err != nil { + return err + } + } + if err := tx.Where("project_id = ?", projectID). + Delete(&models.ExtensionExecutionEvent{}).Error; err != nil { + return err + } + } if tx.Migrator().HasTable(&models.MediaAsset{}) { if err := tx.Model(&models.MediaAsset{}). diff --git a/backend/internal/services/project/delete_test.go b/backend/internal/services/project/delete_test.go index a0ed7046d..ad50ee143 100644 --- a/backend/internal/services/project/delete_test.go +++ b/backend/internal/services/project/delete_test.go @@ -164,7 +164,7 @@ func TestDeleteProjectRemovesOwnerProjectAndDependents(t *testing.T) { Token: "callback-token", ExpiresAt: time.Now().Add(time.Hour), }).Error) - require.NoError(t, db.Create(&models.ExtensionExecutionEvent{ + extensionEvent := models.ExtensionExecutionEvent{ CallbackTokenID: uuid.New(), ExecutionID: "execution-1", ProjectID: project.ID, @@ -173,6 +173,11 @@ func TestDeleteProjectRemovesOwnerProjectAndDependents(t *testing.T) { Platform: "wechat", Status: "queued", Metadata: datatypes.JSON([]byte(`{}`)), + } + require.NoError(t, db.Create(&extensionEvent).Error) + require.NoError(t, db.Create(&models.ExtensionExecutionEventClaim{ + EventID: extensionEvent.EventID, + RecordID: extensionEvent.ID, }).Error) err := s.DeleteProject(project.ID, owner.ID) @@ -190,6 +195,7 @@ func TestDeleteProjectRemovesOwnerProjectAndDependents(t *testing.T) { require.Zero(t, countRows(t, db, &models.PlatformAccountGrant{}, "project_id = ?", project.ID)) require.Zero(t, countRows(t, db, &models.ExtensionCallbackToken{}, "project_id = ?", project.ID)) require.Zero(t, countRows(t, db, &models.ExtensionExecutionEvent{}, "project_id = ?", project.ID)) + require.Zero(t, countRows(t, db, &models.ExtensionExecutionEventClaim{}, "record_id = ?", extensionEvent.ID)) require.Zero(t, countRows(t, db, &models.PublishEvent{}, "project_id = ?", project.ID)) require.Zero(t, countRows(t, db, &models.ScheduledPublication{}, "project_id = ?", project.ID)) require.Zero(t, countRows(t, db, &models.PublishAttempt{}, "scheduled_publication_id = ?", schedule.ID)) diff --git a/backend/internal/services/testsupport/helpers.go b/backend/internal/services/testsupport/helpers.go index f870fbaae..df1151347 100644 --- a/backend/internal/services/testsupport/helpers.go +++ b/backend/internal/services/testsupport/helpers.go @@ -118,7 +118,9 @@ func (f *FakeXOAuth2Provider) Me(_ context.Context, _ string) (pkgx.User, error) } func SetupTestDB() *gorm.DB { - db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{ + TranslateError: true, + }) if err != nil { panic("failed to connect database") } @@ -217,13 +219,14 @@ func SetupTestDB() *gorm.DB { )`) db.Exec(`CREATE TABLE workspace_activities ( - id TEXT PRIMARY KEY, + id TEXT NOT NULL, workspace_id TEXT NOT NULL, actor_user_id TEXT NOT NULL, target_user_id TEXT, event_type TEXT NOT NULL, metadata TEXT NOT NULL DEFAULT '{}', - created_at DATETIME NOT NULL + created_at DATETIME NOT NULL, + PRIMARY KEY (id, created_at) )`) db.Exec(`CREATE TABLE workspace_dashboard_stats ( @@ -292,13 +295,14 @@ func SetupTestDB() *gorm.DB { )`) db.Exec(`CREATE TABLE project_activities ( - id TEXT PRIMARY KEY, + id TEXT NOT NULL, project_id TEXT NOT NULL, actor_user_id TEXT NOT NULL, target_user_id TEXT, event_type TEXT NOT NULL, metadata TEXT NOT NULL DEFAULT '{}', - created_at DATETIME NOT NULL + created_at DATETIME NOT NULL, + PRIMARY KEY (id, created_at) )`) db.Exec(`CREATE TABLE project_comments ( @@ -461,12 +465,12 @@ func SetupTestDB() *gorm.DB { )`) db.Exec(`CREATE TABLE extension_execution_events ( - id TEXT PRIMARY KEY, + id TEXT NOT NULL, callback_token_id TEXT NOT NULL, execution_id TEXT NOT NULL, project_id TEXT NOT NULL, user_id TEXT NOT NULL, - event_id TEXT NOT NULL UNIQUE, + event_id TEXT NOT NULL, platform TEXT NOT NULL, status TEXT NOT NULL, message TEXT, @@ -474,7 +478,15 @@ func SetupTestDB() *gorm.DB { publish_url TEXT, error_message TEXT, metadata TEXT NOT NULL DEFAULT '{}', - created_at DATETIME + created_at DATETIME NOT NULL, + PRIMARY KEY (id, created_at) + )`) + db.Exec(`CREATE INDEX idx_extension_execution_events_event_id ON extension_execution_events(event_id)`) + + db.Exec(`CREATE TABLE extension_execution_event_claims ( + event_id TEXT PRIMARY KEY, + record_id TEXT NOT NULL UNIQUE, + created_at DATETIME NOT NULL )`) return db diff --git a/doc/plan/database-optimization.md b/doc/plan/database-optimization.md index a6789dfe7..6cf97efcc 100644 --- a/doc/plan/database-optimization.md +++ b/doc/plan/database-optimization.md @@ -11,7 +11,7 @@ Status definitions: - `Not started`: no clear implementation has been found yet. - `Deferred`: not recommended for the current business stage; only trigger conditions are retained. -Current overall progress: about `58%`. This number is manually estimated by phase weight and can be adjusted later according to actual completed items. +Current overall progress: about `61%`. This number is manually estimated by phase weight and can be adjusted later according to actual completed items. | Phase | Weight | Current completion | Status | Completed | Not done / next steps | | ----- | ------ | ------------------ | ------ | --------- | --------------------- | @@ -19,7 +19,7 @@ Current overall progress: about `58%`. This number is manually estimated by phas | Phase 1: Single-database connection pool, indexing, pagination, and lifecycle governance | 15% | 100% | Done | backend/publish-worker/collab-service application connection pools, Redis client connection pool, PgBouncer writer pool, composite indexes, keyset list pagination, list queries avoiding the large `source_content` field, event/session history retention periods, and R2/S3 cold-event archive worker | None; Phase 2 is complete, and later work moves into Phase 3/4 read replicas, partitioning, and recovery flows | | Phase 2: Read models and cache first | 15% | 100% | Done | Redis and Asynq dependencies are reusable; admin dashboard stats, admin project list, and dashboard account summary have short-TTL Redis cache; stats/project list/account cache misses are merged with singleflight; project, prepublish, publish, and account write paths invalidate the related dashboard cache; `workspace_dashboard_stats` and `project_list_summaries` read models are in place, and APIs prefer read models when coverage is complete; async refresh triggers after project save, platform sync, publish completion, and member changes; admin rebuild API, Asynq queue, and worker support full read-model rebuild | None | | Phase 3: Read/write splitting | 15% | 100% | Done | Optional `DB_READER_*` connection, application-level DB Router, signed sticky writer, consistency routing for project/stats/workspace/platform_account/publish/prepublish/mediaasset/browser_session/extension, consistency-level inventories for dashboard/publish/collab-service, self-hosted PostgreSQL read replica, managed `postgres-reader` entry point, PgBouncer reader pool, replica lag monitoring and automatic fallback to writer when over threshold | None; Phase 4 continues partitioning, archiving, and recovery flows | -| Phase 4: Single-database partitioning, archiving, and hot/cold tiering | 15% | 15% | In progress | Collaborative editing already has state + update batch + compaction foundation; event and terminal-session history already have row-level R2/S3 archive worker | Time partitioning for event tables, hash partitioning for collaborative batches, cold partition export, recovery flow | +| Phase 4: Single-database partitioning, archiving, and hot/cold tiering | 15% | 35% | In progress | Collaborative editing already has state + update batch + compaction foundation; event and terminal-session history already have row-level R2/S3 archive worker; `publish_events`, `extension_execution_events`, `project_activities`, and `workspace_activities` have PostgreSQL monthly partition target schema | `remote_browser_sessions` time partitioning, hash partitioning for collaborative batches, cold partition export, recovery flow | | Phase 5: Citus preparation | 20% | 5% | Not started | Workspace model, `projects.workspace_id`, and personal workspace ID already exist | Global `workspace_id`, Citus distribution column/colocation design, unique constraint and foreign-key review | | Phase 6: Citus distributed PostgreSQL operation | 10% | 0% | Deferred | None | Future Citus cluster design, worker/coordinator monitoring and backup, large-tenant isolation strategy | @@ -64,7 +64,7 @@ Atomic commit guidance: | Dashboard read models | Done | Added `workspace_dashboard_stats` and `project_list_summaries` read models, idempotently recomputed from fact tables by a centralized readmodel service; async refresh is triggered after project save, platform sync, publish completion, and member changes; admin stats and admin project list prefer read models when coverage is complete; admin rebuild API enqueues through Asynq, and API/worker processes can start readmodel workers for full rebuild from fact tables | None | `backend/internal/models/models.go`, `backend/internal/services/readmodel/service.go`, `backend/internal/services/readmodel/queue.go`, `backend/internal/services/readmodel/service_test.go`, `backend/internal/services/readmodel/queue_test.go`, `backend/internal/services/stats/overview.go`, `backend/internal/services/project/lifecycle.go`, `backend/internal/handlers/dashboard.go`, `backend/cmd/api/main.go`, `backend/cmd/publish-worker/main.go` | | Redis read cache | Done | Redis is already used for queues, locks, OAuth, browser sessions, and short-term coordination; admin dashboard stats, admin project list, and dashboard account summary use 15s TTL cache and bypass scoped/sticky-writer strong-consistency paths; stats/project list/account cache misses use singleflight to prevent process-local stampede; stats and account caches use versioned payloads and semantic validation, and Redis read-error fallback is also merged into one DB computation per key; project create/edit/platform save, prepublish sync/draft update, publish queue/execute/fail, and platform account write paths invalidate the related dashboard cache; full read-model rebuild reuses the Redis/Asynq queue | None | `backend/internal/services/stats/overview.go`, `backend/internal/services/stats/overview_test.go`, `backend/internal/services/project/list_cache.go`, `backend/internal/services/project/list_cache_test.go`, `backend/internal/services/prepublish/drafts.go`, `backend/internal/services/publish/service.go`, `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/publication_flow_test.go`, `backend/internal/services/publish/queue_test.go`, `backend/internal/services/platform_account/account_cache.go`, `backend/internal/services/platform_account/account_cache_test.go`, `backend/internal/services/browser_session/complete.go`, `backend/internal/services/browser_session/service_test.go`, `backend/internal/services/readmodel/queue.go` | | Read/write splitting | Done | Supports optional `DB_READER_*` read-replica connection, `DefaultRouter`, and signed sticky writer; project/stats/workspace/platform_account/publish/prepublish/mediaasset/browser_session/extension are wired to strong/eventual/writer routing; dashboard, publish, and collab-service consistency-level inventories are complete, with collab-service online path kept writer-only; writer/reader pools are in self-hosted Kubernetes, and managed overlay provides a `postgres-reader` ExternalName entry point; `DB_READER_MAX_REPLICA_LAG` configures the replica lag threshold, eventual/analytics reads automatically fall back to writer when over threshold or lag is unknown, and `mpp_db_replica_lag_seconds` and `mpp_db_replica_healthy` metrics are exposed | None | `backend/internal/db/db.go`, `backend/internal/db/router.go`, `backend/internal/db/replica_lag.go`, `backend/internal/services/publish/service.go`, `backend/internal/services/prepublish/service.go`, `backend/internal/services/mediaasset/service.go`, `backend/internal/services/browser_session/service.go`, `backend/internal/services/extension/service.go`, `backend/internal/app/runtime.go`, `deploy/kubernetes/data-services/self-hosted/postgres.yaml`, `deploy/kubernetes/data-services/self-hosted/pgbouncer.yaml`, `deploy/kubernetes/data-services/managed/services.yaml`, `script/kubernetes/validation/data_services.rb` | -| Event-table partitioning and archiving | In progress | `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and terminal `remote_browser_sessions` have default retention periods; the `archive` worker can batch-export JSONL to R2/S3 and delete old hot-table rows after successful upload | Time partitioning, cold partition export, and recovery flow are not implemented | `backend/internal/services/archive/config.go`, `backend/internal/services/archive/worker.go`, `backend/internal/services/archive/worker_test.go` | +| Event-table partitioning and archiving | In progress | `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and terminal `remote_browser_sessions` have default retention periods; the `archive` worker can batch-export JSONL to R2/S3 and delete old hot-table rows after successful upload; PostgreSQL schema initialization now creates monthly `created_at` partitions for `publish_events`, `extension_execution_events`, `project_activities`, and `workspace_activities`, with partition-compatible `(id, created_at)` primary keys and rolling partition creation | `remote_browser_sessions` time partitioning, cold partition export, and recovery flow are not implemented | `backend/internal/db/monthly_partitions.go`, `backend/internal/db/db.go`, `backend/internal/models/models.go`, `backend/internal/db/db_test.go`, `backend/internal/services/archive/worker.go`, `backend/internal/services/archive/worker_test.go` | | Collaboration batch governance | In progress | `collab_document_states`, `collab_document_update_batches`, and compaction/retention foundations exist | `document_id` hash partitioning and cold archiving are not implemented | `backend/internal/models/collab.go`, `collab-service/src/persistence/document-persistence.ts` | | Outbox/CDC/event stream | In progress | The publishing queue path has a transactional Outbox: `EnqueuePublishProject` writes `outbox_events` in the same transaction and dispatches immediately after commit; publish worker starts an outbox dispatcher and supports retries for failed/stale processing records; Asynq continues to serve as the task-execution queue, and `PublishEvent` continues to serve as publishing audit | Currently covers only `publish.job_requested`; general business-event outbox, Debezium, and Redpanda/Kafka CDC are not implemented | `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/outbox.go`, `backend/internal/models/models.go` | | Citus target state | Not started | Confirmed `workspace_id` as the most suitable distribution-column direction | Citus distributed tables, reference tables, and colocation are not implemented | Phase 5/6 in this document | @@ -117,7 +117,7 @@ Atomic commit guidance: #### Phase 4: Single-database Partitioning, Archiving, and Hot/cold Tiering - [x] Keep collaborative editing state + update batch + compaction foundation. -- [ ] Partition `publish_events`, `extension_execution_events`, and activity tables by month. +- [x] Partition `publish_events`, `extension_execution_events`, and activity tables by month. Verification entry point: `backend/internal/db/monthly_partitions.go`, `backend/internal/models/models.go`, `backend/internal/db/db_test.go`. - [ ] Partition `remote_browser_sessions` by time or expiration time. - [ ] Hash partition `collab_document_update_batches` by `document_id`. - [ ] Export cold partitions to R2/S3.