From 48ce4e4b2952cdd4137808f0e4d163a83f9db5d9 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Wed, 5 Feb 2025 13:23:53 -0800 Subject: [PATCH] =?UTF-8?q?`fix/k8s`=20=E2=86=92=20`development`:=20Add=20?= =?UTF-8?q?Retry=20Logic=20to=20Executor=20Job=20and=20Logs=20(#1141)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add initial TODO's for Executor Pod Error Handling * Add initial retry logic to Executor Job creation and Log reading * Update retry logic for k8s executor logs * Wait for K8s Executior pod to be running before reading logs * Update pod status function to return Executor pod logs - The only way an error will arise is if we try to check the Executor pod's logs before or while it's being created. This commit resolves by simply waiting for the pod to be created. --- worker/kubernetes.go | 78 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 66 insertions(+), 12 deletions(-) diff --git a/worker/kubernetes.go b/worker/kubernetes.go index 49987127..f0316872 100644 --- a/worker/kubernetes.go +++ b/worker/kubernetes.go @@ -5,8 +5,10 @@ import ( "context" "fmt" "io" + "log" "strings" "text/template" + "time" "github.com/ohsu-comp-bio/funnel/tes" v1 "k8s.io/api/batch/v1" @@ -94,8 +96,22 @@ func (kcmd KubernetesCommand) Run(ctx context.Context) error { _, err = client.Create(ctx, job, metav1.CreateOptions{}) + var maxRetries = 5 + if err != nil { - return fmt.Errorf("creating job in worker: %v", err) + // Retry creating the Executor Pod on failure + var retryCount int + for retryCount < maxRetries { + _, err = client.Create(ctx, job, metav1.CreateOptions{}) + if err == nil { + break + } + retryCount++ + time.Sleep(2 * time.Second) + } + if retryCount == maxRetries { + return fmt.Errorf("Funnel Worker: Failed to create Executor Job after %v attempts: %v", maxRetries, err) + } } // Wait until the job finishes @@ -109,27 +125,65 @@ func (kcmd KubernetesCommand) Run(ctx context.Context) error { } for _, v := range pods.Items { - req := clientset.CoreV1().Pods(kcmd.Namespace).GetLogs(v.Name, &corev1.PodLogOptions{}) - podLogs, err := req.Stream(ctx) - + // Wait for the pod to reach Running state + pod, err := waitForPodRunning(ctx, kcmd.Namespace, v.Name, 5*time.Minute) if err != nil { - return err + log.Fatalf("Error waiting for pod: %v", err) } - defer podLogs.Close() - buf := new(bytes.Buffer) - _, err = io.Copy(buf, podLogs) + // Stream logs from the running pod + err = streamPodLogs(ctx, kcmd.Namespace, pod.Name, kcmd.Stdout) if err != nil { - return err + log.Fatalf("Error streaming logs: %v", err) } - - var bytes = buf.Bytes() - kcmd.Stdout.Write(bytes) } return nil } +func waitForPodRunning(ctx context.Context, namespace string, podName string, timeout time.Duration) (*corev1.Pod, error) { + clientset, err := getKubernetesClientset() + if err != nil { + return nil, fmt.Errorf("failed getting kubernetes clientset: %v", err) + } + + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + timeoutCh := time.After(timeout) + + for { + select { + case <-timeoutCh: + return nil, fmt.Errorf("timed out waiting for pod %s to be in running state", podName) + case <-ticker.C: + pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("getting pod %s: %v", podName, err) + } + + return pod, nil + } + } +} + +func streamPodLogs(ctx context.Context, namespace string, podName string, stdout io.Writer) error { + clientset, err := getKubernetesClientset() + if err != nil { + return fmt.Errorf("getting kubernetes clientset: %v", err) + } + + req := clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{}) + podLogs, err := req.Stream(ctx) + if err != nil { + return fmt.Errorf("streaming logs: %v", err) + } + defer podLogs.Close() + + _, err = io.Copy(stdout, podLogs) + return err +} + // Deletes the job running the task. func (kcmd KubernetesCommand) Stop() error { clientset, err := getKubernetesClientset()