Skip to content

Commit f04d335

Browse files
committed
get rid os pipe in favor of sockets
1 parent 53d5b65 commit f04d335

File tree

9 files changed

+249
-368
lines changed

9 files changed

+249
-368
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*.dll
44
*.so
55
*.dylib
6+
debug.test
67

78
# Test binary, build with `go test -c`
89
*.test

app.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,18 @@ CatchSignals:
8989
a.shutdown()
9090
// Fork/Exec a child and shutdown.
9191
case syscall.SIGUSR2:
92-
_, cp, err := forkExec(e.activeFiles())
92+
_, f, err := forkExec(e.activeFiles())
9393
if err != nil {
9494
logger.Printf("ZeroDT: failed to forkExec: '%v'", err)
9595
continue CatchSignals
9696
}
97+
m, err := ListenSocket(f)
98+
if err != nil {
99+
logger.Printf("ZeroDT: failed to listen communication socket: '%v'", err)
100+
continue CatchSignals
101+
}
97102
// Nothing to do with errors.
98-
protocolActAsParent(cp, a.WaitForChildTimeout, func() {
103+
protocolActAsParent(m, a.WaitForChildTimeout, func() {
99104
a.shutdown()
100105
})
101106
}
@@ -108,7 +113,7 @@ CatchSignals:
108113

109114
// Serve TODO
110115
func (a *App) Serve() error {
111-
inherited, cp, err := inherit()
116+
inherited, m, err := inherit()
112117
if err != nil {
113118
logger.Printf("ZeroDT: failed to inherit listeners with: '%v'", err)
114119
return err
@@ -152,8 +157,8 @@ func (a *App) Serve() error {
152157
// Wait for all listeners to start listening.
153158
startWG.Wait()
154159

155-
if cp != nil {
156-
protocolActAsChild(cp, a.WaitForParentTimeout)
160+
if m != nil {
161+
protocolActAsChild(m, a.WaitForParentTimeout)
157162
}
158163

159164
// Allow serverse's goroutines to start serving
@@ -172,7 +177,7 @@ func (a *App) Serve() error {
172177

173178
// forkExec starts another process of youself and passes active
174179
// listeners to a child to perform socket activation.
175-
func forkExec(files []*os.File) (int, *PipeJSONMessenger, error) {
180+
func forkExec(files []*os.File) (int, *os.File, error) {
176181
// Get the path name for the executable that started the current process.
177182
path, err := os.Executable()
178183
if err != nil {
@@ -185,15 +190,14 @@ func forkExec(files []*os.File) (int, *PipeJSONMessenger, error) {
185190
return -1, nil, err
186191
}
187192

188-
cr, cw, err := os.Pipe()
193+
fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0)
189194
if err != nil {
190195
return -1, nil, err
191196
}
192-
pr, pw, err := os.Pipe()
193-
if err != nil {
194-
return -1, nil, err
195-
}
196-
files = append(files, cr, pw)
197+
f0 := os.NewFile(uintptr(fds[0]), "s|0")
198+
f1 := os.NewFile(uintptr(fds[1]), "s|0")
199+
200+
files = append(files, f1)
197201

198202
// Start the original executable with the original working directory.
199203
process, err := os.StartProcess(path, os.Args, &os.ProcAttr{
@@ -205,7 +209,7 @@ func forkExec(files []*os.File) (int, *PipeJSONMessenger, error) {
205209
return -1, nil, err
206210
}
207211

208-
return process.Pid, ListenPipe(pr, cw), nil
212+
return process.Pid, f0, nil
209213
}
210214

211215
// formatInherited prints info about inherited listeners to a string.
@@ -221,7 +225,7 @@ func formatInherited(e *exchange) string {
221225
return result
222226
}
223227

224-
func protocolActAsParent(m *PipeJSONMessenger, timeout time.Duration, shutdownFn func()) error {
228+
func protocolActAsParent(m *StreamMessenger, timeout time.Duration, shutdownFn func()) error {
225229
defer m.Close()
226230
// Set a timeout for the whole dialog.
227231
m.SetDeadline(time.Now().Add(timeout))
@@ -248,7 +252,7 @@ func protocolActAsParent(m *PipeJSONMessenger, timeout time.Duration, shutdownFn
248252
return nil
249253
}
250254

251-
func protocolActAsChild(m *PipeJSONMessenger, timeout time.Duration) {
255+
func protocolActAsChild(m *StreamMessenger, timeout time.Duration) {
252256
defer m.Close()
253257
// Child->Parent, ready message
254258
logger.Printf("ZeroDT: sending ready to a parent...")

examples/test/main.go

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,12 @@ package main
22

33
import (
44
"fmt"
5-
"net"
65
"net/http"
76
"os"
87
"ssgreg/zerodt"
98
"time"
109

1110
"github.com/Sirupsen/logrus"
12-
"github.com/gorilla/mux"
1311
)
1412

1513
func sleep(w http.ResponseWriter, r *http.Request) {
@@ -24,30 +22,23 @@ func sleep(w http.ResponseWriter, r *http.Request) {
2422
logrus.Printf("Handled message %d!", os.Getpid())
2523
}
2624

27-
type childReadyMsg struct {
28-
Pid int `json:"pid"`
29-
Data [65520]byte
30-
Str int
31-
Str1 string
32-
}
33-
3425
func main() {
3526
zerodt.SetLogger(logrus.StandardLogger())
3627

37-
r := mux.NewRouter()
38-
r.Path("/sleep").Methods("GET").HandlerFunc(sleep)
28+
// r := mux.NewRouter()
29+
// r.Path("/sleep").Methods("GET").HandlerFunc(sleep)
3930

4031
// a := zerodt.NewApp(&http.Server{Addr: "127.0.0.1:8081", Handler: r}, &http.Server{Addr: "127.0.0.1:8082", Handler: r})
4132
// a.Serve()
4233

43-
srv := http.Server{Addr: "127.0.0.1:8081", Handler: r}
34+
// srv := http.Server{Addr: "127.0.0.1:8081", Handler: r}
4435

45-
addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:8081")
46-
l, _ := net.ListenTCP("tcp", addr)
36+
// addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:8081")
37+
// l, _ := net.ListenTCP("tcp", addr)
4738

4839
// err := srv.Shutdown(context.Background())
4940
// fmt.Println(err)
50-
srv.Serve(l)
41+
// srv.Serve(l)
5142

5243
// var wg sync.WaitGroup
5344
// wg.Add(2)

file_listener.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type fileListenerPair struct {
4747
// inherit returns all inherited listeners with
4848
// duplicated file descriptors wrapped in os.File.
4949
// Can be called only once.
50-
func inherit() ([]*fileListenerPair, *PipeJSONMessenger, error) {
50+
func inherit() ([]*fileListenerPair, *StreamMessenger, error) {
5151
// Are there some listeners to inherit?
5252
fds, err := listenFds()
5353
if err != nil {
@@ -61,13 +61,13 @@ func inherit() ([]*fileListenerPair, *PipeJSONMessenger, error) {
6161
return pairs, cp, nil
6262
}
6363

64-
func inheritWithFDS(fds []int) ([]*fileListenerPair, *PipeJSONMessenger, error) {
64+
func inheritWithFDS(fds []int) ([]*fileListenerPair, *StreamMessenger, error) {
6565
m, err := listenPipeWithFDS(fds)
6666
if err != nil {
6767
return nil, nil, err
6868
}
6969
if m != nil {
70-
fds = fds[0 : len(fds)-2]
70+
fds = fds[0 : len(fds)-1]
7171
}
7272
// Start to listen them.
7373
pairs := make([]*fileListenerPair, len(fds))
@@ -85,23 +85,20 @@ func inheritWithFDS(fds []int) ([]*fileListenerPair, *PipeJSONMessenger, error)
8585
return pairs, m, nil
8686
}
8787

88-
func listenPipeWithFDS(fds []int) (*PipeJSONMessenger, error) {
88+
func listenPipeWithFDS(fds []int) (*StreamMessenger, error) {
8989
count := len(fds)
90-
if count > 1 {
90+
if count > 0 {
9191
ok, err := isSocketTCP(fds[count-1])
9292
if err != nil {
9393
return nil, err
9494
}
9595
if !ok {
96-
ok, err := isSocketTCP(fds[count-2])
96+
s := os.NewFile(uintptr(fds[count-1]), "s|0")
97+
m, err := ListenSocket(s)
9798
if err != nil {
9899
return nil, err
99100
}
100-
if !ok {
101-
r := os.NewFile(uintptr(fds[count-2]), "|0")
102-
w := os.NewFile(uintptr(fds[count-1]), "|1")
103-
return ListenPipe(r, w), nil
104-
}
101+
return m, nil
105102
}
106103
}
107104
return nil, nil
@@ -152,6 +149,15 @@ func isSocketTCP(fd int) (bool, error) {
152149
if socketType != syscall.SOCK_STREAM {
153150
return false, nil
154151
}
152+
lsa, err := syscall.Getsockname(fd)
153+
if err != nil {
154+
return false, err
155+
}
156+
157+
switch lsa.(type) {
158+
case *syscall.SockaddrUnix:
159+
return false, nil
160+
}
155161
return true, nil
156162
}
157163

pipe_json_messenger.go

Lines changed: 0 additions & 109 deletions
This file was deleted.

0 commit comments

Comments
 (0)