Skip to content

Commit 72e7a0c

Browse files
authored
Fix handling of file open error during parallel (#19)
1 parent 3c62370 commit 72e7a0c

File tree

2 files changed

+66
-25
lines changed

2 files changed

+66
-25
lines changed

.golangci.yml

+3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ linters-settings:
2222
linters:
2323
enable-all: true
2424
disable:
25+
- funlen
26+
- nestif
27+
- cyclop
2528
- intrange
2629
- copyloopvar
2730
- lll

cmd/catp/catp/app.go

+63-25
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ type runner struct {
5454
lastErr error
5555
lastStatusTime int64
5656
lastBytesUncompressed int64
57+
58+
noProgress bool
5759
}
5860

5961
// st renders Status as a string.
@@ -80,13 +82,25 @@ func (r *runner) st(s progress.Status) string {
8082
if len(r.sizes) > 1 && r.parallel <= 1 {
8183
pr.CurrentFilePercent = 100 * float64(r.currentFile.Bytes()) / float64(r.currentTotal)
8284

83-
res = fmt.Sprintf("all: %.1f%% bytes read, %s: %.1f%% bytes read, %d lines processed, %.1f l/s, %.1f MB/s, elapsed %s, remaining %s",
84-
s.DonePercent, s.Task, pr.CurrentFilePercent, s.LinesCompleted, s.SpeedLPS, s.SpeedMBPS,
85-
s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String())
85+
if s.LinesCompleted != 0 {
86+
res = fmt.Sprintf("all: %.1f%% bytes read, %s: %.1f%% bytes read, %d lines processed, %.1f l/s, %.1f MB/s, elapsed %s, remaining %s",
87+
s.DonePercent, s.Task, pr.CurrentFilePercent, s.LinesCompleted, s.SpeedLPS, s.SpeedMBPS,
88+
s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String())
89+
} else {
90+
res = fmt.Sprintf("all: %.1f%% bytes read, %s: %.1f%% bytes read, %.1f MB/s, elapsed %s, remaining %s",
91+
s.DonePercent, s.Task, pr.CurrentFilePercent, s.SpeedMBPS,
92+
s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String())
93+
}
8694
} else {
87-
res = fmt.Sprintf("%s: %.1f%% bytes read, %d lines processed, %.1f l/s, %.1f MB/s, elapsed %s, remaining %s",
88-
s.Task, s.DonePercent, s.LinesCompleted, s.SpeedLPS, s.SpeedMBPS,
89-
s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String())
95+
if s.LinesCompleted != 0 {
96+
res = fmt.Sprintf("%s: %.1f%% bytes read, %d lines processed, %.1f l/s, %.1f MB/s, elapsed %s, remaining %s",
97+
s.Task, s.DonePercent, s.LinesCompleted, s.SpeedLPS, s.SpeedMBPS,
98+
s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String())
99+
} else {
100+
res = fmt.Sprintf("%s: %.1f%% bytes read, %.1f MB/s, elapsed %s, remaining %s",
101+
s.Task, s.DonePercent, s.SpeedMBPS,
102+
s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String())
103+
}
90104
}
91105

92106
if currentBytesUncompressed > currentBytes {
@@ -125,6 +139,15 @@ func (r *runner) st(s progress.Status) string {
125139
return res
126140
}
127141

142+
func (r *runner) readFile(rd io.Reader, out io.Writer) {
143+
b := bufio.NewReaderSize(rd, 64*1024)
144+
145+
_, err := io.Copy(out, b)
146+
if err != nil {
147+
log.Fatal(err)
148+
}
149+
}
150+
128151
func (r *runner) scanFile(rd io.Reader, out io.Writer) {
129152
s := bufio.NewScanner(rd)
130153
s.Buffer(make([]byte, 64*1024), 10*1024*1024)
@@ -236,30 +259,35 @@ func (r *runner) cat(filename string) (err error) {
236259
}
237260
}()
238261

239-
cr := progress.NewCountingReader(file)
240-
cr.SetBytes(&r.currentBytes)
241-
cr.SetLines(nil)
262+
rd := io.Reader(file)
242263

243-
if r.parallel <= 1 {
244-
cr = progress.NewCountingReader(file)
264+
if !r.noProgress {
265+
cr := progress.NewCountingReader(file)
266+
cr.SetBytes(&r.currentBytes)
245267
cr.SetLines(nil)
246-
r.currentFile = cr
247-
r.currentTotal = r.sizes[filename]
248-
}
249268

250-
rd := io.Reader(cr)
269+
if r.parallel <= 1 {
270+
cr = progress.NewCountingReader(file)
271+
cr.SetLines(nil)
272+
r.currentFile = cr
273+
r.currentTotal = r.sizes[filename]
274+
}
275+
276+
rd = cr
277+
}
251278

252279
if rd, err = r.openReader(rd, filename); err != nil {
253280
return err
254281
}
255282

256-
crl := progress.NewCountingReader(rd)
283+
if !r.noProgress {
284+
crl := progress.NewCountingReader(rd)
257285

258-
crl.SetBytes(&r.currentBytesUncompressed)
259-
crl.SetLines(&r.currentLines)
260-
crl.SetLines(nil)
286+
crl.SetBytes(&r.currentBytesUncompressed)
287+
crl.SetLines(nil)
261288

262-
rd = crl
289+
rd = crl
290+
}
263291

264292
out := r.output
265293

@@ -285,7 +313,7 @@ func (r *runner) cat(filename string) (err error) {
285313
out = w
286314
}
287315

288-
if r.parallel <= 1 {
316+
if r.parallel <= 1 && !r.noProgress {
289317
r.pr.Start(func(t *progress.Task) {
290318
t.TotalBytes = func() int64 {
291319
return r.totalBytes
@@ -299,9 +327,13 @@ func (r *runner) cat(filename string) (err error) {
299327
})
300328
}
301329

302-
r.scanFile(rd, out)
330+
if len(r.pass) > 0 || len(r.skip) > 0 || r.parallel > 1 {
331+
r.scanFile(rd, out)
332+
} else {
333+
r.readFile(rd, out)
334+
}
303335

304-
if r.parallel <= 1 {
336+
if r.parallel <= 1 && !r.noProgress {
305337
r.pr.Stop()
306338
}
307339

@@ -399,7 +431,7 @@ func Main() error { //nolint:funlen,cyclop,gocognit,gocyclo,maintidx
399431
cpuProfile := flag.String("dbg-cpu-prof", "", "write first 10 seconds of CPU profile to file")
400432
memProfile := flag.String("dbg-mem-prof", "", "write heap profile to file after 10 seconds")
401433
output := flag.String("output", "", "output to file (can have .gz or .zst ext for compression) instead of STDOUT")
402-
noProgress := flag.Bool("no-progress", false, "disable progress printing")
434+
flag.BoolVar(&r.noProgress, "no-progress", false, "disable progress printing")
403435
progressJSON := flag.String("progress-json", "", "write current progress to a file")
404436
ver := flag.Bool("version", false, "print version and exit")
405437

@@ -523,7 +555,7 @@ func Main() error { //nolint:funlen,cyclop,gocognit,gocyclo,maintidx
523555
Print: func(status progress.Status) {
524556
s := r.st(status)
525557

526-
if *noProgress {
558+
if r.noProgress {
527559
return
528560
}
529561

@@ -581,6 +613,12 @@ func Main() error { //nolint:funlen,cyclop,gocognit,gocyclo,maintidx
581613
}
582614

583615
pr.Stop()
616+
617+
close(errs)
618+
619+
if err := <-errs; err != nil {
620+
return err
621+
}
584622
} else {
585623
for i := 0; i < flag.NArg(); i++ {
586624
if err := r.cat(flag.Arg(i)); err != nil {

0 commit comments

Comments
 (0)