Skip to content

Commit 43f68ca

Browse files
handle some edge cases
1 parent 5160555 commit 43f68ca

File tree

4 files changed

+69
-19
lines changed

4 files changed

+69
-19
lines changed

cmd/ctrlc/root/run/exec/exec.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package exec
22

33
import (
4+
"context"
45
"fmt"
56
"os"
67
"os/signal"
78
"runtime"
89
"syscall"
10+
"time"
911

1012
"github.com/MakeNowJust/heredoc/v2"
1113
"github.com/charmbracelet/log"
@@ -17,8 +19,9 @@ import (
1719

1820
func NewRunExecCmd() *cobra.Command {
1921
var (
20-
name string
21-
jobAgentType = "exec-bash"
22+
name string
23+
jobAgentType = "exec-bash"
24+
maxJobTimeoutMin int
2225
)
2326

2427
if runtime.GOOS == "windows" {
@@ -46,6 +49,15 @@ func NewRunExecCmd() *cobra.Command {
4649
return fmt.Errorf("workspace is required")
4750
}
4851

52+
maxJobTimeoutMin := viper.GetInt("max-job-timeout")
53+
if maxJobTimeoutMin <= 0 {
54+
maxJobTimeoutMin = 30
55+
}
56+
57+
// Create a context with timeout for job execution
58+
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(maxJobTimeoutMin)*time.Minute)
59+
defer cancel()
60+
4961
runner := NewExecRunner(client)
5062
jobAgentConfig := api.UpsertJobAgentJSONRequestBody{
5163
Name: name,
@@ -69,10 +81,11 @@ func NewRunExecCmd() *cobra.Command {
6981
<-c
7082
log.Info("Shutting down gracefully...")
7183
runner.ExitAll(true)
84+
cancel() // Cancel the context when shutting down
7285
}()
7386

74-
// Run job check - AddIntervalSupport will handle repeated execution
75-
if err := ja.RunQueuedJobs(); err != nil {
87+
// Run job check with timeout context - AddIntervalSupport will handle repeated execution
88+
if err := ja.RunQueuedJobs(ctx); err != nil {
7689
return fmt.Errorf("failed to run queued jobs: %w", err)
7790
}
7891

@@ -81,6 +94,7 @@ func NewRunExecCmd() *cobra.Command {
8194
}
8295

8396
cmd.Flags().StringVar(&name, "name", "", "Name of the job agent")
97+
cmd.Flags().IntVar(&maxJobTimeoutMin, "max-job-timeout", 0, "Maximum job execution time in minutes (default 30)")
8498
cmd.MarkFlagRequired("name")
8599
cmd.MarkFlagRequired("workspace")
86100
return cmd

cmd/ctrlc/root/run/exec/runner.go

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,20 @@ func NewExecRunner(client *api.ClientWithResponses) *ExecRunner {
5252
sig := <-c
5353
log.Info("Received shutdown signal, terminating all jobs", "signal", sig)
5454

55+
// Set up a short timeout for handling additional signals
56+
timeout := make(chan bool, 1)
57+
go func() {
58+
time.Sleep(5 * time.Second)
59+
timeout <- true
60+
}()
61+
62+
// Handle additional signals
63+
go func() {
64+
sig := <-c
65+
log.Warn("Received second signal, forcing immediate exit", "signal", sig)
66+
os.Exit(1)
67+
}()
68+
5569
// Update all job statuses before exiting
5670
runner.mu.Lock()
5771
for _, runningJob := range runner.runningJobs {
@@ -67,14 +81,26 @@ func NewExecRunner(client *api.ClientWithResponses) *ExecRunner {
6781
// Now terminate all processes
6882
runner.ExitAll(true)
6983
log.Info("Shutdown complete, exiting")
70-
os.Exit(0)
7184
}()
7285

7386
return runner
7487
}
7588

89+
// cleanupJob safely removes a job from the runningJobs map with proper mutex handling
90+
func (r *ExecRunner) cleanupJob(handle string) {
91+
r.mu.Lock()
92+
defer r.mu.Unlock()
93+
if job, exists := r.runningJobs[handle]; exists {
94+
log.Debug("Job cleanup complete", "id", job.jobID)
95+
delete(r.runningJobs, handle)
96+
}
97+
}
98+
7699
// Start creates a temporary script file, starts the process, and updates job status when the process completes.
77100
func (r *ExecRunner) Start(ctx context.Context, job *api.JobWithDetails) (api.JobStatus, error) {
101+
// Ensure job has the current context
102+
job = job.WithContext(ctx)
103+
78104
// Template the script using the job
79105
script, err := job.TemplateJobDetails()
80106
if err != nil {
@@ -148,12 +174,7 @@ func (r *ExecRunner) Start(ctx context.Context, job *api.JobWithDetails) (api.Jo
148174
// Spawn a goroutine to wait for the process to finish and update the job status
149175
go func(handle, scriptPath string) {
150176
defer os.Remove(scriptPath)
151-
defer func() {
152-
r.mu.Lock()
153-
delete(r.runningJobs, handle)
154-
r.mu.Unlock()
155-
log.Debug("Job cleanup complete", "id", runningJob.jobID)
156-
}()
177+
defer r.cleanupJob(handle)
157178

158179
log.Debug("Waiting for command to complete", "id", runningJob.jobID, "handle", handle)
159180
err := cmd.Wait()

internal/api/job.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,25 @@ type JobWithDetails struct {
1414
Details map[string]interface{}
1515
Client *ClientWithResponses
1616
ExternalID string
17+
ctx context.Context
1718
}
1819

1920
// NewJobWithDetails creates a new JobWithDetails with the client and job data
2021
func NewJobWithDetails(client *ClientWithResponses, job Job) (*JobWithDetails, error) {
22+
return NewJobWithDetailsContext(context.Background(), client, job)
23+
}
24+
25+
// NewJobWithDetailsContext creates a new JobWithDetails with a specific context
26+
func NewJobWithDetailsContext(ctx context.Context, client *ClientWithResponses, job Job) (*JobWithDetails, error) {
2127
j := &JobWithDetails{
2228
Job: job,
2329
Client: client,
30+
ctx: ctx,
2431
}
2532

2633
// Fetch job details
2734
var err error
28-
j.Details, err = j.GetJobDetails(context.Background())
35+
j.Details, err = j.GetJobDetails()
2936
if err != nil {
3037
return nil, fmt.Errorf("failed to get job details: %w", err)
3138
}
@@ -34,8 +41,8 @@ func NewJobWithDetails(client *ClientWithResponses, job Job) (*JobWithDetails, e
3441
}
3542

3643
// GetJobDetails retrieves job details for templating
37-
func (j *JobWithDetails) GetJobDetails(ctx context.Context) (map[string]interface{}, error) {
38-
resp, err := j.Client.GetJobWithResponse(ctx, j.Id.String())
44+
func (j *JobWithDetails) GetJobDetails() (map[string]interface{}, error) {
45+
resp, err := j.Client.GetJobWithResponse(j.ctx, j.Id.String())
3946
if err != nil {
4047
return nil, fmt.Errorf("failed to get job details: %w", err)
4148
}
@@ -54,6 +61,14 @@ func (j *JobWithDetails) GetJobDetails(ctx context.Context) (map[string]interfac
5461
return details, nil
5562
}
5663

64+
// WithContext returns a copy of the job with a new context
65+
func (j *JobWithDetails) WithContext(ctx context.Context) *JobWithDetails {
66+
// Create a shallow copy with the new context
67+
jobCopy := *j
68+
jobCopy.ctx = ctx
69+
return &jobCopy
70+
}
71+
5772
// TemplateJobDetails applies the job details template to the script
5873
func (j *JobWithDetails) TemplateJobDetails() (string, error) {
5974
// Extract script from JobAgentConfig
@@ -89,7 +104,7 @@ func (j *JobWithDetails) UpdateStatus(status JobStatus, message string) error {
89104
body.ExternalId = &j.ExternalID
90105
}
91106

92-
resp, err := j.Client.UpdateJobWithResponse(context.Background(), j.Id.String(), body)
107+
resp, err := j.Client.UpdateJobWithResponse(j.ctx, j.Id.String(), body)
93108
if err != nil {
94109
return fmt.Errorf("failed to update job status: %w", err)
95110
}

pkg/jobagent/runner.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ type JobAgent struct {
4747
// RunQueuedJobs retrieves and executes any queued jobs for this agent.
4848
// For each job, it starts execution using the runner's Start method, which
4949
// will update the job status when the job completes.
50-
func (a *JobAgent) RunQueuedJobs() error {
51-
jobs, err := a.client.GetNextJobsWithResponse(context.Background(), a.id)
50+
func (a *JobAgent) RunQueuedJobs(ctx context.Context) error {
51+
jobs, err := a.client.GetNextJobsWithResponse(ctx, a.id)
5252
if err != nil {
5353
return err
5454
}
@@ -65,7 +65,7 @@ func (a *JobAgent) RunQueuedJobs() error {
6565
// continue
6666
// }
6767

68-
job, err := api.NewJobWithDetails(a.client, apiJob)
68+
job, err := api.NewJobWithDetailsContext(ctx, a.client, apiJob)
6969
if err != nil {
7070
log.Error("Failed to create job with details", "error", err, "jobId", apiJob.Id.String())
7171
continue
@@ -76,7 +76,7 @@ func (a *JobAgent) RunQueuedJobs() error {
7676
defer wg.Done()
7777

7878
// Start the job - status updates happen inside Start
79-
status, err := a.runner.Start(context.Background(), job)
79+
status, err := a.runner.Start(ctx, job)
8080

8181
if err != nil {
8282
log.Error("Failed to start job", "error", err, "jobId", job.Id.String())

0 commit comments

Comments
 (0)