Skip to content

Commit 11a3627

Browse files
authored
feat: output refresh and logs for job module (#78)
* feat: output refresh for job module * feat: add logging * fix: empty output without crashing * chore: fix comment * refacror: moved log function to module
1 parent 0afa18d commit 11a3627

File tree

9 files changed

+125
-15
lines changed

9 files changed

+125
-15
lines changed

modules/firehose/driver.go

+3
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,9 @@ func mergeChartValues(cur, newVal *ChartValues) (*ChartValues, error) {
382382

383383
func readOutputData(exr module.ExpandedResource) (*Output, error) {
384384
var curOut Output
385+
if len(exr.Resource.State.Output) == 0 {
386+
return &curOut, nil
387+
}
385388
if err := json.Unmarshal(exr.Resource.State.Output, &curOut); err != nil {
386389
return nil, errors.ErrInternal.WithMsgf("corrupted output").WithCausef(err.Error())
387390
}

modules/firehose/module.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"helm.sh/helm/v3/pkg/release"
10+
v1 "k8s.io/api/core/v1"
1011

1112
"github.com/goto/entropy/core/module"
1213
"github.com/goto/entropy/modules/kubernetes"
@@ -101,7 +102,10 @@ var Module = module.Descriptor{
101102
if err != nil {
102103
return nil, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get pod").WithCausef(err.Error())
103104
}
104-
return kubeCl.GetPodDetails(ctx, ns, labels)
105+
return kubeCl.GetPodDetails(ctx, ns, labels, func(pod v1.Pod) bool {
106+
// allow pods that are in running state and are not marked for deletion
107+
return pod.Status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil
108+
})
105109
},
106110
consumerReset: consumerReset,
107111
}, nil

modules/job/driver/driver.go

+20-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ type Driver struct {
2121
SuspendJob func(ctx context.Context, conf kube.Config, j *job.Job) error
2222
DeleteJob func(ctx context.Context, conf kube.Config, j *job.Job) error
2323
StartJob func(ctx context.Context, conf kube.Config, j *job.Job) error
24+
GetJobPods func(ctx context.Context, conf kube.Config, labels map[string]string) ([]kube.Pod, error)
25+
StreamLogs func(ctx context.Context, kubeConf kube.Config, filter map[string]string) (<-chan module.LogChunk, error)
2426
}
2527

2628
func (driver *Driver) Plan(_ context.Context, res module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) {
@@ -95,7 +97,7 @@ func (driver *Driver) Sync(ctx context.Context, exr module.ExpandedResource) (*r
9597
return &finalState, nil
9698
}
9799

98-
finalOut, err := driver.refreshOutput(ctx, exr.Resource, *conf, *out, kubeOut)
100+
finalOut, err := driver.refreshOutput(ctx, *conf, *out, kubeOut)
99101
if err != nil {
100102
return nil, err
101103
}
@@ -106,6 +108,21 @@ func (driver *Driver) Sync(ctx context.Context, exr module.ExpandedResource) (*r
106108
return &finalState, nil
107109
}
108110

109-
func (*Driver) Output(context.Context, module.ExpandedResource) (json.RawMessage, error) {
110-
return nil, nil
111+
func (driver *Driver) Output(ctx context.Context, exr module.ExpandedResource) (json.RawMessage, error) {
112+
output, err := ReadOutputData(exr)
113+
if err != nil {
114+
return nil, err
115+
}
116+
117+
conf, err := config.ReadConfig(exr.Resource, exr.Spec.Configs, driver.Conf)
118+
if err != nil {
119+
return nil, errors.ErrInternal.WithCausef(err.Error())
120+
}
121+
122+
var kubeOut kubernetes.Output
123+
if err := json.Unmarshal(exr.Dependencies[KeyKubeDependency].Output, &kubeOut); err != nil {
124+
return nil, errors.ErrInternal.WithMsgf("invalid kube state").WithCausef(err.Error())
125+
}
126+
127+
return driver.refreshOutput(ctx, *conf, *output, kubeOut)
111128
}

modules/job/driver/log.go

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package driver
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
7+
"github.com/goto/entropy/core/module"
8+
"github.com/goto/entropy/modules/job/config"
9+
"github.com/goto/entropy/modules/kubernetes"
10+
"github.com/goto/entropy/pkg/errors"
11+
)
12+
13+
func (driver *Driver) Log(ctx context.Context, res module.ExpandedResource, filter map[string]string) (<-chan module.LogChunk, error) {
14+
conf, err := config.ReadConfig(res.Resource, res.Spec.Configs, driver.Conf)
15+
if err != nil {
16+
return nil, errors.ErrInternal.WithCausef(err.Error())
17+
}
18+
19+
if filter == nil {
20+
filter = map[string]string{}
21+
}
22+
filter["app"] = conf.Name
23+
24+
var kubeOut kubernetes.Output
25+
if err := json.Unmarshal(res.Dependencies[KeyKubeDependency].Output, &kubeOut); err != nil {
26+
return nil, errors.ErrInternal.WithCausef(err.Error())
27+
}
28+
return driver.StreamLogs(ctx, kubeOut.Configs, filter)
29+
}

modules/job/driver/output.go

+16-5
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,34 @@ import (
55
"encoding/json"
66

77
"github.com/goto/entropy/core/module"
8-
"github.com/goto/entropy/core/resource"
8+
"github.com/goto/entropy/modules"
99
"github.com/goto/entropy/modules/job/config"
1010
"github.com/goto/entropy/modules/kubernetes"
1111
"github.com/goto/entropy/pkg/errors"
12+
"github.com/goto/entropy/pkg/kube"
1213
)
1314

1415
type Output struct {
15-
Namespace string `json:"namespace"`
16-
JobName string `json:"jobName"`
16+
Namespace string `json:"namespace"`
17+
JobName string `json:"jobName"`
18+
Pods []kube.Pod `json:"pods"`
1719
}
1820

19-
func (*Driver) refreshOutput(context.Context, resource.Resource, config.Config, Output, kubernetes.Output) (json.RawMessage, error) {
20-
return json.RawMessage{}, nil
21+
func (driver *Driver) refreshOutput(ctx context.Context, conf config.Config, output Output, kubeOut kubernetes.Output) (json.RawMessage, error) {
22+
pods, err := driver.GetJobPods(ctx, kubeOut.Configs, map[string]string{"job-name": conf.Name})
23+
if err != nil {
24+
return nil, errors.ErrInternal.WithCausef(err.Error())
25+
}
26+
output.Pods = pods
27+
28+
return modules.MustJSON(output), nil
2129
}
2230

2331
func ReadOutputData(exr module.ExpandedResource) (*Output, error) {
2432
var curOut Output
33+
if len(exr.Resource.State.Output) == 0 {
34+
return &curOut, nil
35+
}
2536
if err := json.Unmarshal(exr.Resource.State.Output, &curOut); err != nil {
2637
return nil, errors.ErrInternal.WithMsgf("corrupted output").WithCausef(err.Error())
2738
}

modules/job/driver/sync.go

+2
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ func getJob(res resource.Resource, conf *config.Config) *job.Job {
108108
Name: conf.Name,
109109
Containers: containers,
110110
Volumes: volumes,
111+
// This label is to support `app` filter on pod for getting the logs until we find better solution
112+
Labels: map[string]string{"app": conf.Name},
111113
}
112114
limit := backoffLimit
113115
j := &job.Job{

modules/job/module.go

+40
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"encoding/json"
66

7+
v1 "k8s.io/api/core/v1"
8+
79
"github.com/goto/entropy/core/module"
810
"github.com/goto/entropy/modules/job/config"
911
"github.com/goto/entropy/modules/job/driver"
@@ -106,6 +108,44 @@ var Module = module.Descriptor{
106108
}
107109
return processor.UpdateJob(false)
108110
},
111+
GetJobPods: func(ctx context.Context, kubeConf kube.Config, labels map[string]string) ([]kube.Pod, error) {
112+
kubeCl, err := kube.NewClient(ctx, kubeConf)
113+
if err != nil {
114+
return nil, errors.ErrInternal.WithMsgf("failed to create new kube client on driver").WithCausef(err.Error())
115+
}
116+
return kubeCl.GetPodDetails(ctx, conf.Namespace, labels, func(pod v1.Pod) bool {
117+
// allow all pods
118+
return true
119+
})
120+
},
121+
StreamLogs: func(ctx context.Context, kubeConf kube.Config, filter map[string]string) (<-chan module.LogChunk, error) {
122+
kubeCl, err := kube.NewClient(ctx, kubeConf)
123+
if err != nil {
124+
return nil, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver Log").WithCausef(err.Error())
125+
}
126+
127+
logs, err := kubeCl.StreamLogs(ctx, conf.Namespace, filter)
128+
if err != nil {
129+
return nil, err
130+
}
131+
132+
mappedLogs := make(chan module.LogChunk)
133+
go func() {
134+
defer close(mappedLogs)
135+
for {
136+
select {
137+
case log, ok := <-logs:
138+
if !ok {
139+
return
140+
}
141+
mappedLogs <- module.LogChunk{Data: log.Data, Labels: log.Labels}
142+
case <-ctx.Done():
143+
return
144+
}
145+
}
146+
}()
147+
return mappedLogs, err
148+
},
109149
}, nil
110150
},
111151
}

pkg/kube/client.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type LogChunk struct {
5252
type Pod struct {
5353
Name string `json:"name"`
5454
Containers []string `json:"containers"`
55+
Status string `json:"status"`
5556
}
5657

5758
type LogOptions struct {
@@ -281,7 +282,7 @@ func (c Client) GetJobProcessor(j *job.Job) (*job.Processor, error) {
281282
return job.NewProcessor(j, clientSet.BatchV1().Jobs(j.Namespace)), nil
282283
}
283284

284-
func (c Client) GetPodDetails(ctx context.Context, namespace string, labelSelectors map[string]string) ([]Pod, error) {
285+
func (c Client) GetPodDetails(ctx context.Context, namespace string, labelSelectors map[string]string, allow func(pod corev1.Pod) bool) ([]Pod, error) {
285286
var podDetails []Pod
286287
var selectors []string
287288
var labelSelector string
@@ -305,13 +306,12 @@ func (c Client) GetPodDetails(ctx context.Context, namespace string, labelSelect
305306
}
306307

307308
for _, pod := range pods.Items {
308-
// not listing pods that are not in running state or are about to terminate
309-
if pod.Status.Phase != corev1.PodRunning || pod.DeletionTimestamp != nil {
309+
if !allow(pod) {
310310
continue
311311
}
312-
313312
podDetail := Pod{
314-
Name: pod.Name,
313+
Name: pod.Name,
314+
Status: string(pod.Status.Phase),
315315
}
316316

317317
for _, container := range pod.Spec.Containers {

pkg/kube/pod/pod.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type Pod struct {
1212
Name string
1313
Containers []container.Container
1414
Volumes []volume.Volume
15+
Labels map[string]string
1516
}
1617

1718
func (p Pod) Template() corev1.PodTemplateSpec {
@@ -28,7 +29,10 @@ func (p Pod) Template() corev1.PodTemplateSpec {
2829
VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}},
2930
})
3031
return corev1.PodTemplateSpec{
31-
ObjectMeta: metav1.ObjectMeta{Name: p.Name},
32+
ObjectMeta: metav1.ObjectMeta{
33+
Name: p.Name,
34+
Labels: p.Labels,
35+
},
3236
Spec: corev1.PodSpec{
3337
Containers: containers,
3438
Volumes: volumes,

0 commit comments

Comments
 (0)