This repository was archived by the owner on Mar 5, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask.go
More file actions
149 lines (118 loc) · 2.64 KB
/
task.go
File metadata and controls
149 lines (118 loc) · 2.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package easyworker
import (
"errors"
"log"
)
/*
Store options and runtime data for task processing.
Also, struct provides interface for control and processing task.
*/
type EasyTask struct {
id int
// config input by user.
config Config
// task for worker. It's slice of slice of params.
inputs [][]any
// store runtime workers.
workerList map[int]*worker
}
/*
Make new EasyTask.
Config is made before make new EasyTask.
Example:
task,_ := NewTask(config)
*/
func NewTask(config Config) (ret EasyTask, err error) {
// auto incremental number.
taskLastId++
ret = EasyTask{
id: taskLastId,
config: config,
inputs: make([][]any, 0),
workerList: make(map[int]*worker, config.worker),
}
return
}
/*
Uses for adding tasks for EasyTask.
Example:
workers.AddParams(1, "user")
workers.AddParams(2, "user")
workers.AddParams(1000, "admin")
*/
func (p *EasyTask) AddTask(i ...any) {
params := make([]any, 0)
params = append(params, i...)
p.inputs = append(p.inputs, params)
}
/*
Run func with existed task or waiting a new task.
Example:
easyTask.Run()
*/
func (p *EasyTask) Run() (ret []any, retErr error) {
ret = make([]any, 0)
if len(p.inputs) < 1 {
retErr = errors.New("need params to run")
return
}
// use for send function's params to worker.
inputCh := make(chan msg, p.config.worker)
// use for get result from worker.
resultCh := make(chan msg, p.config.worker)
// Start workers
for i := 0; i < p.config.worker; i++ {
opt := &worker{
id: int64(i),
fun: p.config.fun,
cmd: make(chan msg),
resultCh: resultCh,
inputCh: inputCh,
retryTimes: p.config.retry,
}
p.workerList[i] = opt
go opt.run()
}
// Send data to worker
go func() {
for index, params := range p.inputs {
inputCh <- msg{id: index, msgType: iTASK, data: params}
}
}()
resultMap := map[int]any{}
// receive result from worker
for {
result := <-resultCh
switch result.msgType {
case iSUCCESS: // task done
resultMap[result.id] = result.data
case iERROR: // task failed
if printLog {
log.Println("task", result.id, " is failed, error:", result.data)
}
resultMap[result.id] = result.data
case iFATAL_ERROR: // worker panic
if printLog {
log.Println(result.id, "worker is fatal error")
}
case iQUIT: // worker quited\
if printLog {
log.Println(result.id, " exited")
}
}
if len(resultMap) == len(p.inputs) {
break
}
}
// send signal to worker to stop.
go func() {
for _, w := range p.workerList {
w.cmd <- msg{msgType: iQUIT}
}
}()
ret = make([]any, len(resultMap))
for k, v := range resultMap {
ret[k] = v
}
return
}