-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathssh.go
143 lines (125 loc) · 3.7 KB
/
ssh.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package gossh
import (
"fmt"
"github.com/dcapwell/gossh/workpool"
"log"
)
const (
MIN_POOL_SIZE = 1
MAX_POOL_SIZE = 100
)
type Options struct {
Concurrent int
User string
Identity string
Options map[string]string
}
type SshResponse struct {
Code int
Stdout string
Stderr string
}
type SshResponseContext struct {
Hostname string
Duration string
Response SshResponse
}
type SshResponses struct {
Responses chan SshResponseContext
}
type Ssh interface {
Run(hosts []string, cmd string, options Options) (SshResponses, error)
}
func NewSsh() Ssh {
pool, _ := workpool.NewWorkPoolWithMax(MAX_POOL_SIZE)
// error is only returned if max is not positive, so can ignore it
return &sshProcessImpl{pool: pool}
}
func NewSshWithMax(max int) (Ssh, error) {
if max < 1 {
return nil, fmt.Errorf("Unable to create ssh with %d max resources; max must be a positive number\n", max)
}
pool, _ := workpool.NewWorkPoolWithMax(max)
// error is only returned if max is not positive, so can ignore it
return &sshProcessImpl{pool: pool}, nil
}
type sshProcessImpl struct {
pool workpool.WorkPool
}
//TODO should this return a chan? WorkPool returns a chan, and can convert on the go. encode/json doesn't support channels, so couldn't use this in http code then.
func (s *sshProcessImpl) Run(hosts []string, cmd string, options Options) (SshResponses, error) {
// find how many hosts to run concurrently
conc := runConcurrency(options, len(hosts))
// create ssh worker per host, send to pool
tasks := createTasks(hosts, cmd, options)
chanResults, err := s.pool.Run(tasks, MIN_POOL_SIZE, conc)
if err != nil {
return SshResponses{}, err
}
rsp := waitForCompletion(chanResults, len(hosts))
// wait for completion
return rsp, nil
}
func waitForCompletion(results chan workpool.TaskResult, expectedResponses int) SshResponses {
responses := make(chan SshResponseContext, expectedResponses)
go func() {
for result := range results {
rsp, err := taskResultToContext(result)
if err != nil {
// should this be ignored?
log.Printf("[WARNING]\t%v", err)
} else {
responses <- rsp
}
}
close(responses)
}()
return SshResponses{responses}
}
func taskResultToContext(result workpool.TaskResult) (SshResponseContext, error) {
if result.Status == workpool.SUCCESS {
rs, ok := result.Result.(SshResponseContext)
if ok {
return rs, nil
}
return SshResponseContext{}, fmt.Errorf("Unable to convert result %v into SshResponseContext\n", result)
}
// ssh process task should not fail. Error shouldn't be returned since ssh context contains errors as well.
return SshResponseContext{}, fmt.Errorf("Unable to convert TaskResult %v to SshResponseContext; status is not supported %d\n", result, result.Status)
}
func createTasks(hosts []string, cmd string, options Options) chan workpool.Task {
tasks := make(chan workpool.Task, len(hosts))
go func() {
for _, host := range hosts {
task := newSshTask(host, cmd, options)
tasks <- task
}
close(tasks)
}()
return tasks
}
func newSshTask(host string, cmd string, opt Options) workpool.Task {
//TODO find better way to do this. Switching should be a user option, and default based off num hosts
// use this method to switch impls
return newSshProcessTask(host, cmd, opt)
// when num hosts is < 8~, proc seems faster, else native seems faster
// return newSshNativeTask(host, cmd, opt)
}
func runConcurrency(options Options, numHosts int) int {
conc := MAX_POOL_SIZE
if options.Concurrent > 0 {
conc = options.Concurrent
}
if numHosts < conc {
conc = numHosts
}
return conc
}
func createContext(host string) SshResponseContext {
rsp := SshResponse{}
ctx := SshResponseContext{
Hostname: host,
Response: rsp,
}
return ctx
}