Skip to content

Commit 641ab8c

Browse files
committed
Implement basic parquet file rewriter
1 parent a82edf9 commit 641ab8c

12 files changed

Lines changed: 1094 additions & 8 deletions

cpp/src/parquet/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ set(PARQUET_SRCS
172172
encryption/internal_file_encryptor.cc
173173
exception.cc
174174
file_reader.cc
175+
file_rewriter.cc
175176
file_writer.cc
176177
geospatial/statistics.cc
177178
geospatial/util_internal.cc
@@ -406,6 +407,8 @@ add_parquet_test(arrow-reader-writer-test
406407

407408
add_parquet_test(arrow-index-test SOURCES arrow/index_test.cc)
408409

410+
add_parquet_test(arrow-rewriter-test SOURCES arrow/arrow_rewriter_test.cc)
411+
409412
add_parquet_test(arrow-internals-test SOURCES arrow/path_internal_test.cc
410413
arrow/reconstruct_internal_test.cc)
411414

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include <gmock/gmock.h>
19+
#include <gtest/gtest.h>
20+
21+
#include <memory>
22+
23+
#include "arrow/io/memory.h"
24+
#include "arrow/testing/gtest_util.h"
25+
#include "gmock/gmock.h"
26+
#include "parquet/arrow/reader.h"
27+
#include "parquet/arrow/test_util.h"
28+
#include "parquet/file_reader.h"
29+
#include "parquet/file_rewriter.h"
30+
#include "parquet/platform.h"
31+
#include "parquet/properties.h"
32+
#include "parquet/test_util.h"
33+
34+
using arrow::Table;
35+
using arrow::io::BufferReader;
36+
37+
namespace parquet::arrow {
38+
39+
TEST(ParquetRewriterTest, SimpleRoundTrip) {
40+
auto rewriter_properties =
41+
RewriterProperties::Builder()
42+
.writer_properties(
43+
WriterProperties::Builder().enable_write_page_index()->build())
44+
->build();
45+
46+
auto schema = ::arrow::schema(
47+
{::arrow::field("a", ::arrow::int32()), ::arrow::field("b", ::arrow::utf8())});
48+
49+
std::shared_ptr<Buffer> buffer;
50+
51+
WriteFile(rewriter_properties->writer_properties(),
52+
::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"]])"}), buffer);
53+
54+
auto sink = CreateOutputStream();
55+
auto rewriter =
56+
ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer)}}, sink,
57+
{{NULLPTR}}, NULLPTR, rewriter_properties);
58+
rewriter->Rewrite();
59+
rewriter->Close();
60+
61+
ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish());
62+
auto file_reader = ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
63+
ASSERT_OK_AND_ASSIGN(auto reader, FileReader::Make(::arrow::default_memory_pool(),
64+
std::move(file_reader)));
65+
66+
ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
67+
ASSERT_OK(table->ValidateFull());
68+
69+
auto expected_table = ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"]])"});
70+
AssertTablesEqual(*expected_table, *table);
71+
}
72+
73+
TEST(ParquetRewriterTest, ConcatRoundTrip) {
74+
auto rewriter_properties =
75+
RewriterProperties::Builder()
76+
.writer_properties(
77+
WriterProperties::Builder().enable_write_page_index()->build())
78+
->build();
79+
80+
auto schema = ::arrow::schema(
81+
{::arrow::field("a", ::arrow::int32()), ::arrow::field("b", ::arrow::utf8())});
82+
83+
std::shared_ptr<Buffer> buffer_up;
84+
std::shared_ptr<Buffer> buffer_down;
85+
86+
WriteFile(rewriter_properties->writer_properties(),
87+
::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"]])"}), buffer_up);
88+
WriteFile(rewriter_properties->writer_properties(),
89+
::arrow::TableFromJSON(schema, {R"([[3, "c"]])"}), buffer_down);
90+
91+
auto sink = CreateOutputStream();
92+
auto rewriter =
93+
ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer_up),
94+
std::make_shared<BufferReader>(buffer_down)}},
95+
sink, {{NULLPTR, NULLPTR}}, NULLPTR, rewriter_properties);
96+
rewriter->Rewrite();
97+
rewriter->Close();
98+
99+
ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish());
100+
auto file_reader = ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
101+
ASSERT_OK_AND_ASSIGN(auto reader, FileReader::Make(::arrow::default_memory_pool(),
102+
std::move(file_reader)));
103+
104+
ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
105+
ASSERT_OK(table->ValidateFull());
106+
107+
auto expected_table =
108+
::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"], [3, "c"]])"});
109+
AssertTablesEqual(*expected_table, *table);
110+
}
111+
112+
TEST(ParquetRewriterTest, JoinRoundTrip) {
113+
auto rewriter_properties =
114+
RewriterProperties::Builder()
115+
.writer_properties(
116+
WriterProperties::Builder().enable_write_page_index()->build())
117+
->build();
118+
119+
auto left_schema = ::arrow::schema(
120+
{::arrow::field("a", ::arrow::int32()), ::arrow::field("b", ::arrow::utf8())});
121+
auto right_schema = ::arrow::schema({::arrow::field("c", ::arrow::int64())});
122+
123+
std::shared_ptr<Buffer> buffer_left;
124+
std::shared_ptr<Buffer> buffer_right;
125+
126+
WriteFile(rewriter_properties->writer_properties(),
127+
::arrow::TableFromJSON(left_schema, {R"([[1, "a"], [2, "b"], [3, "c"]])"}),
128+
buffer_left);
129+
WriteFile(rewriter_properties->writer_properties(),
130+
::arrow::TableFromJSON(right_schema, {R"([[10], [20], [30]])"}),
131+
buffer_right);
132+
133+
auto sink = CreateOutputStream();
134+
auto rewriter = ParquetFileRewriter::Open(
135+
{{std::make_shared<BufferReader>(buffer_left)},
136+
{std::make_shared<BufferReader>(buffer_right)}},
137+
sink, {{NULLPTR}, {NULLPTR}}, NULLPTR, rewriter_properties);
138+
rewriter->Rewrite();
139+
rewriter->Close();
140+
141+
ASSERT_OK_AND_ASSIGN(auto out_buffer, sink->Finish());
142+
auto file_reader = ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
143+
ASSERT_OK_AND_ASSIGN(auto reader, FileReader::Make(::arrow::default_memory_pool(),
144+
std::move(file_reader)));
145+
146+
ASSERT_OK_AND_ASSIGN(auto table, reader->ReadTable());
147+
ASSERT_OK(table->ValidateFull());
148+
149+
auto expected_schema = ::arrow::schema({::arrow::field("a", ::arrow::int32()),
150+
::arrow::field("b", ::arrow::utf8()),
151+
::arrow::field("c", ::arrow::int64())});
152+
auto expected_table = ::arrow::TableFromJSON(
153+
expected_schema, {R"([[1, "a", 10], [2, "b", 20], [3, "c", 30]])"});
154+
AssertTablesEqual(*expected_table, *table);
155+
}
156+
157+
TEST(ParquetRewriterTest, ConcatJoinRoundTrip) {
158+
auto rewriter_properties = RewriterProperties::Builder()
159+
.writer_properties(WriterProperties::Builder()
160+
.enable_write_page_index()
161+
->max_row_group_length(1)
162+
->build())
163+
->build();
164+
165+
auto left_schema = ::arrow::schema(
166+
{::arrow::field("a", ::arrow::int32()), ::arrow::field("b", ::arrow::utf8())});
167+
auto right_schema = ::arrow::schema({::arrow::field("c", ::arrow::int64())});
168+
169+
std::shared_ptr<Buffer> buffer_left_up;
170+
std::shared_ptr<Buffer> buffer_left_down;
171+
std::shared_ptr<Buffer> buffer_right_up;
172+
std::shared_ptr<Buffer> buffer_right_down;
173+
174+
WriteFile(rewriter_properties->writer_properties(),
175+
::arrow::TableFromJSON(left_schema, {R"([[1, "a"], [2, "b"]])"}),
176+
buffer_left_up);
177+
WriteFile(rewriter_properties->writer_properties(),
178+
::arrow::TableFromJSON(left_schema, {R"([[3, "c"]])"}), buffer_left_down);
179+
WriteFile(rewriter_properties->writer_properties(),
180+
::arrow::TableFromJSON(right_schema, {R"([[10]])"}), buffer_right_up);
181+
WriteFile(rewriter_properties->writer_properties(),
182+
::arrow::TableFromJSON(right_schema, {R"([[20], [30]])"}), buffer_right_down);
183+
184+
auto sink = CreateOutputStream();
185+
auto rewriter = ParquetFileRewriter::Open(
186+
{{std::make_shared<BufferReader>(buffer_left_up),
187+
std::make_shared<BufferReader>(buffer_left_down)},
188+
{std::make_shared<BufferReader>(buffer_right_up),
189+
std::make_shared<BufferReader>(buffer_right_down)}},
190+
sink, {{NULLPTR, NULLPTR}, {NULLPTR, NULLPTR}}, NULLPTR, rewriter_properties);
191+
rewriter->Rewrite();
192+
rewriter->Close();
193+
194+
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> out_buffer, sink->Finish());
195+
auto file_reader = ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
196+
ASSERT_OK_AND_ASSIGN(auto reader, FileReader::Make(::arrow::default_memory_pool(),
197+
std::move(file_reader)));
198+
199+
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Table> table, reader->ReadTable());
200+
ASSERT_OK(table->ValidateFull());
201+
202+
auto expected_schema = ::arrow::schema({::arrow::field("a", ::arrow::int32()),
203+
::arrow::field("b", ::arrow::utf8()),
204+
::arrow::field("c", ::arrow::int64())});
205+
auto expected_table = ::arrow::TableFromJSON(
206+
expected_schema, {R"([[1, "a", 10], [2, "b", 20], [3, "c", 30]])"});
207+
AssertTablesEqual(*expected_table, *table);
208+
}
209+
210+
TEST(ParquetRewriterTest, JoinRowCountsMismatch) {
211+
auto rewriter_properties =
212+
RewriterProperties::Builder()
213+
.writer_properties(
214+
WriterProperties::Builder().enable_write_page_index()->build())
215+
->build();
216+
217+
auto schema1 = ::arrow::schema({::arrow::field("a", ::arrow::int32())});
218+
auto schema2 = ::arrow::schema({::arrow::field("b", ::arrow::int32())});
219+
220+
std::shared_ptr<Buffer> buffer1;
221+
std::shared_ptr<Buffer> buffer2;
222+
223+
WriteFile(rewriter_properties->writer_properties(),
224+
::arrow::TableFromJSON(schema1, {R"([[1], [2]])"}), buffer1);
225+
WriteFile(rewriter_properties->writer_properties(),
226+
::arrow::TableFromJSON(schema2, {R"([[3], [4], [5]])"}), buffer2);
227+
228+
auto sink = CreateOutputStream();
229+
230+
EXPECT_THROW_THAT(
231+
[&]() {
232+
ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer1)},
233+
{std::make_shared<BufferReader>(buffer2)}},
234+
sink, {{NULLPTR}, {NULLPTR}}, NULLPTR,
235+
rewriter_properties);
236+
},
237+
ParquetException,
238+
::testing::Property(
239+
&ParquetException::what,
240+
::testing::HasSubstr("The number of rows in each block must match")));
241+
}
242+
243+
TEST(ParquetRewriterTest, InvalidInputDimensions) {
244+
auto rewriter_properties =
245+
RewriterProperties::Builder()
246+
.writer_properties(
247+
WriterProperties::Builder().enable_write_page_index()->build())
248+
->build();
249+
250+
auto schema = ::arrow::schema({::arrow::field("a", ::arrow::int32())});
251+
std::shared_ptr<Buffer> buffer;
252+
WriteFile(rewriter_properties->writer_properties(),
253+
::arrow::TableFromJSON(schema, {R"([[1]])"}), buffer);
254+
255+
auto sink = CreateOutputStream();
256+
257+
EXPECT_THROW_THAT(
258+
[&]() {
259+
ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer)}}, sink, {},
260+
NULLPTR, rewriter_properties);
261+
},
262+
ParquetException,
263+
::testing::Property(
264+
&ParquetException::what,
265+
::testing::HasSubstr(
266+
"The number of sources and sources_metadata must be the same")));
267+
268+
EXPECT_THROW_THAT(
269+
[&]() {
270+
ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer)}}, sink, {{}},
271+
NULLPTR, rewriter_properties);
272+
},
273+
ParquetException,
274+
::testing::Property(
275+
&ParquetException::what,
276+
::testing::HasSubstr(
277+
"The number of sources and sources_metadata must be the same")));
278+
}
279+
280+
} // namespace parquet::arrow

