Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 47 additions & 26 deletions packages/pam/session/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,11 @@ func (su *SessionUploader) startUploadRoutine() {
defer su.ticker.Stop()
defer flushTicker.Stop()

// On startup, re-register any non-expired sessions that were in progress when
// the gateway last shut down or crashed so the flush ticker resumes uploading them.
// On startup, drive final cleanup for any non-expired session files left on disk
// (sessions that were active when the gateway last shut down or crashed).
su.resumeInProgressSessions()

// Process any orphaned expired files from previous runs immediately
// Process any orphaned expired files from previous runs immediately.
su.uploadExpiredSessionFiles()

for {
Expand All @@ -395,19 +395,28 @@ func (su *SessionUploader) startUploadRoutine() {
}()
}

// resumeInProgressSessions re-registers all session files found on disk at startup so
// the flush ticker resumes uploading them after a crash or restart. Expired sessions
// will be cleaned up naturally by uploadExpiredSessionFiles on the next tick.
// resumeInProgressSessions drives final cleanup for non-expired recording files at startup.
// A gateway restart kills all proxy connections, so any file on disk is from a session that is
// already over from the customer's perspective. CleanupPAMSession performs the final flush /
// legacy bulk upload, deletes the file, and notifies the platform of session termination.
// Already-expired files are skipped here and handled exclusively by uploadExpiredSessionFiles
// to avoid duplicate back-to-back cleanup attempts on the same file at startup.
func (su *SessionUploader) resumeInProgressSessions() {
allFiles, err := ListSessionFiles()
if err != nil {
log.Error().Err(err).Msg("Failed to list session files for resume on startup")
return
}

now := time.Now()
for _, fileInfo := range allFiles {
log.Info().Str("sessionId", fileInfo.SessionID).Str("filename", fileInfo.Filename).Msg("Resuming session upload after restart")
su.RegisterSession(fileInfo.SessionID)
if now.After(fileInfo.ExpiresAt) {
continue
}
log.Info().Str("sessionId", fileInfo.SessionID).Str("filename", fileInfo.Filename).Msg("Driving cleanup for leftover session file at startup")
if err := su.CleanupPAMSession(fileInfo.SessionID, "gateway_restart"); err != nil {
log.Error().Err(err).Str("sessionId", fileInfo.SessionID).Str("filename", fileInfo.Filename).Msg("Startup cleanup did not complete successfully")
}
Comment thread
bernie-g marked this conversation as resolved.
}
}

Expand Down Expand Up @@ -456,34 +465,36 @@ func (su *SessionUploader) flushActiveSessions() {
su.activeSessionsMu.RUnlock()

for _, sessionID := range sessionIDs {
su.flushSession(sessionID, encryptionKey)
_ = su.flushSession(sessionID, encryptionKey) // errors already logged inside flushSession; ticker will retry next cycle
}
}

// flushSession reads new events from the session recording file since the last uploaded offset,
// uploads them as a batch, and advances the offset on success.
func (su *SessionUploader) flushSession(sessionID, encryptionKey string) {
// uploads them as a batch, and advances the offset on success. Returns nil when there is nothing
// to do (session not registered, already in legacy mode, no new events) or when a 404 cleanly
// transitions the session to legacy mode; the caller treats those as success.
func (su *SessionUploader) flushSession(sessionID, encryptionKey string) error {
su.activeSessionsMu.RLock()
state, ok := su.activeSessions[sessionID]
su.activeSessionsMu.RUnlock()
if !ok {
return
return nil
}

state.mu.Lock()
defer state.mu.Unlock()

if state.legacyMode {
return // Platform does not support batch uploads; bulk upload will happen at session end
return nil // Platform does not support batch uploads; bulk upload will happen at session end
}

payload, newOffset, err := readFromOffset(state.filename, encryptionKey, state.fileOffset)
if err != nil {
log.Error().Err(err).Str("sessionId", sessionID).Msg("Failed to read session events for batch upload")
return
return err
}
if len(payload) == 0 {
return // No new events since last flush
return nil // No new events since last flush
}

if err := api.CallUploadPamSessionEventBatch(su.httpClient, sessionID, state.fileOffset, payload); err != nil {
Expand All @@ -492,10 +503,10 @@ func (su *SessionUploader) flushSession(sessionID, encryptionKey string) {
// Platform does not support the batch upload endpoint yet; fall back to bulk upload at session end
log.Warn().Str("sessionId", sessionID).Msg("Batch upload endpoint not supported by platform, will use legacy bulk upload at session end")
state.legacyMode = true
return
return nil
}
log.Error().Err(err).Str("sessionId", sessionID).Int64("startOffset", state.fileOffset).Msg("Failed to upload session event batch, will retry next tick")
return // Do not advance offset on failure so the batch is retried
return err // Do not advance offset on failure so the batch is retried
}

state.fileOffset = newOffset
Expand All @@ -504,6 +515,7 @@ func (su *SessionUploader) flushSession(sessionID, encryptionKey string) {
}

log.Debug().Str("sessionId", sessionID).Int64("newOffset", newOffset).Msg("Flushed session event batch")
return nil
}

func (su *SessionUploader) uploadSessionFile(fileInfo *SessionFileInfo) error {
Expand Down Expand Up @@ -625,15 +637,22 @@ func (su *SessionUploader) CleanupPAMSession(sessionID string, reason string) er
su.RegisterSession(sessionID)
}

// Final flush: upload any remaining events before we delete the file.
// Final flush: upload any remaining events before we delete the file. Any failure on this path
// (key fetch, batch flush, or legacy bulk upload) returns early with the recording file, registry
// entry, and persisted offset intact so uploadExpiredSessionFiles can retry once the file crosses
// ExpiresAt. Deleting on failure would lose unuploaded events unrecoverably.
encryptionKey, err := su.credentialsManager.GetPAMSessionEncryptionKey()
if err != nil {
log.Warn().Err(err).Str("sessionId", sessionID).Msg("Could not get encryption key for final flush")
} else {
su.flushSession(sessionID, encryptionKey)
log.Error().Err(err).Str("sessionId", sessionID).Msg("Could not get encryption key for final flush, keeping recording file for retry")
return err
}
if flushErr := su.flushSession(sessionID, encryptionKey); flushErr != nil {
log.Error().Err(flushErr).Str("sessionId", sessionID).Msg("Final batch flush failed at session end, keeping recording file for retry")
return flushErr
}

// If the batch endpoint was not supported, fall back to a single bulk upload.
// If the batch endpoint was not supported (or this session was already in legacy mode),
// fall back to a single bulk upload of the whole file.
su.activeSessionsMu.RLock()
state, stateExists := su.activeSessions[sessionID]
su.activeSessionsMu.RUnlock()
Expand All @@ -642,10 +661,12 @@ func (su *SessionUploader) CleanupPAMSession(sessionID string, reason string) er
useLegacy := state.legacyMode
state.mu.Unlock()
if useLegacy {
if fileInfo, err := FindSessionFileBySessionID(sessionID); err == nil {
if err := su.uploadSessionFile(fileInfo); err != nil {
log.Error().Err(err).Str("sessionId", sessionID).Msg("Legacy bulk upload failed at session end")
}
fileInfo, err := FindSessionFileBySessionID(sessionID)
if err != nil {
log.Warn().Err(err).Str("sessionId", sessionID).Msg("Session file not found for legacy bulk upload")
} else if uploadErr := su.uploadSessionFile(fileInfo); uploadErr != nil {
log.Error().Err(uploadErr).Str("sessionId", sessionID).Str("filename", fileInfo.Filename).Msg("Legacy bulk upload failed at session end, keeping recording file for retry")
return uploadErr
}
Comment thread
bernie-g marked this conversation as resolved.
}
}
Expand Down
Loading