Skip to content

Commit 7f32413

Browse files
authored
Merge pull request #1 from bachue/fix/data-corruption
Fix possible data corruption
2 parents 8abea07 + 6f5ccdf commit 7f32413

File tree

9 files changed

+152
-33
lines changed

9 files changed

+152
-33
lines changed

connector/main.go

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,23 +80,32 @@ func main() {
8080
os.Exit(1)
8181
}
8282

83-
homeDir, err := os.UserHomeDir()
84-
if err != nil {
85-
homeDir = os.TempDir()
83+
if userConfigDir, err := os.UserConfigDir(); err != nil {
84+
rcloneConfigDir = filepath.Join(os.TempDir(), ".rclone", "config")
85+
} else {
86+
rcloneConfigDir = filepath.Join(userConfigDir, "rclone")
87+
}
88+
89+
if userCacheDir, err := os.UserCacheDir(); err != nil {
90+
rcloneCacheDir = filepath.Join(os.TempDir(), ".rclone", "cache")
91+
} else {
92+
rcloneCacheDir = filepath.Join(userCacheDir, "rclone")
93+
}
94+
95+
if userLogDir, err := userLogDir(); err != nil {
96+
rcloneLogDir = filepath.Join(os.TempDir(), ".rclone", "log")
97+
} else {
98+
rcloneLogDir = filepath.Join(userLogDir, "rclone")
8699
}
87-
rcloneConfigDir = filepath.Join(homeDir, ".rclone", "config")
100+
88101
if err = ensureDirectoryExists(rcloneConfigDir); err != nil {
89102
fmt.Fprintf(os.Stderr, "Failed to ensure directory %s exists: %s", rcloneConfigDir, err)
90103
os.Exit(1)
91104
}
92-
93-
rcloneCacheDir = filepath.Join(homeDir, ".rclone", "cache")
94105
if err = ensureDirectoryExists(rcloneCacheDir); err != nil {
95106
fmt.Fprintf(os.Stderr, "Failed to ensure directory %s exists: %s", rcloneCacheDir, err)
96107
os.Exit(1)
97108
}
98-
99-
rcloneLogDir = filepath.Join(homeDir, ".rclone", "log")
100109
if err = ensureDirectoryExists(rcloneLogDir); err != nil {
101110
fmt.Fprintf(os.Stderr, "Failed to ensure directory %s exists: %s", rcloneLogDir, err)
102111
os.Exit(1)
@@ -270,6 +279,15 @@ func handleConn(conn net.Conn, cmdIn <-chan protocol.Cmd, cmdOut chan<- protocol
270279
log.Infof("Received requestDataCmd: %#v", payload)
271280
cmdOut <- payload
272281
}
282+
case protocol.KodoUmountCmdName:
283+
payload := new(protocol.KodoUmountCmd)
284+
if err := json.Unmarshal([]byte(request.Payload), payload); err != nil {
285+
log.Warnf("Protocol %s payload parse error: %s", request.Cmd, err)
286+
return
287+
} else {
288+
log.Infof("Received kodoUmountCmd: %#v", payload)
289+
cmdOut <- payload
290+
}
273291
default:
274292
log.Warnf("Unrecognized request cmd: %s", request.Cmd)
275293
return
@@ -351,7 +369,7 @@ func handleCmd(cmdOut chan<- protocol.Cmd, cmdIn <-chan protocol.Cmd) {
351369
log.Errorf("Failed to create stderr pipe: %s", err)
352370
return false
353371
}
354-
go outputReader("stderr", stderr, false)
372+
go outputReader("stderr", stderr, true)
355373
go func() {
356374
defer cancel()
357375
err := execCmd.Run()
@@ -389,13 +407,32 @@ func handleCmd(cmdOut chan<- protocol.Cmd, cmdIn <-chan protocol.Cmd) {
389407
log.Warnf("Failed to write rclone config: %s", err)
390408
return
391409
}
410+
uuid := rcloneCacheId(c.MountPath)
411+
volumeCacheDir := filepath.Join(rcloneCacheDir, c.VolumeId, uuid)
412+
if err = ensureDirectoryExists(volumeCacheDir); err != nil {
413+
log.Errorf("Failed to ensure directory %s exists: %s", volumeCacheDir, err)
414+
return
415+
}
416+
rcloneLogFile := filepath.Join(rcloneLogDir, c.VolumeId, uuid+".log")
417+
if err = ensureDirectoryExists(filepath.Dir(rcloneLogFile)); err != nil {
418+
log.Errorf("Failed to ensure directory %s exists: %s", filepath.Dir(rcloneLogFile), err)
419+
return
420+
}
392421
ctx = context.WithValue(ctx, protocol.ContextKeyConfigFilePath, rcloneConfigPath)
393422
ctx = context.WithValue(ctx, protocol.ContextKeyUserAgent, userAgent)
394-
ctx = context.WithValue(ctx, protocol.ContextKeyLogDirPath, rcloneLogDir)
395-
ctx = context.WithValue(ctx, protocol.ContextKeyCacheDirPath, rcloneCacheDir)
423+
ctx = context.WithValue(ctx, protocol.ContextKeyLogFilePath, rcloneLogFile)
424+
ctx = context.WithValue(ctx, protocol.ContextKeyCacheDirPath, volumeCacheDir)
396425
if ok := execCommand(c.ExecCommand(ctx), func() { os.Remove(rcloneConfigPath) }); !ok {
397426
return
398427
}
428+
case *protocol.KodoUmountCmd:
429+
uuid := rcloneCacheId(c.MountPath)
430+
volumeCacheDir := filepath.Join(rcloneCacheDir, c.VolumeId, uuid)
431+
rcloneLogFile := filepath.Join(rcloneLogDir, c.VolumeId, uuid+".log")
432+
os.RemoveAll(volumeCacheDir)
433+
os.Remove(rcloneLogFile)
434+
os.Remove(filepath.Dir(rcloneLogFile))
435+
os.Remove(filepath.Dir(volumeCacheDir))
399436
case *protocol.RequestDataCmd:
400437
if stdin == nil {
401438
log.Warnf("Received RequestDataCmd when process is not started")

connector/utils.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,16 @@ package main
22

33
import (
44
"bytes"
5+
"crypto/md5"
6+
"encoding/hex"
7+
"errors"
58
"fmt"
9+
"io"
610
"os"
711
"os/exec"
812
"path/filepath"
913
"regexp"
14+
"runtime"
1015
"strconv"
1116

1217
"github.com/Unknwon/goconfig"
@@ -34,6 +39,46 @@ const (
3439
RCLONE_CONFIG_BOOL_TRUE = "true"
3540
)
3641

42+
func userLogDir() (string, error) {
43+
var dir string
44+
45+
switch runtime.GOOS {
46+
case "windows":
47+
dir = os.Getenv("LocalAppData")
48+
if dir == "" {
49+
return "", errors.New("%LocalAppData% is not defined")
50+
}
51+
52+
case "darwin", "ios":
53+
dir = os.Getenv("HOME")
54+
if dir == "" {
55+
return "", errors.New("$HOME is not defined")
56+
}
57+
dir += "/Library/Logs"
58+
59+
case "plan9":
60+
dir = os.Getenv("home")
61+
if dir == "" {
62+
return "", errors.New("$home is not defined")
63+
}
64+
dir += "/lib/log"
65+
66+
default: // Unix
67+
dir = "/var/log"
68+
}
69+
70+
return dir, nil
71+
}
72+
73+
func rcloneCacheId(items ...string) string {
74+
hasher := md5.New()
75+
for _, item := range items {
76+
io.WriteString(hasher, item)
77+
hasher.Write([]byte{0})
78+
}
79+
return hex.EncodeToString(hasher.Sum(nil))
80+
}
81+
3782
func ensureDirectoryExists(path string) error {
3883
if fileInfo, err := os.Stat(path); os.IsNotExist(err) {
3984
return os.MkdirAll(path, 0700)

docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
FROM debian:bullseye
22

33
ARG KODOFS_VERSION=2.4.18
4-
ARG RCLONE_VERSION=1.60.0
4+
ARG RCLONE_VERSION=1.60.1
55
COPY nsenter /usr/local/bin/nsenter
66
COPY kodofs-v${KODOFS_VERSION} /usr/local/bin/kodofs
77
COPY rclone-v${RCLONE_VERSION} /usr/local/bin/rclone
Binary file not shown.

k8s/kodo/kodo-plugin.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ spec:
5757
capabilities:
5858
add: ["SYS_ADMIN"]
5959
allowPrivilegeEscalation: true
60-
image: kodoproduct/csi-plugin.storage.qiniu.com:v0.1.0
60+
image: kodoproduct/csi-plugin.storage.qiniu.com:v0.1.0-5-g9007c38
6161
imagePullPolicy: Always
6262
args:
6363
- "--endpoint=$(CSI_ENDPOINT)"

k8s/kodofs/kodofs-plugin.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ spec:
5757
capabilities:
5858
add: ["SYS_ADMIN"]
5959
allowPrivilegeEscalation: true
60-
image: kodoproduct/csi-plugin.storage.qiniu.com:v0.1.0
60+
image: kodoproduct/csi-plugin.storage.qiniu.com:v0.1.0-5-g9007c38
6161
imagePullPolicy: Always
6262
args:
6363
- "--endpoint=$(CSI_ENDPOINT)"

plugin/kodo_node_server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ func (server *kodoNodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.
7070
} else {
7171
log.Infof("NodeUnpublishVolume: umounted kodo volume from path: %s", mountPath)
7272
}
73+
if err = cleanAfterKodoUmount(req.VolumeId, mountPath); err != nil {
74+
log.Warnf("NodeUnpublishVolume: failed to clean kodo volume cache and log files: %s", err)
75+
} else {
76+
log.Infof("NodeUnpublishVolume: kodo volume cache and log files are cleaned")
77+
}
7378
return &csi.NodeUnpublishVolumeResponse{}, nil
7479
}
7580

plugin/utils.go

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -189,19 +189,11 @@ func mountKodoFS(gatewayID, mountPath string, mountServerAddress *url.URL, acces
189189
}
190190
switch cmd.(type) {
191191
case *protocol.InitKodoFSMountCmd:
192-
if err = encoder.Encode(&protocol.Request{
193-
Version: protocol.Version,
194-
Cmd: protocol.InitKodoFsMountCmdName,
195-
Payload: json.RawMessage(buf),
196-
}); err != nil {
192+
if err = encoder.Encode(makeRequest(protocol.InitKodoFsMountCmdName, buf)); err != nil {
197193
return fmt.Errorf("failed to write command to unix socket %s: %w", SocketPath, err)
198194
}
199195
case *protocol.RequestDataCmd:
200-
if err = encoder.Encode(&protocol.Request{
201-
Version: protocol.Version,
202-
Cmd: protocol.RequestDataCmdName,
203-
Payload: json.RawMessage(buf),
204-
}); err != nil {
196+
if err = encoder.Encode(makeRequest(protocol.RequestDataCmdName, buf)); err != nil {
205197
return fmt.Errorf("failed to write command to unix socket %s: %w", SocketPath, err)
206198
}
207199
}
@@ -287,11 +279,7 @@ func mountKodo(volumeId, mountPath, subDir, accessKey, secretKey, bucketId, s3Re
287279
}
288280
switch cmd.(type) {
289281
case *protocol.InitKodoMountCmd:
290-
if err = encoder.Encode(&protocol.Request{
291-
Version: protocol.Version,
292-
Cmd: protocol.InitKodoMountCmdName,
293-
Payload: json.RawMessage(buf),
294-
}); err != nil {
282+
if err = encoder.Encode(makeRequest(protocol.InitKodoMountCmdName, buf)); err != nil {
295283
return fmt.Errorf("failed to write command to unix socket %s: %w", SocketPath, err)
296284
}
297285
}
@@ -411,6 +399,44 @@ func umount(mountPath string) error {
411399
return err
412400
}
413401

402+
func cleanAfterKodoUmount(volumeId, mountPath string) error {
403+
conn, err := net.Dial("unix", SocketPath)
404+
if err != nil {
405+
return fmt.Errorf("failed to dial unix socket %s: %w", SocketPath, err)
406+
}
407+
defer conn.Close()
408+
409+
encoder := json.NewEncoder(conn)
410+
411+
writeCmdToConn := func(encoder *json.Encoder, cmd protocol.Cmd) error {
412+
buf, err := json.Marshal(cmd)
413+
if err != nil {
414+
return fmt.Errorf("failed to marshal json payload: %w", err)
415+
}
416+
switch cmd.(type) {
417+
case *protocol.KodoUmountCmd:
418+
if err = encoder.Encode(makeRequest(protocol.KodoUmountCmdName, buf)); err != nil {
419+
return fmt.Errorf("failed to write command to unix socket %s: %w", SocketPath, err)
420+
}
421+
}
422+
return nil
423+
}
424+
425+
cmd := protocol.KodoUmountCmd{
426+
VolumeId: volumeId,
427+
MountPath: mountPath,
428+
}
429+
return writeCmdToConn(encoder, &cmd)
430+
}
431+
432+
func makeRequest(cmdName string, buf []byte) *protocol.Request {
433+
return &protocol.Request{
434+
Version: protocol.Version,
435+
Cmd: cmdName,
436+
Payload: json.RawMessage(buf),
437+
}
438+
}
439+
414440
const (
415441
FuseTypeKodoFS = "fuse.KodoFS"
416442
FuseTypeKodo = "fuse.rclone"

protocol/protocol.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ import (
55
"encoding/json"
66
"fmt"
77
"os/exec"
8-
"path/filepath"
98
"strconv"
109
)
1110

1211
const (
1312
Version = "v2"
1413
InitKodoMountCmdName = "init_kodo_mount"
1514
InitKodoFsMountCmdName = "init_kodofs_mount"
15+
KodoUmountCmdName = "umount_kodo"
1616
RequestDataCmdName = "request_data"
1717
ResponseDataCmdName = "response_data"
1818
TerminateCmdName = "terminate"
@@ -68,6 +68,11 @@ type (
6868
DebugFuse bool `json:"debug_fuse,omitempty"`
6969
}
7070

71+
KodoUmountCmd struct {
72+
VolumeId string `json:"volume_id"`
73+
MountPath string `json:"mount_path"`
74+
}
75+
7176
RequestDataCmd struct {
7277
Data string `json:"data"`
7378
}
@@ -93,6 +98,7 @@ type (
9398

9499
func (*InitKodoFSMountCmd) Command() {}
95100
func (*InitKodoMountCmd) Command() {}
101+
func (*KodoUmountCmd) Command() {}
96102
func (*RequestDataCmd) Command() {}
97103
func (*ResponseDataCmd) Command() {}
98104
func (*TerminateCmd) Command() {}
@@ -107,7 +113,7 @@ const (
107113

108114
ContextKeyConfigFilePath contextKey = "config_file_path"
109115
ContextKeyUserAgent contextKey = "user_agent"
110-
ContextKeyLogDirPath contextKey = "log_dir_path"
116+
ContextKeyLogFilePath contextKey = "log_file_path"
111117
ContextKeyCacheDirPath contextKey = "cache_dir_path"
112118
)
113119

@@ -119,14 +125,14 @@ func (c *InitKodoFSMountCmd) ExecCommand(ctx context.Context) *exec.Cmd {
119125
func (c *InitKodoMountCmd) ExecCommand(ctx context.Context) *exec.Cmd {
120126
rcloneConfigFilePath := ctx.Value(ContextKeyConfigFilePath).(string)
121127
userAgent := ctx.Value(ContextKeyUserAgent).(string)
122-
rcloneLogDirPath := ctx.Value(ContextKeyLogDirPath).(string)
128+
rcloneLogFilePath := ctx.Value(ContextKeyLogFilePath).(string)
123129
rcloneCacheDirPath := ctx.Value(ContextKeyCacheDirPath).(string)
124130

125131
var cmdFlags = []string{
126132
"--auto-confirm",
127133
"--config", rcloneConfigFilePath,
128134
"--user-agent", fmt.Sprintf("%s/%s", userAgent, c.VolumeId),
129-
"--log-file", filepath.Join(rcloneLogDirPath, c.VolumeId+".log")}
135+
"--log-file", rcloneLogFilePath}
130136
if c.BufferSize != nil {
131137
cmdFlags = append(cmdFlags, []string{"--buffer-size", formatByteSize(*c.BufferSize)}...)
132138
}

0 commit comments

Comments
 (0)