From 13a16d3b26360144f15e05eef05d4824f635cb78 Mon Sep 17 00:00:00 2001 From: bernie-g Date: Tue, 28 Apr 2026 17:11:19 -0400 Subject: [PATCH 1/2] fix(pam): keep session recording on upload failure (PAM-205) CleanupPAMSession previously deleted the local recording file and notified the platform of session termination even when the legacy bulk upload (or final batch flush, or encryption-key fetch) failed, silently losing the entire session. Now any upload-side failure returns early with the file, registry entry, and persisted offset all intact, so uploadExpiredSessionFiles can retry once ExpiresAt crosses. flushSession returns its error so CleanupPAMSession can observe the failure; flushActiveSessions discards it since the 10s ticker retries on its own cycle. Also changes resumeInProgressSessions to call CleanupPAMSession instead of RegisterSession for each leftover file at startup. A gateway restart kills every proxy connection, so any file on disk is from a session that is over from the customer's perspective; driving final cleanup is correct and turns gateway restart into a real retry path for stuck legacy uploads. --- packages/pam/session/uploader.go | 62 ++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/packages/pam/session/uploader.go b/packages/pam/session/uploader.go index e75d8b8b..35b8b67c 100644 --- a/packages/pam/session/uploader.go +++ b/packages/pam/session/uploader.go @@ -395,9 +395,11 @@ func (su *SessionUploader) startUploadRoutine() { }() } -// resumeInProgressSessions re-registers all session files found on disk at startup so -// the flush ticker resumes uploading them after a crash or restart. Expired sessions -// will be cleaned up naturally by uploadExpiredSessionFiles on the next tick. +// resumeInProgressSessions drives final cleanup for every leftover recording file at startup. +// A gateway restart kills all proxy connections, so any file on disk is from a session that is +// already over from the customer's perspective. CleanupPAMSession performs the final flush / +// legacy bulk upload, deletes the file, and notifies the platform of session termination. +// On failure the file is retained and uploadExpiredSessionFiles will retry once ExpiresAt crosses. func (su *SessionUploader) resumeInProgressSessions() { allFiles, err := ListSessionFiles() if err != nil { @@ -406,8 +408,10 @@ func (su *SessionUploader) resumeInProgressSessions() { } for _, fileInfo := range allFiles { - log.Info().Str("sessionId", fileInfo.SessionID).Str("filename", fileInfo.Filename).Msg("Resuming session upload after restart") - su.RegisterSession(fileInfo.SessionID) + log.Info().Str("sessionId", fileInfo.SessionID).Str("filename", fileInfo.Filename).Msg("Driving cleanup for leftover session file at startup") + if err := su.CleanupPAMSession(fileInfo.SessionID, "gateway_restart"); err != nil { + log.Error().Err(err).Str("sessionId", fileInfo.SessionID).Str("filename", fileInfo.Filename).Msg("Startup cleanup failed, file retained for retry on next ticker") + } } } @@ -456,34 +460,36 @@ func (su *SessionUploader) flushActiveSessions() { su.activeSessionsMu.RUnlock() for _, sessionID := range sessionIDs { - su.flushSession(sessionID, encryptionKey) + _ = su.flushSession(sessionID, encryptionKey) // errors already logged inside flushSession; ticker will retry next cycle } } // flushSession reads new events from the session recording file since the last uploaded offset, -// uploads them as a batch, and advances the offset on success. -func (su *SessionUploader) flushSession(sessionID, encryptionKey string) { +// uploads them as a batch, and advances the offset on success. Returns nil when there is nothing +// to do (session not registered, already in legacy mode, no new events) or when a 404 cleanly +// transitions the session to legacy mode; the caller treats those as success. +func (su *SessionUploader) flushSession(sessionID, encryptionKey string) error { su.activeSessionsMu.RLock() state, ok := su.activeSessions[sessionID] su.activeSessionsMu.RUnlock() if !ok { - return + return nil } state.mu.Lock() defer state.mu.Unlock() if state.legacyMode { - return // Platform does not support batch uploads; bulk upload will happen at session end + return nil // Platform does not support batch uploads; bulk upload will happen at session end } payload, newOffset, err := readFromOffset(state.filename, encryptionKey, state.fileOffset) if err != nil { log.Error().Err(err).Str("sessionId", sessionID).Msg("Failed to read session events for batch upload") - return + return err } if len(payload) == 0 { - return // No new events since last flush + return nil // No new events since last flush } if err := api.CallUploadPamSessionEventBatch(su.httpClient, sessionID, state.fileOffset, payload); err != nil { @@ -492,10 +498,10 @@ func (su *SessionUploader) flushSession(sessionID, encryptionKey string) { // Platform does not support the batch upload endpoint yet; fall back to bulk upload at session end log.Warn().Str("sessionId", sessionID).Msg("Batch upload endpoint not supported by platform, will use legacy bulk upload at session end") state.legacyMode = true - return + return nil } log.Error().Err(err).Str("sessionId", sessionID).Int64("startOffset", state.fileOffset).Msg("Failed to upload session event batch, will retry next tick") - return // Do not advance offset on failure so the batch is retried + return err // Do not advance offset on failure so the batch is retried } state.fileOffset = newOffset @@ -504,6 +510,7 @@ func (su *SessionUploader) flushSession(sessionID, encryptionKey string) { } log.Debug().Str("sessionId", sessionID).Int64("newOffset", newOffset).Msg("Flushed session event batch") + return nil } func (su *SessionUploader) uploadSessionFile(fileInfo *SessionFileInfo) error { @@ -625,15 +632,22 @@ func (su *SessionUploader) CleanupPAMSession(sessionID string, reason string) er su.RegisterSession(sessionID) } - // Final flush: upload any remaining events before we delete the file. + // Final flush: upload any remaining events before we delete the file. Any failure on this path + // (key fetch, batch flush, or legacy bulk upload) returns early with the recording file, registry + // entry, and persisted offset intact so uploadExpiredSessionFiles can retry once the file crosses + // ExpiresAt. Deleting on failure would lose unuploaded events unrecoverably. encryptionKey, err := su.credentialsManager.GetPAMSessionEncryptionKey() if err != nil { - log.Warn().Err(err).Str("sessionId", sessionID).Msg("Could not get encryption key for final flush") - } else { - su.flushSession(sessionID, encryptionKey) + log.Error().Err(err).Str("sessionId", sessionID).Msg("Could not get encryption key for final flush, keeping recording file for retry") + return err + } + if flushErr := su.flushSession(sessionID, encryptionKey); flushErr != nil { + log.Error().Err(flushErr).Str("sessionId", sessionID).Msg("Final batch flush failed at session end, keeping recording file for retry") + return flushErr } - // If the batch endpoint was not supported, fall back to a single bulk upload. + // If the batch endpoint was not supported (or this session was already in legacy mode), + // fall back to a single bulk upload of the whole file. su.activeSessionsMu.RLock() state, stateExists := su.activeSessions[sessionID] su.activeSessionsMu.RUnlock() @@ -642,10 +656,12 @@ func (su *SessionUploader) CleanupPAMSession(sessionID string, reason string) er useLegacy := state.legacyMode state.mu.Unlock() if useLegacy { - if fileInfo, err := FindSessionFileBySessionID(sessionID); err == nil { - if err := su.uploadSessionFile(fileInfo); err != nil { - log.Error().Err(err).Str("sessionId", sessionID).Msg("Legacy bulk upload failed at session end") - } + fileInfo, err := FindSessionFileBySessionID(sessionID) + if err != nil { + log.Warn().Err(err).Str("sessionId", sessionID).Msg("Session file not found for legacy bulk upload") + } else if uploadErr := su.uploadSessionFile(fileInfo); uploadErr != nil { + log.Error().Err(uploadErr).Str("sessionId", sessionID).Str("filename", fileInfo.Filename).Msg("Legacy bulk upload failed at session end, keeping recording file for retry") + return uploadErr } } } From caf9444f93a69e1d15e435932cdc6afcc08160cc Mon Sep 17 00:00:00 2001 From: bernie-g Date: Tue, 28 Apr 2026 19:28:53 -0400 Subject: [PATCH 2/2] fix(pam): tighten startup cleanup behavior Address review feedback on resumeInProgressSessions: - Skip already-expired files at startup; they are handled exclusively by uploadExpiredSessionFiles which fires immediately afterward. Prevents duplicate back-to-back cleanup attempts on the same file when the platform endpoint is flaky. - Soften the failure log to "Startup cleanup did not complete successfully" since CleanupPAMSession can also fail after the recording file has already been deleted (termination-notify error path), in which case "file retained for retry" was inaccurate. - Update the stale inline comment in startUploadRoutine to reflect the new function behavior. --- packages/pam/session/uploader.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/packages/pam/session/uploader.go b/packages/pam/session/uploader.go index 35b8b67c..dbc65622 100644 --- a/packages/pam/session/uploader.go +++ b/packages/pam/session/uploader.go @@ -375,11 +375,11 @@ func (su *SessionUploader) startUploadRoutine() { defer su.ticker.Stop() defer flushTicker.Stop() - // On startup, re-register any non-expired sessions that were in progress when - // the gateway last shut down or crashed so the flush ticker resumes uploading them. + // On startup, drive final cleanup for any non-expired session files left on disk + // (sessions that were active when the gateway last shut down or crashed). su.resumeInProgressSessions() - // Process any orphaned expired files from previous runs immediately + // Process any orphaned expired files from previous runs immediately. su.uploadExpiredSessionFiles() for { @@ -395,11 +395,12 @@ func (su *SessionUploader) startUploadRoutine() { }() } -// resumeInProgressSessions drives final cleanup for every leftover recording file at startup. +// resumeInProgressSessions drives final cleanup for non-expired recording files at startup. // A gateway restart kills all proxy connections, so any file on disk is from a session that is // already over from the customer's perspective. CleanupPAMSession performs the final flush / // legacy bulk upload, deletes the file, and notifies the platform of session termination. -// On failure the file is retained and uploadExpiredSessionFiles will retry once ExpiresAt crosses. +// Already-expired files are skipped here and handled exclusively by uploadExpiredSessionFiles +// to avoid duplicate back-to-back cleanup attempts on the same file at startup. func (su *SessionUploader) resumeInProgressSessions() { allFiles, err := ListSessionFiles() if err != nil { @@ -407,10 +408,14 @@ func (su *SessionUploader) resumeInProgressSessions() { return } + now := time.Now() for _, fileInfo := range allFiles { + if now.After(fileInfo.ExpiresAt) { + continue + } log.Info().Str("sessionId", fileInfo.SessionID).Str("filename", fileInfo.Filename).Msg("Driving cleanup for leftover session file at startup") if err := su.CleanupPAMSession(fileInfo.SessionID, "gateway_restart"); err != nil { - log.Error().Err(err).Str("sessionId", fileInfo.SessionID).Str("filename", fileInfo.Filename).Msg("Startup cleanup failed, file retained for retry on next ticker") + log.Error().Err(err).Str("sessionId", fileInfo.SessionID).Str("filename", fileInfo.Filename).Msg("Startup cleanup did not complete successfully") } } }