Skip to content

Commit

Permalink
Add support for tailing Kubernetes pod logs
Browse files Browse the repository at this point in the history
This introduces support for reading logs via the Kubernetes API server
using the `kubectl logs` utility.
  • Loading branch information
josephglanville committed Mar 10, 2023
1 parent b426a5f commit e8e6d24
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 1 deletion.
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ type ServerConfig struct {
// on the specifed "hostname:port" for Postgres log messages
LogSyslogServer string `ini:"db_log_syslog_server"`

// Configure the collector to tail a container within a Kubernetes pod
LogKubernetesPod string `ini:"db_log_kubernetes_pod"`
// The container name within the kubernetes pod to tail
LogKubernetesContainer string `ini:"db_log_kubernetes_container"`

// Configures the collector to use the "pg_read_file" (superuser) or
// "pganalyze.read_log_file" (helper) function to retrieve log data
// directly over the Postgres connection. This only works when superuser
Expand Down
6 changes: 6 additions & 0 deletions config/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ func getDefaultConfig() *ServerConfig {
if logSyslogServer := os.Getenv("LOG_SYSLOG_SERVER"); logSyslogServer != "" {
config.LogSyslogServer = logSyslogServer
}
if logKubernetesPodName := os.Getenv("LOG_KUBERNETES_POD"); logKubernetesPodName != "" {
config.LogKubernetesPod = logKubernetesPodName
}
if logKubernetesContainerName := os.Getenv("LOG_KUBERNETES_CONTAINER"); logKubernetesContainerName != "" {
config.LogKubernetesContainer = logKubernetesContainerName
}
if alwaysCollectSystemData := os.Getenv("PGA_ALWAYS_COLLECT_SYSTEM_DATA"); alwaysCollectSystemData != "" {
config.AlwaysCollectSystemData = parseConfigBool(alwaysCollectSystemData)
}
Expand Down
67 changes: 67 additions & 0 deletions input/system/selfhosted/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ func SetupLogTailForServer(ctx context.Context, wg *sync.WaitGroup, globalCollec
return setupLogLocationTail(ctx, server.Config.LogLocation, logStream, logger)
}

func SetupLogTailForPod(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.CollectionOpts, logger *util.Logger, server *state.Server, parsedLogStream chan state.ParsedLogStreamItem) error {
if globalCollectionOpts.DebugLogs || globalCollectionOpts.TestRun {
logger.PrintInfo("Setting up log tail for pod: %s, container: %s", server.Config.LogKubernetesPod, server.Config.LogKubernetesContainer)
}

logStream := setupLogTransformer(ctx, wg, server, globalCollectionOpts, logger, parsedLogStream)
return setupKubernetesTail(ctx, server.Config.LogKubernetesPod, server.Config.LogKubernetesContainer, logStream, logger)
}

// SetupLogTails - Sets up continuously running log tails for all servers with a
// local log directory or file specified
func SetupLogTails(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts state.CollectionOpts, logger *util.Logger, servers []*state.Server, parsedLogStream chan state.ParsedLogStreamItem) {
Expand All @@ -155,6 +164,16 @@ func SetupLogTails(ctx context.Context, wg *sync.WaitGroup, globalCollectionOpts
if err != nil {
prefixedLogger.PrintError("ERROR - %s", err)
}
} else if server.Config.LogKubernetesPod != "" && server.Config.LogKubernetesContainer != "" {
if globalCollectionOpts.DebugLogs || globalCollectionOpts.TestRun {
prefixedLogger.PrintInfo("Setting up kubectl logs tail for %s", server.Config.LogKubernetesPod)
}

logStream := setupLogTransformer(ctx, wg, server, globalCollectionOpts, prefixedLogger, parsedLogStream)
err := setupKubernetesTail(ctx, server.Config.LogKubernetesPod, server.Config.LogKubernetesContainer, logStream, prefixedLogger)
if err != nil {
prefixedLogger.PrintError("Error - %s", err)
}
} else if server.Config.LogSyslogServer != "" {
logStream := setupLogTransformer(ctx, wg, server, globalCollectionOpts, prefixedLogger, parsedLogStream)
err := setupSyslogHandler(ctx, server.Config.LogSyslogServer, logStream, prefixedLogger)
Expand Down Expand Up @@ -372,6 +391,54 @@ func setupDockerTail(ctx context.Context, containerName string, out chan<- SelfH
return nil
}

func setupKubernetesTail(ctx context.Context, podName string, containerName string, out chan<- SelfHostedLogStreamItem, prefixedLogger *util.Logger) error {
go func() {
for {
var cancelled bool
cmd := exec.CommandContext(ctx, "kubectl", "logs", podName, "-c", containerName, "-f", "--tail=0")
// Set a custom Cancel, so we can interpret the difference between the command exiting due to failure vs context cancellation
cmd.Cancel = func() error {
cancelled = true
return cmd.Process.Kill()
}

stdout, _ := cmd.StdoutPipe()
stderr, _ := cmd.StderrPipe()

scanner := bufio.NewScanner(stdout)
go func() {
for scanner.Scan() {
out <- SelfHostedLogStreamItem{Line: scanner.Text()}
}
}()

errScanner := bufio.NewScanner(stderr)
go func() {
for errScanner.Scan() {
prefixedLogger.PrintError("kubectl err: %s", errScanner.Text())
}
}()

if err := cmd.Start(); err != nil {
prefixedLogger.PrintError("Failed to start kubectl logs, err: %s", err.Error())
time.Sleep(1 * time.Second)
continue
}

processState, _ := cmd.Process.Wait()
if cancelled {
prefixedLogger.PrintVerbose("kubectl log tail received stop signal")
return
}
exitCode := processState.ExitCode()
prefixedLogger.PrintVerbose("kubectl logs exited with exitCode: %d, restarting", exitCode)
time.Sleep(1 * time.Second)
}
}()

return nil
}

func setupLogTransformer(ctx context.Context, wg *sync.WaitGroup, server *state.Server, globalCollectionOpts state.CollectionOpts, prefixedLogger *util.Logger, parsedLogStream chan state.ParsedLogStreamItem) chan<- SelfHostedLogStreamItem {
logStream := make(chan SelfHostedLogStreamItem)

Expand Down
20 changes: 19 additions & 1 deletion runner/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func SetupLogCollection(ctx context.Context, wg *sync.WaitGroup, servers []*stat
if server.Config.DisableLogs {
continue
}
if server.Config.LogLocation != "" || server.Config.LogDockerTail != "" || server.Config.LogSyslogServer != "" {
if server.Config.LogLocation != "" || server.Config.LogDockerTail != "" || server.Config.LogSyslogServer != "" || server.Config.LogKubernetesPod != "" {
hasAnyLogTails = true
} else if server.Config.SupportsLogDownload() {
hasAnyLogDownloads = true
Expand Down Expand Up @@ -390,6 +390,8 @@ func TestLogsForAllServers(ctx context.Context, servers []*state.Server, globalC
} else {
success = false
}
} else if server.Config.LogKubernetesPod != "" && server.Config.LogKubernetesContainer != "" {
success = testKubernetesLogTail(ctx, &wg, server, globalCollectionOpts, prefixedLogger)
} else if server.Config.SupportsLogDownload() {
success = testLogDownload(ctx, &wg, server, globalCollectionOpts, prefixedLogger)
} else if server.Config.AzureDbServerName != "" && server.Config.AzureEventhubNamespace != "" && server.Config.AzureEventhubName != "" {
Expand Down Expand Up @@ -434,6 +436,22 @@ func testLocalLogTail(ctx context.Context, wg *sync.WaitGroup, server *state.Ser
return true
}

func testKubernetesLogTail(ctx context.Context, wg *sync.WaitGroup, server *state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) bool {
logger.PrintInfo("Testing log collection (kubernetes)...")

logTestSucceeded := make(chan bool, 1)
parsedLogStream := setupLogStreamer(ctx, wg, globalCollectionOpts, logger, []*state.Server{server}, logTestSucceeded, stream.LogTestCollectorIdentify)

err := selfhosted.SetupLogTailForPod(ctx, wg, globalCollectionOpts, logger, server, parsedLogStream)
if err != nil {
logger.PrintError("ERROR - Could not tail logs for server: %s", err)
return false
}

logger.PrintInfo(" Kubernetes log test successful")
return true
}

func testLogDownload(ctx context.Context, wg *sync.WaitGroup, server *state.Server, globalCollectionOpts state.CollectionOpts, prefixedLogger *util.Logger) bool {
prefixedLogger.PrintInfo("Testing log download...")
_, _, err := downloadLogsForServer(ctx, server, globalCollectionOpts, prefixedLogger)
Expand Down

0 comments on commit e8e6d24

Please sign in to comment.