Skip to content

Commit d97c911

Browse files
committed
Standalone tool for translating Parquet files into CSV
1 parent 4e0d95f commit d97c911

File tree

2 files changed

+64
-30
lines changed

2 files changed

+64
-30
lines changed

src/partition/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ partition_apps(
4848
sph-estimate-stats
4949
sph-htm-index
5050
sph-layout
51+
sph-parq2csv
5152
sph-partition-matches
5253
sph-partition
5354
)

src/partition/sph-parq2csv.cc

+63-30
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,23 @@
2222
#include <iomanip>
2323
#include <iostream>
2424
#include <fstream>
25-
#include <map>
2625
#include <memory>
26+
#include <set>
2727
#include <stdexcept>
2828
#include <string>
2929
#include <vector>
3030

31+
// Third party headers
3132
#include "boost/program_options.hpp"
33+
#include "nlohmann/json.hpp"
3234

3335
#include "partition/ParquetInterface.h"
3436

3537
namespace po = boost::program_options;
3638
namespace part = lsst::partition;
3739

3840
using namespace std;
41+
using json = nlohmann::json;
3942

4043
namespace {
4144

@@ -60,19 +63,20 @@ class CommandLineParams {
6063
po::options_description desc(help, 80);
6164
desc.add_options()("help,h", "Produce this help");
6265
desc.add_options()("verbose,v", "Produce verbose output.");
66+
desc.add_options()("csv-quote-fields", "Double quote fields as needed in the generated CSV.");
6367
desc.add_options()("max-proc-mem-mb", po::value<int>()->default_value(maxMemAllocatedMB),
6468
"Max size (MB) of RAM allocated to the process.");
6569
desc.add_options()("buf-size-mb", po::value<int>()->default_value(maxBuffSizeMB),
6670
"Buffers size (MB) for translating batches.");
67-
desc.add_options()("parq-file", po::value<vector<string>>(), "Input file to be translated.");
68-
desc.add_options()("coldef-file", po::value<vector<string>>(),
69-
"Input file with the names of columns to be extracted.");
70-
desc.add_options()("csv-file", po::value<vector<string>>(), "Output file to be written.");
71+
desc.add_options()("parq", po::value<vector<string>>(), "Input file to be translated.");
72+
desc.add_options()("config", po::value<vector<string>>(),
73+
"Input JSON file with definition of columns to be extracted.");
74+
desc.add_options()("csv", po::value<vector<string>>(), "Output file to be written.");
7175

7276
po::positional_options_description p;
73-
p.add("parq-file", 1);
74-
p.add("coldef-file", 1);
75-
p.add("csv-file", 1);
77+
p.add("parq", 1);
78+
p.add("config", 1);
79+
p.add("csv", 1);
7680

7781
po::variables_map vm;
7882
po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
@@ -82,17 +86,17 @@ class CommandLineParams {
8286
cout << desc << "\n";
8387
return false;
8488
}
85-
parqFileName = vm.count("parq-file") ? vm["parq-file"].as<vector<string>>().front() : string();
86-
coldefFileName = vm.count("coldef-file") ? vm["coldef-file"].as<vector<string>>().front() : string();
87-
csvFileName = vm.count("csv-file") ? vm["csv-file"].as<vector<string>>().front() : string();
89+
parqFileName = vm.count("parq") ? vm["parq"].as<vector<string>>().front() : string();
90+
configFileName = vm.count("config") ? vm["config"].as<vector<string>>().front() : string();
91+
csvFileName = vm.count("csv") ? vm["csv"].as<vector<string>>().front() : string();
8892

89-
if (parqFileName.empty() || coldefFileName.empty() || csvFileName.empty()) {
93+
if (parqFileName.empty() || configFileName.empty() || csvFileName.empty()) {
9094
throw runtime_error("The names of all required files must be provided.");
9195
}
9296
if (csvFileName == parqFileName) {
9397
throw runtime_error("Input and output file names must be different.");
9498
}
95-
_parseColdDefFile();
99+
_parseConfigFile();
96100

97101
if (vm.count("max-mem-alloc-mb")) {
98102
maxMemAllocatedMB = vm["max-mem-alloc-mb"].as<int>();
@@ -107,18 +111,19 @@ class CommandLineParams {
107111
throw runtime_error("Buffer size (MB) must be in a range of [1,1024].");
108112
}
109113
verbose = vm.count("verbose") != 0;
114+
quote = vm.count("csv-quote-fields") != 0;
110115

111116
return true;
112117
}
113118

114119
// Values of the parsed parameters aere stored in the data members defined below.
115120

116121
string parqFileName;
117-
string coldefFileName;
122+
string configFileName;
118123
string csvFileName;
119124

120125
vector<string> columns;
121-
map<string, string> optionalColumnDefs;
126+
set<string> optionalColumns;
122127

123128
int maxMemAllocatedMB = 3000;
124129
int maxBuffSizeMB = 16;
@@ -127,21 +132,47 @@ class CommandLineParams {
127132
string const delimStr = "\t";
128133

129134
bool verbose = false;
135+
bool quote = false;
130136

131137
private:
132-
void _parseColdDefFile() {
133-
columns.clear();
134-
ifstream columnsFile(coldefFileName);
135-
if (!columnsFile) {
136-
throw runtime_error("Error while opening the columns file.");
138+
void _parseConfigFile() {
139+
ifstream file(configFileName, ios_base::in);
140+
if (!file.good()) throw invalid_argument("Failed to open file: '" + configFileName + "'");
141+
json config;
142+
try {
143+
file >> config;
144+
} catch (...) {
145+
throw runtime_error("Config file: '" + configFileName + "' doesn't have a valid JSON payload");
137146
}
138-
string column;
139-
while (columnsFile >> column) {
140-
columns.push_back(column);
147+
if (!config.is_object()) {
148+
throw invalid_argument("Config file: '" + configFileName + "' is not a valid JSON object");
141149
}
150+
if (!config.contains("columns")) {
151+
throw runtime_error("The JSON file must contain a 'columns' key.");
152+
}
153+
if (!config["columns"].is_array()) {
154+
throw runtime_error("The 'columns' key must contain an array.");
155+
}
156+
columns = config["columns"].get<vector<string>>();
142157
if (columns.empty()) {
143158
throw runtime_error("No columns to be extracted.");
144159
}
160+
optionalColumns.clear();
161+
if (config.contains("optional")) {
162+
if (!config["optional"].is_array()) {
163+
throw runtime_error("The 'optional' key must contain an object.");
164+
}
165+
for (auto const& column : config["optional"].get<vector<string>>()) {
166+
optionalColumns.insert(column);
167+
}
168+
}
169+
// All optional columns must be defined in the 'columns' array.
170+
for (auto const& name : optionalColumns) {
171+
if (find(columns.begin(), columns.end(), name) == columns.end()) {
172+
throw runtime_error("The optional column '" + name +
173+
"' is not defined in the 'columns' array.");
174+
}
175+
}
145176
}
146177
};
147178

@@ -163,18 +194,20 @@ int main(int argc, char const* const* argv) {
163194
cout << "Translating '" << params.parqFileName << "' into '" << params.csvFileName << "'" << endl;
164195
}
165196
part::ParquetFile parqFile(params.parqFileName, params.maxMemAllocatedMB);
166-
if (parqFile.setupBatchReader(maxBuffSizeBytes) != arrow::Status::OK()) {
167-
throw runtime_error("Error while setting up the batch reader.");
168-
}
197+
parqFile.setupBatchReader(maxBuffSizeBytes);
198+
169199
ofstream csvFile(params.csvFileName, ios::out | ios::binary);
170200
if (!csvFile) {
171201
throw runtime_error("Error while opening the output file.");
172202
}
173203
while (true) {
174-
auto status = parqFile.readNextBatch_Table2CSV(buf.get(), buffSize, params.columns,
175-
params.optionalColumnDefs, params.nullStr,
176-
params.delimStr);
177-
if ((status != arrow::Status::OK()) || (buffSize == 0)) break;
204+
bool const success = parqFile.readNextBatch_Table2CSV(buf.get(), buffSize, params.columns,
205+
params.optionalColumns, params.nullStr,
206+
params.delimStr, params.quote);
207+
if (!success) break;
208+
if (buffSize == 0) {
209+
throw runtime_error("Received EOF while reading the file.");
210+
}
178211
if (params.verbose) {
179212
cout << "Writing " << setw(9) << buffSize << " bytes" << endl;
180213
}

0 commit comments

Comments
 (0)