Skip to content

Commit b74dd23

Browse files
committed
memoryWatcher: move definition to a separate file
1 parent 0e8133e commit b74dd23

2 files changed

Lines changed: 145 additions & 133 deletions

File tree

pipe/memory_watcher.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package pipe
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"time"
8+
)
9+
10+
// memoryWatcher monitors a `LimitedStage`'s memory usage, optionally
11+
// logging its peak RSS when it finishes, and optionally killing the
12+
// stage if it exceeds a configured memory limit.
13+
type memoryWatcher struct {
14+
stage LimitableStage
15+
eventHandler func(e *Event)
16+
17+
limit *uint64 // non-nil enables kill-at-limit
18+
observe bool // log peak RSS when the stage exits
19+
20+
maxRSS uint64
21+
samples int
22+
errCount int
23+
consecutiveErrors int
24+
}
25+
26+
// MemoryWatchOption configures a MemoryWatch stage.
27+
type MemoryWatchOption func(*memoryWatcher)
28+
29+
// WithMemoryLimit makes MemoryWatch kill the stage when its RSS exceeds
30+
// byteLimit.
31+
func WithMemoryLimit(byteLimit uint64) MemoryWatchOption {
32+
return func(mw *memoryWatcher) {
33+
mw.limit = &byteLimit
34+
}
35+
}
36+
37+
// WithPeakUsageLogging makes MemoryWatch log the peak RSS when the stage
38+
// exits.
39+
func WithPeakUsageLogging() MemoryWatchOption {
40+
return func(mw *memoryWatcher) {
41+
mw.observe = true
42+
}
43+
}
44+
45+
// watch is a `memoryWatchFunc` that watches the memory usage of the
46+
// specified `stage`.
47+
func (mw *memoryWatcher) watch(ctx context.Context) {
48+
t := time.NewTicker(memoryPollInterval)
49+
50+
watchLoop:
51+
for {
52+
select {
53+
case <-ctx.Done():
54+
break watchLoop
55+
case <-t.C:
56+
if mw.update(ctx) {
57+
// The stage was killed.
58+
break watchLoop
59+
}
60+
}
61+
}
62+
63+
t.Stop()
64+
65+
if mw.observe {
66+
<-ctx.Done()
67+
mw.reportPeakUsage()
68+
}
69+
}
70+
71+
// update samples the current memory usage and updates internal stats.
72+
// Return true if the stage was killed for exceeding the memory limit.
73+
func (mw *memoryWatcher) update(ctx context.Context) bool {
74+
rss, err := mw.stage.GetRSSAnon(ctx)
75+
if err != nil {
76+
mw.handleGetRSSError(err)
77+
return false
78+
}
79+
80+
mw.consecutiveErrors = 0
81+
mw.samples++
82+
if rss > mw.maxRSS {
83+
mw.maxRSS = rss
84+
}
85+
86+
if mw.limit != nil && rss >= *mw.limit {
87+
mw.killStage(rss)
88+
return true
89+
}
90+
91+
return false
92+
}
93+
94+
// handleGetRSSError deals with error `err` that happened when trying
95+
// to get `stage`'s memory usage.
96+
func (mw *memoryWatcher) handleGetRSSError(err error) {
97+
if !errors.Is(err, errProcessInfoMissing) {
98+
mw.errCount++
99+
mw.consecutiveErrors++
100+
if mw.consecutiveErrors == 2 {
101+
mw.eventHandler(&Event{
102+
Command: mw.stage.Name(),
103+
Msg: "error getting RSS",
104+
Err: err,
105+
})
106+
}
107+
} else {
108+
mw.consecutiveErrors = 0
109+
}
110+
}
111+
112+
// killStage kills the stage and reports and event saying what it did.
113+
func (mw *memoryWatcher) killStage(rss uint64) {
114+
// Guarantee the over-limit stage is killed even if
115+
// the user's event handler panics.
116+
defer mw.stage.Kill(ErrMemoryLimitExceeded)
117+
118+
mw.eventHandler(&Event{
119+
Command: mw.stage.Name(),
120+
Msg: "stage exceeded allowed memory use",
121+
Err: fmt.Errorf("stage exceeded allowed memory use"),
122+
Context: map[string]any{
123+
"limit": *mw.limit,
124+
"used": rss,
125+
},
126+
})
127+
}
128+
129+
// reportPeakUsage sends an event reporting the peak usage that has
130+
// been seen for `stage`.
131+
func (mw *memoryWatcher) reportPeakUsage() {
132+
mw.eventHandler(&Event{
133+
Command: mw.stage.Name(),
134+
Msg: "peak memory usage",
135+
Context: map[string]any{
136+
"max_rss_bytes": mw.maxRSS,
137+
"samples": mw.samples,
138+
"errors": mw.errCount,
139+
},
140+
})
141+
}

pipe/memorylimit.go

Lines changed: 4 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,11 @@ type LimitableStage interface {
2424
Kill(error)
2525
}
2626

