Skip to content

Commit 6f1a001

Browse files
authored
Addresses issue #18: adding BytesReplacer interface to allow BytesReplacingReader to have a customized sizing/search strategy (#19)
Originally `BytesReplacingReader` is designed for a single pair of `search/replace` token replacement in a stream. Issue #18 reveals the need for multi-token replacement in a memory efficient way. Alternative to this solution can we ask user to nest `BytesReplacingReader` for each pair of `search/replace`. However that approach would allocate `r.buf` for each and every pair - if the # of pairs is large, which is evidently so in issue #8, the memory consumption is huge. Another alternative is to ask user to use `BytesReplacingReader` to finish one pair of `search/replace` replacement in a stream and repeat the process multiple times. That's equally undesirable, given high memory/disk demand for this approach. Instead, we now introduce a new interfacer `BytesReplacer` that allows `BytesReplacingReader` to do buf allocation sizing estimate customization as well as search customization. API is strictly backward compatible: `NewBytesReplacingReader` simply creates a simple single `search/replace` replacer and then uses the new `NewBytesReplacingReaderEx` underneath. We demonstrated the multi-token replacement strategy demanded in issue #18x in a test. There is no performance degradation[1]: Before change: ``` BenchmarkBytesReplacingReader_70MBLength_500Targets-8 62 18566817 ns/op 423499484 B/op 49 allocs/op BenchmarkRegularReader_70MBLength_500Targets-8 74 16334800 ns/op 423499325 B/op 49 allocs/op BenchmarkBytesReplacingReader_1KBLength_20Targets-8 1583468 756.7 ns/op 2864 B/op 4 allocs/op BenchmarkRegularReader_1KBLength_20Targets-8 3863762 309.0 ns/op 2864 B/op 4 allocs/op BenchmarkBytesReplacingReader_50KBLength_1000Targets-8 33760 35490 ns/op 210480 B/op 17 allocs/op BenchmarkRegularReader_50KBLength_1000Targets-8 95044 12714 ns/op 210480 B/op 17 allocs/op ``` After: ``` BenchmarkBytesReplacingReader_70MBLength_500Targets-8 61 18214781 ns/op 423499484 B/op 49 allocs/op BenchmarkRegularReader_70MBLength_500Targets-8 74 16589935 ns/op 423499329 B/op 49 allocs/op BenchmarkBytesReplacingReader_1KBLength_20Targets-8 1552221 772.6 ns/op 2864 B/op 4 allocs/op BenchmarkRegularReader_1KBLength_20Targets-8 3879327 308.9 ns/op 2864 B/op 4 allocs/op BenchmarkBytesReplacingReader_50KBLength_1000Targets-8 32160 37192 ns/op 210480 B/op 17 allocs/op BenchmarkRegularReader_50KBLength_1000Targets-8 95293 12419 ns/op 210480 B/op 17 allocs/op ``` [1] strictly speaking there is one extra allocation (for creating a single `search/replace` replace) if the existing `r.Reset` is used, thus if we really want to be pedantic, yes there is a minor perf degradation, if user of the API choose to now modify their code at all.
1 parent 5e34763 commit 6f1a001

File tree

4 files changed

+391
-231
lines changed

4 files changed

+391
-231
lines changed

