-
Notifications
You must be signed in to change notification settings - Fork 81
Utilize memory allocator in ReadProperties.GetStream #547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Utilize memory allocator in ReadProperties.GetStream #547
Conversation
parquet/file/page_reader.go
Outdated
| // underlying reader, even if there is less data available than that. So even if there are no more bytes, | ||
| // the buffer must have at least bytes.MinRead capacity remaining to avoid a relocation. | ||
| allocSize := lenCompressed | ||
| if p.decompressBuffer.Cap() < lenCompressed+bytes.MinRead { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is dependent on the combined behavior of io.LimitReader and bytes.Buffer. Which seems fragile to me, but I don't have any other ideas how to deal with it. I'll at least add unit tests that the reallocation happens when I don't add bytes.MinRead to the allocation size and doesn't happen when I do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this seems really fragile. Maybe io.ReadFull directly into p.decompressBuffer.Bytes()[:lenCompressed] instead of using the intermediate bytes.Buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, lets go with ReadFull and we can skip ``bytes.Buffer` altogether.
| if n != lenUncompressed { | ||
| return nil, fmt.Errorf("parquet: expected to read %d bytes but only read %d", lenUncompressed, n) | ||
| } | ||
| if p.cryptoCtx.DataDecryptor != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is needed or not. But for data page v2, the data is just read by ReadFull and Decrypt is not called:
arrow-go/parquet/file/page_reader.go
Lines 815 to 824 in 95b3f76
| if compressed { | |
| if levelsBytelen > 0 { | |
| io.ReadFull(p.r, buf.Bytes()[:levelsBytelen]) | |
| } | |
| if _, p.err = p.decompress(p.r, lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil { | |
| return false | |
| } | |
| } else { | |
| io.ReadFull(p.r, buf.Bytes()) | |
| } |
So maybe the Decrypt call is not needed for data age v1 or dictionary page either?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reverse actually. Looks like this is a bug we just never came across, I'm guessing no one was using DataPageV2 with uncompressed data but still encrypted that was using this library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I'll fix it for DataPageV2 then. I'll add a unit tests without compression and with encryption that should fail with the current main, if I have the time.
I reran the profiler with the current commit in this PR, with a 2.8GB parquet file stored in S3, uncompressed. And cpu profiler is showing that more time is spent in runtime.memmove (copying memory) than in syscall.Syscall6 (read). Which is annoying me. :-D
I think it should be still possible to eliminate at least one copy for the uncompressed case.
So this is my scenario:
ReaderProperties.GetStreamreads column chunk from a TLS and stores it in a buffer (or just allocates the buffer ifBufferedStreamEnabled, but lets go with the unbuffered case for now)serializedPageReaderis created with the buffer returned from ReaderProperties.GetStreamserializedPageReader.Nextget the page header callsserializedPageReader.readUncompressed/serializedPageReader.decompresswhich reads data from the GetStream buffer into dictPageBuffer/dataPageBuffer
3a) for the uncompressed case the this is just a copypagestruct is created from the bytes written to the dictPageBuffer/dataPageBuffer
I think I could avoid the copy in 3a and create page directly from the bytes in the buffer returned by ReaderProperties.GetStream by using combination of calls Peek (to get the bytes)+Discard (to move the internal position inside the buffer). This should hold when BufferedStreamEnabled is false, I have to check what happens when it is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! Thanks for diving into this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, so I "steal" the buffer by using Peek/Discard if the data has been read previously and it is available of the BufferedReader. So in the uncompressed and unencrypted case -> data is read and stored into a buffer in ReaderProperties.GetStream and copied to the user provided buffer to Float32ColumnChunkReader.ReadBatch.
Now, if we have a plainEncoder and no compression, it should be possible to write the data directly to the user provided buffer, so that would eliminate even that copy, but one is more complicated and I need to be start doing other stuff. :D
Also, the decryption types allocate buffers for the decrypted data. We could send it an already allocated buffer to use, or maybe do an in place decryption (if possible), or give it the custom allocator if it is set.
Anyway, I'll fix the decryption for DataPageV2 next and I'll consider this one done.
e3a38f7 to
c7bee5e
Compare
| require.NoError(t, err) | ||
|
|
||
| icr := col0.(*file.Int64ColumnChunkReader) | ||
| // require.NoError(t, icr.SeekToRow(3)) // TODO: this causes a panic currently |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I uncomment this, then this causes a panic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That shouldn't be panicing as the SeekToRow works correctly based on my last tests.... I'll see if i can debug this
| arrWriterProps := pqarrow.NewArrowWriterProperties() | ||
|
|
||
| var buf bytes.Buffer | ||
| wr, err := pqarrow.NewFileWriter(schema, &buf, writerProps, arrWriterProps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zeroshade I think there is a bug in pqarrow writer. For the data page v2 buffer, it first writes the levels (definition and repetition) and then the values. Only values are compressed. However, this whole buffer is then encrypted. And unless ChatGPT is hallucinating on me then only the compressed values should also be encrypted, levels should stay unencrypted and uncompressed.
I'll check with some encrypted parquet create in a different way on Monday and see what happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zeroshade OK, I should have some time this week to finish this. In fact, I think the memory allocation is done, but the decryption needs fixing before the tests pass.
I've been trying to use PyArrow to encrypt/decrypt files, but there seems to be some discrepancy in implementations. I cannot get files encrypted by PyArrow to decrypt using go-arrow and vice versa. I'll open an issue for the encryption/decryption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please link the issue here when you file it, as PyArrow and arrow-go should be agreeing on encrypting/decrypting and vice versa since pyarrow binds to Arrow C++ and the tests for parquet C++ and arrow-go should be the same tests that are passing. I'd be interested to reproduce the failure and debug it
Rationale for this change
Optimization of memory usage, enables the use of custom allocators when reading column data with both buffered and unbuffered readers.
What changes are included in this PR?
Changes to bufferedReader type, new bytesBufferReader type and modification of ReadProperties.GetStream to propagate the custom memory allocator to the readers.
Are these changes tested?
TODO: add unit tests
Are there any user-facing changes?
The allocator if provided with reader properties will be used to allocate the underlying buffers for the buffered/unbuffered readers.
The BufferedReader interface was extended by the Free method to allow returning of the memory to the allocator.