Skip to content

Commit 0f8358f

Browse files
committed
Standalone tool for translating Parquet files into CSV
1 parent ea900f9 commit 0f8358f

File tree

2 files changed

+64
-34
lines changed

2 files changed

+64
-34
lines changed

src/partition/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ partition_apps(
4848
sph-estimate-stats
4949
sph-htm-index
5050
sph-layout
51+
sph-parq2csv
5152
sph-partition-matches
5253
sph-partition
5354
)

src/partition/sph-parq2csv.cc

+63-34
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,23 @@
2222
#include <iomanip>
2323
#include <iostream>
2424
#include <fstream>
25-
#include <map>
2625
#include <memory>
26+
#include <set>
2727
#include <stdexcept>
2828
#include <string>
2929
#include <vector>
3030

31+
// Third party headers
3132
#include "boost/program_options.hpp"
33+
#include "nlohmann/json.hpp"
3234

3335
#include "partition/ParquetInterface.h"
3436

3537
namespace po = boost::program_options;
3638
namespace part = lsst::partition;
3739

3840
using namespace std;
41+
using json = nlohmann::json;
3942

4043
namespace {
4144

@@ -64,15 +67,15 @@ class CommandLineParams {
6467
"Max size (MB) of RAM allocated to the process.");
6568
desc.add_options()("buf-size-mb", po::value<int>()->default_value(maxBuffSizeMB),
6669
"Buffers size (MB) for translating batches.");
67-
desc.add_options()("parq-file", po::value<vector<string>>(), "Input file to be translated.");
68-
desc.add_options()("coldef-file", po::value<vector<string>>(),
69-
"Input file with the names of columns to be extracted.");
70-
desc.add_options()("csv-file", po::value<vector<string>>(), "Output file to be written.");
70+
desc.add_options()("parq", po::value<vector<string>>(), "Input file to be translated.");
71+
desc.add_options()("config", po::value<vector<string>>(),
72+
"Input JSON file with definition of columns to be extracted.");
73+
desc.add_options()("csv", po::value<vector<string>>(), "Output file to be written.");
7174

7275
po::positional_options_description p;
73-
p.add("parq-file", 1);
74-
p.add("coldef-file", 1);
75-
p.add("csv-file", 1);
76+
p.add("parq", 1);
77+
p.add("config", 1);
78+
p.add("csv", 1);
7679

7780
po::variables_map vm;
7881
po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
@@ -82,17 +85,17 @@ class CommandLineParams {
8285
cout << desc << "\n";
8386
return false;
8487
}
85-
parqFileName = vm.count("parq-file") ? vm["parq-file"].as<vector<string>>().front() : string();
86-
coldefFileName = vm.count("coldef-file") ? vm["coldef-file"].as<vector<string>>().front() : string();
87-
csvFileName = vm.count("csv-file") ? vm["csv-file"].as<vector<string>>().front() : string();
88+
parqFileName = vm.count("parq") ? vm["parq"].as<vector<string>>().front() : string();
89+
configFileName = vm.count("config") ? vm["config"].as<vector<string>>().front() : string();
90+
csvFileName = vm.count("csv") ? vm["csv"].as<vector<string>>().front() : string();
8891

89-
if (parqFileName.empty() || coldefFileName.empty() || csvFileName.empty()) {
92+
if (parqFileName.empty() || configFileName.empty() || csvFileName.empty()) {
9093
throw runtime_error("The names of all required files must be provided.");
9194
}
9295
if (csvFileName == parqFileName) {
9396
throw runtime_error("Input and output file names must be different.");
9497
}
95-
_parseColdDefFile();
98+
_parseConfigFile();
9699

97100
if (vm.count("max-mem-alloc-mb")) {
98101
maxMemAllocatedMB = vm["max-mem-alloc-mb"].as<int>();
@@ -114,11 +117,11 @@ class CommandLineParams {
114117
// Values of the parsed parameters aere stored in the data members defined below.
115118

116119
string parqFileName;
117-
string coldefFileName;
120+
string configFileName;
118121
string csvFileName;
119122

120123
vector<string> columns;
121-
map<string, string> optionalColumnDefs;
124+
set<string> optionalColumns;
122125

123126
int maxMemAllocatedMB = 3000;
124127
int maxBuffSizeMB = 16;
@@ -129,19 +132,44 @@ class CommandLineParams {
129132
bool verbose = false;
130133

131134
private:
132-
void _parseColdDefFile() {
133-
columns.clear();
134-
ifstream columnsFile(coldefFileName);
135-
if (!columnsFile) {
136-
throw runtime_error("Error while opening the columns file.");
135+
void _parseConfigFile() {
136+
ifstream file(configFileName, ios_base::in);
137+
if (!file.good()) throw invalid_argument("Failed to open file: '" + configFileName + "'");
138+
json config;
139+
try {
140+
file >> config;
141+
} catch (...) {
142+
throw runtime_error("Config file: '" + configFileName + "' doesn't have a valid JSON payload");
137143
}
138-
string column;
139-
while (columnsFile >> column) {
140-
columns.push_back(column);
144+
if (!config.is_object()) {
145+
throw invalid_argument("Config file: '" + configFileName + "' is not a valid JSON object");
141146
}
147+
if (!config.contains("columns")) {
148+
throw runtime_error("The JSON file must contain a 'columns' key.");
149+
}
150+
if (!config["columns"].is_array()) {
151+
throw runtime_error("The 'columns' key must contain an array.");
152+
}
153+
columns = config["columns"].get<vector<string>>();
142154
if (columns.empty()) {
143155
throw runtime_error("No columns to be extracted.");
144156
}
157+
optionalColumns.clear();
158+
if (config.contains("optional")) {
159+
if (!config["optional"].is_array()) {
160+
throw runtime_error("The 'optional' key must contain an object.");
161+
}
162+
for (auto const& column : config["optional"].get<vector<string>>()) {
163+
optionalColumns.insert(column);
164+
}
165+
}
166+
// All optional columns must be defined in the 'columns' array.
167+
for (auto const& name : optionalColumns) {
168+
if (find(columns.begin(), columns.end(), name) == columns.end()) {
169+
throw runtime_error("The optional column '" + name +
170+
"' is not defined in the 'columns' array.");
171+
}
172+
}
145173
}
146174
};
147175

@@ -163,23 +191,24 @@ int main(int argc, char const* const* argv) {
163191
cout << "Translating '" << params.parqFileName << "' into '" << params.csvFileName << "'" << endl;
164192
}
165193
part::ParquetFile parqFile(params.parqFileName, params.maxMemAllocatedMB);
166-
if (parqFile.setupBatchReader(maxBuffSizeBytes) != arrow::Status::OK()) {
167-
throw runtime_error("Error while setting up the batch reader.");
168-
}
194+
parqFile.setupBatchReader(maxBuffSizeBytes);
195+
169196
ofstream csvFile(params.csvFileName, ios::out | ios::binary);
170197
if (!csvFile) {
171198
throw runtime_error("Error while opening the output file.");
172199
}
173200
while (true) {
174-
auto status = parqFile.readNextBatch_Table2CSV(buf.get(), buffSize, params.columns,
175-
params.optionalColumnDefs, params.nullStr,
176-
params.delimStr);
177-
if ((status != arrow::Status::OK()) || (buffSize == 0)) break;
178-
if (params.verbose) {
179-
cout << "Writing " << setw(9) << buffSize << " bytes" << endl;
201+
bool const hasMore =
202+
parqFile.readNextBatch_Table2CSV(buf.get(), buffSize, params.columns,
203+
params.optionalColumns, params.nullStr, params.delimStr);
204+
if (buffSize > 0) {
205+
if (params.verbose) {
206+
cout << "Writing " << setw(9) << buffSize << " bytes" << endl;
207+
}
208+
csvFile.write((char*)(buf.get()), buffSize);
209+
numBytesWritten += buffSize;
180210
}
181-
csvFile.write((char*)(buf.get()), buffSize);
182-
numBytesWritten += buffSize;
211+
if (!hasMore || (buffSize == 0)) break;
183212
}
184213
csvFile.close();
185214
if (params.verbose) {

0 commit comments

Comments
 (0)