ios/bytesReplacingReader.go

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package ios
2+
3+
import (
4+
"bytes"
5+
"io"
6+
)
7+
8+
// BytesReplacer allows customization on how BytesReplacingReader does sizing estimate during
9+
// initialization/reset and does search and replacement during the execution.
10+
type BytesReplacer interface {
11+
// GetSizingHints returns hints for BytesReplacingReader to do sizing estimate and allocation.
12+
// Return values:
13+
// - 1st: max search token len
14+
// - 2nd: max replace token len
15+
// - 3rd: max (search_len / replace_len) ratio that is less than 1,
16+
// if none of the search/replace ratio is less than 1, then return a negative number.
17+
// will only be called once during BytesReplacingReader initialization/reset.
18+
GetSizingHints() (int, int, float64)
19+
// Index does token search for BytesReplacingReader.
20+
// Return values:
21+
// - 1st: index of the first found search token; -1, if not found;
22+
// - 2nd: the found search token; ignored if not found;
23+
// - 3rd: the matching replace token; ignored if not found;
24+
Index(buf []byte) (int, []byte, []byte)
25+
}
26+
27+
// BytesReplacingReader allows transparent replacement of a given token during read operation.
28+
type BytesReplacingReader struct {
29+
replacer BytesReplacer
30+
maxSearchTokenLen int
31+
r io.Reader
32+
err error
33+
buf []byte
34+
// buf[0:buf0]: bytes already processed; buf[buf0:buf1] bytes read in but not yet processed.
35+
buf0, buf1 int
36+
// because we need to replace 'search' with 'replace', this marks the max bytes we can read into buf
37+
max int
38+
}
39+
40+
const defaultBufSize = int(4096)
41+
42+
func max(a, b int) int {
43+
if a > b {
44+
return a
45+
}
46+
return b
47+
}
48+
49+
// ResetEx allows reuse of a previous allocated `*BytesReplacingReader` for buf allocation optimization.
50+
func (r *BytesReplacingReader) ResetEx(r1 io.Reader, replacer BytesReplacer) *BytesReplacingReader {
51+
if r1 == nil {
52+
panic("io.Reader cannot be nil")
53+
}
54+
r.replacer = replacer
55+
maxSearchTokenLen, maxReplaceTokenLen, maxSearchOverReplaceLenRatio := r.replacer.GetSizingHints()
56+
if maxSearchTokenLen == 0 {
57+
panic("search token cannot be nil/empty")
58+
}
59+
r.maxSearchTokenLen = maxSearchTokenLen
60+
r.r = r1
61+
r.err = nil
62+
bufSize := max(defaultBufSize, max(maxSearchTokenLen, maxReplaceTokenLen))
63+
if r.buf == nil || len(r.buf) < bufSize {
64+
r.buf = make([]byte, bufSize)
65+
}
66+
r.buf0 = 0
67+
r.buf1 = 0
68+
r.max = len(r.buf)
69+
if maxSearchOverReplaceLenRatio > 0 {
70+
// If len(search) < len(replace), then we have to assume the worst case:
71+
// what's the max bound value such that if we have consecutive 'search' filling up
72+
// the buf up to buf[:max], and all of them are placed with 'replace', and the final
73+
// result won't end up exceed the len(buf)?
74+
r.max = int(maxSearchOverReplaceLenRatio * float64(len(r.buf)))
75+
}
76+
return r
77+
}
78+
79+
// Reset allows reuse of a previous allocated `*BytesReplacingReader` for buf allocation optimization.
80+
// `search` cannot be nil/empty. `replace` can.
81+
func (r *BytesReplacingReader) Reset(r1 io.Reader, search1, replace1 []byte) *BytesReplacingReader {
82+
return r.ResetEx(r1, &singleSearchReplaceReplacer{search: search1, replace: replace1})
83+
}
84+
85+
// Read implements the `io.Reader` interface.
86+
func (r *BytesReplacingReader) Read(p []byte) (int, error) {
87+
n := 0
88+
for {
89+
if r.buf0 > 0 {
90+
n = copy(p, r.buf[0:r.buf0])
91+
r.buf0 -= n
92+
r.buf1 -= n
93+
if r.buf1 == 0 && r.err != nil {
94+
return n, r.err
95+
}
96+
copy(r.buf, r.buf[n:r.buf1+n])
97+
return n, nil
98+
} else if r.err != nil {
99+
return 0, r.err
100+
}
101+
102+
n, r.err = r.r.Read(r.buf[r.buf1:r.max])
103+
if n > 0 {
104+
r.buf1 += n
105+
for {
106+
index, search, replace := r.replacer.Index(r.buf[r.buf0:r.buf1])
107+
if index < 0 {
108+
r.buf0 = max(r.buf0, r.buf1-r.maxSearchTokenLen+1)
109+
break
110+
}
111+
searchTokenLen := len(search)
112+
if searchTokenLen == 0 {
113+
panic("search token cannot be nil/empty")
114+
}
115+
replaceTokenLen := len(replace)
116+
lenDelta := replaceTokenLen - searchTokenLen
117+
index += r.buf0
118+
copy(r.buf[index+replaceTokenLen:r.buf1+lenDelta], r.buf[index+searchTokenLen:r.buf1])
119+
copy(r.buf[index:index+replaceTokenLen], replace)
120+
r.buf0 = index + replaceTokenLen
121+
r.buf1 += lenDelta
122+
}
123+
}
124+
if r.err != nil {
125+
r.buf0 = r.buf1
126+
}
127+
}
128+
}
129+
130+
type singleSearchReplaceReplacer struct {
131+
search []byte
132+
replace []byte
133+
}
134+
135+
func (r *singleSearchReplaceReplacer) GetSizingHints() (int, int, float64) {
136+
searchLen := len(r.search)
137+
replaceLen := len(r.replace)
138+
ratio := float64(-1)
139+
if searchLen < replaceLen {
140+
ratio = float64(searchLen) / float64(replaceLen)
141+
}
142+
return searchLen, replaceLen, ratio
143+
}
144+
145+
func (r *singleSearchReplaceReplacer) Index(buf []byte) (int, []byte, []byte) {
146+
return bytes.Index(buf, r.search), r.search, r.replace
147+
}
148+
149+
// NewBytesReplacingReader creates a new `*BytesReplacingReader` for a single pair of search:replace token replacement.
150+
// `search` cannot be nil/empty. `replace` can.
151+
func NewBytesReplacingReader(r io.Reader, search, replace []byte) *BytesReplacingReader {
152+
return (&BytesReplacingReader{}).ResetEx(r, &singleSearchReplaceReplacer{search: search, replace: replace})
153+
}
154+
155+
// NewBytesReplacingReaderEx creates a new `*BytesReplacingReader` for a given BytesReplacer customization.
156+
func NewBytesReplacingReaderEx(r io.Reader, replacer BytesReplacer) *BytesReplacingReader {
157+
return (&BytesReplacingReader{}).ResetEx(r, replacer)
158+
}

