Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,11 +665,10 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {

func (l *loopyWriter) registerStreamHandler(h *registerStream) {
str := &outStream{
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
reader: mem.BufferSlice{}.Reader(),
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
}
l.estdStreams[h.streamID] = str
}
Expand Down Expand Up @@ -701,11 +700,10 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
}
// Case 2: Client wants to originate stream.
str := &outStream{
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
reader: mem.BufferSlice{}.Reader(),
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
}
return l.originateStream(str, h)
}
Expand Down Expand Up @@ -948,11 +946,11 @@ func (l *loopyWriter) processData() (bool, error) {
if str == nil {
return true, nil
}
reader := str.reader
reader := &str.reader
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
if !dataItem.processing {
dataItem.processing = true
str.reader.Reset(dataItem.data)
reader.Reset(dataItem.data)
dataItem.data.Free()
}
// A data item is represented by a dataFrame, since it later translates into
Expand Down
39 changes: 17 additions & 22 deletions mem/buffer_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ func (s BufferSlice) MaterializeToBuffer(pool BufferPool) Buffer {

// Reader returns a new Reader for the input slice after taking references to
// each underlying buffer.
func (s BufferSlice) Reader() Reader {
func (s BufferSlice) Reader() *Reader {
s.Ref()
return &sliceReader{
return &Reader{
data: s,
len: s.Len(),
}
Expand All @@ -129,46 +129,40 @@ func (s BufferSlice) Reader() Reader {
// with other parts systems. It also provides an additional convenience method
// Remaining(), which returns the number of unread bytes remaining in the slice.
// Buffers will be freed as they are read.
type Reader interface {
io.Reader
io.ByteReader
// Close frees the underlying BufferSlice and never returns an error. Subsequent
// calls to Read will return (0, io.EOF).
Close() error
// Remaining returns the number of unread bytes remaining in the slice.
Remaining() int
// Reset frees the currently held buffer slice and starts reading from the
// provided slice. This allows reusing the reader object.
Reset(s BufferSlice)
}

type sliceReader struct {
// A Reader can be constructed from a BufferSlice; alternatively the zero value
// of a Reader may be used after calling Reset on it.
type Reader struct {
data BufferSlice
len int
// The index into data[0].ReadOnlyData().
bufferIdx int
}

func (r *sliceReader) Remaining() int {
// Remaining returns the number of unread bytes remaining in the slice.
func (r *Reader) Remaining() int {
return r.len
}

func (r *sliceReader) Reset(s BufferSlice) {
// Reset frees the currently held buffer slice and starts reading from the
// provided slice. This allows reusing the reader object.
func (r *Reader) Reset(s BufferSlice) {
r.data.Free()
s.Ref()
r.data = s
r.len = s.Len()
r.bufferIdx = 0
}

func (r *sliceReader) Close() error {
// Close frees the underlying BufferSlice and never returns an error. Subsequent
// calls to Read will return (0, io.EOF).
func (r *Reader) Close() error {
r.data.Free()
r.data = nil
r.len = 0
return nil
}

func (r *sliceReader) freeFirstBufferIfEmpty() bool {
func (r *Reader) freeFirstBufferIfEmpty() bool {
if len(r.data) == 0 || r.bufferIdx != len(r.data[0].ReadOnlyData()) {
return false
}
Expand All @@ -179,7 +173,7 @@ func (r *sliceReader) freeFirstBufferIfEmpty() bool {
return true
}

func (r *sliceReader) Read(buf []byte) (n int, _ error) {
func (r *Reader) Read(buf []byte) (n int, _ error) {
if r.len == 0 {
return 0, io.EOF
}
Expand All @@ -202,7 +196,8 @@ func (r *sliceReader) Read(buf []byte) (n int, _ error) {
return n, nil
}

func (r *sliceReader) ReadByte() (byte, error) {
// ReadByte reads a single byte.
func (r *Reader) ReadByte() (byte, error) {
if r.len == 0 {
return 0, io.EOF
}
Expand Down