Skip to content

Commit 48ce4e4

Browse files
authored
fix/k8sdevelopment: Add Retry Logic to Executor Job and Logs (#1141)
* Add initial TODO's for Executor Pod Error Handling * Add initial retry logic to Executor Job creation and Log reading * Update retry logic for k8s executor logs * Wait for K8s Executior pod to be running before reading logs * Update pod status function to return Executor pod logs - The only way an error will arise is if we try to check the Executor pod's logs before or while it's being created. This commit resolves by simply waiting for the pod to be created.
1 parent b6a49af commit 48ce4e4

File tree

1 file changed

+66
-12
lines changed

1 file changed

+66
-12
lines changed

worker/kubernetes.go

+66-12
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"context"
66
"fmt"
77
"io"
8+
"log"
89
"strings"
910
"text/template"
11+
"time"
1012

1113
"github.com/ohsu-comp-bio/funnel/tes"
1214
v1 "k8s.io/api/batch/v1"
@@ -94,8 +96,22 @@ func (kcmd KubernetesCommand) Run(ctx context.Context) error {
9496

9597
_, err = client.Create(ctx, job, metav1.CreateOptions{})
9698

99+
var maxRetries = 5
100+
97101
if err != nil {
98-
return fmt.Errorf("creating job in worker: %v", err)
102+
// Retry creating the Executor Pod on failure
103+
var retryCount int
104+
for retryCount < maxRetries {
105+
_, err = client.Create(ctx, job, metav1.CreateOptions{})
106+
if err == nil {
107+
break
108+
}
109+
retryCount++
110+
time.Sleep(2 * time.Second)
111+
}
112+
if retryCount == maxRetries {
113+
return fmt.Errorf("Funnel Worker: Failed to create Executor Job after %v attempts: %v", maxRetries, err)
114+
}
99115
}
100116

101117
// Wait until the job finishes
@@ -109,27 +125,65 @@ func (kcmd KubernetesCommand) Run(ctx context.Context) error {
109125
}
110126

111127
for _, v := range pods.Items {
112-
req := clientset.CoreV1().Pods(kcmd.Namespace).GetLogs(v.Name, &corev1.PodLogOptions{})
113-
podLogs, err := req.Stream(ctx)
114-
128+
// Wait for the pod to reach Running state
129+
pod, err := waitForPodRunning(ctx, kcmd.Namespace, v.Name, 5*time.Minute)
115130
if err != nil {
116-
return err
131+
log.Fatalf("Error waiting for pod: %v", err)
117132
}
118133

119-
defer podLogs.Close()
120-
buf := new(bytes.Buffer)
121-
_, err = io.Copy(buf, podLogs)
134+
// Stream logs from the running pod
135+
err = streamPodLogs(ctx, kcmd.Namespace, pod.Name, kcmd.Stdout)
122136
if err != nil {
123-
return err
137+
log.Fatalf("Error streaming logs: %v", err)
124138
}
125-
126-
var bytes = buf.Bytes()
127-
kcmd.Stdout.Write(bytes)
128139
}
129140

130141
return nil
131142
}
132143

144+
func waitForPodRunning(ctx context.Context, namespace string, podName string, timeout time.Duration) (*corev1.Pod, error) {
145+
clientset, err := getKubernetesClientset()
146+
if err != nil {
147+
return nil, fmt.Errorf("failed getting kubernetes clientset: %v", err)
148+
}
149+
150+
ticker := time.NewTicker(2 * time.Second)
151+
defer ticker.Stop()
152+
153+
timeoutCh := time.After(timeout)
154+
155+
for {
156+
select {
157+
case <-timeoutCh:
158+
return nil, fmt.Errorf("timed out waiting for pod %s to be in running state", podName)
159+
case <-ticker.C:
160+
pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
161+
if err != nil {
162+
return nil, fmt.Errorf("getting pod %s: %v", podName, err)
163+
}
164+
165+
return pod, nil
166+
}
167+
}
168+
}
169+
170+
func streamPodLogs(ctx context.Context, namespace string, podName string, stdout io.Writer) error {
171+
clientset, err := getKubernetesClientset()
172+
if err != nil {
173+
return fmt.Errorf("getting kubernetes clientset: %v", err)
174+
}
175+
176+
req := clientset.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{})
177+
podLogs, err := req.Stream(ctx)
178+
if err != nil {
179+
return fmt.Errorf("streaming logs: %v", err)
180+
}
181+
defer podLogs.Close()
182+
183+
_, err = io.Copy(stdout, podLogs)
184+
return err
185+
}
186+
133187
// Deletes the job running the task.
134188
func (kcmd KubernetesCommand) Stop() error {
135189
clientset, err := getKubernetesClientset()

0 commit comments

Comments
 (0)