-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
Copy pathbuffer.go
151 lines (127 loc) · 4.05 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Go MySQL Driver - A MySQL-Driver for Go's database/sql package
//
// Copyright 2013 The Go-MySQL-Driver Authors. All rights reserved.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package mysql
import (
"io"
)
const defaultBufSize = 4096
const maxCachedBufSize = 256 * 1024
// readerFunc is a function that compatible with io.Reader.
// We use this function type instead of io.Reader because we want to
// just pass mc.readWithTimeout.
type readerFunc func([]byte) (int, error)
// A buffer which is used for both reading and writing.
// This is possible since communication on each connection is synchronous.
// In other words, we can't write and read simultaneously on the same connection.
// The buffer is similar to bufio.Reader / Writer but zero-copy-ish
// Also highly optimized for this particular use case.
type buffer struct {
buf []byte // read buffer.
cachedBuf []byte // buffer that will be reused. len(cachedBuf) <= maxCachedBufSize.
}
// newBuffer allocates and returns a new buffer.
func newBuffer() buffer {
return buffer{
cachedBuf: make([]byte, defaultBufSize),
}
}
// busy returns true if the read buffer is not empty.
func (b *buffer) busy() bool {
return len(b.buf) > 0
}
// fill reads into the read buffer until at least _need_ bytes are in it.
func (b *buffer) fill(need int, r readerFunc) error {
// we'll move the contents of the current buffer to dest before filling it.
dest := b.cachedBuf
// grow buffer if necessary to fit the whole packet.
if need > len(dest) {
// Round up to the next multiple of the default size
dest = make([]byte, ((need/defaultBufSize)+1)*defaultBufSize)
// if the allocated buffer is not too large, move it to backing storage
// to prevent extra allocations on applications that perform large reads
if len(dest) <= maxCachedBufSize {
b.cachedBuf = dest
}
}
// move the existing data to the start of the buffer.
n := len(b.buf)
copy(dest[:n], b.buf)
for {
nn, err := r(dest[n:])
n += nn
if err == nil && n < need {
continue
}
b.buf = dest[:n]
if err == io.EOF {
if n < need {
err = io.ErrUnexpectedEOF
} else {
err = nil
}
}
return err
}
}
// returns next N bytes from buffer.
// The returned slice is only guaranteed to be valid until the next read
func (b *buffer) readNext(need int, r readerFunc) ([]byte, error) {
if len(b.buf) < need {
// refill
if err := b.fill(need, r); err != nil {
return nil, err
}
}
data := b.buf[:need]
b.buf = b.buf[need:]
return data, nil
}
// takeBuffer returns a buffer with the requested size.
// If possible, a slice from the existing buffer is returned.
// Otherwise a bigger buffer is made.
// Only one buffer (total) can be used at a time.
func (b *buffer) takeBuffer(length int) ([]byte, error) {
if b.busy() {
return nil, ErrBusyBuffer
}
// test (cheap) general case first
if length <= len(b.cachedBuf) {
return b.cachedBuf[:length], nil
}
if length < maxCachedBufSize {
b.cachedBuf = make([]byte, length)
return b.cachedBuf, nil
}
// buffer is larger than we want to store.
return make([]byte, length), nil
}
// takeSmallBuffer is shortcut which can be used if length is
// known to be smaller than defaultBufSize.
// Only one buffer (total) can be used at a time.
func (b *buffer) takeSmallBuffer(length int) ([]byte, error) {
if b.busy() {
return nil, ErrBusyBuffer
}
return b.cachedBuf[:length], nil
}
// takeCompleteBuffer returns the complete existing buffer.
// This can be used if the necessary buffer size is unknown.
// cap and len of the returned buffer will be equal.
// Only one buffer (total) can be used at a time.
func (b *buffer) takeCompleteBuffer() ([]byte, error) {
if b.busy() {
return nil, ErrBusyBuffer
}
return b.cachedBuf, nil
}
// store stores buf, an updated buffer, if its suitable to do so.
func (b *buffer) store(buf []byte) {
if cap(buf) <= maxCachedBufSize && cap(buf) > cap(b.cachedBuf) {
b.cachedBuf = buf[:cap(buf)]
}
}