Skip to content

Commit cb36a83

Browse files
committed
address review
1 parent 06fb05c commit cb36a83

File tree

3 files changed

+49
-35
lines changed

3 files changed

+49
-35
lines changed

cpp/src/parquet/file_rewriter.cc

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <unordered_set>
2626
#include <utility>
2727

28+
#include "arrow/util/checked_cast.h"
2829
#include "arrow/util/compression.h"
2930
#include "arrow/util/logging.h"
3031
#include "arrow/util/string.h"
@@ -99,30 +100,31 @@ class PagesRewriter {
99100
bool has_dictionary = false;
100101
bool fallback = false;
101102
std::shared_ptr<Page> page;
102-
size_t page_no = 0;
103+
size_t page_ordinal = 0;
103104
while ((page = page_reader_->NextPage()) != nullptr) {
104105
switch (page->type()) {
105106
case parquet::PageType::DICTIONARY_PAGE: {
106-
WriteDictionaryPage(*static_cast<const DictionaryPage*>(page.get()));
107+
WriteDictionaryPage(
108+
::arrow::internal::checked_cast<const DictionaryPage&>(*page));
107109
has_dictionary = true;
108110
break;
109111
}
110112
case parquet::PageType::DATA_PAGE: {
111-
auto& data_page = *static_cast<const DataPageV1*>(page.get());
112-
if (data_page.encoding() != Encoding::PLAIN_DICTIONARY) {
113+
auto& data_page = ::arrow::internal::checked_cast<const DataPageV1&>(*page);
114+
if (!IsDictionaryIndexEncoding(data_page.encoding())) {
113115
fallback = true;
114116
}
115-
WriteDataPageV1(data_page, page_no);
116-
page_no++;
117+
WriteDataPageV1(data_page, page_ordinal);
118+
page_ordinal++;
117119
break;
118120
}
119121
case parquet::PageType::DATA_PAGE_V2: {
120-
auto& data_page = *static_cast<const DataPageV2*>(page.get());
121-
if (data_page.encoding() != Encoding::PLAIN_DICTIONARY) {
122+
auto& data_page = ::arrow::internal::checked_cast<const DataPageV2&>(*page);
123+
if (!IsDictionaryIndexEncoding(data_page.encoding())) {
122124
fallback = true;
123125
}
124-
WriteDataPageV2(data_page, page_no);
125-
page_no++;
126+
WriteDataPageV2(data_page, page_ordinal);
127+
page_ordinal++;
126128
break;
127129
}
128130
default: {
@@ -141,7 +143,7 @@ class PagesRewriter {
141143
total_uncompressed_size_ += page_writer_->WriteDictionaryPage(dict_page);
142144
}
143145

144-
void WriteDataPageV1(const DataPageV1& data_page, const size_t page_no) {
146+
void WriteDataPageV1(const DataPageV1& data_page, const size_t page_ordinal) {
145147
std::shared_ptr<Buffer> compressed_data;
146148
if (page_writer_->has_compressor()) {
147149
auto buffer = std::static_pointer_cast<ResizableBuffer>(
@@ -153,15 +155,15 @@ class PagesRewriter {
153155
}
154156
auto first_row_index =
155157
original_offset_index_
156-
? std::optional{original_offset_index_->page_locations()[page_no]
158+
? std::optional{original_offset_index_->page_locations()[page_ordinal]
157159
.first_row_index}
158160
: std::nullopt;
159161
SizeStatistics size_statistics;
160162
size_statistics.unencoded_byte_array_data_bytes =
161163
original_offset_index_ &&
162164
!original_offset_index_->unencoded_byte_array_data_bytes().empty()
163165
? std::optional{original_offset_index_
164-
->unencoded_byte_array_data_bytes()[page_no]}
166+
->unencoded_byte_array_data_bytes()[page_ordinal]}
165167
: std::nullopt;
166168
DataPageV1 new_page(compressed_data, data_page.num_values(), data_page.encoding(),
167169
data_page.definition_level_encoding(),
@@ -171,7 +173,7 @@ class PagesRewriter {
171173
total_uncompressed_size_ += page_writer_->WriteDataPage(new_page);
172174
}
173175

174-
void WriteDataPageV2(const DataPageV2& data_page, const size_t page_no) {
176+
void WriteDataPageV2(const DataPageV2& data_page, const size_t page_ordinal) {
175177
int32_t levels_byte_len = data_page.repetition_levels_byte_length() +
176178
data_page.definition_levels_byte_length();
177179
bool page_is_compressed = false;
@@ -199,15 +201,15 @@ class PagesRewriter {
199201

200202
auto first_row_index =
201203
original_offset_index_
202-
? std::optional{original_offset_index_->page_locations()[page_no]
204+
? std::optional{original_offset_index_->page_locations()[page_ordinal]
203205
.first_row_index}
204206
: std::nullopt;
205207
SizeStatistics size_statistics;
206208
size_statistics.unencoded_byte_array_data_bytes =
207209
original_offset_index_ &&
208210
!original_offset_index_->unencoded_byte_array_data_bytes().empty()
209211
? std::optional{original_offset_index_
210-
->unencoded_byte_array_data_bytes()[page_no]}
212+
->unencoded_byte_array_data_bytes()[page_ordinal]}
211213
: std::nullopt;
212214
DataPageV2 new_page(output_buffer, data_page.num_values(), data_page.num_nulls(),
213215
data_page.num_rows(), data_page.encoding(),
@@ -493,7 +495,7 @@ class SingleFileRewriter {
493495

494496
const SchemaDescriptor& schema() const { return *metadata_->schema(); }
495497

496-
std::vector<int64_t> row_group_row_counts() const {
498+
std::vector<int64_t> RowGroupRowCounts() const {
497499
int num_row_groups = metadata_->num_row_groups();
498500
std::vector<int64_t> row_counts;
499501
row_counts.reserve(num_row_groups);
@@ -553,10 +555,10 @@ class ConcatRewriter {
553555

554556
const SchemaDescriptor& schema() const { return file_rewriters_[0]->schema(); }
555557

556-
std::vector<int64_t> row_group_row_counts() const {
558+
std::vector<int64_t> RowGroupRowCounts() const {
557559
std::vector<int64_t> row_counts;
558560
for (auto& rewriter : file_rewriters_) {
559-
auto count = rewriter->row_group_row_counts();
561+
auto count = rewriter->RowGroupRowCounts();
560562
row_counts.insert(row_counts.end(), count.begin(), count.end());
561563
}
562564
return row_counts;
@@ -574,9 +576,9 @@ class JoinRewriter {
574576
if (rewriters_.empty()) {
575577
throw ParquetException("At least one ConcatRewriter is required");
576578
}
577-
auto row_counts = rewriters_[0]->row_group_row_counts();
579+
auto row_counts = rewriters_[0]->RowGroupRowCounts();
578580
for (size_t i = 1; i < rewriters_.size(); ++i) {
579-
if (auto current_row_counts = rewriters_[i]->row_group_row_counts();
581+
if (auto current_row_counts = rewriters_[i]->RowGroupRowCounts();
580582
row_counts != current_row_counts) {
581583
// TODO(anyone): use `std::format("{}", row_counts)` instead when C++23 available
582584
auto vecToString = [](const std::vector<int64_t>& v) -> std::string {
@@ -657,8 +659,8 @@ class GeneratedFile : public ParquetFileRewriter::Contents {
657659
std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources,
658660
std::shared_ptr<ArrowOutputStream> sink,
659661
std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata,
660-
std::shared_ptr<const KeyValueMetadata> sink_metadata,
661-
std::shared_ptr<RewriterProperties> props) {
662+
std::shared_ptr<RewriterProperties> props,
663+
std::shared_ptr<const KeyValueMetadata> sink_metadata) {
662664
if (sources.size() != sources_metadata.size() ||
663665
// TODO(anyone): use std::views::zip when C++23 available
664666
std::ranges::any_of(std::views::iota(0u, sources.size()), [&](size_t i) {
@@ -669,7 +671,7 @@ class GeneratedFile : public ParquetFileRewriter::Contents {
669671
}
670672
std::unique_ptr<ParquetFileRewriter::Contents> result(new GeneratedFile(
671673
std::move(sources), std::move(sink), std::move(sources_metadata),
672-
std::move(sink_metadata), std::move(props)));
674+
std::move(props), std::move(sink_metadata)));
673675
return result;
674676
}
675677

@@ -712,16 +714,16 @@ class GeneratedFile : public ParquetFileRewriter::Contents {
712714
bloom_filter_locations);
713715
}
714716

715-
auto file_metadata = metadata_builder_->Finish(sink_metadata_);
716-
WriteFileMetaData(*file_metadata, sink_.get());
717+
file_metadata_ = metadata_builder_->Finish(sink_metadata_);
718+
WriteFileMetaData(*file_metadata_, sink_.get());
717719
}
718720

719721
private:
720722
GeneratedFile(std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources,
721723
std::shared_ptr<ArrowOutputStream> sink,
722724
std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata,
723-
std::shared_ptr<const KeyValueMetadata> sink_metadata,
724-
std::shared_ptr<RewriterProperties> props)
725+
std::shared_ptr<RewriterProperties> props,
726+
std::shared_ptr<const KeyValueMetadata> sink_metadata)
725727
: sink_(std::move(sink)),
726728
props_(std::move(props)),
727729
sink_metadata_(std::move(sink_metadata)) {
@@ -786,11 +788,11 @@ std::unique_ptr<ParquetFileRewriter> ParquetFileRewriter::Open(
786788
std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources,
787789
std::shared_ptr<ArrowOutputStream> sink,
788790
std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata,
789-
std::shared_ptr<const KeyValueMetadata> sink_metadata,
790-
std::shared_ptr<RewriterProperties> props) {
791+
std::shared_ptr<RewriterProperties> props,
792+
std::shared_ptr<const KeyValueMetadata> sink_metadata) {
791793
auto contents = GeneratedFile::Open(std::move(sources), std::move(sink),
792-
std::move(sources_metadata),
793-
std::move(sink_metadata), std::move(props));
794+
std::move(sources_metadata), std::move(props),
795+
std::move(sink_metadata));
794796
std::unique_ptr<ParquetFileRewriter> result(new ParquetFileRewriter());
795797
result->Open(std::move(contents));
796798
return result;
@@ -803,10 +805,15 @@ void ParquetFileRewriter::Open(std::unique_ptr<ParquetFileRewriter::Contents> co
803805
void ParquetFileRewriter::Close() {
804806
if (contents_) {
805807
contents_->Close();
808+
file_metadata_ = contents_->metadata();
806809
contents_.reset();
807810
}
808811
}
809812

810813
void ParquetFileRewriter::Rewrite() { contents_->Rewrite(); }
811814

815+
const std::shared_ptr<FileMetaData>& ParquetFileRewriter::metadata() const {
816+
return file_metadata_;
817+
}
818+
812819
} // namespace parquet

cpp/src/parquet/file_rewriter.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ class PARQUET_EXPORT ParquetFileRewriter {
3232
virtual ~Contents() = default;
3333
virtual void Close() = 0;
3434
virtual void Rewrite() = 0;
35+
36+
const std::shared_ptr<FileMetaData>& metadata() const { return file_metadata_; }
37+
std::shared_ptr<FileMetaData> file_metadata_;
3538
};
3639

3740
ParquetFileRewriter();
@@ -41,8 +44,8 @@ class PARQUET_EXPORT ParquetFileRewriter {
4144
std::vector<std::vector<std::shared_ptr<ArrowInputFile>>> sources,
4245
std::shared_ptr<ArrowOutputStream> sink,
4346
std::vector<std::vector<std::shared_ptr<FileMetaData>>> sources_metadata,
44-
std::shared_ptr<const ::arrow::KeyValueMetadata> sink_metadata = NULLPTR,
45-
std::shared_ptr<RewriterProperties> props = default_rewriter_properties());
47+
std::shared_ptr<RewriterProperties> props = default_rewriter_properties(),
48+
std::shared_ptr<const ::arrow::KeyValueMetadata> sink_metadata = NULLPTR);
4649

4750
void Open(std::unique_ptr<Contents> contents);
4851
void Close();
@@ -52,8 +55,12 @@ class PARQUET_EXPORT ParquetFileRewriter {
5255
/// This method may throw.
5356
void Rewrite();
5457

58+
/// Returns the file metadata, only available after calling Close().
59+
const std::shared_ptr<FileMetaData>& metadata() const;
60+
5561
private:
5662
std::unique_ptr<Contents> contents_;
63+
std::shared_ptr<FileMetaData> file_metadata_;
5764
};
5865

5966
} // namespace parquet

cpp/src/parquet/thrift_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ static inline format::SizeStatistics ToThrift(const SizeStatistics& size_stats)
560560
return size_statistics;
561561
}
562562

563-
// Get KeyValueMetadata from parquet Thrift RowGroup or ColumnChunk metadata.
563+
// Get KeyValueMetadata from parquet Thrift File or ColumnChunk metadata.
564564
//
565565
// Returns nullptr if the metadata is not set.
566566
template <typename Metadata>

0 commit comments

Comments
 (0)