ios/bytesReplacingReader_test.go

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
package ios
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"io/ioutil"
7+
"math/rand"
8+
"strings"
9+
"testing"
10+
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
func TestBytesReplacingReader(t *testing.T) {
15+
for _, test := range []struct {
16+
name string
17+
input []byte
18+
search []byte
19+
replace []byte
20+
expected []byte
21+
}{
22+
{
23+
name: "len(replace) > len(search)",
24+
input: []byte{1, 2, 3, 2, 2, 3, 4, 5},
25+
search: []byte{2, 3},
26+
replace: []byte{4, 5, 6},
27+
expected: []byte{1, 4, 5, 6, 2, 4, 5, 6, 4, 5},
28+
},
29+
{
30+
name: "len(replace) < len(search)",
31+
input: []byte{1, 2, 3, 2, 2, 3, 4, 5, 6, 7, 8},
32+
search: []byte{2, 3, 2},
33+
replace: []byte{9},
34+
expected: []byte{1, 9, 2, 3, 4, 5, 6, 7, 8},
35+
},
36+
{
37+
name: "strip out search, no replace",
38+
input: []byte{1, 2, 3, 2, 2, 3, 4, 2, 3, 2, 8},
39+
search: []byte{2, 3, 2},
40+
replace: []byte{},
41+
expected: []byte{1, 2, 3, 4, 8},
42+
},
43+
{
44+
name: "len(replace) == len(search)",
45+
input: []byte{1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5},
46+
search: []byte{5, 5},
47+
replace: []byte{6, 6},
48+
expected: []byte{1, 2, 3, 4, 6, 6, 6, 6, 6, 6, 6, 6, 5},
49+
},
50+
{
51+
name: "double quote -> single quote",
52+
input: []byte(`r = NewLineNumReportingCsvReader(strings.NewReader("a,b,c"))`),
53+
search: []byte(`"`),
54+
replace: []byte(`'`),
55+
expected: []byte(`r = NewLineNumReportingCsvReader(strings.NewReader('a,b,c'))`),
56+
},
57+
} {
58+
t.Run(test.name, func(t *testing.T) {
59+
r := NewBytesReplacingReader(bytes.NewReader(test.input), test.search, test.replace)
60+
result, err := ioutil.ReadAll(r)
61+
assert.NoError(t, err)
62+
assert.Equal(t, test.expected, result)
63+
64+
})
65+
}
66+
67+
assert.PanicsWithValue(t, "io.Reader cannot be nil", func() {
68+
NewBytesReplacingReader(nil, []byte{1}, []byte{2})
69+
})
70+
assert.PanicsWithValue(t, "search token cannot be nil/empty", func() {
71+
(&BytesReplacingReader{}).Reset(strings.NewReader("test"), nil, []byte("est"))
72+
})
73+
}
74+
75+
func createTestInput(length int, numTarget int) []byte {
76+
rand.Seed(1234) // fixed rand seed to ensure bench stability
77+
b := make([]byte, length)
78+
for i := 0; i < length; i++ {
79+
b[i] = byte(rand.Intn(100) + 10) // all regular numbers >= 10
80+
}
81+
for i := 0; i < numTarget; i++ {
82+
for {
83+
index := rand.Intn(length)
84+
if b[index] == 7 {
85+
continue
86+
}
87+
b[index] = 7 // special number 7 we will search for and replace with 8.
88+
break
89+
}
90+
}
91+
return b
92+
}
93+
94+
var testInput70MBLength500Targets = createTestInput(70*1024*1024, 500)
95+
var testInput1KBLength20Targets = createTestInput(1024, 20)
96+
var testInput50KBLength1000Targets = createTestInput(50*1024, 1000)
97+
var testSearchFor = []byte{7}
98+
var testReplaceWith = []byte{8}
99+
var testReplacer = &singleSearchReplaceReplacer{search: testSearchFor, replace: testReplaceWith}
100+
101+
func BenchmarkBytesReplacingReader_70MBLength_500Targets(b *testing.B) {
102+
r := &BytesReplacingReader{}
103+
for i := 0; i < b.N; i++ {
104+
r.ResetEx(bytes.NewReader(testInput70MBLength500Targets), testReplacer)
105+
_, _ = ioutil.ReadAll(r)
106+
}
107+
}
108+
109+
func BenchmarkRegularReader_70MBLength_500Targets(b *testing.B) {
110+
for i := 0; i < b.N; i++ {
111+
_, _ = ioutil.ReadAll(bytes.NewReader(testInput70MBLength500Targets))
112+
}
113+
}
114+
115+
func BenchmarkBytesReplacingReader_1KBLength_20Targets(b *testing.B) {
116+
r := &BytesReplacingReader{}
117+
for i := 0; i < b.N; i++ {
118+
r.ResetEx(bytes.NewReader(testInput1KBLength20Targets), testReplacer)
119+
_, _ = ioutil.ReadAll(r)
120+
}
121+
}
122+
123+
func BenchmarkRegularReader_1KBLength_20Targets(b *testing.B) {
124+
for i := 0; i < b.N; i++ {
125+
_, _ = ioutil.ReadAll(bytes.NewReader(testInput1KBLength20Targets))
126+
}
127+
}
128+
129+
func BenchmarkBytesReplacingReader_50KBLength_1000Targets(b *testing.B) {
130+
r := &BytesReplacingReader{}
131+
for i := 0; i < b.N; i++ {
132+
r.ResetEx(bytes.NewReader(testInput50KBLength1000Targets), testReplacer)
133+
_, _ = ioutil.ReadAll(r)
134+
}
135+
}
136+
137+
func BenchmarkRegularReader_50KBLength_1000Targets(b *testing.B) {
138+
for i := 0; i < b.N; i++ {
139+
_, _ = ioutil.ReadAll(bytes.NewReader(testInput50KBLength1000Targets))
140+
}
141+
}
142+
143+
// The follow struct/test is to demonstrate how to do a different customization of BytesReplacer.
144+
type multiTokenReplacer struct {
145+
searches [][]byte
146+
replaces [][]byte
147+
}
148+
149+
func (r *multiTokenReplacer) GetSizingHints() (int, int, float64) {
150+
if len(r.searches) != len(r.replaces) {
151+
panic(fmt.Sprintf("len(searches) (%d) != len(replaces) (%d)", len(r.searches), len(r.replaces)))
152+
}
153+
if len(r.searches) == 0 {
154+
panic("searches must have at least one token")
155+
}
156+
maxSearchLen := 0
157+
maxReplaceLen := 0
158+
maxRatio := float64(-1)
159+
for i, _ := range r.searches {
160+
searchLen := len(r.searches[i])
161+
replaceLen := len(r.replaces[i])
162+
if searchLen > maxSearchLen {
163+
maxSearchLen = searchLen
164+
}
165+
if replaceLen > maxReplaceLen {
166+
maxReplaceLen = replaceLen
167+
}
168+
if searchLen < replaceLen {
169+
ratio := float64(searchLen) / float64(replaceLen)
170+
if ratio > maxRatio {
171+
maxRatio = ratio
172+
}
173+
}
174+
}
175+
return maxSearchLen, maxReplaceLen, maxRatio
176+
}
177+
178+
func (r *multiTokenReplacer) Index(buf []byte) (int, []byte, []byte) {
179+
for i, _ := range r.searches {
180+
index := bytes.Index(buf, r.searches[i])
181+
if index >= 0 {
182+
return index, r.searches[i], r.replaces[i]
183+
}
184+
}
185+
return -1, nil, nil
186+
}
187+
188+
func TestMultiTokenBytesReplacingReader(t *testing.T) {
189+
for _, test := range []struct {
190+
name string
191+
input []byte
192+
searches [][]byte
193+
replaces [][]byte
194+
expected []byte
195+
}{
196+
{
197+
name: "multi tokens; len(search) < len(replace); len(search) > len(replace); replace = nil",
198+
input: []byte("abcdefgop01234qrstuvwxyz"),
199+
searches: [][]byte{
200+
[]byte("abc"),
201+
[]byte("12"),
202+
[]byte("st"),
203+
[]byte("xyz"),
204+
},
205+
replaces: [][]byte{
206+
[]byte("one two three"),
207+
[]byte("twelve is an int"),
208+
nil,
209+
[]byte("uv"),
210+
},
211+
expected: []byte("one two threedefgop0twelve is an int34qruvwuv"),
212+
},
213+
} {
214+
replacer := &multiTokenReplacer{
215+
searches: test.searches,
216+
replaces: test.replaces,
217+
}
218+
r := NewBytesReplacingReaderEx(bytes.NewReader(test.input), replacer)
219+
result, err := ioutil.ReadAll(r)
220+
assert.NoError(t, err)
221+
assert.Equal(t, string(test.expected), string(result))
222+
}
223+
224+
r := (&BytesReplacingReader{}).ResetEx(
225+
strings.NewReader("test"),
226+
&multiTokenReplacer{
227+
searches: [][]byte{[]byte("abc"), []byte("")},
228+
replaces: [][]byte{[]byte("xyz"), []byte("wrong")},
229+
})
230+
assert.PanicsWithValue(t, "search token cannot be nil/empty", func() {
231+
_, _ = ioutil.ReadAll(r)
232+
})
233+
}

0 commit comments

Comments
 (0)