diff --git a/backend/internal/db/db.go b/backend/internal/db/db.go index 415c5dfe..5675b033 100644 --- a/backend/internal/db/db.go +++ b/backend/internal/db/db.go @@ -336,6 +336,9 @@ func syncSchemaUnlocked(database *gorm.DB) error { if err := ensureMonthlyEventPartitions(database, time.Now().UTC()); err != nil { return err } + if err := ensureCollabUpdateBatchHashPartitions(database); err != nil { + return err + } if err := database.AutoMigrate( &models.User{}, diff --git a/backend/internal/db/db_test.go b/backend/internal/db/db_test.go index d5fb7995..02820c74 100644 --- a/backend/internal/db/db_test.go +++ b/backend/internal/db/db_test.go @@ -152,6 +152,16 @@ func TestMonthlyPartitionedEventModelsUsePartitionCompatiblePrimaryKeys(t *testi require.True(t, database.Migrator().HasTable(&models.ExtensionExecutionEventClaim{})) } +func TestCollabUpdateBatchModelUsesHashPartitionCompatiblePrimaryKey(t *testing.T) { + database, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, syncSchema(database)) + + primaryKeyColumns := sqlitePrimaryKeyColumns(t, database, "collab_document_update_batches") + + require.ElementsMatch(t, []string{"id", "document_id"}, primaryKeyColumns) +} + func TestCreateMonthlyPartitionSQLUsesMonthRange(t *testing.T) { start := time.Date(2026, 6, 1, 0, 0, 0, 0, time.UTC) @@ -171,6 +181,36 @@ func TestCreateDefaultMonthlyPartitionSQLUsesDefaultPartition(t *testing.T) { require.Contains(t, sql, " DEFAULT") } +func TestCreateHashPartitionSQLUsesDocumentHashRemainder(t *testing.T) { + sql := createHashPartitionSQL("collab_document_update_batches", 3, 16) + + require.Contains(t, sql, `"collab_document_update_batches_p03"`) + require.Contains(t, sql, `PARTITION OF "collab_document_update_batches"`) + require.Contains(t, sql, "FOR VALUES WITH (MODULUS 16, REMAINDER 3)") +} + +func TestCollabUpdateBatchHashPartitionDefinition(t *testing.T) { + require.Equal(t, "collab_document_update_batches", collabUpdateBatchHashPartitionedTable.name) + require.Equal(t, "document_id", collabUpdateBatchHashPartitionedTable.partitionKey) + require.Equal(t, 16, collabUpdateBatchHashPartitionedTable.partitionCount) + require.Contains(t, collabUpdateBatchHashPartitionedTable.createSQL, "PARTITION BY HASH (document_id)") + require.Contains(t, collabUpdateBatchHashPartitionedTable.createSQL, "PRIMARY KEY (id, document_id)") + require.Contains(t, collabUpdateBatchHashPartitionedTable.columns, "document_id") +} + +func TestRenameLegacySerialSequenceSQLAvoidsBigserialCollision(t *testing.T) { + sql := renameLegacySerialSequenceSQL( + "collab_document_update_batches", + "collab_document_update_batches_legacy_20260623120000", + "id", + ) + + require.Contains(t, sql, "pg_get_serial_sequence('collab_document_update_batches_legacy_20260623120000', 'id')") + require.Contains(t, sql, "to_regclass('collab_document_update_batches_id_seq')") + require.Contains(t, sql, "ALTER SEQUENCE %s RENAME TO %I") + require.Contains(t, sql, "'collab_document_update_batches_legacy_20260623120000_id_seq'") +} + func TestBackfillExtensionExecutionEventClaims(t *testing.T) { database, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{ TranslateError: true, diff --git a/backend/internal/db/hash_partitions.go b/backend/internal/db/hash_partitions.go new file mode 100644 index 00000000..3c889f28 --- /dev/null +++ b/backend/internal/db/hash_partitions.go @@ -0,0 +1,192 @@ +package db + +import ( + "fmt" + "strings" + "time" + + "gorm.io/gorm" +) + +const collabUpdateBatchHashPartitionCount = 16 + +type hashPartitionedTable struct { + name string + partitionKey string + partitionCount int + createSQL string + columns []string +} + +var collabUpdateBatchHashPartitionedTable = hashPartitionedTable{ + name: "collab_document_update_batches", + partitionKey: "document_id", + partitionCount: collabUpdateBatchHashPartitionCount, + createSQL: ` + CREATE TABLE IF NOT EXISTS collab_document_update_batches ( + id bigserial NOT NULL, + document_id uuid NOT NULL, + from_seq bigint NOT NULL, + to_seq bigint NOT NULL, + update_payload bytea NOT NULL, + update_count integer NOT NULL, + payload_size_bytes integer NOT NULL, + actor_user_id uuid, + created_at timestamptz NOT NULL, + CONSTRAINT pk_collab_document_update_batches_partitioned PRIMARY KEY (id, document_id) + ) PARTITION BY HASH (document_id) + `, + columns: []string{ + "id", + "document_id", + "from_seq", + "to_seq", + "update_payload", + "update_count", + "payload_size_bytes", + "actor_user_id", + "created_at", + }, +} + +func ensureCollabUpdateBatchHashPartitions(database *gorm.DB) error { + if database.Name() != "postgres" { + return nil + } + return ensureHashPartitionedTable(database, collabUpdateBatchHashPartitionedTable) +} + +func ensureHashPartitionedTable(database *gorm.DB, table hashPartitionedTable) error { + partitionKey, exists, err := postgresPartitionedTableKey(database, table.name) + if err != nil { + return err + } + + expectedKey := fmt.Sprintf("HASH (%s)", table.partitionKey) + if exists && partitionKey != "" && !strings.EqualFold(partitionKey, expectedKey) { + return fmt.Errorf("%s is partitioned by %s, expected %s", table.name, partitionKey, expectedKey) + } + + if exists && partitionKey == "" { + if err := migrateRegularTableToHashPartitions(database, table); err != nil { + return err + } + } else if !exists { + if err := database.Exec(table.createSQL).Error; err != nil { + return err + } + } + + for partitionIndex := range table.partitionCount { + if err := database.Exec(createHashPartitionSQL(table.name, partitionIndex, table.partitionCount)).Error; err != nil { + return err + } + } + return nil +} + +func postgresPartitionedTableKey(database *gorm.DB, tableName string) (partitionKey string, exists bool, err error) { + err = database.Raw(` + SELECT + to_regclass(?) IS NOT NULL AS exists, + COALESCE(pg_get_partkeydef(to_regclass(?)), '') AS partition_key + `, tableName, tableName).Row().Scan(&exists, &partitionKey) + return partitionKey, exists, err +} + +func migrateRegularTableToHashPartitions(database *gorm.DB, table hashPartitionedTable) error { + legacyName := fmt.Sprintf("%s_legacy_%s", table.name, time.Now().UTC().Format("20060102150405")) + if err := database.Exec(fmt.Sprintf( + "ALTER TABLE %s RENAME TO %s", + quotePostgresIdentifier(table.name), + quotePostgresIdentifier(legacyName), + )).Error; err != nil { + return err + } + if err := renameLegacySerialSequence(database, table.name, legacyName, "id"); err != nil { + return err + } + + if err := database.Exec(table.createSQL).Error; err != nil { + return err + } + for partitionIndex := range table.partitionCount { + if err := database.Exec(createHashPartitionSQL(table.name, partitionIndex, table.partitionCount)).Error; err != nil { + return err + } + } + if err := copyLegacyRowsIntoHashPartitionedTable(database, table, legacyName); err != nil { + return err + } + if err := syncSerialSequenceToMaxID(database, table.name, "id"); err != nil { + return err + } + return database.Exec(fmt.Sprintf("DROP TABLE %s", quotePostgresIdentifier(legacyName))).Error +} + +func copyLegacyRowsIntoHashPartitionedTable(database *gorm.DB, table hashPartitionedTable, legacyName string) error { + columnList := quotedColumnList(table.columns) + return database.Exec(fmt.Sprintf( + "INSERT INTO %s (%s) SELECT %s FROM %s", + quotePostgresIdentifier(table.name), + columnList, + columnList, + quotePostgresIdentifier(legacyName), + )).Error +} + +func renameLegacySerialSequence(database *gorm.DB, tableName string, legacyName string, columnName string) error { + return database.Exec(renameLegacySerialSequenceSQL(tableName, legacyName, columnName)).Error +} + +func renameLegacySerialSequenceSQL(tableName string, legacyName string, columnName string) string { + sequenceName := fmt.Sprintf("%s_%s_seq", tableName, columnName) + legacySequenceName := fmt.Sprintf("%s_%s_seq", legacyName, columnName) + return fmt.Sprintf(` +DO $$ +DECLARE + legacy_sequence text; +BEGIN + SELECT pg_get_serial_sequence(%s, %s) INTO legacy_sequence; + IF legacy_sequence IS NOT NULL + AND to_regclass(legacy_sequence) = to_regclass(%s) + THEN + EXECUTE format('ALTER SEQUENCE %%s RENAME TO %%I', legacy_sequence, %s); + END IF; +END $$`, + quotePostgresStringLiteral(legacyName), + quotePostgresStringLiteral(columnName), + quotePostgresStringLiteral(sequenceName), + quotePostgresStringLiteral(legacySequenceName), + ) +} + +func syncSerialSequenceToMaxID(database *gorm.DB, tableName string, columnName string) error { + return database.Exec(fmt.Sprintf( + `SELECT setval( + pg_get_serial_sequence('%s', '%s'), + COALESCE((SELECT MAX(%s) FROM %s), 1), + EXISTS (SELECT 1 FROM %s) + )`, + strings.ReplaceAll(tableName, "'", "''"), + strings.ReplaceAll(columnName, "'", "''"), + quotePostgresIdentifier(columnName), + quotePostgresIdentifier(tableName), + quotePostgresIdentifier(tableName), + )).Error +} + +func createHashPartitionSQL(tableName string, partitionIndex int, partitionCount int) string { + partitionName := fmt.Sprintf("%s_p%02d", tableName, partitionIndex) + return fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES WITH (MODULUS %d, REMAINDER %d)", + quotePostgresIdentifier(partitionName), + quotePostgresIdentifier(tableName), + partitionCount, + partitionIndex, + ) +} + +func quotePostgresStringLiteral(value string) string { + return "'" + strings.ReplaceAll(value, "'", "''") + "'" +} diff --git a/backend/internal/models/collab.go b/backend/internal/models/collab.go index 5c098558..4df3b463 100644 --- a/backend/internal/models/collab.go +++ b/backend/internal/models/collab.go @@ -58,8 +58,8 @@ type CollabDocumentState struct { } type CollabDocumentUpdateBatch struct { - ID int64 `gorm:"primaryKey;autoIncrement"` - DocumentID uuid.UUID `gorm:"type:uuid;not null;uniqueIndex:ux_collab_update_batch_doc_seq,priority:1;index:idx_collab_update_batches_doc_seq,priority:1"` + ID int64 `gorm:"primaryKey;autoIncrement:false"` + DocumentID uuid.UUID `gorm:"type:uuid;primaryKey;not null;uniqueIndex:ux_collab_update_batch_doc_seq,priority:1;index:idx_collab_update_batches_doc_seq,priority:1"` FromSeq int64 `gorm:"not null;uniqueIndex:ux_collab_update_batch_doc_seq,priority:2"` ToSeq int64 `gorm:"not null;uniqueIndex:ux_collab_update_batch_doc_seq,priority:3;index:idx_collab_update_batches_doc_seq,priority:2,sort:desc"` UpdatePayload []byte `gorm:"type:bytea;not null"` diff --git a/doc/plan/database-optimization.md b/doc/plan/database-optimization.md index 20e5db51..5e903703 100644 --- a/doc/plan/database-optimization.md +++ b/doc/plan/database-optimization.md @@ -11,7 +11,7 @@ Status definitions: - `Not started`: no clear implementation has been found yet. - `Deferred`: not recommended for the current business stage; only trigger conditions are retained. -Current overall progress: about `64%`. This number is manually estimated by phase weight and can be adjusted later according to actual completed items. +Current overall progress: about `67%`. This number is manually estimated by phase weight and can be adjusted later according to actual completed items. | Phase | Weight | Current completion | Status | Completed | Not done / next steps | | ----- | ------ | ------------------ | ------ | --------- | --------------------- | @@ -19,7 +19,7 @@ Current overall progress: about `64%`. This number is manually estimated by phas | Phase 1: Single-database connection pool, indexing, pagination, and lifecycle governance | 15% | 100% | Done | backend/publish-worker/collab-service application connection pools, Redis client connection pool, PgBouncer writer pool, composite indexes, keyset list pagination, list queries avoiding the large `source_content` field, event/session history retention periods, and R2/S3 cold-event archive worker | None; Phase 2 is complete, and later work moves into Phase 3/4 read replicas, partitioning, and recovery flows | | Phase 2: Read models and cache first | 15% | 100% | Done | Redis and Asynq dependencies are reusable; admin dashboard stats, admin project list, and dashboard account summary have short-TTL Redis cache; stats/project list/account cache misses are merged with singleflight; project, prepublish, publish, and account write paths invalidate the related dashboard cache; `workspace_dashboard_stats` and `project_list_summaries` read models are in place, and APIs prefer read models when coverage is complete; async refresh triggers after project save, platform sync, publish completion, and member changes; admin rebuild API, Asynq queue, and worker support full read-model rebuild | None | | Phase 3: Read/write splitting | 15% | 100% | Done | Optional `DB_READER_*` connection, application-level DB Router, signed sticky writer, consistency routing for project/stats/workspace/platform_account/publish/prepublish/mediaasset/browser_session/extension, consistency-level inventories for dashboard/publish/collab-service, self-hosted PostgreSQL read replica, managed `postgres-reader` entry point, PgBouncer reader pool, replica lag monitoring and automatic fallback to writer when over threshold | None; Phase 4 continues partitioning, archiving, and recovery flows | -| Phase 4: Single-database partitioning, archiving, and hot/cold tiering | 15% | 50% | In progress | Collaborative editing already has state + update batch + compaction foundation; event and terminal-session history already have row-level R2/S3 archive worker; `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and `remote_browser_sessions` have PostgreSQL monthly partition target schema | Hash partitioning for collaborative batches, cold partition export, recovery flow | +| Phase 4: Single-database partitioning, archiving, and hot/cold tiering | 15% | 70% | In progress | Collaborative editing already has state + update batch + compaction foundation; `collab_document_update_batches` has PostgreSQL `document_id` hash partition target schema; event and terminal-session history already have row-level R2/S3 archive worker; `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and `remote_browser_sessions` have PostgreSQL monthly partition target schema | Cold partition export and recovery flow | | Phase 5: Citus preparation | 20% | 5% | Not started | Workspace model, `projects.workspace_id`, and personal workspace ID already exist | Global `workspace_id`, Citus distribution column/colocation design, unique constraint and foreign-key review | | Phase 6: Citus distributed PostgreSQL operation | 10% | 0% | Deferred | None | Future Citus cluster design, worker/coordinator monitoring and backup, large-tenant isolation strategy | @@ -65,7 +65,7 @@ Atomic commit guidance: | Redis read cache | Done | Redis is already used for queues, locks, OAuth, browser sessions, and short-term coordination; admin dashboard stats, admin project list, and dashboard account summary use 15s TTL cache and bypass scoped/sticky-writer strong-consistency paths; stats/project list/account cache misses use singleflight to prevent process-local stampede; stats and account caches use versioned payloads and semantic validation, and Redis read-error fallback is also merged into one DB computation per key; project create/edit/platform save, prepublish sync/draft update, publish queue/execute/fail, and platform account write paths invalidate the related dashboard cache; full read-model rebuild reuses the Redis/Asynq queue | None | `backend/internal/services/stats/overview.go`, `backend/internal/services/stats/overview_test.go`, `backend/internal/services/project/list_cache.go`, `backend/internal/services/project/list_cache_test.go`, `backend/internal/services/prepublish/drafts.go`, `backend/internal/services/publish/service.go`, `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/publication_flow_test.go`, `backend/internal/services/publish/queue_test.go`, `backend/internal/services/platform_account/account_cache.go`, `backend/internal/services/platform_account/account_cache_test.go`, `backend/internal/services/browser_session/complete.go`, `backend/internal/services/browser_session/service_test.go`, `backend/internal/services/readmodel/queue.go` | | Read/write splitting | Done | Supports optional `DB_READER_*` read-replica connection, `DefaultRouter`, and signed sticky writer; project/stats/workspace/platform_account/publish/prepublish/mediaasset/browser_session/extension are wired to strong/eventual/writer routing; dashboard, publish, and collab-service consistency-level inventories are complete, with collab-service online path kept writer-only; writer/reader pools are in self-hosted Kubernetes, and managed overlay provides a `postgres-reader` ExternalName entry point; `DB_READER_MAX_REPLICA_LAG` configures the replica lag threshold, eventual/analytics reads automatically fall back to writer when over threshold or lag is unknown, and `mpp_db_replica_lag_seconds` and `mpp_db_replica_healthy` metrics are exposed | None | `backend/internal/db/db.go`, `backend/internal/db/router.go`, `backend/internal/db/replica_lag.go`, `backend/internal/services/publish/service.go`, `backend/internal/services/prepublish/service.go`, `backend/internal/services/mediaasset/service.go`, `backend/internal/services/browser_session/service.go`, `backend/internal/services/extension/service.go`, `backend/internal/app/runtime.go`, `deploy/kubernetes/data-services/self-hosted/postgres.yaml`, `deploy/kubernetes/data-services/self-hosted/pgbouncer.yaml`, `deploy/kubernetes/data-services/managed/services.yaml`, `script/kubernetes/validation/data_services.rb` | | Event-table partitioning and archiving | In progress | `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and terminal `remote_browser_sessions` have default retention periods; the `archive` worker can batch-export JSONL to R2/S3 and delete old hot-table rows after successful upload; PostgreSQL schema initialization now creates monthly `created_at` partitions for `publish_events`, `extension_execution_events`, `project_activities`, `workspace_activities`, and `remote_browser_sessions`, with partition-compatible `(id, created_at)` primary keys and rolling partition creation; PostgreSQL browser-session active-row fallback uses a scoped advisory transaction lock because partitioned unique constraints must include the partition key | Cold partition export and recovery flow are not implemented | `backend/internal/db/monthly_partitions.go`, `backend/internal/db/db.go`, `backend/internal/models/models.go`, `backend/internal/db/db_test.go`, `backend/internal/services/browser_session/start.go`, `backend/internal/services/browser_session/cleanup.go`, `backend/internal/services/archive/worker.go`, `backend/internal/services/archive/worker_test.go` | -| Collaboration batch governance | In progress | `collab_document_states`, `collab_document_update_batches`, and compaction/retention foundations exist | `document_id` hash partitioning and cold archiving are not implemented | `backend/internal/models/collab.go`, `collab-service/src/persistence/document-persistence.ts` | +| Collaboration batch governance | In progress | `collab_document_states`, `collab_document_update_batches`, and compaction/retention foundations exist; PostgreSQL schema initialization creates a 16-way `document_id` hash-partitioned `collab_document_update_batches` target table and migrates existing regular-table rows into it | Cold archiving is not implemented | `backend/internal/db/hash_partitions.go`, `backend/internal/db/db.go`, `backend/internal/models/collab.go`, `backend/internal/db/db_test.go`, `collab-service/src/persistence/document-persistence.ts` | | Outbox/CDC/event stream | In progress | The publishing queue path has a transactional Outbox: `EnqueuePublishProject` writes `outbox_events` in the same transaction and dispatches immediately after commit; publish worker starts an outbox dispatcher and supports retries for failed/stale processing records; Asynq continues to serve as the task-execution queue, and `PublishEvent` continues to serve as publishing audit | Currently covers only `publish.job_requested`; general business-event outbox, Debezium, and Redpanda/Kafka CDC are not implemented | `backend/internal/services/publish/queue.go`, `backend/internal/services/publish/outbox.go`, `backend/internal/models/models.go` | | Citus target state | Not started | Confirmed `workspace_id` as the most suitable distribution-column direction | Citus distributed tables, reference tables, and colocation are not implemented | Phase 5/6 in this document | @@ -119,7 +119,7 @@ Atomic commit guidance: - [x] Keep collaborative editing state + update batch + compaction foundation. - [x] Partition `publish_events`, `extension_execution_events`, and activity tables by month. Verification entry point: `backend/internal/db/monthly_partitions.go`, `backend/internal/models/models.go`, `backend/internal/db/db_test.go`. - [x] Partition `remote_browser_sessions` by time or expiration time. Verification entry point: `backend/internal/db/monthly_partitions.go`, `backend/internal/models/models.go`, `backend/internal/services/browser_session/start.go`, `backend/internal/db/db_test.go`. -- [ ] Hash partition `collab_document_update_batches` by `document_id`. +- [x] Hash partition `collab_document_update_batches` by `document_id`. Verification entry point: `backend/internal/db/hash_partitions.go`, `backend/internal/db/db.go`, `backend/internal/models/collab.go`, `backend/internal/db/db_test.go`. - [ ] Export cold partitions to R2/S3. - [ ] Write archive recovery procedure.