Skip to content

Commit e07b737

Browse files
committed
Improved and streamlined implementation of the Parquet-to-CSV translator
- Fixed indentation - Fixed an ambiguity in the file type selector - Added an explicit command-line flag to tell the partitioner that all input files are the Parquet files - Added support for optional columns in the translated Parquet files - Improved error reporting to log error messages for errors and raise exceptions
1 parent 9a7fb3f commit e07b737

File tree

7 files changed

+435
-207
lines changed

7 files changed

+435
-207
lines changed

src/partition/CMakeLists.txt

+2-2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ FUNCTION(partition_apps)
3535
partition
3636
boost_filesystem
3737
sphgeom
38-
arrow
39-
parquet
38+
arrow
39+
parquet
4040
)
4141
install(TARGETS ${APP})
4242
ENDFOREACH()

src/partition/CmdLineUtils.cc

+37-27
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
#include <cstdlib>
2626
#include <algorithm>
2727
#include <iostream>
28-
#include <map>
2928
#include <set>
3029
#include <vector>
3130
#include "boost/algorithm/string/predicate.hpp"
@@ -125,6 +124,12 @@ void defineInputOptions(po::options_description& opts) {
125124
"directory, then all the files and symbolic links to files in "
126125
"the directory are treated as inputs. This option must be "
127126
"specified at least once.");
127+
input.add_options()("in.is-parquet", po::bool_switch()->default_value(false),
128+
"If true, input files are assumed to be parquet files.");
129+
input.add_options()("in.parq2csv-quote", po::bool_switch()->default_value(false),
130+
"If true then put double quotes around valid fields when translating "
131+
"the parquet files to CSV. This option is only valid when --in.is-parquet "
132+
"is specified. Note this flag requires --out.csv.no-quote=false.");
128133
opts.add(input);
129134
}
130135

@@ -141,7 +146,6 @@ InputLines const makeInputLines(ConfigStore const& config) {
141146
"using --in.path.");
142147
}
143148
std::vector<fs::path> paths;
144-
bool bIsParquetFile = false;
145149
for (auto&& s : config.get<std::vector<std::string>>("in.path")) {
146150
fs::path p(s);
147151
fs::file_status stat = fs::status(p);
@@ -154,32 +158,38 @@ InputLines const makeInputLines(ConfigStore const& config) {
154158
}
155159
}
156160
}
157-
if (!bIsParquetFile)
158-
bIsParquetFile = (boost::algorithm::ends_with(s.c_str(), ".parquet") ||
159-
boost::algorithm::ends_with(s.c_str(), ".parq"));
160161
}
161162
if (paths.empty()) {
162163
throw std::runtime_error(
163164
"No non-empty input files found among the "
164165
"files and directories specified via --in.path.");
165166
}
166-
167-
// return InputLines(paths, blockSize * MiB, false, names);
168-
if (!bIsParquetFile) return InputLines(paths, blockSize * MiB, false);
167+
if (!config.flag("in.is-parquet")) return InputLines(paths, blockSize * MiB, false);
169168

170169
// In case input files are parquet files, data from config file have to be transfered to the parquet
171170
// reading class Arrow : collect parameter name list to be read from parquet file
172-
std::vector<std::string> names;
173-
std::string st_null = "";
174-
std::string st_delimiter = "";
175-
std::string st_escape = "";
176-
177-
if (config.has("in.csv.field")) names = config.get<std::vector<std::string>>("in.csv.field");
178-
if (config.has("in.csv.null")) st_null = config.get<std::string>("in.csv.null");
179-
if (config.has("in.csv.delimiter")) st_delimiter = config.get<std::string>("in.csv.delimiter");
180-
if (config.has("in.csv.escape")) st_escape = config.get<std::string>("in.csv.escape");
181-
182-
ConfigParamArrow const configParamArrow{names, st_null, st_delimiter, st_escape};
171+
std::vector<std::string> columns;
172+
if (config.has("in.csv.field")) {
173+
columns = config.get<std::vector<std::string>>("in.csv.field");
174+
}
175+
std::set<std::string> optional;
176+
if (config.has("in.csv.optional")) {
177+
for (auto const& column : config.get<std::vector<std::string>>("in.csv.optional")) {
178+
optional.insert(column);
179+
}
180+
}
181+
std::string const st_null =
182+
config.has("in.csv.null") ? config.get<std::string>("in.csv.null") : std::string();
183+
std::string const st_delimiter =
184+
config.has("in.csv.delimiter") ? config.get<std::string>("in.csv.delimiter") : std::string();
185+
std::string const st_escape =
186+
config.has("in.csv.escape") ? config.get<std::string>("in.csv.escape") : std::string();
187+
bool const in_quote = config.flag("in.parq2csv-quote");
188+
bool const out_no_quote = config.has("out.csv.no-quote") ? config.get<bool>("out.csv.no-quote") : false;
189+
if (in_quote && out_no_quote) {
190+
throw std::runtime_error("Option --in.parq2csv-quote=true requires --out.csv.no-quote=false");
191+
}
192+
ConfigParamArrow const configParamArrow{columns, optional, st_null, st_delimiter, st_escape, in_quote};
183193

