Skip to content

Commit 8103228

Browse files
authored
EH: Watch qacct file; improved types (#22)
* EH: Watch qacct file; improved types * Update review.yml
1 parent 4fb3ed1 commit 8103228

File tree

14 files changed

+579
-100
lines changed

14 files changed

+579
-100
lines changed

cmd/simulator/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0
2222
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
2323
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
2424
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
25-
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
26-
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
25+
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
26+
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
2727
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
2828
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
2929
golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg=

examples/testexample/go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,7 @@ require (
1010
google.golang.org/protobuf v1.35.1
1111
)
1212

13-
require go.uber.org/multierr v1.10.0 // indirect
13+
require (
14+
github.com/goccy/go-json v0.10.3 // indirect
15+
go.uber.org/multierr v1.10.0 // indirect
16+
)

examples/testexample/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
44
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
55
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
66
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
7+
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
8+
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
79
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
810
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
911
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg=

examples/testexample/testexample.go

Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,34 @@ import (
1717
var qacctClient qacct.QAcct
1818
var qstatClient qstat.QStat
1919

20+
var newlyFinishedJobs <-chan qacct.JobDetail
21+
2022
var log *zap.Logger
2123

2224
func init() {
2325
var err error
2426
log, _ = zap.NewProduction()
27+
28+
qstatClient, err = qstat.NewCommandLineQstat(qstat.CommandLineQStatConfig{})
29+
if err != nil {
30+
log.Fatal("Failed to initialize qstat client", zap.String("error",
31+
err.Error()))
32+
}
33+
2534
qacctClient, err = qacct.NewCommandLineQAcct(qacct.CommandLineQAcctConfig{})
2635
if err != nil {
2736
log.Fatal("Failed to initialize qacct client", zap.String("error",
2837
err.Error()))
2938
}
30-
qstatClient, err = qstat.NewCommandLineQstat(qstat.CommandLineQStatConfig{})
39+
40+
// watch for newly finished jobs
41+
newlyFinishedJobs, err = qacct.WatchFile(context.Background(),
42+
qacct.GetDefaultQacctFile(), 1024)
3143
if err != nil {
32-
log.Fatal("Failed to initialize qstat client", zap.String("error",
33-
err.Error()))
44+
log.Fatal("Failed to initialize job watcher",
45+
zap.String("error", err.Error()))
3446
}
47+
3548
}
3649

3750
func main() {
@@ -48,7 +61,7 @@ func run(ctx context.Context) {
4861
log.Info("Context cancelled, stopping ClusterScheduler")
4962
return
5063
default:
51-
finishedJobs, err := GetFinishedJobs()
64+
finishedJobs, err := GetFinishedJobsWithWatcher()
5265
if err != nil {
5366
log.Error("Error getting finished jobs", zap.String("error",
5467
err.Error()))
@@ -107,10 +120,47 @@ type SimpleJob struct {
107120
MasterNode string `json:"master_node"`
108121
}
109122

123+
func GetFinishedJobsWithWatcher() ([]*SimpleJob, error) {
124+
jobs := []*SimpleJob{}
125+
126+
for {
127+
// get next job or timeout after 0.1s of there is no new job
128+
select {
129+
case fjob := <-newlyFinishedJobs:
130+
state := fmt.Sprintf("%d", fjob.ExitStatus)
131+
if state == "0" {
132+
state = "done"
133+
} else {
134+
state = "failed"
135+
}
136+
simpleJob := SimpleJob{
137+
// ignore job arrays for now
138+
JobId: fmt.Sprintf("%d", fjob.JobNumber),
139+
Cluster: fjob.QName,
140+
JobName: fjob.JobName,
141+
Partition: fjob.GrantedPE,
142+
Account: fjob.Account,
143+
User: fjob.Owner,
144+
State: state,
145+
ExitCode: fmt.Sprintf("%d", fjob.ExitStatus),
146+
Submit: parseTimestampInt64(fjob.SubmitTime),
147+
Start: parseTimestampInt64(fjob.StartTime),
148+
End: parseTimestampInt64(fjob.EndTime),
149+
MasterNode: fjob.HostName,
150+
}
151+
jobs = append(jobs, &simpleJob)
152+
case <-time.After(100 * time.Millisecond):
153+
return jobs, nil
154+
}
155+
}
156+
return jobs, nil
157+
}
158+
110159
func GetFinishedJobs() ([]*SimpleJob, error) {
111160
// Use qacct NativeSpecification to get finished jobs
112161
qacctOutput, err := qacctClient.NativeSpecification([]string{"-j", "*"})
113162
if err != nil {
163+
// no job are command failed
114164
return nil, fmt.Errorf("error running qacct command: %v", err)
115165
}
116166

@@ -137,9 +187,9 @@ func GetFinishedJobs() ([]*SimpleJob, error) {
137187
User: job.Owner,
138188
State: state,
139189
ExitCode: fmt.Sprintf("%d", job.ExitStatus),
140-
Submit: parseTimestamp(job.QSubTime),
141-
Start: parseTimestamp(job.StartTime),
142-
End: parseTimestamp(job.EndTime),
190+
Submit: parseTimestampInt64(job.SubmitTime),
191+
Start: parseTimestampInt64(job.StartTime),
192+
End: parseTimestampInt64(job.EndTime),
143193
MasterNode: job.HostName,
144194
}
145195
}
@@ -150,7 +200,8 @@ func GetRunningJobs() ([]*SimpleJob, error) {
150200

151201
qstatOverview, err := qstatClient.NativeSpecification([]string{"-g", "t"})
152202
if err != nil {
153-
return nil, fmt.Errorf("error running qstat command: %v", err)
203+
// no jobs running
204+
return nil, nil
154205
}
155206
jobsByTask, err := qstat.ParseGroupByTask(qstatOverview)
156207
if err != nil {
@@ -193,7 +244,8 @@ func GetRunningJobs() ([]*SimpleJob, error) {
193244
// get running jobs
194245
qstatOutput, err := qstatClient.NativeSpecification([]string{"-j", "*"})
195246
if err != nil {
196-
return nil, fmt.Errorf("error running qstat command: %v", err)
247+
// no jobs running; qstat -j * found 0 jobs (TODO)
248+
return nil, nil
197249
}
198250

199251
jobs, err := qstat.ParseSchedulerJobInfo(qstatOutput)
@@ -242,6 +294,14 @@ func SendJobs(ctx context.Context, jobs []*SimpleJob) (int, error) {
242294
return len(jobs), nil
243295
}
244296

297+
func parseTimestampInt64(ts int64) *timestamppb.Timestamp {
298+
// ts is 6 digits behind the second (microseconds)
299+
sec := ts / 1e6
300+
nsec := (ts - sec*1e6) * 1e3
301+
t := time.Unix(sec, nsec)
302+
return timestamppb.New(t)
303+
}
304+
245305
// 2024-10-24 09:49:59.911136
246306
func parseTimestamp(s string) *timestamppb.Timestamp {
247307
loc, err := time.LoadLocation("Local")

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/hpc-gridware/go-clusterscheduler
33
go 1.22.4
44

55
require (
6+
github.com/goccy/go-json v0.10.3
67
github.com/onsi/ginkgo/v2 v2.19.1
78
github.com/onsi/gomega v1.34.1
89
go.opentelemetry.io/contrib/bridges/otelslog v0.5.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
77
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
88
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
99
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
10+
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
11+
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
1012
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
1113
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
1214
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg=

pkg/qacct/v9.0/file.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package qacct
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"fmt"
7+
"io"
8+
"log"
9+
"os"
10+
"path/filepath"
11+
"time"
12+
13+
"github.com/goccy/go-json"
14+
)
15+
16+
// DefaultQacctFile returns the path to the default accounting file based
17+
// on the SGE_ROOT and SGE_CELL environment variables.
18+
func GetDefaultQacctFile() string {
19+
sgeRoot := os.Getenv("SGE_ROOT")
20+
sgeCell := os.Getenv("SGE_CELL")
21+
return filepath.Join(sgeRoot, sgeCell, "common", "accounting.jsonl")
22+
}
23+
24+
// WatchFile returns a channel that emits all JobDetail objects from the accounting
25+
// file. It continues to emit JobDetail objects as new lines are added to the file.
26+
// The channel is buffered with the given buffer size.
27+
func WatchFile(ctx context.Context, path string, bufferSize int) (<-chan JobDetail, error) {
28+
if path == "" {
29+
path = GetDefaultQacctFile()
30+
}
31+
32+
file, err := os.OpenFile(path, os.O_RDONLY, 0)
33+
if err != nil {
34+
return nil, fmt.Errorf("failed to open file: %v", err)
35+
}
36+
37+
jobDetailsChan := make(chan JobDetail, bufferSize)
38+
39+
// offset points to the last processed line
40+
var offset int64 = 0
41+
42+
go func() {
43+
defer file.Close()
44+
defer close(jobDetailsChan)
45+
46+
scanner := bufio.NewScanner(file)
47+
48+
for {
49+
if _, err := file.Seek(offset, io.SeekStart); err != nil {
50+
log.Printf("failed to seek to file end: %v", err)
51+
return
52+
}
53+
54+
for scanner.Scan() {
55+
var job JobDetail
56+
line := scanner.Text()
57+
// TODO parsing can be done in parallel
58+
err := json.Unmarshal([]byte(line), &job)
59+
if err != nil {
60+
log.Printf("failed to unmarshal line: %v", err)
61+
continue
62+
}
63+
jobDetailsChan <- job
64+
}
65+
66+
if err := scanner.Err(); err != nil {
67+
log.Printf("JSONL parsing error: %v", err)
68+
return
69+
}
70+
71+
// store processed offset
72+
offset, err = file.Seek(0, io.SeekCurrent)
73+
if err != nil {
74+
log.Printf("failed to get current offset: %v", err)
75+
return
76+
}
77+
78+
// wait a little before re-scanning for new data and reset scanner
79+
select {
80+
case <-ctx.Done():
81+
return
82+
default:
83+
<-time.After(1 * time.Second)
84+
scanner = bufio.NewScanner(file)
85+
}
86+
}
87+
}()
88+
89+
return jobDetailsChan, nil
90+
}

pkg/qacct/v9.0/file_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package qacct_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"slices"
8+
9+
. "github.com/onsi/ginkgo/v2"
10+
. "github.com/onsi/gomega"
11+
12+
qacct "github.com/hpc-gridware/go-clusterscheduler/pkg/qacct/v9.0"
13+
qsub "github.com/hpc-gridware/go-clusterscheduler/pkg/qsub/v9.0"
14+
)
15+
16+
var _ = Describe("File", func() {
17+
18+
Context("WatchFile", func() {
19+
20+
It("returns an error when the file does not exist", func() {
21+
_, err := qacct.WatchFile(context.Background(),
22+
"nonexistentfile.txt", 10)
23+
Expect(err).To(HaveOccurred())
24+
})
25+
26+
It("returns a channel that emits JobDetail objects for 10 jobs", func() {
27+
28+
qs, err := qsub.NewCommandLineQSub(qsub.CommandLineQSubConfig{})
29+
Expect(err).NotTo(HaveOccurred())
30+
31+
jobIDs := make([]int, 10)
32+
for i := 0; i < 10; i++ {
33+
jobID, _, err := qs.Submit(context.Background(), qsub.JobOptions{
34+
Command: "echo",
35+
CommandArgs: []string{fmt.Sprintf("job %d", i+1)},
36+
Binary: qsub.ToPtr(true),
37+
})
38+
Expect(err).NotTo(HaveOccurred())
39+
log.Printf("jobID: %d", jobID)
40+
jobIDs[i] = int(jobID)
41+
}
42+
43+
jobDetailsChan, err := qacct.WatchFile(context.Background(),
44+
qacct.GetDefaultQacctFile(), 0)
45+
Expect(err).NotTo(HaveOccurred())
46+
Expect(jobDetailsChan).NotTo(BeNil())
47+
48+
receivedJobs := make(map[int]bool)
49+
Eventually(func() bool {
50+
select {
51+
case jd := <-jobDetailsChan:
52+
log.Printf("job: %+v", jd.JobNumber)
53+
// check if jobID is in the jobIDs list
54+
if slices.Contains(jobIDs, int(jd.JobNumber)) {
55+
Expect(jd.SubmitCommandLine).To(ContainSubstring("echo 'job"))
56+
Expect(jd.JobUsage.Usage.Memory).To(BeNumerically(">=", 0))
57+
receivedJobs[int(jd.JobNumber)] = true
58+
}
59+
default:
60+
return len(receivedJobs) == 10
61+
}
62+
return false
63+
}, "10s").Should(BeTrue())
64+
})
65+
})
66+
})

0 commit comments

Comments
 (0)