Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions backend/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,13 +388,17 @@ func syncSchemaUnlocked(database *gorm.DB) error {
return err
}

// Redis owns normal active-session locking; this index is the atomic fallback when Redis is disabled.
if err := database.Exec(`
CREATE UNIQUE INDEX IF NOT EXISTS ux_remote_browser_sessions_active_user_platform
ON remote_browser_sessions (user_id, platform)
WHERE status IN ('pending', 'ready', 'login_detected', 'capturing')
`).Error; err != nil {
return err
if database.Name() != "postgres" {
// Redis owns normal active-session locking. Non-PostgreSQL local/test databases keep
// a partial unique index as the no-Redis fallback; PostgreSQL uses a scoped advisory
// transaction lock because partitioned unique indexes must include the partition key.
if err := database.Exec(`
CREATE UNIQUE INDEX IF NOT EXISTS ux_remote_browser_sessions_active_user_platform
ON remote_browser_sessions (user_id, platform)
WHERE status IN ('pending', 'ready', 'login_detected', 'capturing')
`).Error; err != nil {
return err
}
}
if database.Name() == "postgres" {
if err := database.Exec(`
Expand Down
1 change: 1 addition & 0 deletions backend/internal/db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func TestMonthlyPartitionedEventModelsUsePartitionCompatiblePrimaryKeys(t *testi
"extension_execution_events",
"project_activities",
"workspace_activities",
"remote_browser_sessions",
} {
primaryKeyColumns := sqlitePrimaryKeyColumns(t, database, tableName)

Expand Down
40 changes: 40 additions & 0 deletions backend/internal/db/monthly_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,46 @@ var monthlyEventPartitionedTables = []monthlyPartitionedTable{
"created_at",
},
},
{
name: "remote_browser_sessions",
createSQL: `
CREATE TABLE IF NOT EXISTS remote_browser_sessions (
id uuid NOT NULL,
user_id uuid NOT NULL,
workspace_id uuid,
platform_account_id uuid,
platform text NOT NULL,
status text NOT NULL,
worker_session_ref text NOT NULL DEFAULT '',
runtime_reference jsonb NOT NULL DEFAULT '{}'::jsonb,
stream_endpoint_ref text NOT NULL DEFAULT '',
connect_token_hash text NOT NULL,
connect_token_expires_at timestamptz,
error_message text NOT NULL DEFAULT '',
created_at timestamptz NOT NULL,
expires_at timestamptz NOT NULL,
completed_at timestamptz,
CONSTRAINT pk_remote_browser_sessions_partitioned PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at)
`,
columns: []string{
"id",
"user_id",
"workspace_id",
"platform_account_id",
"platform",
"status",
"worker_session_ref",
"runtime_reference",
"stream_endpoint_ref",
"connect_token_hash",
"connect_token_expires_at",
"error_message",
"created_at",
"expires_at",
"completed_at",
},
},
}

func ensureMonthlyEventPartitions(database *gorm.DB, now time.Time) error {
Expand Down
7 changes: 5 additions & 2 deletions backend/internal/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,8 +632,8 @@ type RemoteBrowserSession struct {
ConnectTokenHash string `gorm:"not null"`
ConnectTokenExpiresAt time.Time
ErrorMessage string `gorm:"not null;default:''"`
CreatedAt time.Time `gorm:"not null;index:idx_remote_browser_sessions_archive_status_created_id,priority:2"`
ExpiresAt time.Time `gorm:"not null"`
CreatedAt time.Time `gorm:"primaryKey;not null;index:idx_remote_browser_sessions_archive_status_created_id,priority:2"`
ExpiresAt time.Time `gorm:"not null;index:idx_remote_browser_sessions_expires_at"`
CompletedAt *time.Time
}

Expand Down Expand Up @@ -927,6 +927,9 @@ func (s *RemoteBrowserSession) BeforeCreate(_ *gorm.DB) (err error) {
if s.ID == uuid.Nil {
s.ID = uuid.New()
}
if s.CreatedAt.IsZero() {
s.CreatedAt = time.Now().UTC()
}
return
}

Expand Down
2 changes: 1 addition & 1 deletion backend/internal/services/browser_session/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *BrowserSessionService) CleanupExpiredSessions(ctx context.Context, now

func (s *BrowserSessionService) cleanupExpiredSession(ctx context.Context, sessionID uuid.UUID) error {
var session models.RemoteBrowserSession
if err := s.writerDB(ctx).First(&session, sessionID).Error; err != nil {
if err := s.writerDB(ctx).Where("id = ?", sessionID).First(&session).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return s.removeRedisCleanupMember(ctx, sessionID)
}
Expand Down
65 changes: 52 additions & 13 deletions backend/internal/services/browser_session/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/google/uuid"
"gorm.io/datatypes"
"gorm.io/gorm"

"github.com/kurodakayn/mpp-backend/internal/contracts"
"github.com/kurodakayn/mpp-backend/internal/dto"
Expand Down Expand Up @@ -64,6 +65,12 @@ func validateWorkerRuntimeEndpoint(name string, endpoint contracts.BrowserWorker
return nil
}

const browserSessionActiveGuardNamespace = 776770002

func browserSessionActiveGuardScope(userID uuid.UUID, platform string) string {
return "mpp:browser-session-active:" + userID.String() + ":" + strings.ToLower(strings.TrimSpace(platform))
}

func (s *BrowserSessionService) startSessionForWorkspace(ctx context.Context, userID uuid.UUID, tenantID string, workspaceID uuid.UUID, accountID uuid.UUID, platform string, authorizeTarget bool) (*dto.StartBrowserSessionResponse, error) {
adapter, ok := s.adapters[platform]
if !ok {
Expand All @@ -85,6 +92,7 @@ func (s *BrowserSessionService) startSessionForWorkspace(ctx context.Context, us
expiresAt := now.Add(browserSessionTTL)

// 1. Use Redis as the live active-session lock when available.
var dbSessionCreated bool
if s.coordinationRedisClient != nil {
acquired, err := s.acquireRedisActiveSession(ctx, userID, platform, sessionID, expiresAt)
if err != nil {
Expand Down Expand Up @@ -114,14 +122,6 @@ func (s *BrowserSessionService) startSessionForWorkspace(ctx context.Context, us
_ = s.releaseRedisActiveSession(ctx, userID, platform, sessionID)
return nil, err
}
} else {
activeSessionExists, err := s.activeSessionExists(ctx, userID, platform, now)
if err != nil {
return nil, err
}
if activeSessionExists {
return nil, ErrActiveSessionExists
}
}

// 2. Generate stream token
Expand All @@ -147,12 +147,21 @@ func (s *BrowserSessionService) startSessionForWorkspace(ctx context.Context, us
session.PlatformAccountID = &accountID
}

if err := s.writerDB(ctx).Create(session).Error; err != nil {
_ = s.cleanupRedisSessionForTenant(ctx, userID, tenantID, platform, sessionID, "")
if isActiveSessionUniquenessError(err) {
return nil, ErrActiveSessionExists
if s.coordinationRedisClient == nil {
if err := s.createSessionWithDatabaseActiveGuard(ctx, session, now); err != nil {
return nil, err
}
dbSessionCreated = true
}

if !dbSessionCreated {
if err := s.writerDB(ctx).Create(session).Error; err != nil {
_ = s.cleanupRedisSessionForTenant(ctx, userID, tenantID, platform, sessionID, "")
if isActiveSessionUniquenessError(err) {
return nil, ErrActiveSessionExists
}
return nil, err
}
return nil, err
}
if err := s.saveRedisLiveSession(ctx, browserSessionLiveState{
SessionID: sessionID,
Expand Down Expand Up @@ -265,6 +274,36 @@ func (s *BrowserSessionService) startSessionForWorkspace(ctx context.Context, us
}, nil
}

func (s *BrowserSessionService) createSessionWithDatabaseActiveGuard(ctx context.Context, session *models.RemoteBrowserSession, now time.Time) error {
return s.writerDB(ctx).Transaction(func(tx *gorm.DB) error {
scoped := *s
scoped.db = tx
scoped.router = nil

if tx.Name() == "postgres" {
if err := tx.Exec("SELECT pg_advisory_xact_lock(?, hashtext(?))", browserSessionActiveGuardNamespace, browserSessionActiveGuardScope(session.UserID, session.Platform)).Error; err != nil {
return err
}
}

activeSessionExists, err := scoped.activeSessionExists(ctx, session.UserID, session.Platform, now)
if err != nil {
return err
}
if activeSessionExists {
return ErrActiveSessionExists
}

if err := tx.Create(session).Error; err != nil {
if isActiveSessionUniquenessError(err) {
return ErrActiveSessionExists
}
return err
}
return nil
})
}

func isActiveSessionUniquenessError(err error) bool {
if err == nil {
return false
Expand Down
Loading
Loading