27-
// MemoryWatchOption configures a MemoryWatch stage.
28-
type MemoryWatchOption func(*memoryWatcher)
29-
30-
// WithMemoryLimit makes MemoryWatch kill the stage when its RSS exceeds
31-
// byteLimit.
32-
func WithMemoryLimit(byteLimit uint64) MemoryWatchOption {
33-
return func(mw *memoryWatcher) {
34-
mw.limit = &byteLimit
35-
}
36-
}
37-
38-
// WithPeakUsageLogging makes MemoryWatch log the peak RSS when the stage
39-
// exits.
40-
func WithPeakUsageLogging() MemoryWatchOption {
41-
return func(mw *memoryWatcher) {
42-
mw.observe = true
43-
}
44-
}
45-
4627
// MemoryWatch watches the memory usage of the stage and reports via
47-
// eventHandler. With WithMemoryLimit it kills the stage when the limit is
48-
// exceeded; with WithPeakUsageLogging it logs the peak RSS when the stage
49-
// exits. At least one of the two options is required.
28+
// eventHandler. `opts` configure its behavior. With WithMemoryLimit
29+
// it kills the stage when the limit is exceeded; with
30+
// WithPeakUsageLogging it logs the peak RSS when the stage exits. At
31+
// least one of these options is required.
5032
//
5133
// If the event handler panics while reporting the over-limit event, the
5234
// stage is still killed. A panic in any other event-handler call (an
@@ -95,117 +77,6 @@ func MemoryWatch(stage Stage, eventHandler func(e *Event), opts ...MemoryWatchOp
9577
}
9678
}
9779

98-
type memoryWatcher struct {
99-
stage LimitableStage
100-
eventHandler func(e *Event)
101-
102-
limit *uint64 // non-nil enables kill-at-limit
103-
observe bool // log peak RSS when the stage exits
104-
105-
maxRSS uint64
106-
samples int
107-
errCount int
108-
consecutiveErrors int
109-
}
110-
111-
// watch is a `memoryWatchFunc` that watches the memory usage of the
112-
// specified `stage`.
113-
func (mw *memoryWatcher) watch(ctx context.Context) {
114-
t := time.NewTicker(memoryPollInterval)
115-
116-
watchLoop:
117-
for {
118-
select {
119-
case <-ctx.Done():
120-
break watchLoop
121-
case <-t.C:
122-
if mw.update(ctx) {
123-
// The stage was killed.
124-
break watchLoop
125-
}
126-
}
127-
}
128-
129-
t.Stop()
130-
131-
if mw.observe {
132-
<-ctx.Done()
133-
mw.reportPeakUsage()
134-
}
135-
}
136-
137-
// update samples the current memory usage and updates internal stats.
138-
// Return true if the stage was killed for exceeding the memory limit.
139-
func (mw *memoryWatcher) update(ctx context.Context) bool {
140-
rss, err := mw.stage.GetRSSAnon(ctx)
141-
if err != nil {
142-
mw.handleGetRSSError(err)
143-
return false
144-
}
145-
146-
mw.consecutiveErrors = 0
147-
mw.samples++
148-
if rss > mw.maxRSS {
149-
mw.maxRSS = rss
150-
}
151-
152-
if mw.limit != nil && rss >= *mw.limit {
153-
mw.killStage(rss)
154-
return true
155-
}
156-
157-
return false
158-
}
159-
160-
// handleGetRSSError deals with error `err` that happened when trying
161-
// to get `stage`'s memory usage.
162-
func (mw *memoryWatcher) handleGetRSSError(err error) {
163-
if !errors.Is(err, errProcessInfoMissing) {
164-
mw.errCount++
165-
mw.consecutiveErrors++
166-
if mw.consecutiveErrors == 2 {
167-
mw.eventHandler(&Event{
168-
Command: mw.stage.Name(),
169-
Msg: "error getting RSS",
170-
Err: err,
171-
})
172-
}
173-
} else {
174-
mw.consecutiveErrors = 0
175-
}
176-
}
177-
178-
// killStage kills the stage and reports and event saying what it did.
179-
func (mw *memoryWatcher) killStage(rss uint64) {
180-
// Guarantee the over-limit stage is killed even if
181-
// the user's event handler panics.
182-
defer mw.stage.Kill(ErrMemoryLimitExceeded)
183-
184-
mw.eventHandler(&Event{
185-
Command: mw.stage.Name(),
186-
Msg: "stage exceeded allowed memory use",
187-
Err: fmt.Errorf("stage exceeded allowed memory use"),
188-
Context: map[string]any{
189-
"limit": *mw.limit,
190-
"used": rss,
191-
},
192-
})
193-
}
194-
195-
// reportPeakUsage sends an event reporting the peak usage that has
196-
// been seen for `stage`.
197-
func (mw *memoryWatcher) reportPeakUsage() {
198-
mw.eventHandler(&Event{
199-
Command: mw.stage.Name(),
200-
Msg: "peak memory usage",
201-
Context: map[string]any{
202-
"max_rss_bytes": mw.maxRSS,
203-
"samples": mw.samples,
204-
"errors": mw.errCount,
205-
},
206-
})
207-
}
208-
20980
type memoryWatchStage struct {
21081
nameSuffix string
21182
stage LimitableStage

0 commit comments

Comments
 (0)