Skip to content

Commit 34db797

Browse files
author
Qingping Hou
committed
use .objinsync as working dir to implement atomic update
tmp will not work when it's mounted as a different partition
1 parent 91a0137 commit 34db797

File tree

3 files changed

+110
-57
lines changed

3 files changed

+110
-57
lines changed

main.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,24 +96,22 @@ func main() {
9696
localDir := args[1]
9797
interval := time.Second * 5
9898

99-
_, err := os.Stat(localDir)
99+
puller, err := sync.NewPuller(remoteUri, localDir)
100100
if err != nil {
101-
log.Fatal(localDir, " is not a valid dir: ", err)
101+
log.Fatal(err)
102102
}
103-
104-
puller := sync.NewPuller()
105103
if FlagExclude != nil {
106104
puller.AddExcludePatterns(FlagExclude)
107105
}
108106
if !FlagScratch {
109-
puller.PopulateChecksum(localDir)
107+
puller.PopulateChecksum()
110108
}
111109

112110
pull := func() {
113111
start := time.Now()
114112
l.Info("Pull started.")
115113

116-
errMsg := puller.Pull(remoteUri, localDir)
114+
errMsg := puller.Pull()
117115
if errMsg != "" {
118116
sentry.CaptureMessage(errMsg)
119117
sentry.Flush(time.Second * 5)

pkg/sync/pull.go

Lines changed: 70 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,32 @@ func uidFromLocalPath(localPath string) (string, error) {
9999
return fmt.Sprintf("\"%s\"", uid), nil
100100
}
101101

102+
type Puller struct {
103+
RemoteUri string
104+
LocalDir string
105+
106+
workingDir string
107+
exclude []string
108+
workerCnt int
109+
uidCache map[string]string
110+
uidLock *sync.Mutex
111+
taskQueue chan DownloadTask
112+
errMsgQueue chan string
113+
// Here is how filesToDelete is being used:
114+
//
115+
// 1. before each pull action, we populate filesToDelete with all files
116+
// (without dirs) from local target directory. During this process, we also
117+
// delete local empty directories.
118+
//
119+
// 2. we list S3 bucket, for any file in the bucket, we remove related
120+
// entry from the delete list
121+
//
122+
// 3. at the end of the pull, we delete files from the list
123+
filesToDelete map[string]bool
124+
fileListedCnt int
125+
filePulledCnt int
126+
}
127+
102128
func (self *Puller) downloadHandler(task DownloadTask, downloader GenericDownloader) {
103129
l := zap.S()
104130

@@ -125,9 +151,9 @@ func (self *Puller) downloadHandler(task DownloadTask, downloader GenericDownloa
125151
}
126152

127153
// create file
128-
tmpfile, err := ioutil.TempFile(os.TempDir(), "objinsync-download-")
154+
tmpfile, err := ioutil.TempFile(self.workingDir, filepath.Base(task.LocalPath))
129155
if err != nil {
130-
self.errMsgQueue <- fmt.Sprintf("Failed to create file %s for download: %v", tmpfile.Name(), err)
156+
self.errMsgQueue <- fmt.Sprintf("Failed to create file for download: %v", err)
131157
return
132158
}
133159
defer tmpfile.Close()
@@ -228,50 +254,41 @@ func (self *Puller) handlePageList(
228254
return true
229255
}
230256

231-
type Puller struct {
232-
exclude []string
233-
workerCnt int
234-
uidCache map[string]string
235-
uidLock *sync.Mutex
236-
taskQueue chan DownloadTask
237-
errMsgQueue chan string
238-
// Here is how filesToDelete is being used:
239-
//
240-
// 1. before each pull action, we populate filesToDelete with all files
241-
// (without dirs) from local target directory. During this process, we also
242-
// delete local empty directories.
243-
//
244-
// 2. we list S3 bucket, for any file in the bucket, we remove related
245-
// entry from the delete list
246-
//
247-
// 3. at the end of the pull, we delete files from the list
248-
filesToDelete map[string]bool
249-
fileListedCnt int
250-
filePulledCnt int
251-
}
252-
253257
func (self *Puller) AddExcludePatterns(patterns []string) {
254258
for _, pattern := range patterns {
255259
self.exclude = append(self.exclude, pattern)
256260
}
257261
}
258262

259-
func (self *Puller) Pull(remoteUri string, localDir string) string {
263+
func (self *Puller) SetupWorkingDir() error {
264+
// create temporary working directory to hold downloads for atomic rename
265+
// TmpDir won't work because it could be in a different partition, which
266+
// will lead to invalid cross-device link error
267+
if _, err := os.Stat(self.workingDir); os.IsNotExist(err) {
268+
err = os.MkdirAll(self.workingDir, os.ModePerm)
269+
if err != nil {
270+
return err
271+
}
272+
}
273+
return nil
274+
}
275+
276+
func (self *Puller) Pull() string {
260277
l := zap.S()
261278

262-
filesToDelete, err := listAndPruneDir(localDir, self.exclude)
279+
filesToDelete, err := listAndPruneDir(self.LocalDir, self.exclude)
263280
if err != nil {
264-
return fmt.Sprintf("Failed to list and prune local dir %s: %v", localDir, err)
281+
return fmt.Sprintf("Failed to list and prune local dir %s: %v", self.LocalDir, err)
265282
}
266283
// handlePageList method will remove files existed in remote source from this list
267284
self.filesToDelete = filesToDelete
268285
defer func() {
269286
self.filesToDelete = nil
270287
}()
271288

272-
bucket, remoteDirPath, err := parseObjectUri(remoteUri)
289+
bucket, remoteDirPath, err := parseObjectUri(self.RemoteUri)
273290
if err != nil {
274-
return fmt.Sprintf("Invalid remote uri %s: %v", remoteUri, err)
291+
return fmt.Sprintf("Invalid remote uri %s: %v", self.RemoteUri, err)
275292
}
276293

277294
self.taskQueue = make(chan DownloadTask, 30)
@@ -292,6 +309,11 @@ func (self *Puller) Pull(remoteUri string, localDir string) string {
292309
svc := s3.New(sess, aws.NewConfig().WithRegion(region))
293310
downloader := s3manager.NewDownloaderWithClient(svc)
294311

312+
if err := self.SetupWorkingDir(); err != nil {
313+
return fmt.Sprintf("Failed to create working directory %s: %v", self.workingDir, err)
314+
}
315+
defer os.RemoveAll(self.workingDir) // purge working dir when downlaods are done
316+
295317
// spawn worker goroutines
296318
var wg sync.WaitGroup
297319
for i := 0; i < self.workerCnt; i++ {
@@ -329,7 +351,7 @@ func (self *Puller) Pull(remoteUri string, localDir string) string {
329351

330352
err = svc.ListObjectsV2Pages(listParams,
331353
func(page *s3.ListObjectsV2Output, lastPage bool) bool {
332-
return self.handlePageList(page, lastPage, bucket, remoteDirPath, localDir)
354+
return self.handlePageList(page, lastPage, bucket, remoteDirPath, self.LocalDir)
333355
})
334356
close(self.taskQueue)
335357
wg.Wait()
@@ -339,7 +361,7 @@ func (self *Puller) Pull(remoteUri string, localDir string) string {
339361
metricsFilePulled.Set(float64(self.filePulledCnt))
340362

341363
if err != nil {
342-
return fmt.Sprintf("Failed to list remote uri %s: %v", remoteUri, err)
364+
return fmt.Sprintf("Failed to list remote uri %s: %v", self.RemoteUri, err)
343365
} else {
344366
errMsgWg.Wait()
345367

@@ -354,7 +376,7 @@ func (self *Puller) Pull(remoteUri string, localDir string) string {
354376
}
355377
}
356378

357-
func (self *Puller) PopulateChecksum(localDir string) {
379+
func (self *Puller) PopulateChecksum() {
358380
l := zap.S()
359381

360382
setFileChecksum := func(path string) {
@@ -369,9 +391,9 @@ func (self *Puller) PopulateChecksum(localDir string) {
369391
l.Errorf("Failed to calculate checksum for file: %s, err: %s", path, err)
370392
}
371393

372-
uidKey, err := uidKeyFromLocalPath(localDir, path)
394+
uidKey, err := uidKeyFromLocalPath(self.LocalDir, path)
373395
if err != nil {
374-
l.Errorf("Failed to calculate uidKey for file: %s under dir: %s, err: %s", path, localDir, err)
396+
l.Errorf("Failed to calculate uidKey for file: %s under dir: %s, err: %s", path, self.LocalDir, err)
375397
return
376398
}
377399

@@ -386,14 +408,14 @@ func (self *Puller) PopulateChecksum(localDir string) {
386408
self.uidLock.Unlock()
387409
}
388410

389-
err := filepath.Walk(localDir, func(path string, info os.FileInfo, err error) error {
411+
err := filepath.Walk(self.LocalDir, func(path string, info os.FileInfo, err error) error {
390412
if err != nil {
391413
return err
392414
}
393415

394416
// ignore file that matches exclude rules
395417
shouldSkip := false
396-
relPath, err := filepath.Rel(localDir, path)
418+
relPath, err := filepath.Rel(self.LocalDir, path)
397419
if err != nil {
398420
l.Errorf("Got invalid path from filepath.Walk: %s, err: %s", path, err)
399421
shouldSkip = true
@@ -424,10 +446,17 @@ func (self *Puller) PopulateChecksum(localDir string) {
424446
}
425447
}
426448

427-
func NewPuller() *Puller {
428-
return &Puller{
429-
workerCnt: 5,
430-
uidCache: map[string]string{},
431-
uidLock: &sync.Mutex{},
449+
func NewPuller(remoteUri string, localDir string) (*Puller, error) {
450+
if _, err := os.Stat(localDir); os.IsNotExist(err) {
451+
return nil, fmt.Errorf("local directory `%s` does not exist: %v", localDir, err)
432452
}
453+
454+
return &Puller{
455+
RemoteUri: remoteUri,
456+
LocalDir: localDir,
457+
workingDir: filepath.Join(localDir, ".objinsync"),
458+
workerCnt: 5,
459+
uidCache: map[string]string{},
460+
uidLock: &sync.Mutex{},
461+
}, nil
433462
}

pkg/sync/pull_internal_test.go

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,13 @@ import (
1515
)
1616

1717
func TestSkipParentDir(t *testing.T) {
18-
p := NewPuller()
18+
dir, err := ioutil.TempDir("", "")
19+
assert.Equal(t, nil, err)
20+
defer os.RemoveAll(dir)
21+
p, err := NewPuller("s3://foo/home", dir)
22+
assert.Equal(t, nil, err)
23+
p.SetupWorkingDir()
24+
1925
p.taskQueue = make(chan DownloadTask, 10)
2026
p.handlePageList(
2127
&s3.ListObjectsV2Output{
@@ -33,7 +39,7 @@ func TestSkipParentDir(t *testing.T) {
3339
false,
3440
"foo",
3541
"home",
36-
"abc",
42+
dir,
3743
)
3844
close(p.taskQueue)
3945

@@ -46,6 +52,7 @@ func TestSkipParentDir(t *testing.T) {
4652

4753
func TestDeleteStaleFile(t *testing.T) {
4854
dir, err := ioutil.TempDir("", "")
55+
defer os.RemoveAll(dir)
4956
nonEmptyDir := filepath.Join(dir, "bar")
5057
os.MkdirAll(nonEmptyDir, os.ModePerm)
5158
fileA := filepath.Join(nonEmptyDir, "a.go")
@@ -62,7 +69,9 @@ func TestDeleteStaleFile(t *testing.T) {
6269
err = ioutil.WriteFile(deletedFileB, []byte("test2"), 0644)
6370
assert.Equal(t, nil, err)
6471

65-
p := NewPuller()
72+
p, err := NewPuller("s3://foo/home/dags", dir)
73+
assert.Equal(t, nil, err)
74+
assert.Equal(t, nil, p.SetupWorkingDir())
6675
p.taskQueue = make(chan DownloadTask, 10)
6776
p.filesToDelete, err = listAndPruneDir(dir, nil)
6877
assert.Equal(t, nil, err)
@@ -117,7 +126,12 @@ func TestDeleteStaleFile(t *testing.T) {
117126
}
118127

119128
func TestSkipObjectsWithoutChange(t *testing.T) {
120-
p := NewPuller()
129+
dir, err := ioutil.TempDir("", "")
130+
assert.Equal(t, nil, err)
131+
defer os.RemoveAll(dir)
132+
p, err := NewPuller("s3://foo/home/dags", dir)
133+
assert.Equal(t, nil, err)
134+
assert.Equal(t, nil, p.SetupWorkingDir())
121135
p.taskQueue = make(chan DownloadTask, 10)
122136
p.uidCache["b.file"] = "\"1\""
123137

@@ -146,7 +160,7 @@ func TestSkipObjectsWithoutChange(t *testing.T) {
146160
false,
147161
"foo",
148162
"home/dags",
149-
"bar",
163+
dir,
150164
)
151165
close(p.taskQueue)
152166
wg.Wait()
@@ -156,7 +170,12 @@ func TestSkipObjectsWithoutChange(t *testing.T) {
156170
}
157171

158172
func TestSkipExcludedObjects(t *testing.T) {
159-
p := NewPuller()
173+
dir, err := ioutil.TempDir("", "")
174+
assert.Equal(t, nil, err)
175+
defer os.RemoveAll(dir)
176+
p, err := NewPuller("s3://foo/home", dir)
177+
assert.Equal(t, nil, err)
178+
assert.Equal(t, nil, p.SetupWorkingDir())
160179
p.taskQueue = make(chan DownloadTask, 10)
161180

162181
var wg sync.WaitGroup
@@ -197,7 +216,7 @@ func TestSkipExcludedObjects(t *testing.T) {
197216
false,
198217
"foo",
199218
"home",
200-
"bar",
219+
dir,
201220
)
202221
close(p.taskQueue)
203222
wg.Wait()
@@ -207,7 +226,12 @@ func TestSkipExcludedObjects(t *testing.T) {
207226
}
208227

209228
func TestSkipDirectories(t *testing.T) {
210-
p := NewPuller()
229+
dir, err := ioutil.TempDir("", "")
230+
assert.Equal(t, nil, err)
231+
defer os.RemoveAll(dir)
232+
p, err := NewPuller("s3://foo/home/dags", dir)
233+
assert.Equal(t, nil, err)
234+
assert.Equal(t, nil, p.SetupWorkingDir())
211235
p.taskQueue = make(chan DownloadTask, 10)
212236

213237
var wg sync.WaitGroup
@@ -235,7 +259,7 @@ func TestSkipDirectories(t *testing.T) {
235259
false,
236260
"foo",
237261
"home/dags",
238-
"bar",
262+
dir,
239263
)
240264
close(p.taskQueue)
241265
wg.Wait()
@@ -257,8 +281,10 @@ func TestNestedPathDownload(t *testing.T) {
257281

258282
mockDownloader := MockDownloader{}
259283

260-
p := NewPuller()
284+
p, err := NewPuller("s3://abc/efg", dir)
285+
assert.Equal(t, nil, err)
261286
p.errMsgQueue = make(chan string, 30)
287+
assert.Equal(t, nil, p.SetupWorkingDir())
262288

263289
p.downloadHandler(
264290
DownloadTask{

0 commit comments

Comments
 (0)