cpp/src/parquet/arrow/test_util.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,23 @@
2828
#include "arrow/array/builder_binary.h"
2929
#include "arrow/array/builder_decimal.h"
3030
#include "arrow/array/builder_primitive.h"
31+
#include "arrow/table.h"
3132
#include "arrow/testing/gtest_util.h"
3233
#include "arrow/testing/random.h"
3334
#include "arrow/type_fwd.h"
3435
#include "arrow/type_traits.h"
3536
#include "arrow/util/decimal.h"
3637
#include "arrow/util/float16.h"
38+
#include "parquet/arrow/schema.h"
39+
#include "parquet/arrow/writer.h"
3740
#include "parquet/column_reader.h"
41+
#include "parquet/file_writer.h"
3842
#include "parquet/test_util.h"
3943

4044
namespace parquet {
4145

4246
using internal::RecordReader;
47+
using schema::GroupNode;
4348

4449
namespace arrow {
4550

@@ -482,6 +487,29 @@ void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) {
482487
EXPECT_TRUE(result->Equals(*expected_array));
483488
}
484489

490+
void WriteFile(const std::shared_ptr<WriterProperties>& writer_properties,
491+
const std::shared_ptr<::arrow::Table>& table,
492+
std::shared_ptr<Buffer>& buffer) {
493+
// Get schema from table.
494+
auto schema = table->schema();
495+
std::shared_ptr<SchemaDescriptor> parquet_schema;
496+
auto arrow_writer_properties = default_arrow_writer_properties();
497+
ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
498+
*arrow_writer_properties, &parquet_schema));
499+
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
500+
501+
// Write table to buffer.
502+
auto sink = CreateOutputStream();
503+
auto pool = ::arrow::default_memory_pool();
504+
auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
505+
std::unique_ptr<FileWriter> arrow_writer;
506+
ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
507+
&arrow_writer));
508+
ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
509+
ASSERT_OK_NO_THROW(arrow_writer->Close());
510+
ASSERT_OK_AND_ASSIGN(buffer, sink->Finish());
511+
}
512+
485513
} // namespace arrow
486514

487515
} // namespace parquet

0 commit comments

Comments
 (0)