diff --git a/packages/pam/session/uploader.go b/packages/pam/session/uploader.go index e75d8b8b..dbc65622 100644 --- a/packages/pam/session/uploader.go +++ b/packages/pam/session/uploader.go @@ -375,11 +375,11 @@ func (su *SessionUploader) startUploadRoutine() { defer su.ticker.Stop() defer flushTicker.Stop() - // On startup, re-register any non-expired sessions that were in progress when - // the gateway last shut down or crashed so the flush ticker resumes uploading them. + // On startup, drive final cleanup for any non-expired session files left on disk + // (sessions that were active when the gateway last shut down or crashed). su.resumeInProgressSessions() - // Process any orphaned expired files from previous runs immediately + // Process any orphaned expired files from previous runs immediately. su.uploadExpiredSessionFiles() for { @@ -395,9 +395,12 @@ func (su *SessionUploader) startUploadRoutine() { }() } -// resumeInProgressSessions re-registers all session files found on disk at startup so -// the flush ticker resumes uploading them after a crash or restart. Expired sessions -// will be cleaned up naturally by uploadExpiredSessionFiles on the next tick. +// resumeInProgressSessions drives final cleanup for non-expired recording files at startup. +// A gateway restart kills all proxy connections, so any file on disk is from a session that is +// already over from the customer's perspective. CleanupPAMSession performs the final flush / +// legacy bulk upload, deletes the file, and notifies the platform of session termination. +// Already-expired files are skipped here and handled exclusively by uploadExpiredSessionFiles +// to avoid duplicate back-to-back cleanup attempts on the same file at startup. func (su *SessionUploader) resumeInProgressSessions() { allFiles, err := ListSessionFiles() if err != nil { @@ -405,9 +408,15 @@ func (su *SessionUploader) resumeInProgressSessions() { return } + now := time.Now() for _, fileInfo := range allFiles { - log.Info().Str("sessionId", fileInfo.SessionID).Str("filename", fileInfo.Filename).Msg("Resuming session upload after restart") - su.RegisterSession(fileInfo.SessionID) + if now.After(fileInfo.ExpiresAt) { + continue + } + log.Info().Str("sessionId", fileInfo.SessionID).Str("filename", fileInfo.Filename).Msg("Driving cleanup for leftover session file at startup") + if err := su.CleanupPAMSession(fileInfo.SessionID, "gateway_restart"); err != nil { + log.Error().Err(err).Str("sessionId", fileInfo.SessionID).Str("filename", fileInfo.Filename).Msg("Startup cleanup did not complete successfully") + } } } @@ -456,34 +465,36 @@ func (su *SessionUploader) flushActiveSessions() { su.activeSessionsMu.RUnlock() for _, sessionID := range sessionIDs { - su.flushSession(sessionID, encryptionKey) + _ = su.flushSession(sessionID, encryptionKey) // errors already logged inside flushSession; ticker will retry next cycle } } // flushSession reads new events from the session recording file since the last uploaded offset, -// uploads them as a batch, and advances the offset on success. -func (su *SessionUploader) flushSession(sessionID, encryptionKey string) { +// uploads them as a batch, and advances the offset on success. Returns nil when there is nothing +// to do (session not registered, already in legacy mode, no new events) or when a 404 cleanly +// transitions the session to legacy mode; the caller treats those as success. +func (su *SessionUploader) flushSession(sessionID, encryptionKey string) error { su.activeSessionsMu.RLock() state, ok := su.activeSessions[sessionID] su.activeSessionsMu.RUnlock() if !ok { - return + return nil } state.mu.Lock() defer state.mu.Unlock() if state.legacyMode { - return // Platform does not support batch uploads; bulk upload will happen at session end + return nil // Platform does not support batch uploads; bulk upload will happen at session end } payload, newOffset, err := readFromOffset(state.filename, encryptionKey, state.fileOffset) if err != nil { log.Error().Err(err).Str("sessionId", sessionID).Msg("Failed to read session events for batch upload") - return + return err } if len(payload) == 0 { - return // No new events since last flush + return nil // No new events since last flush } if err := api.CallUploadPamSessionEventBatch(su.httpClient, sessionID, state.fileOffset, payload); err != nil { @@ -492,10 +503,10 @@ func (su *SessionUploader) flushSession(sessionID, encryptionKey string) { // Platform does not support the batch upload endpoint yet; fall back to bulk upload at session end log.Warn().Str("sessionId", sessionID).Msg("Batch upload endpoint not supported by platform, will use legacy bulk upload at session end") state.legacyMode = true - return + return nil } log.Error().Err(err).Str("sessionId", sessionID).Int64("startOffset", state.fileOffset).Msg("Failed to upload session event batch, will retry next tick") - return // Do not advance offset on failure so the batch is retried + return err // Do not advance offset on failure so the batch is retried } state.fileOffset = newOffset @@ -504,6 +515,7 @@ func (su *SessionUploader) flushSession(sessionID, encryptionKey string) { } log.Debug().Str("sessionId", sessionID).Int64("newOffset", newOffset).Msg("Flushed session event batch") + return nil } func (su *SessionUploader) uploadSessionFile(fileInfo *SessionFileInfo) error { @@ -625,15 +637,22 @@ func (su *SessionUploader) CleanupPAMSession(sessionID string, reason string) er su.RegisterSession(sessionID) } - // Final flush: upload any remaining events before we delete the file. + // Final flush: upload any remaining events before we delete the file. Any failure on this path + // (key fetch, batch flush, or legacy bulk upload) returns early with the recording file, registry + // entry, and persisted offset intact so uploadExpiredSessionFiles can retry once the file crosses + // ExpiresAt. Deleting on failure would lose unuploaded events unrecoverably. encryptionKey, err := su.credentialsManager.GetPAMSessionEncryptionKey() if err != nil { - log.Warn().Err(err).Str("sessionId", sessionID).Msg("Could not get encryption key for final flush") - } else { - su.flushSession(sessionID, encryptionKey) + log.Error().Err(err).Str("sessionId", sessionID).Msg("Could not get encryption key for final flush, keeping recording file for retry") + return err + } + if flushErr := su.flushSession(sessionID, encryptionKey); flushErr != nil { + log.Error().Err(flushErr).Str("sessionId", sessionID).Msg("Final batch flush failed at session end, keeping recording file for retry") + return flushErr } - // If the batch endpoint was not supported, fall back to a single bulk upload. + // If the batch endpoint was not supported (or this session was already in legacy mode), + // fall back to a single bulk upload of the whole file. su.activeSessionsMu.RLock() state, stateExists := su.activeSessions[sessionID] su.activeSessionsMu.RUnlock() @@ -642,10 +661,12 @@ func (su *SessionUploader) CleanupPAMSession(sessionID string, reason string) er useLegacy := state.legacyMode state.mu.Unlock() if useLegacy { - if fileInfo, err := FindSessionFileBySessionID(sessionID); err == nil { - if err := su.uploadSessionFile(fileInfo); err != nil { - log.Error().Err(err).Str("sessionId", sessionID).Msg("Legacy bulk upload failed at session end") - } + fileInfo, err := FindSessionFileBySessionID(sessionID) + if err != nil { + log.Warn().Err(err).Str("sessionId", sessionID).Msg("Session file not found for legacy bulk upload") + } else if uploadErr := su.uploadSessionFile(fileInfo); uploadErr != nil { + log.Error().Err(uploadErr).Str("sessionId", sessionID).Str("filename", fileInfo.Filename).Msg("Legacy bulk upload failed at session end, keeping recording file for retry") + return uploadErr } } }