Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion backend/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func openPostgresDatabase(dsn string) (*gorm.DB, error) {
database, err := gorm.Open(postgres.New(postgres.Config{
DSN: dsn,
PreferSimpleProtocol: true,
}), &gorm.Config{})
}), &gorm.Config{
TranslateError: true,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -331,6 +333,10 @@ func syncSchema(database *gorm.DB) error {
}

func syncSchemaUnlocked(database *gorm.DB) error {
if err := ensureMonthlyEventPartitions(database, time.Now().UTC()); err != nil {
return err
}

if err := database.AutoMigrate(
&models.User{},
&models.Workspace{},
Expand Down Expand Up @@ -373,10 +379,14 @@ func syncSchemaUnlocked(database *gorm.DB) error {
&models.CollabDocumentState{},
&models.CollabDocumentUpdateBatch{},
&models.ExtensionCallbackToken{},
&models.ExtensionExecutionEventClaim{},
&models.ExtensionExecutionEvent{},
); err != nil {
return err
}
if err := backfillExtensionExecutionEventClaims(database); err != nil {
return err
}

// Redis owns normal active-session locking; this index is the atomic fallback when Redis is disabled.
if err := database.Exec(`
Expand Down
111 changes: 111 additions & 0 deletions backend/internal/db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"context"
"database/sql"
"testing"
"time"

Expand Down Expand Up @@ -132,6 +133,90 @@ func TestSyncSchemaAddsArchiveScanIndexes(t *testing.T) {
require.True(t, database.Migrator().HasIndex(&models.RemoteBrowserSession{}, "idx_remote_browser_sessions_archive_status_created_id"))
}

func TestMonthlyPartitionedEventModelsUsePartitionCompatiblePrimaryKeys(t *testing.T) {
database, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
require.NoError(t, err)
require.NoError(t, syncSchema(database))

for _, tableName := range []string{
"publish_events",
"extension_execution_events",
"project_activities",
"workspace_activities",
} {
primaryKeyColumns := sqlitePrimaryKeyColumns(t, database, tableName)

require.ElementsMatch(t, []string{"id", "created_at"}, primaryKeyColumns, tableName)
}
require.True(t, database.Migrator().HasTable(&models.ExtensionExecutionEventClaim{}))
}

func TestCreateMonthlyPartitionSQLUsesMonthRange(t *testing.T) {
start := time.Date(2026, 6, 1, 0, 0, 0, 0, time.UTC)

sql := createMonthlyPartitionSQL("publish_events", start, start.AddDate(0, 1, 0))

require.Contains(t, sql, `"publish_events_2026_06"`)
require.Contains(t, sql, `PARTITION OF "publish_events"`)
require.Contains(t, sql, `TIMESTAMPTZ '2026-06-01 00:00:00+00'`)
require.Contains(t, sql, `TIMESTAMPTZ '2026-07-01 00:00:00+00'`)
}

func TestCreateDefaultMonthlyPartitionSQLUsesDefaultPartition(t *testing.T) {
sql := createDefaultMonthlyPartitionSQL("publish_events")

require.Contains(t, sql, `"publish_events_default"`)
require.Contains(t, sql, `PARTITION OF "publish_events"`)
require.Contains(t, sql, " DEFAULT")
}

func TestBackfillExtensionExecutionEventClaims(t *testing.T) {
database, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{
TranslateError: true,
})
require.NoError(t, err)
require.NoError(t, database.AutoMigrate(
&models.ExtensionExecutionEvent{},
&models.ExtensionExecutionEventClaim{},
))

olderID := uuid.New()
newerID := uuid.New()
older := time.Date(2026, 6, 1, 10, 0, 0, 0, time.UTC)
newer := older.Add(time.Hour)
require.NoError(t, database.Create(&models.ExtensionExecutionEvent{
ID: newerID,
CallbackTokenID: uuid.New(),
ExecutionID: "execution-newer",
ProjectID: uuid.New(),
UserID: uuid.New(),
EventID: "event-1",
Platform: "x",
Status: "failed",
Metadata: []byte(`{}`),
CreatedAt: newer,
}).Error)
require.NoError(t, database.Create(&models.ExtensionExecutionEvent{
ID: olderID,
CallbackTokenID: uuid.New(),
ExecutionID: "execution-older",
ProjectID: uuid.New(),
UserID: uuid.New(),
EventID: "event-1",
Platform: "x",
Status: "failed",
Metadata: []byte(`{}`),
CreatedAt: older,
}).Error)

require.NoError(t, backfillExtensionExecutionEventClaims(database))
require.NoError(t, backfillExtensionExecutionEventClaims(database))

var claim models.ExtensionExecutionEventClaim
require.NoError(t, database.First(&claim, "event_id = ?", "event-1").Error)
require.Equal(t, olderID, claim.RecordID)
}

func TestConnectionPoolConfigFromEnvUsesDefaults(t *testing.T) {
clearConnectionPoolEnv(t)

Expand Down Expand Up @@ -448,3 +533,29 @@ func setDatabaseConnectionEnv(t *testing.T) {
t.Setenv("DB_NAME", "poster_db")
t.Setenv("DB_PORT", "5432")
}

func sqlitePrimaryKeyColumns(t *testing.T, database *gorm.DB, tableName string) []string {
t.Helper()

rows, err := database.Raw("PRAGMA table_info(" + tableName + ")").Rows()
require.NoError(t, err)
defer func() {
require.NoError(t, rows.Close())
}()

columns := []string{}
for rows.Next() {
var cid int
var name string
var dataType string
var notNull int
var defaultValue sql.NullString
var primaryKeyPosition int
require.NoError(t, rows.Scan(&cid, &name, &dataType, &notNull, &defaultValue, &primaryKeyPosition))
if primaryKeyPosition > 0 {
columns = append(columns, name)
}
}
require.NoError(t, rows.Err())
return columns
}
42 changes: 42 additions & 0 deletions backend/internal/db/extension_event_claims.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package db

import (
"fmt"

"gorm.io/gorm"
)

func backfillExtensionExecutionEventClaims(database *gorm.DB) error {
if !database.Migrator().HasTable("extension_execution_events") ||
!database.Migrator().HasTable("extension_execution_event_claims") {
return nil
}

switch database.Name() {
case "postgres":
return database.Exec(`
INSERT INTO extension_execution_event_claims (event_id, record_id, created_at)
SELECT DISTINCT ON (event_id) event_id, id, created_at
FROM extension_execution_events
ORDER BY event_id, created_at ASC, id ASC
ON CONFLICT (event_id) DO NOTHING
`).Error
case "sqlite":
return database.Exec(`
INSERT OR IGNORE INTO extension_execution_event_claims (event_id, record_id, created_at)
SELECT e.event_id, e.id, e.created_at
FROM extension_execution_events e
WHERE NOT EXISTS (
SELECT 1
FROM extension_execution_events older
WHERE older.event_id = e.event_id
AND (
older.created_at < e.created_at
OR (older.created_at = e.created_at AND older.id < e.id)
)
)
`).Error
default:
return fmt.Errorf("unsupported database dialect %q for extension event claim backfill", database.Name())
}
}
Loading
Loading