184194
// Direct parquet file reading is not possible using MT - March 2023
185195
if (config.has("mr.num-workers") && config.get<int>("mr.num-workers") > 1)
@@ -231,21 +241,21 @@ void ensureOutputFieldExists(ConfigStore& config, std::string const& opt) {
231241
if (!config.has(opt)) {
232242
return;
233243
}
234-
std::vector<std::string> names;
244+
std::vector<std::string> columns;
235245
if (!config.has("out.csv.field")) {
236246
if (!config.has("in.csv.field")) {
237-
std::cerr << "Input CSV field names not specified." << std::endl;
247+
std::cerr << "Input CSV column names not specified." << std::endl;
238248
std::exit(EXIT_FAILURE);
239249
}
240-
names = config.get<std::vector<std::string>>("in.csv.field");
250+
columns = config.get<std::vector<std::string>>("in.csv.field");
241251
} else {
242-
names = config.get<std::vector<std::string>>("out.csv.field");
252+
columns = config.get<std::vector<std::string>>("out.csv.field");
243253
}
244-
std::string const name = config.get<std::string>(opt);
245-
if (std::find(names.begin(), names.end(), name) == names.end()) {
246-
names.push_back(name);
254+
std::string const column = config.get<std::string>(opt);
255+
if (std::find(columns.begin(), columns.end(), column) == columns.end()) {
256+
columns.push_back(column);
247257
}
248-
config.set("out.csv.field", names);
258+
config.set("out.csv.field", columns);
249259
}
250260

251261
std::vector<int32_t> const chunksToDuplicate(Chunker const& chunker, ConfigStore const& config) {

src/partition/FileUtils.cc

+5-8
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,7 @@ InputFileArrow::InputFileArrow(fs::path const &path, off_t blockSize)
101101
struct ::stat st;
102102

103103
_batchReader = std::make_unique<lsst::partition::ParquetFile>(path.string());
104-
arrow::Status status = _batchReader->setupBatchReader(blockSize);
105-
if (!status.ok())
106-
throw std::runtime_error("InputArrowFile::" + std::string(__func__) +
107-
": Could not setup Arrow recordbatchreader");
104+
_batchReader->setupBatchReader(blockSize);
108105

109106
int fd = ::open(path.string().c_str(), O_RDONLY);
110107

@@ -138,10 +135,10 @@ void InputFileArrow::read(void *buf, off_t off, size_t sz, int &csvBufferSize,
138135
ConfigParamArrow const &params) const {
139136
uint8_t *cur = static_cast<uint8_t *>(buf);
140137

141-
arrow::Status status = _batchReader->readNextBatch_Table2CSV(cur, csvBufferSize, params.paramNames,
142-
params.str_null, params.str_delimiter);
143-
144-
if (status.ok()) {
138+
auto const success =
139+
_batchReader->readNextBatch_Table2CSV(cur, csvBufferSize, params.columns, params.optionalColumns,
140+
params.str_null, params.str_delimiter, params.quote);
141+
if (success) {
145142
ssize_t n = csvBufferSize;
146143
if (n == 0) {
147144
throw std::runtime_error("InputFileArrow::" + std::string(__func__) + ": received EOF [" +

src/partition/FileUtils.h

+22-13
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,15 @@
2626
#ifndef LSST_PARTITION_FILEUTILS_H
2727
#define LSST_PARTITION_FILEUTILS_H
2828

29+
// System headers
30+
#include <memory>
31+
#include <set>
32+
#include <string>
33+
#include <vector>
2934
#include <sys/types.h>
3035
#include <stdint.h>
3136

37+
// Third-party headers
3238
#include "boost/filesystem.hpp"
3339
#include "boost/static_assert.hpp"
3440

@@ -37,22 +43,25 @@ namespace lsst::partition {
3743
class ParquetFile;
3844

3945
struct ConfigParamArrow {
40-
std::vector<std::string> const paramNames;
46+
std::vector<std::string> const columns;
47+
std::set<std::string> optionalColumns;
4148
std::string str_null;
4249
std::string str_delimiter;
4350
std::string str_escape;
44-
45-
ConfigParamArrow()
46-
: paramNames(std::vector<std::string>()), str_null(""), str_delimiter(""), str_escape("") {}
47-
ConfigParamArrow(std::vector<std::string> const &paramNames, std::string const &vnull,
48-
std::string const &vdelimiter, std::string const &vescape)
49-
: paramNames(paramNames), str_null(vnull), str_delimiter(vdelimiter), str_escape(vescape) {}
50-
51-
ConfigParamArrow(const ConfigParamArrow &v)
52-
: paramNames(v.paramNames),
53-
str_null(v.str_null),
54-
str_delimiter(v.str_delimiter),
55-
str_escape(v.str_escape) {}
51+
bool quote = false;
52+
53+
ConfigParamArrow(std::vector<std::string> const &columns, std::set<std::string> const &optionalColumns,
54+
std::string const &vnull, std::string const &vdelimiter, std::string const &vescape,
55+
bool vquote)
56+
: columns(columns),
57+
optionalColumns(optionalColumns),
58+
str_null(vnull),
59+
str_delimiter(vdelimiter),
60+
str_escape(vescape),
61+
quote(vquote) {}
62+
63+
ConfigParamArrow() = default;
64+
ConfigParamArrow(const ConfigParamArrow &v) = default;
5665
ConfigParamArrow &operator=(const ConfigParamArrow &) = delete;
5766
};
5867

0 commit comments

Comments
 (0)