Skip to content

Commit 8ade48a

Browse files
author
Qingping Hou
committed
feat: avoid unnecessary file override during the initial sync
1 parent 9430b58 commit 8ade48a

File tree

3 files changed

+140
-30
lines changed

3 files changed

+140
-30
lines changed

main.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ var (
2323
FlagRunOnce bool
2424
FlagStatusAddr = ":8087"
2525
FlagExclude []string
26+
FlagScratch bool
2627

2728
metricsSyncTime = prometheus.NewGauge(prometheus.GaugeOpts{
2829
Namespace: "objinsync",
@@ -102,9 +103,10 @@ func main() {
102103

103104
puller := sync.NewPuller()
104105
if FlagExclude != nil {
105-
for _, exclude := range FlagExclude {
106-
puller.AddExcludePattern(exclude)
107-
}
106+
puller.AddExcludePatterns(FlagExclude)
107+
}
108+
if !FlagScratch {
109+
puller.PopulateChecksum(localDir)
108110
}
109111

110112
pull := func() {
@@ -151,6 +153,13 @@ func main() {
151153
&FlagStatusAddr, "status-addr", "s", ":8087", "binding address for status endpoint")
152154
pullCmd.PersistentFlags().StringSliceVarP(
153155
&FlagExclude, "exclude", "e", nil, "exclude files matching given pattern, see https://github.com/bmatcuk/doublestar#patterns for pattern spec")
156+
pullCmd.PersistentFlags().BoolVarP(
157+
&FlagScratch,
158+
"scratch",
159+
"",
160+
false,
161+
"skip checksums calculation and override all files during the initial sync",
162+
)
154163

155164
rootCmd.AddCommand(pullCmd)
156165
rootCmd.Execute()

pkg/sync/pull.go

Lines changed: 119 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package sync
22

33
import (
4+
"crypto/md5"
5+
"encoding/hex"
46
"fmt"
57
"io"
68
"io/ioutil"
@@ -56,8 +58,11 @@ type DownloadTask struct {
5658
Uri string
5759
LocalPath string
5860
Uid string
61+
// uid key is common suffix between local path and remote uri
62+
UidKey string
5963
}
6064

65+
// parse bucket and key out of remote object URI
6166
func parseObjectUri(uri string) (string, string, error) {
6267
parts := strings.SplitN(uri, "//", 2)
6368
if len(parts) != 2 {
@@ -73,6 +78,27 @@ func parseObjectUri(uri string) (string, string, error) {
7378
return pathParts[0], pathParts[1], nil
7479
}
7580

81+
func uidKeyFromLocalPath(localDir string, localPath string) (string, error) {
82+
return filepath.Rel(localDir, localPath)
83+
}
84+
85+
func uidFromLocalPath(localPath string) (string, error) {
86+
f, err := os.Open(localPath)
87+
if err != nil {
88+
return "", fmt.Errorf("Invalid file path for checksum calculation: %s, err: %s", localPath, err)
89+
}
90+
defer f.Close()
91+
92+
h := md5.New()
93+
if _, err := io.Copy(h, f); err != nil {
94+
return "", fmt.Errorf("Failed to calculate checksum for file: %s, err: %s", localPath, err)
95+
}
96+
97+
uid := hex.EncodeToString(h.Sum(nil))
98+
// AWS S3 ETag is a quoted hex string
99+
return fmt.Sprintf("\"%s\"", uid), nil
100+
}
101+
76102
func (self *Puller) downloadHandler(task DownloadTask, downloader GenericDownloader) {
77103
l := zap.S()
78104

@@ -120,9 +146,19 @@ func (self *Puller) downloadHandler(task DownloadTask, downloader GenericDownloa
120146

121147
// update cache with new object ID
122148
self.uidLock.Lock()
123-
l.Debugw("Updaing uid cache", "key", task.Uri, "val", task.Uid)
124-
self.uidCache[task.Uri] = task.Uid
125-
defer self.uidLock.Unlock()
149+
l.Debugw("Updaing uid cache", "key", task.UidKey, "val", task.Uid)
150+
self.uidCache[task.UidKey] = task.Uid
151+
self.uidLock.Unlock()
152+
}
153+
154+
func (self *Puller) isPathExcluded(path string) bool {
155+
for _, pattern := range self.exclude {
156+
matched, _ := doublestar.Match(pattern, path)
157+
if matched {
158+
return true
159+
}
160+
}
161+
return false
126162
}
127163

128164
func (self *Puller) handlePageList(
@@ -153,16 +189,9 @@ func (self *Puller) handlePageList(
153189
continue
154190
}
155191
// ignore file that matches exclude rules
156-
shouldSkip := false
157-
for _, pattern := range self.exclude {
158-
matched, _ := doublestar.Match(pattern, relPath)
159-
if matched {
160-
l.Debugf("skipped %s due to exclude pattern: %s", uri, pattern)
161-
shouldSkip = true
162-
break
163-
}
164-
}
192+
shouldSkip := self.isPathExcluded(relPath)
165193
if shouldSkip {
194+
l.Debugf("skipped %s due to exclude pattern", uri)
166195
continue
167196
}
168197

@@ -178,10 +207,11 @@ func (self *Puller) handlePageList(
178207

179208
self.fileListedCnt += 1
180209

210+
uidKey := relPath
181211
self.uidLock.Lock()
182-
oldUid, ok := self.uidCache[uri]
212+
oldUid, ok := self.uidCache[uidKey]
183213
self.uidLock.Unlock()
184-
l.Debugf("Comparing object UID: %s <> %s = %v", oldUid, newUid, oldUid == newUid)
214+
l.Debugf("Comparing object UID: %s <> %s", oldUid, newUid)
185215
if ok && oldUid == newUid {
186216
// skip update if uid is the same
187217
continue
@@ -192,6 +222,7 @@ func (self *Puller) handlePageList(
192222
Uri: uri,
193223
LocalPath: localPath,
194224
Uid: newUid,
225+
UidKey: uidKey,
195226
}
196227
}
197228
return true
@@ -219,8 +250,10 @@ type Puller struct {
219250
filePulledCnt int
220251
}
221252

222-
func (self *Puller) AddExcludePattern(pattern string) {
223-
self.exclude = append(self.exclude, pattern)
253+
func (self *Puller) AddExcludePatterns(patterns []string) {
254+
for _, pattern := range patterns {
255+
self.exclude = append(self.exclude, pattern)
256+
}
224257
}
225258

226259
func (self *Puller) Pull(remoteUri string, localDir string) string {
@@ -321,6 +354,76 @@ func (self *Puller) Pull(remoteUri string, localDir string) string {
321354
}
322355
}
323356

357+
func (self *Puller) PopulateChecksum(localDir string) {
358+
l := zap.S()
359+
360+
setFileChecksum := func(path string) {
361+
f, err := os.Open(path)
362+
if err != nil {
363+
l.Errorf("Invalid file path for checksum calculation: %s, err: %s", path, err)
364+
}
365+
defer f.Close()
366+
367+
h := md5.New()
368+
if _, err := io.Copy(h, f); err != nil {
369+
l.Errorf("Failed to calculate checksum for file: %s, err: %s", path, err)
370+
}
371+
372+
uidKey, err := uidKeyFromLocalPath(localDir, path)
373+
if err != nil {
374+
l.Errorf("Failed to calculate uidKey for file: %s under dir: %s, err: %s", path, localDir, err)
375+
return
376+
}
377+
378+
uid, err := uidFromLocalPath(path)
379+
if err != nil {
380+
l.Errorf("Failed to calculate UID: %s", err)
381+
return
382+
}
383+
384+
self.uidLock.Lock()
385+
self.uidCache[uidKey] = uid
386+
self.uidLock.Unlock()
387+
}
388+
389+
err := filepath.Walk(localDir, func(path string, info os.FileInfo, err error) error {
390+
if err != nil {
391+
return err
392+
}
393+
394+
// ignore file that matches exclude rules
395+
shouldSkip := false
396+
relPath, err := filepath.Rel(localDir, path)
397+
if err != nil {
398+
l.Errorf("Got invalid path from filepath.Walk: %s, err: %s", path, err)
399+
shouldSkip = true
400+
} else {
401+
if info.IsDir() {
402+
// this is so that pattern `foo/**` also matches `foo`
403+
relPath += "/"
404+
}
405+
shouldSkip = self.isPathExcluded(relPath)
406+
}
407+
408+
if info.IsDir() {
409+
if shouldSkip {
410+
return filepath.SkipDir
411+
}
412+
} else {
413+
if shouldSkip {
414+
return nil
415+
}
416+
417+
setFileChecksum(path)
418+
}
419+
return nil
420+
})
421+
422+
if err != nil {
423+
l.Errorf("Failed to walk directory for populating file checksum, err: %s", err)
424+
}
425+
}
426+
324427
func NewPuller() *Puller {
325428
return &Puller{
326429
workerCnt: 5,

pkg/sync/pull_internal_test.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func TestDeleteStaleFile(t *testing.T) {
119119
func TestSkipObjectsWithoutChange(t *testing.T) {
120120
p := NewPuller()
121121
p.taskQueue = make(chan DownloadTask, 10)
122-
p.uidCache["s3://foo/home/dags/b.file"] = "1"
122+
p.uidCache["b.file"] = "\"1\""
123123

124124
var wg sync.WaitGroup
125125
wg.Add(1)
@@ -135,11 +135,11 @@ func TestSkipObjectsWithoutChange(t *testing.T) {
135135
Contents: []*s3.Object{
136136
&s3.Object{
137137
Key: aws.String("home/dags/b.file"),
138-
ETag: aws.String("1"),
138+
ETag: aws.String("\"1\""),
139139
},
140140
&s3.Object{
141141
Key: aws.String("home/dags/bar/a.go"),
142-
ETag: aws.String("1"),
142+
ETag: aws.String("\"1\""),
143143
},
144144
},
145145
},
@@ -168,31 +168,29 @@ func TestSkipExcludedObjects(t *testing.T) {
168168
wg.Done()
169169
}()
170170

171-
p.AddExcludePattern("airflow.cfg")
172-
p.AddExcludePattern("webserver_config.py")
173-
p.AddExcludePattern("config/**")
171+
p.AddExcludePatterns([]string{"airflow.cfg", "webserver_config.py", "config/**"})
174172
p.handlePageList(
175173
&s3.ListObjectsV2Output{
176174
Contents: []*s3.Object{
177175
&s3.Object{
178176
Key: aws.String("home/dags/b.file"),
179-
ETag: aws.String("1"),
177+
ETag: aws.String("\"1\""),
180178
},
181179
&s3.Object{
182180
Key: aws.String("home/airflow.cfg"),
183-
ETag: aws.String("2"),
181+
ETag: aws.String("\"2\""),
184182
},
185183
&s3.Object{
186184
Key: aws.String("home/config/a.file"),
187-
ETag: aws.String("3"),
185+
ETag: aws.String("\"3\""),
188186
},
189187
&s3.Object{
190188
Key: aws.String("home/config/subdir/a.file"),
191-
ETag: aws.String("4"),
189+
ETag: aws.String("\"4\""),
192190
},
193191
&s3.Object{
194192
Key: aws.String("home/webserver_config.py"),
195-
ETag: aws.String("5"),
193+
ETag: aws.String("\"5\""),
196194
},
197195
},
198196
},

0 commit comments

Comments
 (0)