Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmake/PythonUtilities.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ function(upgrade_pip)
return()
endif()

message(STATUS "Upgrading pip in Python interpreter: ${Python3_EXECUTABLE}")
execute_process(
COMMAND ${Python3_EXECUTABLE} -m pip install --upgrade --no-user pip
RESULT_VARIABLE result
Expand All @@ -66,6 +67,8 @@ function(pip_install_python_package package_name)
return()
endif()

message(STATUS "Installing Python package: ${package_name}")
message(STATUS "Using Python executable: ${Python3_EXECUTABLE}")
execute_process(
COMMAND ${Python3_EXECUTABLE} -m pip install --no-user ${package_name}
RESULT_VARIABLE result
Expand Down
14 changes: 11 additions & 3 deletions include/ygm/io/parquet_parser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ class parquet_parser {
size_t num_rows() const { return m_num_rows; }

private:
// Check if the column is flat (non-nested)
static bool is_flat_column(const parquet::ColumnDescriptor *col) {
return col->max_repetition_level() == 0 &&
col->path()->ToDotVector().size() == 1;
}

// Open Parquet files and read schema.
void init(const std::vector<std::string> &stringpaths,
const bool recursive = false) {
Expand Down Expand Up @@ -449,12 +455,14 @@ class parquet_parser {
auto ptype = detail::parquet_data_type{column->physical_type()};
// Check if the type is supported
bool unsupported = !detail::is_supported_parquet_type(ptype.type);

// Check if the column is flat
if (column->max_definition_level() != 1 ||
column->max_repetition_level() != 0) {
if (!is_flat_column(column)) {
m_comm.cerr0() << "Unsupported column (not flat): " << column->name()
<< ", " << column->path()->ToDotString() << std::endl;
// The column is not flat, which is not supported by this parser.
unsupported = true;
// Memo: for definition and repetition levels, see
// Memo: for repetition levels, see
// https://blog.x.com/engineering/en_us/a/2013/dremel-made-simple-with-parquet
}
m_col_schema[i] = {
Expand Down
Binary file added test/data/parquet_files/case2/non_flat.parquet
Binary file not shown.
Binary file added test/data/parquet_files/case3/none_value.parquet
Binary file not shown.
Binary file not shown.
120 changes: 114 additions & 6 deletions test/test_parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,27 @@
#include <ygm/comm.hpp>
#include <ygm/io/parquet_parser.hpp>

void test_case1(const std::string& dir_name, ygm::comm& world);
void test_case2(const std::string& dir_name, ygm::comm& world);
void test_case3(const std::string& dir_name, ygm::comm& world);
void test_case4(const std::string& dir_name, ygm::comm& world);

int main(int argc, char** argv) {
ygm::comm world(&argc, &argv);

// assuming the build directory is inside the YGM root directory
const std::string dir_name = "data/parquet_files/";
const auto test_bin_dir = std::filesystem::path(argv[0]).parent_path();
test_case1(test_bin_dir / "data/parquet_files/case1", world);
test_case2(test_bin_dir / "data/parquet_files/case2", world);
test_case3(test_bin_dir / "data/parquet_files/case3", world);
test_case4(test_bin_dir / "data/parquet_files/case4", world);

return 0;
}

//
void test_case1(const std::string& dir_name, ygm::comm& world) {
// Test number of columns and rows in files
{
// parquet_parser assumes files have identical scehma
// parquet_parser assumes files have identical schema
ygm::io::parquet_parser parquetp(world, {dir_name});

YGM_ASSERT_RELEASE(parquetp.num_files() == 3);
Expand Down Expand Up @@ -101,7 +112,7 @@ int main(int argc, char** argv) {
}
world.cf_barrier();

// Make sure every processes read diffrent row or nothing
// Make sure every processes read different row or nothing
{
static std::vector<std::string> buf;
const auto& row = *row_opt;
Expand All @@ -118,5 +129,102 @@ int main(int argc, char** argv) {
}
}
}
return 0;
}

// Test case file contains multiple non-flat column patterns
void test_case2(const std::string& dir_name, ygm::comm& world) {
ygm::io::parquet_parser parquetp(world, {dir_name});

YGM_ASSERT_RELEASE(parquetp.num_files() == 1);
YGM_ASSERT_RELEASE(parquetp.num_rows() == 2);
YGM_ASSERT_RELEASE(parquetp.get_schema().size() == 8);

parquetp.for_all([](const auto& row) {
for (int col_idx = 0; col_idx < static_cast<int>(row.size()); ++col_idx) {
std::visit(
[col_idx](const auto& value) {
using T = std::decay_t<decltype(value)>;
// Only the first column is valid (flat)
// Non-flat or unsupported columns is treated as std::monostate
if constexpr (std::is_same_v<T, std::monostate>) {
YGM_ASSERT_RELEASE(col_idx != 0);
} else {
YGM_ASSERT_RELEASE(col_idx == 0);
}
},
row[col_idx]);
}
});
}

// Some values are NONE
void test_case3(const std::string& dir_name, ygm::comm& world) {
ygm::io::parquet_parser parquetp(world, {dir_name});

YGM_ASSERT_RELEASE(parquetp.num_files() == 1);
YGM_ASSERT_RELEASE(parquetp.num_rows() == 2);
YGM_ASSERT_RELEASE(parquetp.get_schema().size() == 2);

parquetp.for_all([](const auto& row) {
for (int col_idx = 0; col_idx < static_cast<int>(row.size()); ++col_idx) {
std::visit(
[row, col_idx](const auto& value) {
using T = std::decay_t<decltype(value)>;
// column 0: [10, NONE]
// column 1: [NONE, 20]
if constexpr (std::is_same_v<T, std::monostate>) {
YGM_ASSERT_RELEASE(
(col_idx == 0 && std::get<int32_t>(row[1]) == 20) ||
(col_idx == 1 && std::get<int32_t>(row[0]) == 10));
} else if constexpr (std::is_same_v<T, int32_t>) {
YGM_ASSERT_RELEASE((col_idx == 0 && value == 10) ||
(col_idx == 1 && value == 20));
} else {
// Unexpected type
YGM_ASSERT_RELEASE(false);
}
},
row[col_idx]);
}
});
}

// Required and optional columns with NONE values
void test_case4(const std::string& dir_name, ygm::comm& world) {
ygm::io::parquet_parser parquetp(world, {dir_name});

YGM_ASSERT_RELEASE(parquetp.num_files() == 1);
YGM_ASSERT_RELEASE(parquetp.num_rows() == 2);
YGM_ASSERT_RELEASE(parquetp.get_schema().size() == 2);

parquetp.for_all([](const auto& row) {
for (int col_idx = 0; col_idx < static_cast<int>(row.size()); ++col_idx) {
std::visit(
[row, col_idx](const auto& value) {
using T = std::decay_t<decltype(value)>;
// 1st column is required, 2nd column is optional
// column 0 (required): [1, 2]
// column 1 (optional): [10, NONE]
if constexpr (std::is_same_v<T, std::monostate>) {
// Column 1, 2nd row is NONE
// Also checks that the other column value is read correctly
YGM_ASSERT_RELEASE(col_idx == 1 &&
std::get<int32_t>(row[0]) == 2);
} else if constexpr (std::is_same_v<T, int32_t>) {
if (col_idx == 0) {
YGM_ASSERT_RELEASE(value == 1 || value == 2);
} else if (col_idx == 1) {
YGM_ASSERT_RELEASE(value == 10);
} else {
// Unexpected column index
YGM_ASSERT_RELEASE(false);
}
} else {
// Unexpected type
YGM_ASSERT_RELEASE(false);
}
},
row[col_idx]);
}
});
}