feat: Flush row group by buffered bytes in parquet writer#15751
feat: Flush row group by buffered bytes in parquet writer#15751wecharyu wants to merge 7 commits intofacebookincubator:mainfrom
Conversation
✅ Deploy Preview for meta-velox ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
|
@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this in D92526575. |
|
@xiaoxmeng merged this pull request in 6e01ab2. |
The test passes 0 as bytesInRowGroup to parquet::DefaultFlushPolicy, which flows into Arrow's WriterProperties::Builder::maxRowGroupBytes(). Since PR facebookincubator#15751 added ARROW_CHECK_GT(maxRowGroupBytes, 0) validation, passing 0 causes a SIGABRT crash. Use 128MB (the default value) instead, so the test controls flushing by row count only while satisfying the positive-value constraint.
|
This pull request has been reverted by 62c4a06. |
|
@xiaoxmeng nvm, I'll align the code with arrow-48467. |
…cubator#15751) Summary: facebookincubator#5442 check `bytesInRowGroup` based on uncompressed bytes, which will cause the final compressed row group is much more smaller than config `bytesInRowGroup`. In this patch we flush row group based on buffered size on arrow, it could reduce the row group numbers and improve the read performance. Pull Request resolved: facebookincubator#15751 Reviewed By: tanjialiang Differential Revision: D92526575 Pulled By: xiaoxmeng fbshipit-source-id: b9285e585ed631b75bac2d8c580efbd1f5de9587
PingLiuPing
left a comment
There was a problem hiding this comment.
@wecharyu Thanks for the code.
Do you have further plan to rework on this?
| if (batch_size > 0) { | ||
| RETURN_NOT_OK(WriteBatch(offset, batch_size)); | ||
| offset += batch_size; | ||
| } else if (offset < batch.num_rows()) { |
There was a problem hiding this comment.
This is hard to read compare to original code, since it implies when "batch_size <= 0" && "offset < batch.num_rows()" it writes a new group.
Can we add a var such as int64_t available_rows = max_row_group_length - group_rows; and then adjust this value based on your new code
while {
...
if (group_rows > 0) {
if (buffered_bytes >= max_row_group_bytes) {
available_rows = 0;
} else if (buffered_bytes > 0) {
double avg_row_size = buffered_bytes * 1.0 / group_rows;
int64_t rows_by_bytes =
static_cast<int64_t>((max_row_group_bytes - buffered_bytes) / avg_row_size);
available_rows = std::min(available_rows, rows_by_bytes);
}
if (available_rows <= 0) {
RETURN_NOT_OK(NewBufferedRowGroup());
}
}
int64_t batch_size = std::min(available_rows, batch.num_rows() - offset);
RETURN_NOT_OK(WriteBatch(offset, batch_size));
offset += batch_size;
}
|
Hi @PingLiuPing, I want first make apache/arrow#48468 ready, then we can cherry-pick it and make little additional change in velox. |
#5442 check
bytesInRowGroupbased on uncompressed bytes, which will cause the final compressed row group is much more smaller than configbytesInRowGroup.In this patch we flush row group based on buffered size on arrow, it could reduce the row group numbers and improve the read performance.