|
| 1 | +package progress |
| 2 | + |
| 3 | +import ( |
| 4 | + "fmt" |
| 5 | + "io" |
| 6 | + "runtime" |
| 7 | + "sync/atomic" |
| 8 | + "time" |
| 9 | +) |
| 10 | + |
| 11 | +// ProgressStatus describes current progress. |
| 12 | +type ProgressStatus struct { |
| 13 | + Task string |
| 14 | + DonePercent float64 |
| 15 | + LinesCompleted int64 |
| 16 | + SpeedMBPS float64 |
| 17 | + SpeedLPS float64 |
| 18 | + Elapsed time.Duration |
| 19 | + Remaining time.Duration |
| 20 | + Metrics []ProgressMetric |
| 21 | +} |
| 22 | + |
| 23 | +// Progress reports reading performance. |
| 24 | +type Progress struct { |
| 25 | + Interval time.Duration |
| 26 | + Print func(status ProgressStatus) |
| 27 | + ShowHeapStats bool |
| 28 | + ShowLinesStats bool |
| 29 | + done chan bool |
| 30 | + lines int64 |
| 31 | + task string |
| 32 | + current func() int64 |
| 33 | + prnt func(s ProgressStatus) |
| 34 | + start time.Time |
| 35 | + tot float64 |
| 36 | + metrics []ProgressMetric |
| 37 | +} |
| 38 | + |
| 39 | +// ProgressType describes metric value. |
| 40 | +type ProgressType string |
| 41 | + |
| 42 | +// ProgressType values. |
| 43 | +const ( |
| 44 | + ProgressBytes = ProgressType("bytes") |
| 45 | + ProgressDuration = ProgressType("duration") |
| 46 | + ProgressGauge = ProgressType("gauge") |
| 47 | +) |
| 48 | + |
| 49 | +// ProgressMetric is an operation metric. |
| 50 | +type ProgressMetric struct { |
| 51 | + Name string |
| 52 | + Type ProgressType |
| 53 | + Value *int64 |
| 54 | +} |
| 55 | + |
| 56 | +// DefaultStatus renders ProgressStatus as a string. |
| 57 | +func DefaultStatus(s ProgressStatus) string { |
| 58 | + if s.Task != "" { |
| 59 | + s.Task += ": " |
| 60 | + } |
| 61 | + |
| 62 | + ms := runtime.MemStats{} |
| 63 | + runtime.ReadMemStats(&ms) |
| 64 | + |
| 65 | + heapMB := ms.HeapInuse / (1024 * 1024) |
| 66 | + |
| 67 | + res := fmt.Sprintf(s.Task+"%.1f%% bytes read, %d lines processed, %.1f l/s, %.1f MB/s, elapsed %s, remaining %s, heap %d MB", |
| 68 | + s.DonePercent, s.LinesCompleted, s.SpeedLPS, s.SpeedMBPS, |
| 69 | + s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String(), heapMB) |
| 70 | + |
| 71 | + return res |
| 72 | +} |
| 73 | + |
| 74 | +// MetricsStatus renders ProgressStatus metrics as a string. |
| 75 | +func MetricsStatus(s ProgressStatus) string { |
| 76 | + metrics := "" |
| 77 | + |
| 78 | + for _, m := range s.Metrics { |
| 79 | + switch m.Type { |
| 80 | + case ProgressBytes: |
| 81 | + spdMBPS := float64(atomic.LoadInt64(m.Value)) / (s.Elapsed.Seconds() * 1024 * 1024) |
| 82 | + metrics += fmt.Sprintf("%s: %.1f MB/s, ", m.Name, spdMBPS) |
| 83 | + case ProgressDuration: |
| 84 | + metrics += m.Name + ": " + time.Duration(atomic.LoadInt64(m.Value)).String() + ", " |
| 85 | + case ProgressGauge: |
| 86 | + metrics += fmt.Sprintf("%s: %d, ", m.Name, atomic.LoadInt64(m.Value)) |
| 87 | + } |
| 88 | + } |
| 89 | + |
| 90 | + if metrics != "" { |
| 91 | + metrics = metrics[:len(metrics)-2] |
| 92 | + } |
| 93 | + |
| 94 | + return metrics |
| 95 | +} |
| 96 | + |
| 97 | +// Start spawns background progress reporter. |
| 98 | +func (p *Progress) Start(total int64, current func() int64, task string) { |
| 99 | + p.done = make(chan bool) |
| 100 | + atomic.StoreInt64(&p.lines, 0) |
| 101 | + p.task = task |
| 102 | + p.current = current |
| 103 | + |
| 104 | + interval := p.Interval |
| 105 | + if interval == 0 { |
| 106 | + interval = time.Second |
| 107 | + } |
| 108 | + |
| 109 | + p.prnt = p.Print |
| 110 | + if p.prnt == nil { |
| 111 | + p.prnt = func(s ProgressStatus) { |
| 112 | + println(DefaultStatus(s)) |
| 113 | + } |
| 114 | + } |
| 115 | + |
| 116 | + p.start = time.Now() |
| 117 | + p.tot = float64(total) |
| 118 | + done := p.done |
| 119 | + t := time.NewTicker(interval) |
| 120 | + |
| 121 | + go func() { |
| 122 | + for { |
| 123 | + select { |
| 124 | + case <-t.C: |
| 125 | + p.printStatus(false) |
| 126 | + |
| 127 | + case <-done: |
| 128 | + t.Stop() |
| 129 | + |
| 130 | + return |
| 131 | + } |
| 132 | + } |
| 133 | + }() |
| 134 | +} |
| 135 | + |
| 136 | +// AddMetrics adds more metrics to progress status message. |
| 137 | +func (p *Progress) AddMetrics(metrics ...ProgressMetric) { |
| 138 | + p.metrics = append(p.metrics, metrics...) |
| 139 | +} |
| 140 | + |
| 141 | +func (p *Progress) printStatus(last bool) { |
| 142 | + s := ProgressStatus{} |
| 143 | + s.Task = p.task |
| 144 | + s.LinesCompleted = atomic.LoadInt64(&p.lines) |
| 145 | + s.Metrics = p.metrics |
| 146 | + |
| 147 | + b := float64(p.current()) |
| 148 | + s.DonePercent = 100 * b / p.tot |
| 149 | + s.Elapsed = time.Since(p.start) |
| 150 | + s.SpeedMBPS = (b / s.Elapsed.Seconds()) / (1024 * 1024) |
| 151 | + s.SpeedLPS = float64(s.LinesCompleted) / s.Elapsed.Seconds() |
| 152 | + |
| 153 | + s.Remaining = time.Duration(float64(100*s.Elapsed)/s.DonePercent) - s.Elapsed |
| 154 | + s.Remaining = s.Remaining.Truncate(time.Second) |
| 155 | + |
| 156 | + if s.Remaining > 100*time.Millisecond || last { |
| 157 | + p.prnt(s) |
| 158 | + } |
| 159 | +} |
| 160 | + |
| 161 | +// CountLine increments line counter. |
| 162 | +func (p *Progress) CountLine() int64 { |
| 163 | + return atomic.AddInt64(&p.lines, 1) |
| 164 | +} |
| 165 | + |
| 166 | +// Lines returns number of counted lines. |
| 167 | +func (p *Progress) Lines() int64 { |
| 168 | + return atomic.LoadInt64(&p.lines) |
| 169 | +} |
| 170 | + |
| 171 | +// Stop stops progress reporting. |
| 172 | +func (p *Progress) Stop() { |
| 173 | + p.printStatus(true) |
| 174 | + p.metrics = nil |
| 175 | + |
| 176 | + close(p.done) |
| 177 | +} |
| 178 | + |
| 179 | +// CountingReader wraps io.Reader to count bytes. |
| 180 | +type CountingReader struct { |
| 181 | + Reader io.Reader |
| 182 | + |
| 183 | + readBytes int64 |
| 184 | +} |
| 185 | + |
| 186 | +// Read reads and counts bytes. |
| 187 | +func (cr *CountingReader) Read(p []byte) (n int, err error) { |
| 188 | + n, err = cr.Reader.Read(p) |
| 189 | + |
| 190 | + atomic.AddInt64(&cr.readBytes, int64(n)) |
| 191 | + |
| 192 | + return n, err |
| 193 | +} |
| 194 | + |
| 195 | +// Bytes returns number of read bytes. |
| 196 | +func (cr *CountingReader) Bytes() int64 { |
| 197 | + return atomic.LoadInt64(&cr.readBytes) |
| 198 | +} |
| 199 | + |
| 200 | +// CountingWriter wraps io.Writer to count bytes. |
| 201 | +type CountingWriter struct { |
| 202 | + Writer io.Writer |
| 203 | + |
| 204 | + writtenBytes int64 |
| 205 | +} |
| 206 | + |
| 207 | +// Write writes and counts bytes. |
| 208 | +func (cr *CountingWriter) Write(p []byte) (n int, err error) { |
| 209 | + n, err = cr.Writer.Write(p) |
| 210 | + |
| 211 | + atomic.AddInt64(&cr.writtenBytes, int64(n)) |
| 212 | + |
| 213 | + return n, err |
| 214 | +} |
| 215 | + |
| 216 | +// Bytes returns number of written bytes. |
| 217 | +func (cr *CountingWriter) Bytes() int64 { |
| 218 | + return atomic.LoadInt64(&cr.writtenBytes) |
| 219 | +} |
| 220 | + |
| 221 | +// MetricsExposer provides metric counters. |
| 222 | +type MetricsExposer interface { |
| 223 | + Metrics() []ProgressMetric |
| 224 | +} |
0 commit comments