From de3a39e1925d2a41d06c5bcab0b2fae60363c525 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 21 Jun 2026 20:43:47 +0800 Subject: [PATCH 1/6] feat(db): add monthly event partitions Event and activity tables need a partition-compatible target schema before Phase 4 can move beyond row-level archiving. Create PostgreSQL monthly partition parents and rolling partitions for publish, extension, project activity, and workspace activity events. Event tables now use created_at-aware primary keys, and extension callback deduplication stays serialized without relying on a global unique event id. --- backend/internal/db/db.go | 8 +- backend/internal/db/monthly_partitions.go | 316 ++++++++++++++++++ backend/internal/models/models.go | 22 +- backend/internal/services/extension/dedup.go | 10 + .../internal/services/extension/handoffs.go | 25 +- 5 files changed, 367 insertions(+), 14 deletions(-) create mode 100644 backend/internal/db/monthly_partitions.go create mode 100644 backend/internal/services/extension/dedup.go diff --git a/backend/internal/db/db.go b/backend/internal/db/db.go index 26008039..742696f0 100644 --- a/backend/internal/db/db.go +++ b/backend/internal/db/db.go @@ -101,7 +101,9 @@ func openPostgresDatabase(dsn string) (*gorm.DB, error) { database, err := gorm.Open(postgres.New(postgres.Config{ DSN: dsn, PreferSimpleProtocol: true, - }), &gorm.Config{}) + }), &gorm.Config{ + TranslateError: true, + }) if err != nil { return nil, err } @@ -331,6 +333,10 @@ func syncSchema(database *gorm.DB) error { } func syncSchemaUnlocked(database *gorm.DB) error { + if err := ensureMonthlyEventPartitions(database, time.Now().UTC()); err != nil { + return err + } + if err := database.AutoMigrate( &models.User{}, &models.Workspace{}, diff --git a/backend/internal/db/monthly_partitions.go b/backend/internal/db/monthly_partitions.go new file mode 100644 index 00000000..b1c220b2 --- /dev/null +++ b/backend/internal/db/monthly_partitions.go @@ -0,0 +1,316 @@ +package db + +import ( + "database/sql" + "fmt" + "strings" + "time" + + "gorm.io/gorm" +) + +const ( + monthlyPartitionPastMonths = 12 + monthlyPartitionFutureMonths = 3 +) + +type monthlyPartitionedTable struct { + name string + createSQL string + columns []string +} + +var monthlyEventPartitionedTables = []monthlyPartitionedTable{ + { + name: "publish_events", + createSQL: ` + CREATE TABLE IF NOT EXISTS publish_events ( + id uuid NOT NULL, + publication_id uuid NOT NULL, + project_id uuid NOT NULL, + user_id uuid NOT NULL, + platform text NOT NULL, + job_id uuid NOT NULL, + idempotency_key text NOT NULL, + event_type text NOT NULL, + status text NOT NULL, + message text, + remote_id text, + publish_url text, + error_message text, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL, + CONSTRAINT pk_publish_events_partitioned PRIMARY KEY (id, created_at) + ) PARTITION BY RANGE (created_at) + `, + columns: []string{ + "id", + "publication_id", + "project_id", + "user_id", + "platform", + "job_id", + "idempotency_key", + "event_type", + "status", + "message", + "remote_id", + "publish_url", + "error_message", + "metadata", + "created_at", + }, + }, + { + name: "extension_execution_events", + createSQL: ` + CREATE TABLE IF NOT EXISTS extension_execution_events ( + id uuid NOT NULL, + callback_token_id uuid NOT NULL, + execution_id text NOT NULL, + project_id uuid NOT NULL, + user_id uuid NOT NULL, + event_id text NOT NULL, + platform text NOT NULL, + status text NOT NULL, + message text, + remote_id text, + publish_url text, + error_message text, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL, + CONSTRAINT pk_extension_execution_events_partitioned PRIMARY KEY (id, created_at) + ) PARTITION BY RANGE (created_at) + `, + columns: []string{ + "id", + "callback_token_id", + "execution_id", + "project_id", + "user_id", + "event_id", + "platform", + "status", + "message", + "remote_id", + "publish_url", + "error_message", + "metadata", + "created_at", + }, + }, + { + name: "project_activities", + createSQL: ` + CREATE TABLE IF NOT EXISTS project_activities ( + id uuid NOT NULL, + project_id uuid NOT NULL, + actor_user_id uuid NOT NULL, + target_user_id uuid, + event_type text NOT NULL, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL, + CONSTRAINT pk_project_activities_partitioned PRIMARY KEY (id, created_at) + ) PARTITION BY RANGE (created_at) + `, + columns: []string{ + "id", + "project_id", + "actor_user_id", + "target_user_id", + "event_type", + "metadata", + "created_at", + }, + }, + { + name: "workspace_activities", + createSQL: ` + CREATE TABLE IF NOT EXISTS workspace_activities ( + id uuid NOT NULL, + workspace_id uuid NOT NULL, + actor_user_id uuid NOT NULL, + target_user_id uuid, + event_type text NOT NULL, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL, + CONSTRAINT pk_workspace_activities_partitioned PRIMARY KEY (id, created_at) + ) PARTITION BY RANGE (created_at) + `, + columns: []string{ + "id", + "workspace_id", + "actor_user_id", + "target_user_id", + "event_type", + "metadata", + "created_at", + }, + }, +} + +func ensureMonthlyEventPartitions(database *gorm.DB, now time.Time) error { + if database.Name() != "postgres" { + return nil + } + for _, table := range monthlyEventPartitionedTables { + if err := ensureMonthlyPartitionedTable(database, table, now); err != nil { + return err + } + } + return nil +} + +func ensureMonthlyPartitionedTable(database *gorm.DB, table monthlyPartitionedTable, now time.Time) error { + partitioned, exists, err := postgresPartitionedTableState(database, table.name) + if err != nil { + return err + } + if exists && !partitioned { + if err := migrateRegularTableToMonthlyPartitions(database, table, now); err != nil { + return err + } + } else if !exists { + if err := database.Exec(table.createSQL).Error; err != nil { + return err + } + } + return ensureRollingMonthlyPartitions(database, table.name, now) +} + +func postgresPartitionedTableState(database *gorm.DB, tableName string) (partitioned bool, exists bool, err error) { + err = database.Raw(` + SELECT + to_regclass(?) IS NOT NULL AS exists, + EXISTS ( + SELECT 1 + FROM pg_partitioned_table + WHERE partrelid = to_regclass(?) + ) AS partitioned + `, tableName, tableName).Row().Scan(&exists, &partitioned) + return partitioned, exists, err +} + +func migrateRegularTableToMonthlyPartitions(database *gorm.DB, table monthlyPartitionedTable, now time.Time) error { + legacyName := fmt.Sprintf("%s_legacy_%s", table.name, now.UTC().Format("20060102150405")) + if err := database.Exec(fmt.Sprintf( + "ALTER TABLE %s RENAME TO %s", + quotePostgresIdentifier(table.name), + quotePostgresIdentifier(legacyName), + )).Error; err != nil { + return err + } + + if err := database.Exec(table.createSQL).Error; err != nil { + return err + } + if err := ensureRollingMonthlyPartitions(database, table.name, now); err != nil { + return err + } + if err := ensureLegacyDataMonthlyPartitions(database, table.name, legacyName); err != nil { + return err + } + if err := copyLegacyRowsIntoPartitionedTable(database, table, legacyName, now); err != nil { + return err + } + return database.Exec(fmt.Sprintf("DROP TABLE %s", quotePostgresIdentifier(legacyName))).Error +} + +func ensureRollingMonthlyPartitions(database *gorm.DB, tableName string, now time.Time) error { + start := monthStartUTC(now).AddDate(0, -monthlyPartitionPastMonths, 0) + for offset := 0; offset <= monthlyPartitionPastMonths+monthlyPartitionFutureMonths; offset++ { + if err := ensureMonthlyPartition(database, tableName, start.AddDate(0, offset, 0)); err != nil { + return err + } + } + return nil +} + +func ensureLegacyDataMonthlyPartitions(database *gorm.DB, tableName string, legacyName string) error { + var minCreatedAt, maxCreatedAt sql.NullTime + if err := database.Raw(fmt.Sprintf( + "SELECT MIN(created_at), MAX(created_at) FROM %s", + quotePostgresIdentifier(legacyName), + )).Row().Scan(&minCreatedAt, &maxCreatedAt); err != nil { + return err + } + if !minCreatedAt.Valid || !maxCreatedAt.Valid { + return nil + } + + start := monthStartUTC(minCreatedAt.Time) + end := monthStartUTC(maxCreatedAt.Time) + for partitionStart := start; !partitionStart.After(end); partitionStart = partitionStart.AddDate(0, 1, 0) { + if err := ensureMonthlyPartition(database, tableName, partitionStart); err != nil { + return err + } + } + return nil +} + +func ensureMonthlyPartition(database *gorm.DB, tableName string, partitionStart time.Time) error { + partitionStart = monthStartUTC(partitionStart) + partitionEnd := partitionStart.AddDate(0, 1, 0) + return database.Exec(createMonthlyPartitionSQL(tableName, partitionStart, partitionEnd)).Error +} + +func copyLegacyRowsIntoPartitionedTable(database *gorm.DB, table monthlyPartitionedTable, legacyName string, now time.Time) error { + columnList := quotedColumnList(table.columns) + selectList := legacySelectColumnList(table.columns, now) + return database.Exec(fmt.Sprintf( + "INSERT INTO %s (%s) SELECT %s FROM %s", + quotePostgresIdentifier(table.name), + columnList, + selectList, + quotePostgresIdentifier(legacyName), + )).Error +} + +func createMonthlyPartitionSQL(tableName string, partitionStart time.Time, partitionEnd time.Time) string { + partitionName := fmt.Sprintf("%s_%s", tableName, partitionStart.UTC().Format("2006_01")) + return fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES FROM (%s) TO (%s)", + quotePostgresIdentifier(partitionName), + quotePostgresIdentifier(tableName), + quotePostgresTimestampLiteral(partitionStart), + quotePostgresTimestampLiteral(partitionEnd), + ) +} + +func quotedColumnList(columns []string) string { + quoted := make([]string, 0, len(columns)) + for _, column := range columns { + quoted = append(quoted, quotePostgresIdentifier(column)) + } + return strings.Join(quoted, ", ") +} + +func legacySelectColumnList(columns []string, now time.Time) string { + selected := make([]string, 0, len(columns)) + for _, column := range columns { + if column == "created_at" { + selected = append(selected, fmt.Sprintf( + "COALESCE(%s, %s) AS %s", + quotePostgresIdentifier(column), + quotePostgresTimestampLiteral(now), + quotePostgresIdentifier(column), + )) + continue + } + selected = append(selected, quotePostgresIdentifier(column)) + } + return strings.Join(selected, ", ") +} + +func monthStartUTC(value time.Time) time.Time { + value = value.UTC() + return time.Date(value.Year(), value.Month(), 1, 0, 0, 0, 0, time.UTC) +} + +func quotePostgresIdentifier(identifier string) string { + return `"` + strings.ReplaceAll(identifier, `"`, `""`) + `"` +} + +func quotePostgresTimestampLiteral(value time.Time) string { + return "TIMESTAMPTZ '" + value.UTC().Format("2006-01-02 15:04:05-07") + "'" +} diff --git a/backend/internal/models/models.go b/backend/internal/models/models.go index 65c41d9f..d1281901 100644 --- a/backend/internal/models/models.go +++ b/backend/internal/models/models.go @@ -289,7 +289,7 @@ type ProjectActivity struct { TargetUserID *uuid.UUID `gorm:"type:uuid;index"` EventType string `gorm:"not null;index"` Metadata datatypes.JSON `gorm:"type:jsonb;not null;default:'{}'"` - CreatedAt time.Time `gorm:"not null;index:idx_project_activities_project_created_at,priority:2;index:idx_project_activities_archive_created_id,priority:1"` + CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_project_activities_project_created_at,priority:2;index:idx_project_activities_archive_created_id,priority:1"` Project Project `gorm:"foreignKey:ProjectID;constraint:OnDelete:CASCADE"` Actor User `gorm:"foreignKey:ActorUserID;constraint:OnDelete:CASCADE"` TargetUser *User `gorm:"foreignKey:TargetUserID;constraint:OnDelete:SET NULL"` @@ -430,7 +430,7 @@ type WorkspaceActivity struct { TargetUserID *uuid.UUID `gorm:"type:uuid;index"` EventType string `gorm:"not null;index"` Metadata datatypes.JSON `gorm:"type:jsonb;not null;default:'{}'"` - CreatedAt time.Time `gorm:"not null;index:idx_workspace_activities_workspace_created_at,priority:2;index:idx_workspace_activities_archive_created_id,priority:1"` + CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_workspace_activities_workspace_created_at,priority:2;index:idx_workspace_activities_archive_created_id,priority:1"` Workspace Workspace `gorm:"foreignKey:WorkspaceID;constraint:OnDelete:CASCADE"` Actor User `gorm:"foreignKey:ActorUserID;constraint:OnDelete:CASCADE"` TargetUser *User `gorm:"foreignKey:TargetUserID;constraint:OnDelete:SET NULL"` @@ -523,7 +523,7 @@ type PublishEvent struct { PublishURL string ErrorMessage string Metadata datatypes.JSON `gorm:"type:jsonb;not null;default:'{}'"` - CreatedAt time.Time `gorm:"index:idx_publish_events_archive_created_id,priority:1"` + CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_publish_events_archive_created_id,priority:1"` } type ScheduledPublication struct { @@ -669,7 +669,7 @@ type ExtensionExecutionEvent struct { ExecutionID string `gorm:"not null;index"` ProjectID uuid.UUID `gorm:"type:uuid;not null;index"` UserID uuid.UUID `gorm:"type:uuid;not null;index"` - EventID string `gorm:"not null;uniqueIndex"` + EventID string `gorm:"not null;index"` Platform string `gorm:"not null;index"` Status string `gorm:"not null;index"` Message string @@ -677,7 +677,7 @@ type ExtensionExecutionEvent struct { PublishURL string ErrorMessage string Metadata datatypes.JSON `gorm:"type:jsonb;not null;default:'{}'"` - CreatedAt time.Time `gorm:"index:idx_extension_execution_events_archive_created_id,priority:1"` + CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_extension_execution_events_archive_created_id,priority:1"` } // BeforeCreate hook to generate UUID if not set @@ -767,6 +767,9 @@ func (e *PublishEvent) BeforeCreate(_ *gorm.DB) (err error) { if e.ID == uuid.Nil { e.ID = uuid.New() } + if e.CreatedAt.IsZero() { + e.CreatedAt = time.Now().UTC() + } return } @@ -820,6 +823,9 @@ func (a *WorkspaceActivity) BeforeCreate(_ *gorm.DB) (err error) { if a.ID == uuid.Nil { a.ID = uuid.New() } + if a.CreatedAt.IsZero() { + a.CreatedAt = time.Now().UTC() + } return } @@ -847,6 +853,9 @@ func (a *ProjectActivity) BeforeCreate(_ *gorm.DB) (err error) { if a.ID == uuid.Nil { a.ID = uuid.New() } + if a.CreatedAt.IsZero() { + a.CreatedAt = time.Now().UTC() + } return } @@ -933,5 +942,8 @@ func (e *ExtensionExecutionEvent) BeforeCreate(_ *gorm.DB) (err error) { if e.ID == uuid.Nil { e.ID = uuid.New() } + if e.CreatedAt.IsZero() { + e.CreatedAt = time.Now().UTC() + } return } diff --git a/backend/internal/services/extension/dedup.go b/backend/internal/services/extension/dedup.go new file mode 100644 index 00000000..2d516af7 --- /dev/null +++ b/backend/internal/services/extension/dedup.go @@ -0,0 +1,10 @@ +package extension + +import "gorm.io/gorm" + +func lockExtensionEventID(tx *gorm.DB, eventID string) error { + if tx == nil || tx.Name() != "postgres" { + return nil + } + return tx.Exec("SELECT pg_advisory_xact_lock(hashtextextended(?, 0))", eventID).Error +} diff --git a/backend/internal/services/extension/handoffs.go b/backend/internal/services/extension/handoffs.go index 8f985ef0..b51e129c 100644 --- a/backend/internal/services/extension/handoffs.go +++ b/backend/internal/services/extension/handoffs.go @@ -212,13 +212,6 @@ func (s *Service) RecordExtensionEvent(req dto.ExtensionEventCallbackRequest) (* return nil, ErrExtensionCallbackTokenInvalid } - var existing models.ExtensionExecutionEvent - if err := s.writerDB().First(&existing, "event_id = ?", eventID).Error; err == nil { - return &dto.ExtensionEventCallbackResponse{Accepted: true, Duplicate: true}, nil - } else if !errors.Is(err, gorm.ErrRecordNotFound) { - return nil, err - } - metadata := datatypes.JSON([]byte(`{}`)) if req.Metadata != nil { payload, err := json.Marshal(req.Metadata) @@ -242,8 +235,24 @@ func (s *Service) RecordExtensionEvent(req dto.ExtensionEventCallbackRequest) (* ErrorMessage: strings.TrimSpace(req.ErrorMessage), Metadata: metadata, } + duplicate := false if err := s.writerDB().Transaction(func(tx *gorm.DB) error { + if err := lockExtensionEventID(tx, eventID); err != nil { + return err + } + var existing models.ExtensionExecutionEvent + if err := tx.First(&existing, "event_id = ?", eventID).Error; err == nil { + event = existing + duplicate = true + return nil + } else if !errors.Is(err, gorm.ErrRecordNotFound) { + return err + } if err := tx.Create(&event).Error; err != nil { + if errors.Is(err, gorm.ErrDuplicatedKey) { + duplicate = true + return tx.First(&event, "event_id = ?", eventID).Error + } return err } return applyExtensionPublicationEvent(tx, token, event) @@ -251,7 +260,7 @@ func (s *Service) RecordExtensionEvent(req dto.ExtensionEventCallbackRequest) (* return nil, err } - return &dto.ExtensionEventCallbackResponse{Accepted: true, Duplicate: false}, nil + return &dto.ExtensionEventCallbackResponse{Accepted: true, Duplicate: duplicate}, nil } func applyExtensionPublicationEvent(tx *gorm.DB, token models.ExtensionCallbackToken, event models.ExtensionExecutionEvent) error { From f365b1bd07433c04b4e8c178834b568371836e9e Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 21 Jun 2026 20:44:36 +0800 Subject: [PATCH 2/6] test(db): cover event partition keys The partitioned event schema changes the primary key shape for append-only event and activity tables. Add schema assertions for the created_at-aware keys and mirror that shape in the SQLite service test fixture. Tests now catch regressions that would make the monthly partition DDL incompatible with the Go models. --- backend/internal/db/db_test.go | 55 +++++++++++++++++++ .../internal/services/testsupport/helpers.go | 22 +++++--- 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/backend/internal/db/db_test.go b/backend/internal/db/db_test.go index 1e316360..b7c7f123 100644 --- a/backend/internal/db/db_test.go +++ b/backend/internal/db/db_test.go @@ -2,6 +2,7 @@ package db import ( "context" + "database/sql" "testing" "time" @@ -132,6 +133,34 @@ func TestSyncSchemaAddsArchiveScanIndexes(t *testing.T) { require.True(t, database.Migrator().HasIndex(&models.RemoteBrowserSession{}, "idx_remote_browser_sessions_archive_status_created_id")) } +func TestMonthlyPartitionedEventModelsUsePartitionCompatiblePrimaryKeys(t *testing.T) { + database, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, syncSchema(database)) + + for _, tableName := range []string{ + "publish_events", + "extension_execution_events", + "project_activities", + "workspace_activities", + } { + primaryKeyColumns := sqlitePrimaryKeyColumns(t, database, tableName) + + require.ElementsMatch(t, []string{"id", "created_at"}, primaryKeyColumns, tableName) + } +} + +func TestCreateMonthlyPartitionSQLUsesMonthRange(t *testing.T) { + start := time.Date(2026, 6, 1, 0, 0, 0, 0, time.UTC) + + sql := createMonthlyPartitionSQL("publish_events", start, start.AddDate(0, 1, 0)) + + require.Contains(t, sql, `"publish_events_2026_06"`) + require.Contains(t, sql, `PARTITION OF "publish_events"`) + require.Contains(t, sql, `TIMESTAMPTZ '2026-06-01 00:00:00+00'`) + require.Contains(t, sql, `TIMESTAMPTZ '2026-07-01 00:00:00+00'`) +} + func TestConnectionPoolConfigFromEnvUsesDefaults(t *testing.T) { clearConnectionPoolEnv(t) @@ -448,3 +477,29 @@ func setDatabaseConnectionEnv(t *testing.T) { t.Setenv("DB_NAME", "poster_db") t.Setenv("DB_PORT", "5432") } + +func sqlitePrimaryKeyColumns(t *testing.T, database *gorm.DB, tableName string) []string { + t.Helper() + + rows, err := database.Raw("PRAGMA table_info(" + tableName + ")").Rows() + require.NoError(t, err) + defer func() { + require.NoError(t, rows.Close()) + }() + + columns := []string{} + for rows.Next() { + var cid int + var name string + var dataType string + var notNull int + var defaultValue sql.NullString + var primaryKeyPosition int + require.NoError(t, rows.Scan(&cid, &name, &dataType, ¬Null, &defaultValue, &primaryKeyPosition)) + if primaryKeyPosition > 0 { + columns = append(columns, name) + } + } + require.NoError(t, rows.Err()) + return columns +} diff --git a/backend/internal/services/testsupport/helpers.go b/backend/internal/services/testsupport/helpers.go index f870fbaa..2433b4ec 100644 --- a/backend/internal/services/testsupport/helpers.go +++ b/backend/internal/services/testsupport/helpers.go @@ -118,7 +118,9 @@ func (f *FakeXOAuth2Provider) Me(_ context.Context, _ string) (pkgx.User, error) } func SetupTestDB() *gorm.DB { - db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{ + TranslateError: true, + }) if err != nil { panic("failed to connect database") } @@ -217,13 +219,14 @@ func SetupTestDB() *gorm.DB { )`) db.Exec(`CREATE TABLE workspace_activities ( - id TEXT PRIMARY KEY, + id TEXT NOT NULL, workspace_id TEXT NOT NULL, actor_user_id TEXT NOT NULL, target_user_id TEXT, event_type TEXT NOT NULL, metadata TEXT NOT NULL DEFAULT '{}', - created_at DATETIME NOT NULL + created_at DATETIME NOT NULL, + PRIMARY KEY (id, created_at) )`) db.Exec(`CREATE TABLE workspace_dashboard_stats ( @@ -292,13 +295,14 @@ func SetupTestDB() *gorm.DB { )`) db.Exec(`CREATE TABLE project_activities ( - id TEXT PRIMARY KEY, + id TEXT NOT NULL, project_id TEXT NOT NULL, actor_user_id TEXT NOT NULL, target_user_id TEXT, event_type TEXT NOT NULL, metadata TEXT NOT NULL DEFAULT '{}', - created_at DATETIME NOT NULL + created_at DATETIME NOT NULL, + PRIMARY KEY (id, created_at) )`) db.Exec(`CREATE TABLE project_comments ( @@ -461,12 +465,12 @@ func SetupTestDB() *gorm.DB { )`) db.Exec(`CREATE TABLE extension_execution_events ( - id TEXT PRIMARY KEY, + id TEXT NOT NULL, callback_token_id TEXT NOT NULL, execution_id TEXT NOT NULL, project_id TEXT NOT NULL, user_id TEXT NOT NULL, - event_id TEXT NOT NULL UNIQUE, + event_id TEXT NOT NULL, platform TEXT NOT NULL, status TEXT NOT NULL, message TEXT, @@ -474,8 +478,10 @@ func SetupTestDB() *gorm.DB { publish_url TEXT, error_message TEXT, metadata TEXT NOT NULL DEFAULT '{}', - created_at DATETIME + created_at DATETIME NOT NULL, + PRIMARY KEY (id, created_at) )`) + db.Exec(`CREATE INDEX idx_extension_execution_events_event_id ON extension_execution_events(event_id)`) return db } From 76b1ccb0a783989d7b32e1aa90be752315da2d4c Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 21 Jun 2026 20:45:21 +0800 Subject: [PATCH 3/6] docs(database): update partitioning progress The database optimization plan should reflect the completed monthly partitioning slice for event and activity tables. Mark the Phase 4 checklist item complete and add the new schema and test files as verification entry points. The roadmap now shows the remaining Phase 4 work around browser sessions, collaboration batches, cold export, and recovery. --- doc/plan/database-optimization.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/doc/plan/database-optimization.md b/doc/plan/database-optimization.md index a6789dfe..6cf97efc 100644 --- a/doc/plan/database-optimization.md +++ b/doc/plan/database-optimization.md @@ -11,7 +11,7 @@ Status definitions: - `Not started`: no clear implementation has been found yet. - `Deferred`: not recommended for the current business stage; only trigger conditions are retained. -Current overall progress: about `58%`. This number is manually estimated by phase weight and can be adjusted later according to actual completed items. +Current overall progress: about `61%`. This number is manually estimated by phase weight and can be adjusted later according to actual completed items. | Phase | Weight | Current completion | Status | Completed | Not done / next steps | | ----- | ------ | ------------------ | ------ | --------- | --------------------- | @@ -19,7 +19,7 @@ Current overall progress: about `58%`. This number is manually estimated by phas | Phase 1: Single-database connection pool, indexing, pagination, and lifecycle governance | 15% | 100% | Done | backend/publish-worker/collab-service application connection pools, Redis client connection pool, PgBouncer writer pool, composite indexes, keyset list pagination, list queries avoiding the large `source_content` field, event/session history retention periods, and R2/S3 cold-event archive worker | None; Phase 2 is complete, and later work moves into Phase 3/4 read replicas, partitioning, and recovery flows | | Phase 2: Read models and cache first | 15% | 100% | Done | Redis and Asynq dependencies are reusable; admin dashboard stats, admin project list, and dashboard account summary have short-TTL Redis cache; stats/project list/account cache misses are merged with singleflight; project, prepublish, publish, and account write paths invalidate the related dashboard cache; `workspace_dashboard_stats` and `project_list_summaries` read models are in place, and APIs prefer read models when coverage is complete; async refresh triggers after project save, platform sync, publish completion, and member changes; admin rebuild API, Asynq queue, and worker support full read-model rebuild | None | | Phase 3: Read/write splitting | 15% | 100% | Done | Optional `DB_READER_*` connection, application-level DB Router, signed sticky writer, consistency routing for project/stats/workspace/platform_account/publish/prepublish/mediaasset/browser_session/extension, consistency-level inventories for dashboard/publish/collab-service, self-hosted PostgreSQL read replica, managed `postgres-reader` entry point, PgBouncer reader pool, replica lag monitoring and automatic fallback to writer when over threshold | None; Phase 4 continues partitioning, archiving, and recovery flows | -| Phase 4: Single-database partitioning, archiving, and hot/cold tiering | 15% | 15% | In progress | Collaborative editing already has state + update batch + compaction foundation; event and terminal-session history already have row-level R2/S3 archive worker | Time partitioning for event tables, hash partitioning for collaborative batches, cold partition export, recovery flow | +| Phase 4: Single-database partitioning, archiving, and hot/cold tiering | 15% | 35% | In progress | Collaborative editing already has state + update batch + compaction foundation; event and terminal-session history already have row-level R2/S3 archive worker; `publish_events`, `extension_execution_events`, `project_activities`, and `workspace_activities` have PostgreSQL monthly partition target schema | `remote_browser_sessions` time partitioning, hash partitioning for collaborative batches, cold partition export, recovery flow | | Phase 5: Citus preparation | 20% | 5% | Not started | Workspace model, `projects.workspace_id`, and personal workspace ID already exist | Global `workspace_id`, Citus distribution column/colocation design, unique constraint and foreign-key review | | Phase 6: Citus distributed PostgreSQL operation | 10% | 0% | Deferred | None | Future Citus cluster design, worker/coordinator monitoring and backup, large-tenant isolation strategy | @@ -64,7 +64,7 @@ Atomic commit guidance: | Dashboard read models | Done | Added `workspace_dashboard_stats` and `project_list_summaries` read models, idempotently recomputed from fact tables by a centralized readmodel service; async refresh is triggered after project save, platform sync, publish completion, and member changes; admin stats and admin project list prefer read models when coverage is complete; admin rebuild API enqueues through Asynq, and API/worker processes can start readmodel workers for full rebuild from fact tables | None | `backend/internal/models/models.go`, `backend/internal/services/readmodel/service.go`, `backend/internal/services/readmodel/queue.go`, `backend/internal/services/readmodel/service_test.go`, `backend/internal/services/readmodel/queue_test.go`, `backend/internal/services/stats/overview.go`, `backend/internal/services/project/lifecycle.go`, `backend/internal/handlers/dashboard.go`, `backend/cmd/api/main.go`, `backend/cmd/publish-worker/main.go` | | Redis read cache | Done | Redis is already used for queues, locks, OAuth, browser sessions, and short-term coordination; admin dashboard stats, admin project list, and dashboard account summary use 15s TTL cache and bypass scoped/sticky-writer strong-consistency paths; stats/project list/account cache misses use singleflight to prevent process-local stampede; stats and account caches use versioned payloads and semantic validation, and Redis read-error fallback is also merged into one DB computation per key; project create/edit/platform save, prepublish sync/draft update, publish queue/execute/fail, and platform account write paths invalidate the related dashboard cache; full read-model rebuild reuses the Redis/Asynq queue | None | `backend/internal/services/stats/overview.go`, `backend/internal/services/stats/overview_test.go`, `backend/internal/services/project/list_cache.go`, `backend/internal/services/project/list_cache_test.go`, `backend/internal/services/prepublish/drafts.go`, `backend/internal/services/publish/service.go`, `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/publication_flow_test.go`, `backend/internal/services/publish/queue_test.go`, `backend/internal/services/platform_account/account_cache.go`, `backend/internal/services/platform_account/account_cache_test.go`, `backend/internal/services/browser_session/complete.go`, `backend/internal/services/browser_session/service_test.go`, `backend/internal/services/readmodel/queue.go` | | Read/write splitting | Done | Supports optional `DB_READER_*` read-replica connection, `DefaultRouter`, and signed sticky writer; project/stats/workspace/platform_account/publish/prepublish/mediaasset/browser_session/extension are wired to strong/eventual/writer routing; dashboard, publish, and collab-service consistency-level inventories are complete, with collab-service online path kept writer-only; writer/reader pools are in self-hosted Kubernetes, and managed overlay provides a `postgres-reader` ExternalName entry point; `DB_READER_MAX_REPLICA_LAG` configures the replica lag threshold, eventual/analytics reads automatically fall back to writer when over threshold or lag is unknown, and `mpp_db_replica_lag_seconds` and `mpp_db_replica_healthy` metrics are exposed | None | `backend/internal/db/db.go`, `backend/internal/db/router.go`, `backend/internal/db/replica_lag.go`, `backend/internal/services/publish/service.go`, `backend/internal/services/prepublish/service.go`, `backend/internal/services/mediaasset/service.go`, `backend/internal/services/browser_session/service.go`, `backend/internal/services/extension/service.go`, `backend/internal/app/runtime.go`, `deploy/kubernetes/data-services/self-hosted/postgres.yaml`, `deploy/kubernetes/data-services/self-hosted/pgbouncer.yaml`, `deploy/kubernetes/data-services/managed/services.yaml`, `script/kubernetes/validation/data_services.rb` | -| Event-table partitioning and archiving | In progress | `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and terminal `remote_browser_sessions` have default retention periods; the `archive` worker can batch-export JSONL to R2/S3 and delete old hot-table rows after successful upload | Time partitioning, cold partition export, and recovery flow are not implemented | `backend/internal/services/archive/config.go`, `backend/internal/services/archive/worker.go`, `backend/internal/services/archive/worker_test.go` | +| Event-table partitioning and archiving | In progress | `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and terminal `remote_browser_sessions` have default retention periods; the `archive` worker can batch-export JSONL to R2/S3 and delete old hot-table rows after successful upload; PostgreSQL schema initialization now creates monthly `created_at` partitions for `publish_events`, `extension_execution_events`, `project_activities`, and `workspace_activities`, with partition-compatible `(id, created_at)` primary keys and rolling partition creation | `remote_browser_sessions` time partitioning, cold partition export, and recovery flow are not implemented | `backend/internal/db/monthly_partitions.go`, `backend/internal/db/db.go`, `backend/internal/models/models.go`, `backend/internal/db/db_test.go`, `backend/internal/services/archive/worker.go`, `backend/internal/services/archive/worker_test.go` | | Collaboration batch governance | In progress | `collab_document_states`, `collab_document_update_batches`, and compaction/retention foundations exist | `document_id` hash partitioning and cold archiving are not implemented | `backend/internal/models/collab.go`, `collab-service/src/persistence/document-persistence.ts` | | Outbox/CDC/event stream | In progress | The publishing queue path has a transactional Outbox: `EnqueuePublishProject` writes `outbox_events` in the same transaction and dispatches immediately after commit; publish worker starts an outbox dispatcher and supports retries for failed/stale processing records; Asynq continues to serve as the task-execution queue, and `PublishEvent` continues to serve as publishing audit | Currently covers only `publish.job_requested`; general business-event outbox, Debezium, and Redpanda/Kafka CDC are not implemented | `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/outbox.go`, `backend/internal/models/models.go` | | Citus target state | Not started | Confirmed `workspace_id` as the most suitable distribution-column direction | Citus distributed tables, reference tables, and colocation are not implemented | Phase 5/6 in this document | @@ -117,7 +117,7 @@ Atomic commit guidance: #### Phase 4: Single-database Partitioning, Archiving, and Hot/cold Tiering - [x] Keep collaborative editing state + update batch + compaction foundation. -- [ ] Partition `publish_events`, `extension_execution_events`, and activity tables by month. +- [x] Partition `publish_events`, `extension_execution_events`, and activity tables by month. Verification entry point: `backend/internal/db/monthly_partitions.go`, `backend/internal/models/models.go`, `backend/internal/db/db_test.go`. - [ ] Partition `remote_browser_sessions` by time or expiration time. - [ ] Hash partition `collab_document_update_batches` by `document_id`. - [ ] Export cold partitions to R2/S3. From bc60196e665deb2c3f73a90dc41341a7a3f5ead1 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 21 Jun 2026 21:09:22 +0800 Subject: [PATCH 4/6] fix(db): harden event partition idempotency Review found that monthly partitions could age out in long-lived processes and extension event deduplication was no longer a durable database invariant. Add default partitions for event tables and introduce an unpartitioned extension event claim table keyed by event id. Future-dated event inserts now have a partition fallback, and duplicate extension callbacks are blocked consistently across database backends. --- backend/internal/db/db.go | 1 + backend/internal/db/db_test.go | 9 ++++ backend/internal/db/monthly_partitions.go | 47 +++++++++++++++++++ backend/internal/handlers/dashboard_test.go | 12 +++-- backend/internal/models/models.go | 13 +++++ backend/internal/services/extension/dedup.go | 23 +++++++-- .../internal/services/extension/handoffs.go | 21 ++++----- .../services/extension/handoffs_test.go | 16 +++++++ backend/internal/services/project/delete.go | 19 +++++++- .../internal/services/project/delete_test.go | 8 +++- .../internal/services/testsupport/helpers.go | 6 +++ 11 files changed, 153 insertions(+), 22 deletions(-) diff --git a/backend/internal/db/db.go b/backend/internal/db/db.go index 742696f0..c8824828 100644 --- a/backend/internal/db/db.go +++ b/backend/internal/db/db.go @@ -379,6 +379,7 @@ func syncSchemaUnlocked(database *gorm.DB) error { &models.CollabDocumentState{}, &models.CollabDocumentUpdateBatch{}, &models.ExtensionCallbackToken{}, + &models.ExtensionExecutionEventClaim{}, &models.ExtensionExecutionEvent{}, ); err != nil { return err diff --git a/backend/internal/db/db_test.go b/backend/internal/db/db_test.go index b7c7f123..4d790961 100644 --- a/backend/internal/db/db_test.go +++ b/backend/internal/db/db_test.go @@ -148,6 +148,7 @@ func TestMonthlyPartitionedEventModelsUsePartitionCompatiblePrimaryKeys(t *testi require.ElementsMatch(t, []string{"id", "created_at"}, primaryKeyColumns, tableName) } + require.True(t, database.Migrator().HasTable(&models.ExtensionExecutionEventClaim{})) } func TestCreateMonthlyPartitionSQLUsesMonthRange(t *testing.T) { @@ -161,6 +162,14 @@ func TestCreateMonthlyPartitionSQLUsesMonthRange(t *testing.T) { require.Contains(t, sql, `TIMESTAMPTZ '2026-07-01 00:00:00+00'`) } +func TestCreateDefaultMonthlyPartitionSQLUsesDefaultPartition(t *testing.T) { + sql := createDefaultMonthlyPartitionSQL("publish_events") + + require.Contains(t, sql, `"publish_events_default"`) + require.Contains(t, sql, `PARTITION OF "publish_events"`) + require.Contains(t, sql, " DEFAULT") +} + func TestConnectionPoolConfigFromEnvUsesDefaults(t *testing.T) { clearConnectionPoolEnv(t) diff --git a/backend/internal/db/monthly_partitions.go b/backend/internal/db/monthly_partitions.go index b1c220b2..a511bd97 100644 --- a/backend/internal/db/monthly_partitions.go +++ b/backend/internal/db/monthly_partitions.go @@ -175,6 +175,9 @@ func ensureMonthlyPartitionedTable(database *gorm.DB, table monthlyPartitionedTa return err } } + if err := ensureDefaultMonthlyPartition(database, table.name); err != nil { + return err + } return ensureRollingMonthlyPartitions(database, table.name, now) } @@ -204,6 +207,9 @@ func migrateRegularTableToMonthlyPartitions(database *gorm.DB, table monthlyPart if err := database.Exec(table.createSQL).Error; err != nil { return err } + if err := ensureDefaultMonthlyPartition(database, table.name); err != nil { + return err + } if err := ensureRollingMonthlyPartitions(database, table.name, now); err != nil { return err } @@ -251,9 +257,38 @@ func ensureLegacyDataMonthlyPartitions(database *gorm.DB, tableName string, lega func ensureMonthlyPartition(database *gorm.DB, tableName string, partitionStart time.Time) error { partitionStart = monthStartUTC(partitionStart) partitionEnd := partitionStart.AddDate(0, 1, 0) + hasRows, err := defaultPartitionHasRowsInRange(database, tableName, partitionStart, partitionEnd) + if err != nil { + return err + } + if hasRows { + return nil + } return database.Exec(createMonthlyPartitionSQL(tableName, partitionStart, partitionEnd)).Error } +func ensureDefaultMonthlyPartition(database *gorm.DB, tableName string) error { + return database.Exec(createDefaultMonthlyPartitionSQL(tableName)).Error +} + +func defaultPartitionHasRowsInRange(database *gorm.DB, tableName string, partitionStart time.Time, partitionEnd time.Time) (bool, error) { + defaultPartitionName := defaultMonthlyPartitionName(tableName) + var exists bool + if err := database.Raw("SELECT to_regclass(?) IS NOT NULL", defaultPartitionName).Row().Scan(&exists); err != nil { + return false, err + } + if !exists { + return false, nil + } + + var hasRows bool + err := database.Raw(fmt.Sprintf( + "SELECT EXISTS (SELECT 1 FROM %s WHERE created_at >= ? AND created_at < ?)", + quotePostgresIdentifier(defaultPartitionName), + ), partitionStart, partitionEnd).Row().Scan(&hasRows) + return hasRows, err +} + func copyLegacyRowsIntoPartitionedTable(database *gorm.DB, table monthlyPartitionedTable, legacyName string, now time.Time) error { columnList := quotedColumnList(table.columns) selectList := legacySelectColumnList(table.columns, now) @@ -277,6 +312,18 @@ func createMonthlyPartitionSQL(tableName string, partitionStart time.Time, parti ) } +func createDefaultMonthlyPartitionSQL(tableName string) string { + return fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s PARTITION OF %s DEFAULT", + quotePostgresIdentifier(defaultMonthlyPartitionName(tableName)), + quotePostgresIdentifier(tableName), + ) +} + +func defaultMonthlyPartitionName(tableName string) string { + return tableName + "_default" +} + func quotedColumnList(columns []string) string { quoted := make([]string, 0, len(columns)) for _, column := range columns { diff --git a/backend/internal/handlers/dashboard_test.go b/backend/internal/handlers/dashboard_test.go index 674462ca..b06cf992 100644 --- a/backend/internal/handlers/dashboard_test.go +++ b/backend/internal/handlers/dashboard_test.go @@ -297,12 +297,12 @@ func setupHandlerTestDB(t *testing.T) *gorm.DB { )`).Error) require.NoError(t, db.Exec(`CREATE TABLE extension_execution_events ( - id TEXT PRIMARY KEY, + id TEXT NOT NULL, callback_token_id TEXT NOT NULL, execution_id TEXT NOT NULL, project_id TEXT NOT NULL, user_id TEXT NOT NULL, - event_id TEXT NOT NULL UNIQUE, + event_id TEXT NOT NULL, platform TEXT NOT NULL, status TEXT NOT NULL, message TEXT, @@ -310,7 +310,13 @@ func setupHandlerTestDB(t *testing.T) *gorm.DB { publish_url TEXT, error_message TEXT, metadata TEXT NOT NULL DEFAULT '{}', - created_at DATETIME + created_at DATETIME NOT NULL, + PRIMARY KEY (id, created_at) + )`).Error) + require.NoError(t, db.Exec(`CREATE TABLE extension_execution_event_claims ( + event_id TEXT PRIMARY KEY, + record_id TEXT NOT NULL UNIQUE, + created_at DATETIME NOT NULL )`).Error) return db diff --git a/backend/internal/models/models.go b/backend/internal/models/models.go index d1281901..301b51b8 100644 --- a/backend/internal/models/models.go +++ b/backend/internal/models/models.go @@ -680,6 +680,12 @@ type ExtensionExecutionEvent struct { CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_extension_execution_events_archive_created_id,priority:1"` } +type ExtensionExecutionEventClaim struct { + EventID string `gorm:"primaryKey;not null"` + RecordID uuid.UUID `gorm:"type:uuid;not null;uniqueIndex"` + CreatedAt time.Time `gorm:"not null;index"` +} + // BeforeCreate hook to generate UUID if not set func (u *User) BeforeCreate(_ *gorm.DB) (err error) { if u.ID == uuid.Nil { @@ -947,3 +953,10 @@ func (e *ExtensionExecutionEvent) BeforeCreate(_ *gorm.DB) (err error) { } return } + +func (c *ExtensionExecutionEventClaim) BeforeCreate(_ *gorm.DB) (err error) { + if c.CreatedAt.IsZero() { + c.CreatedAt = time.Now().UTC() + } + return +} diff --git a/backend/internal/services/extension/dedup.go b/backend/internal/services/extension/dedup.go index 2d516af7..fd2055be 100644 --- a/backend/internal/services/extension/dedup.go +++ b/backend/internal/services/extension/dedup.go @@ -1,10 +1,23 @@ package extension -import "gorm.io/gorm" +import ( + "errors" -func lockExtensionEventID(tx *gorm.DB, eventID string) error { - if tx == nil || tx.Name() != "postgres" { - return nil + "gorm.io/gorm" + + "github.com/kurodakayn/mpp-backend/internal/models" +) + +func loadClaimedExtensionEvent(tx *gorm.DB, eventID string, event *models.ExtensionExecutionEvent) error { + var claim models.ExtensionExecutionEventClaim + if err := tx.First(&claim, "event_id = ?", eventID).Error; err != nil { + return err + } + if err := tx.First(event, "id = ?", claim.RecordID).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil + } + return err } - return tx.Exec("SELECT pg_advisory_xact_lock(hashtextextended(?, 0))", eventID).Error + return nil } diff --git a/backend/internal/services/extension/handoffs.go b/backend/internal/services/extension/handoffs.go index b51e129c..a2b1e108 100644 --- a/backend/internal/services/extension/handoffs.go +++ b/backend/internal/services/extension/handoffs.go @@ -222,6 +222,7 @@ func (s *Service) RecordExtensionEvent(req dto.ExtensionEventCallbackRequest) (* } event := models.ExtensionExecutionEvent{ + ID: uuid.New(), CallbackTokenID: token.ID, ExecutionID: token.ExecutionID, ProjectID: token.ProjectID, @@ -237,24 +238,20 @@ func (s *Service) RecordExtensionEvent(req dto.ExtensionEventCallbackRequest) (* } duplicate := false if err := s.writerDB().Transaction(func(tx *gorm.DB) error { - if err := lockExtensionEventID(tx, eventID); err != nil { - return err - } - var existing models.ExtensionExecutionEvent - if err := tx.First(&existing, "event_id = ?", eventID).Error; err == nil { - event = existing - duplicate = true - return nil - } else if !errors.Is(err, gorm.ErrRecordNotFound) { - return err + claim := models.ExtensionExecutionEventClaim{ + EventID: eventID, + RecordID: event.ID, } - if err := tx.Create(&event).Error; err != nil { + if err := tx.Create(&claim).Error; err != nil { if errors.Is(err, gorm.ErrDuplicatedKey) { duplicate = true - return tx.First(&event, "event_id = ?", eventID).Error + return loadClaimedExtensionEvent(tx, eventID, &event) } return err } + if err := tx.Create(&event).Error; err != nil { + return err + } return applyExtensionPublicationEvent(tx, token, event) }); err != nil { return nil, err diff --git a/backend/internal/services/extension/handoffs_test.go b/backend/internal/services/extension/handoffs_test.go index 149ef817..37afe126 100644 --- a/backend/internal/services/extension/handoffs_test.go +++ b/backend/internal/services/extension/handoffs_test.go @@ -402,6 +402,10 @@ func TestRecordExtensionEventAcceptsKnownTokenAndDeduplicatesEventID(t *testing. assert.Equal(t, "event-1", events[0].EventID) assert.Equal(t, "user_review", events[0].Status) assert.Contains(t, string(events[0].Metadata), "DYNAMIC_DOUYIN") + + var claim models.ExtensionExecutionEventClaim + require.NoError(t, db.First(&claim, "event_id = ?", "event-1").Error) + assert.Equal(t, events[0].ID, claim.RecordID) } func TestRecordExtensionEventMarksXPublicationReadyForUserReview(t *testing.T) { @@ -539,6 +543,18 @@ func TestRecordExtensionEventDoesNotApplyDuplicatePublicationUpdate(t *testing.T require.NoError(t, err) assert.True(t, second.Duplicate) + var eventCount int64 + require.NoError(t, db.Model(&models.ExtensionExecutionEvent{}). + Where("event_id = ?", req.EventID). + Count(&eventCount).Error) + assert.Equal(t, int64(1), eventCount) + + var claimCount int64 + require.NoError(t, db.Model(&models.ExtensionExecutionEventClaim{}). + Where("event_id = ?", req.EventID). + Count(&claimCount).Error) + assert.Equal(t, int64(1), claimCount) + var publication models.ProjectPlatformPublication require.NoError(t, db.First(&publication, "project_id = ? AND platform = ?", project.ID, "x").Error) assert.Equal(t, 1, publication.RetryCount) diff --git a/backend/internal/services/project/delete.go b/backend/internal/services/project/delete.go index 2f0aee66..f8a789c1 100644 --- a/backend/internal/services/project/delete.go +++ b/backend/internal/services/project/delete.go @@ -51,7 +51,6 @@ func (s *Service) DeleteProject(projectID uuid.UUID, userID uuid.UUID) error { cleanup := []any{ &models.PublishEvent{}, &models.ExtensionCallbackToken{}, - &models.ExtensionExecutionEvent{}, &models.MediaAssetUsage{}, &models.PlatformAccountGrant{}, &models.ProjectListSummary{}, @@ -70,6 +69,24 @@ func (s *Service) DeleteProject(projectID uuid.UUID, userID uuid.UUID) error { return err } } + if tx.Migrator().HasTable(&models.ExtensionExecutionEvent{}) { + var extensionEventIDs []uuid.UUID + if err := tx.Model(&models.ExtensionExecutionEvent{}). + Where("project_id = ?", projectID). + Pluck("id", &extensionEventIDs).Error; err != nil { + return err + } + if len(extensionEventIDs) > 0 && tx.Migrator().HasTable(&models.ExtensionExecutionEventClaim{}) { + if err := tx.Where("record_id IN ?", extensionEventIDs). + Delete(&models.ExtensionExecutionEventClaim{}).Error; err != nil { + return err + } + } + if err := tx.Where("project_id = ?", projectID). + Delete(&models.ExtensionExecutionEvent{}).Error; err != nil { + return err + } + } if tx.Migrator().HasTable(&models.MediaAsset{}) { if err := tx.Model(&models.MediaAsset{}). diff --git a/backend/internal/services/project/delete_test.go b/backend/internal/services/project/delete_test.go index a0ed7046..ad50ee14 100644 --- a/backend/internal/services/project/delete_test.go +++ b/backend/internal/services/project/delete_test.go @@ -164,7 +164,7 @@ func TestDeleteProjectRemovesOwnerProjectAndDependents(t *testing.T) { Token: "callback-token", ExpiresAt: time.Now().Add(time.Hour), }).Error) - require.NoError(t, db.Create(&models.ExtensionExecutionEvent{ + extensionEvent := models.ExtensionExecutionEvent{ CallbackTokenID: uuid.New(), ExecutionID: "execution-1", ProjectID: project.ID, @@ -173,6 +173,11 @@ func TestDeleteProjectRemovesOwnerProjectAndDependents(t *testing.T) { Platform: "wechat", Status: "queued", Metadata: datatypes.JSON([]byte(`{}`)), + } + require.NoError(t, db.Create(&extensionEvent).Error) + require.NoError(t, db.Create(&models.ExtensionExecutionEventClaim{ + EventID: extensionEvent.EventID, + RecordID: extensionEvent.ID, }).Error) err := s.DeleteProject(project.ID, owner.ID) @@ -190,6 +195,7 @@ func TestDeleteProjectRemovesOwnerProjectAndDependents(t *testing.T) { require.Zero(t, countRows(t, db, &models.PlatformAccountGrant{}, "project_id = ?", project.ID)) require.Zero(t, countRows(t, db, &models.ExtensionCallbackToken{}, "project_id = ?", project.ID)) require.Zero(t, countRows(t, db, &models.ExtensionExecutionEvent{}, "project_id = ?", project.ID)) + require.Zero(t, countRows(t, db, &models.ExtensionExecutionEventClaim{}, "record_id = ?", extensionEvent.ID)) require.Zero(t, countRows(t, db, &models.PublishEvent{}, "project_id = ?", project.ID)) require.Zero(t, countRows(t, db, &models.ScheduledPublication{}, "project_id = ?", project.ID)) require.Zero(t, countRows(t, db, &models.PublishAttempt{}, "scheduled_publication_id = ?", schedule.ID)) diff --git a/backend/internal/services/testsupport/helpers.go b/backend/internal/services/testsupport/helpers.go index 2433b4ec..df115134 100644 --- a/backend/internal/services/testsupport/helpers.go +++ b/backend/internal/services/testsupport/helpers.go @@ -483,6 +483,12 @@ func SetupTestDB() *gorm.DB { )`) db.Exec(`CREATE INDEX idx_extension_execution_events_event_id ON extension_execution_events(event_id)`) + db.Exec(`CREATE TABLE extension_execution_event_claims ( + event_id TEXT PRIMARY KEY, + record_id TEXT NOT NULL UNIQUE, + created_at DATETIME NOT NULL + )`) + return db } From 263e664293d2f8ef97218acbe018b0936bf89534 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 21 Jun 2026 22:44:12 +0800 Subject: [PATCH 5/6] fix(db): resolve event partition review issues Duplicate extension callbacks need idempotent claim handling on PostgreSQL without aborting the transaction. Schema sync now backfills extension event claims and drains default rows into real monthly partitions. Existing callbacks stay deduplicated and future partition maintenance preserves pruning. --- backend/internal/db/db.go | 3 + backend/internal/db/db_test.go | 47 +++++++++++++ backend/internal/db/extension_event_claims.go | 42 +++++++++++ backend/internal/db/monthly_partitions.go | 70 ++++++++++++++++--- .../internal/services/extension/handoffs.go | 17 +++-- 5 files changed, 162 insertions(+), 17 deletions(-) create mode 100644 backend/internal/db/extension_event_claims.go diff --git a/backend/internal/db/db.go b/backend/internal/db/db.go index c8824828..a43b282a 100644 --- a/backend/internal/db/db.go +++ b/backend/internal/db/db.go @@ -384,6 +384,9 @@ func syncSchemaUnlocked(database *gorm.DB) error { ); err != nil { return err } + if err := backfillExtensionExecutionEventClaims(database); err != nil { + return err + } // Redis owns normal active-session locking; this index is the atomic fallback when Redis is disabled. if err := database.Exec(` diff --git a/backend/internal/db/db_test.go b/backend/internal/db/db_test.go index 4d790961..e6f31783 100644 --- a/backend/internal/db/db_test.go +++ b/backend/internal/db/db_test.go @@ -170,6 +170,53 @@ func TestCreateDefaultMonthlyPartitionSQLUsesDefaultPartition(t *testing.T) { require.Contains(t, sql, " DEFAULT") } +func TestBackfillExtensionExecutionEventClaims(t *testing.T) { + database, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{ + TranslateError: true, + }) + require.NoError(t, err) + require.NoError(t, database.AutoMigrate( + &models.ExtensionExecutionEvent{}, + &models.ExtensionExecutionEventClaim{}, + )) + + olderID := uuid.New() + newerID := uuid.New() + older := time.Date(2026, 6, 1, 10, 0, 0, 0, time.UTC) + newer := older.Add(time.Hour) + require.NoError(t, database.Create(&models.ExtensionExecutionEvent{ + ID: newerID, + CallbackTokenID: uuid.New(), + ExecutionID: "execution-newer", + ProjectID: uuid.New(), + UserID: uuid.New(), + EventID: "event-1", + Platform: "x", + Status: "failed", + Metadata: []byte(`{}`), + CreatedAt: newer, + }).Error) + require.NoError(t, database.Create(&models.ExtensionExecutionEvent{ + ID: olderID, + CallbackTokenID: uuid.New(), + ExecutionID: "execution-older", + ProjectID: uuid.New(), + UserID: uuid.New(), + EventID: "event-1", + Platform: "x", + Status: "failed", + Metadata: []byte(`{}`), + CreatedAt: older, + }).Error) + + require.NoError(t, backfillExtensionExecutionEventClaims(database)) + require.NoError(t, backfillExtensionExecutionEventClaims(database)) + + var claim models.ExtensionExecutionEventClaim + require.NoError(t, database.First(&claim, "event_id = ?", "event-1").Error) + require.Equal(t, olderID, claim.RecordID) +} + func TestConnectionPoolConfigFromEnvUsesDefaults(t *testing.T) { clearConnectionPoolEnv(t) diff --git a/backend/internal/db/extension_event_claims.go b/backend/internal/db/extension_event_claims.go new file mode 100644 index 00000000..88cdc3c7 --- /dev/null +++ b/backend/internal/db/extension_event_claims.go @@ -0,0 +1,42 @@ +package db + +import ( + "fmt" + + "gorm.io/gorm" +) + +func backfillExtensionExecutionEventClaims(database *gorm.DB) error { + if !database.Migrator().HasTable("extension_execution_events") || + !database.Migrator().HasTable("extension_execution_event_claims") { + return nil + } + + switch database.Name() { + case "postgres": + return database.Exec(` + INSERT INTO extension_execution_event_claims (event_id, record_id, created_at) + SELECT DISTINCT ON (event_id) event_id, id, created_at + FROM extension_execution_events + ORDER BY event_id, created_at ASC, id ASC + ON CONFLICT (event_id) DO NOTHING + `).Error + case "sqlite": + return database.Exec(` + INSERT OR IGNORE INTO extension_execution_event_claims (event_id, record_id, created_at) + SELECT e.event_id, e.id, e.created_at + FROM extension_execution_events e + WHERE NOT EXISTS ( + SELECT 1 + FROM extension_execution_events older + WHERE older.event_id = e.event_id + AND ( + older.created_at < e.created_at + OR (older.created_at = e.created_at AND older.id < e.id) + ) + ) + `).Error + default: + return fmt.Errorf("unsupported database dialect %q for extension event claim backfill", database.Name()) + } +} diff --git a/backend/internal/db/monthly_partitions.go b/backend/internal/db/monthly_partitions.go index a511bd97..28a3b7ce 100644 --- a/backend/internal/db/monthly_partitions.go +++ b/backend/internal/db/monthly_partitions.go @@ -178,7 +178,7 @@ func ensureMonthlyPartitionedTable(database *gorm.DB, table monthlyPartitionedTa if err := ensureDefaultMonthlyPartition(database, table.name); err != nil { return err } - return ensureRollingMonthlyPartitions(database, table.name, now) + return ensureRollingMonthlyPartitions(database, table, now) } func postgresPartitionedTableState(database *gorm.DB, tableName string) (partitioned bool, exists bool, err error) { @@ -210,10 +210,10 @@ func migrateRegularTableToMonthlyPartitions(database *gorm.DB, table monthlyPart if err := ensureDefaultMonthlyPartition(database, table.name); err != nil { return err } - if err := ensureRollingMonthlyPartitions(database, table.name, now); err != nil { + if err := ensureRollingMonthlyPartitions(database, table, now); err != nil { return err } - if err := ensureLegacyDataMonthlyPartitions(database, table.name, legacyName); err != nil { + if err := ensureLegacyDataMonthlyPartitions(database, table, legacyName); err != nil { return err } if err := copyLegacyRowsIntoPartitionedTable(database, table, legacyName, now); err != nil { @@ -222,17 +222,17 @@ func migrateRegularTableToMonthlyPartitions(database *gorm.DB, table monthlyPart return database.Exec(fmt.Sprintf("DROP TABLE %s", quotePostgresIdentifier(legacyName))).Error } -func ensureRollingMonthlyPartitions(database *gorm.DB, tableName string, now time.Time) error { +func ensureRollingMonthlyPartitions(database *gorm.DB, table monthlyPartitionedTable, now time.Time) error { start := monthStartUTC(now).AddDate(0, -monthlyPartitionPastMonths, 0) for offset := 0; offset <= monthlyPartitionPastMonths+monthlyPartitionFutureMonths; offset++ { - if err := ensureMonthlyPartition(database, tableName, start.AddDate(0, offset, 0)); err != nil { + if err := ensureMonthlyPartition(database, table, start.AddDate(0, offset, 0)); err != nil { return err } } return nil } -func ensureLegacyDataMonthlyPartitions(database *gorm.DB, tableName string, legacyName string) error { +func ensureLegacyDataMonthlyPartitions(database *gorm.DB, table monthlyPartitionedTable, legacyName string) error { var minCreatedAt, maxCreatedAt sql.NullTime if err := database.Raw(fmt.Sprintf( "SELECT MIN(created_at), MAX(created_at) FROM %s", @@ -247,24 +247,24 @@ func ensureLegacyDataMonthlyPartitions(database *gorm.DB, tableName string, lega start := monthStartUTC(minCreatedAt.Time) end := monthStartUTC(maxCreatedAt.Time) for partitionStart := start; !partitionStart.After(end); partitionStart = partitionStart.AddDate(0, 1, 0) { - if err := ensureMonthlyPartition(database, tableName, partitionStart); err != nil { + if err := ensureMonthlyPartition(database, table, partitionStart); err != nil { return err } } return nil } -func ensureMonthlyPartition(database *gorm.DB, tableName string, partitionStart time.Time) error { +func ensureMonthlyPartition(database *gorm.DB, table monthlyPartitionedTable, partitionStart time.Time) error { partitionStart = monthStartUTC(partitionStart) partitionEnd := partitionStart.AddDate(0, 1, 0) - hasRows, err := defaultPartitionHasRowsInRange(database, tableName, partitionStart, partitionEnd) + hasRows, err := defaultPartitionHasRowsInRange(database, table.name, partitionStart, partitionEnd) if err != nil { return err } if hasRows { - return nil + return drainDefaultRowsIntoMonthlyPartition(database, table, partitionStart, partitionEnd) } - return database.Exec(createMonthlyPartitionSQL(tableName, partitionStart, partitionEnd)).Error + return database.Exec(createMonthlyPartitionSQL(table.name, partitionStart, partitionEnd)).Error } func ensureDefaultMonthlyPartition(database *gorm.DB, tableName string) error { @@ -289,6 +289,54 @@ func defaultPartitionHasRowsInRange(database *gorm.DB, tableName string, partiti return hasRows, err } +func drainDefaultRowsIntoMonthlyPartition(database *gorm.DB, table monthlyPartitionedTable, partitionStart time.Time, partitionEnd time.Time) error { + tempTableName := fmt.Sprintf("tmp_%s_%s_%d", table.name, partitionStart.UTC().Format("200601"), time.Now().UTC().UnixNano()) + columnList := quotedColumnList(table.columns) + defaultPartitionName := defaultMonthlyPartitionName(table.name) + + return database.Transaction(func(tx *gorm.DB) error { + if err := tx.Exec(fmt.Sprintf( + "LOCK TABLE %s IN ACCESS EXCLUSIVE MODE", + quotePostgresIdentifier(table.name), + )).Error; err != nil { + return err + } + if err := tx.Exec(fmt.Sprintf( + "CREATE TEMP TABLE %s ON COMMIT DROP AS SELECT %s FROM %s WHERE false", + quotePostgresIdentifier(tempTableName), + columnList, + quotePostgresIdentifier(defaultPartitionName), + )).Error; err != nil { + return err + } + if err := tx.Exec(fmt.Sprintf( + "INSERT INTO %s (%s) SELECT %s FROM %s WHERE created_at >= ? AND created_at < ?", + quotePostgresIdentifier(tempTableName), + columnList, + columnList, + quotePostgresIdentifier(defaultPartitionName), + ), partitionStart, partitionEnd).Error; err != nil { + return err + } + if err := tx.Exec(fmt.Sprintf( + "DELETE FROM %s WHERE created_at >= ? AND created_at < ?", + quotePostgresIdentifier(defaultPartitionName), + ), partitionStart, partitionEnd).Error; err != nil { + return err + } + if err := tx.Exec(createMonthlyPartitionSQL(table.name, partitionStart, partitionEnd)).Error; err != nil { + return err + } + return tx.Exec(fmt.Sprintf( + "INSERT INTO %s (%s) SELECT %s FROM %s", + quotePostgresIdentifier(table.name), + columnList, + columnList, + quotePostgresIdentifier(tempTableName), + )).Error + }) +} + func copyLegacyRowsIntoPartitionedTable(database *gorm.DB, table monthlyPartitionedTable, legacyName string, now time.Time) error { columnList := quotedColumnList(table.columns) selectList := legacySelectColumnList(table.columns, now) diff --git a/backend/internal/services/extension/handoffs.go b/backend/internal/services/extension/handoffs.go index a2b1e108..5bae5162 100644 --- a/backend/internal/services/extension/handoffs.go +++ b/backend/internal/services/extension/handoffs.go @@ -9,6 +9,7 @@ import ( "github.com/google/uuid" "gorm.io/datatypes" "gorm.io/gorm" + "gorm.io/gorm/clause" "github.com/kurodakayn/mpp-backend/internal/dto" "github.com/kurodakayn/mpp-backend/internal/models" @@ -242,12 +243,16 @@ func (s *Service) RecordExtensionEvent(req dto.ExtensionEventCallbackRequest) (* EventID: eventID, RecordID: event.ID, } - if err := tx.Create(&claim).Error; err != nil { - if errors.Is(err, gorm.ErrDuplicatedKey) { - duplicate = true - return loadClaimedExtensionEvent(tx, eventID, &event) - } - return err + result := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "event_id"}}, + DoNothing: true, + }).Create(&claim) + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + duplicate = true + return loadClaimedExtensionEvent(tx, eventID, &event) } if err := tx.Create(&event).Error; err != nil { return err From c177dbb676b17bb42f36f405314fc959a0f41890 Mon Sep 17 00:00:00 2001 From: Kuroda Kayn Date: Sun, 21 Jun 2026 23:30:43 +0800 Subject: [PATCH 6/6] fix(archive): delete extension event claims Archived extension events leave durable callback claims behind after the event rows are removed. The archive delete phase now runs an optional hook and removes claims for archived extension event IDs in the same transaction. Old claims stop blocking reused event IDs while hot event claims remain intact. --- backend/internal/services/archive/worker.go | 57 ++++++++-- .../internal/services/archive/worker_test.go | 107 ++++++++++++++++++ 2 files changed, 155 insertions(+), 9 deletions(-) diff --git a/backend/internal/services/archive/worker.go b/backend/internal/services/archive/worker.go index 912e5806..ee7615d4 100644 --- a/backend/internal/services/archive/worker.go +++ b/backend/internal/services/archive/worker.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/google/uuid" "gorm.io/gorm" "github.com/kurodakayn/mpp-backend/internal/models" @@ -44,6 +45,8 @@ type archiveLine[T any] struct { Row T `json:"row"` } +type archiveBeforeDeleteHook[T any] func(*gorm.DB, []T) error + // NewWorker creates a cold-row archival worker. func NewWorker(db *gorm.DB, storage objectstorage.Client, config Config) *Worker { return &Worker{db: db, storage: storage, config: config} @@ -131,9 +134,9 @@ func (w *Worker) archivePublishEvents(ctx context.Context, db *gorm.DB, now time } func (w *Worker) archiveExtensionExecutionEvents(ctx context.Context, db *gorm.DB, now time.Time) (TableResult, error) { - return archiveModel[models.ExtensionExecutionEvent](ctx, db, w.storage, w.config, "extension_execution_events", w.config.ExtensionExecutionEventRetention, now, func(query *gorm.DB, cutoff time.Time) *gorm.DB { + return archiveModelWithDeleteHook(ctx, db, w.storage, w.config, "extension_execution_events", w.config.ExtensionExecutionEventRetention, now, func(query *gorm.DB, cutoff time.Time) *gorm.DB { return query.Where("created_at < ?", cutoff) - }) + }, deleteExtensionExecutionEventClaims) } func (w *Worker) archiveProjectActivities(ctx context.Context, db *gorm.DB, now time.Time) (TableResult, error) { @@ -168,12 +171,26 @@ func archiveModel[T any]( retention time.Duration, now time.Time, scope func(*gorm.DB, time.Time) *gorm.DB, +) (TableResult, error) { + return archiveModelWithDeleteHook[T](ctx, db, storage, config, table, retention, now, scope, nil) +} + +func archiveModelWithDeleteHook[T any]( + ctx context.Context, + db *gorm.DB, + storage objectstorage.Client, + config Config, + table string, + retention time.Duration, + now time.Time, + scope func(*gorm.DB, time.Time) *gorm.DB, + beforeDelete archiveBeforeDeleteHook[T], ) (TableResult, error) { cutoff := now.Add(-retention) result := TableResult{Table: table, Cutoff: cutoff} for batchNumber := 0; ; batchNumber++ { - batchResult, err := archiveModelBatch[T](ctx, db, storage, config, table, cutoff, now, batchNumber, scope) + batchResult, err := archiveModelBatch[T](ctx, db, storage, config, table, cutoff, now, batchNumber, scope, beforeDelete) if err != nil { return result, err } @@ -199,6 +216,7 @@ func archiveModelBatch[T any]( now time.Time, batchNumber int, scope func(*gorm.DB, time.Time) *gorm.DB, + beforeDelete archiveBeforeDeleteHook[T], ) (TableResult, error) { result := TableResult{Table: table, Cutoff: cutoff} var records []T @@ -225,12 +243,22 @@ func archiveModelBatch[T any]( }); err != nil { return result, fmt.Errorf("upload %s archive object: %w", table, err) } - deleteResult := db.WithContext(ctx).Delete(&records) - if deleteResult.Error != nil { - return result, fmt.Errorf("delete archived %s rows: %w", table, deleteResult.Error) - } - if deleteResult.RowsAffected != int64(len(records)) { - return result, fmt.Errorf("delete archived %s rows: expected %d, deleted %d", table, len(records), deleteResult.RowsAffected) + if err := db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + if beforeDelete != nil { + if err := beforeDelete(tx, records); err != nil { + return err + } + } + deleteResult := tx.Delete(&records) + if deleteResult.Error != nil { + return fmt.Errorf("delete archived %s rows: %w", table, deleteResult.Error) + } + if deleteResult.RowsAffected != int64(len(records)) { + return fmt.Errorf("delete archived %s rows: expected %d, deleted %d", table, len(records), deleteResult.RowsAffected) + } + return nil + }); err != nil { + return result, err } result.RowsArchived = len(records) @@ -239,6 +267,17 @@ func archiveModelBatch[T any]( return result, nil } +func deleteExtensionExecutionEventClaims(tx *gorm.DB, records []models.ExtensionExecutionEvent) error { + if len(records) == 0 || !tx.Migrator().HasTable(&models.ExtensionExecutionEventClaim{}) { + return nil + } + recordIDs := make([]uuid.UUID, 0, len(records)) + for _, record := range records { + recordIDs = append(recordIDs, record.ID) + } + return tx.Where("record_id IN ?", recordIDs).Delete(&models.ExtensionExecutionEventClaim{}).Error +} + func tryArchiveWorkerLock(ctx context.Context, db *gorm.DB) (bool, error) { var locked bool if err := db.WithContext(ctx).Raw("SELECT pg_try_advisory_lock(?)", archiveWorkerAdvisoryLockKey).Scan(&locked).Error; err != nil { diff --git a/backend/internal/services/archive/worker_test.go b/backend/internal/services/archive/worker_test.go index 432b3b01..5954a92a 100644 --- a/backend/internal/services/archive/worker_test.go +++ b/backend/internal/services/archive/worker_test.go @@ -129,6 +129,104 @@ func TestWorkerRunOnceArchivesColdEventsAndDeletesRows(t *testing.T) { } } +func TestWorkerRunOnceDeletesExtensionEventClaimsForArchivedRows(t *testing.T) { + db := setupArchiveTestDB(t) + storage := fake.NewClient() + now := time.Date(2026, 6, 11, 10, 0, 0, 0, time.UTC) + coldCreatedAt := now.Add(-181 * 24 * time.Hour) + hotCreatedAt := now.Add(-10 * 24 * time.Hour) + + coldEvent := models.ExtensionExecutionEvent{ + CallbackTokenID: uuid.New(), + ExecutionID: "execution-cold", + ProjectID: uuid.New(), + UserID: uuid.New(), + EventID: "event-cold", + Platform: "wechat", + Status: "failed", + Metadata: datatypes.JSON(`{}`), + CreatedAt: coldCreatedAt, + } + hotEvent := models.ExtensionExecutionEvent{ + CallbackTokenID: uuid.New(), + ExecutionID: "execution-hot", + ProjectID: uuid.New(), + UserID: uuid.New(), + EventID: "event-hot", + Platform: "wechat", + Status: "failed", + Metadata: datatypes.JSON(`{}`), + CreatedAt: hotCreatedAt, + } + if err := db.Create(&coldEvent).Error; err != nil { + t.Fatalf("create cold extension event: %v", err) + } + if err := db.Create(&hotEvent).Error; err != nil { + t.Fatalf("create hot extension event: %v", err) + } + if err := db.Create(&models.ExtensionExecutionEventClaim{ + EventID: coldEvent.EventID, + RecordID: coldEvent.ID, + }).Error; err != nil { + t.Fatalf("create cold extension event claim: %v", err) + } + if err := db.Create(&models.ExtensionExecutionEventClaim{ + EventID: hotEvent.EventID, + RecordID: hotEvent.ID, + }).Error; err != nil { + t.Fatalf("create hot extension event claim: %v", err) + } + + worker := NewWorker(db, storage, Config{ + Enabled: true, + Interval: time.Hour, + BatchSize: 10, + ObjectKeyPrefix: "test-archive", + PublishEventRetention: 180 * 24 * time.Hour, + ExtensionExecutionEventRetention: 180 * 24 * time.Hour, + ProjectActivityRetention: 365 * 24 * time.Hour, + WorkspaceActivityRetention: 365 * 24 * time.Hour, + BrowserSessionHistoryRetention: 90 * 24 * time.Hour, + }) + result, err := worker.RunOnce(context.Background(), now) + if err != nil { + t.Fatalf("run archive worker: %v", err) + } + + extensionResult := tableResult(result, "extension_execution_events") + if extensionResult.RowsArchived != 1 { + t.Fatalf("expected one archived extension event, got %d", extensionResult.RowsArchived) + } + + var remainingEvents []models.ExtensionExecutionEvent + if err := db.Find(&remainingEvents).Error; err != nil { + t.Fatalf("query remaining extension events: %v", err) + } + if len(remainingEvents) != 1 || remainingEvents[0].ID != hotEvent.ID { + t.Fatalf("expected only hot extension event to remain, got %#v", remainingEvents) + } + + if countArchiveRows(t, db, &models.ExtensionExecutionEventClaim{}, "record_id = ?", coldEvent.ID) != 0 { + t.Fatalf("expected archived extension event claim to be deleted") + } + if countArchiveRows(t, db, &models.ExtensionExecutionEventClaim{}, "record_id = ?", hotEvent.ID) != 1 { + t.Fatalf("expected hot extension event claim to remain") + } + + body := readObject(t, storage, extensionResult.ObjectKey) + lines := jsonLines(t, body) + if len(lines) != 1 { + t.Fatalf("expected one archived extension event line, got %d", len(lines)) + } + if lines[0]["table"] != "extension_execution_events" { + t.Fatalf("expected extension event table metadata, got %#v", lines[0]["table"]) + } + row := lines[0]["row"].(map[string]any) + if row["ID"] != coldEvent.ID.String() { + t.Fatalf("expected archived extension event ID %s, got %#v", coldEvent.ID, row["ID"]) + } +} + func TestWorkerRunOnceArchivesOnlyTerminalColdBrowserSessions(t *testing.T) { db := setupArchiveTestDB(t) storage := fake.NewClient() @@ -384,3 +482,12 @@ func jsonLines(t *testing.T, body string) []map[string]any { } return lines } + +func countArchiveRows(t *testing.T, db *gorm.DB, model any, query string, args ...any) int64 { + t.Helper() + var count int64 + if err := db.Model(model).Where(query, args...).Count(&count).Error; err != nil { + t.Fatalf("count archive rows: %v", err) + } + return count +}