Skip to content

Commit e3a38f7

Browse files
Read uncompressed data directly into the page buffer
1 parent 8edb901 commit e3a38f7

File tree

1 file changed

+34
-2
lines changed

1 file changed

+34
-2
lines changed

parquet/file/page_reader.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,8 @@ type serializedPageReader struct {
374374
dataPageBuffer *memory.Buffer
375375
dictPageBuffer *memory.Buffer
376376
err error
377+
378+
isCompressed bool
377379
}
378380

379381
func (p *serializedPageReader) Close() error {
@@ -402,6 +404,7 @@ func (p *serializedPageReader) init(compressType compress.Compression, ctx *Cryp
402404
return err
403405
}
404406
p.codec = codec
407+
p.isCompressed = compressType != compress.Codecs.Uncompressed
405408

406409
if ctx != nil {
407410
p.cryptoCtx = *ctx
@@ -444,6 +447,7 @@ func NewPageReader(r parquet.BufferedReader, nrows int64, compressType compress.
444447
dictPageBuffer: memory.NewResizableBuffer(mem),
445448
}
446449
rdr.decompressBuffer.ResizeNoShrink(defaultPageHeaderSize)
450+
rdr.isCompressed = compressType != compress.Codecs.Uncompressed
447451
if ctx != nil {
448452
rdr.cryptoCtx = *ctx
449453
rdr.initDecryption()
@@ -460,6 +464,8 @@ func (p *serializedPageReader) Reset(r parquet.BufferedReader, nrows int64, comp
460464
if p.err != nil {
461465
return
462466
}
467+
p.isCompressed = compressType != compress.Codecs.Uncompressed
468+
463469
if ctx != nil {
464470
p.cryptoCtx = *ctx
465471
p.initDecryption()
@@ -502,6 +508,20 @@ func (p *serializedPageReader) Page() Page {
502508
return p.curPage
503509
}
504510

511+
func (p *serializedPageReader) readUncompressed(rd io.Reader, lenUncompressed int, buf []byte) ([]byte, error) {
512+
n, err := io.ReadFull(rd, buf[:lenUncompressed])
513+
if err != nil {
514+
return nil, err
515+
}
516+
if n != lenUncompressed {
517+
return nil, fmt.Errorf("parquet: expected to read %d bytes but only read %d", lenUncompressed, n)
518+
}
519+
if p.cryptoCtx.DataDecryptor != nil {
520+
buf = p.cryptoCtx.DataDecryptor.Decrypt(buf)
521+
}
522+
return buf[:lenUncompressed], nil
523+
}
524+
505525
func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf []byte) ([]byte, error) {
506526
p.decompressBuffer.ResizeNoShrink(lenCompressed)
507527
data := p.decompressBuffer.Bytes()
@@ -735,7 +755,13 @@ func (p *serializedPageReader) Next() bool {
735755
p.dictPageBuffer.ResizeNoShrink(lenUncompressed)
736756
buf := memory.NewBufferBytes(p.dictPageBuffer.Bytes())
737757

738-
data, err := p.decompress(p.r, lenCompressed, buf.Bytes())
758+
var data []byte
759+
var err error
760+
if p.isCompressed {
761+
data, err = p.decompress(p.r, lenCompressed, buf.Bytes())
762+
} else {
763+
data, err = p.readUncompressed(p.r, lenCompressed, buf.Bytes())
764+
}
739765
if err != nil {
740766
p.err = err
741767
return false
@@ -770,7 +796,13 @@ func (p *serializedPageReader) Next() bool {
770796
firstRowIdx := p.rowsSeen
771797
p.rowsSeen += int64(dataHeader.GetNumValues())
772798

773-
data, err := p.decompress(p.r, lenCompressed, buf.Bytes())
799+
var data []byte
800+
var err error
801+
if p.isCompressed {
802+
data, err = p.decompress(p.r, lenCompressed, buf.Bytes())
803+
} else {
804+
data, err = p.readUncompressed(p.r, lenCompressed, buf.Bytes())
805+
}
774806
if err != nil {
775807
p.err = err
776808
return false

0 commit comments

Comments
 (0)