Skip to content

DM-48759: Improved and extended implementation of the Parquet translator in Qserv #897

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/partition/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ FUNCTION(partition_apps)
partition
boost_filesystem
sphgeom
arrow
parquet
arrow
parquet
)
install(TARGETS ${APP})
ENDFOREACH()
Expand All @@ -48,6 +48,7 @@ partition_apps(
sph-estimate-stats
sph-htm-index
sph-layout
sph-parq2csv
sph-partition-matches
sph-partition
)
Expand Down
64 changes: 37 additions & 27 deletions src/partition/CmdLineUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <cstdlib>
#include <algorithm>
#include <iostream>
#include <map>
#include <set>
#include <vector>
#include "boost/algorithm/string/predicate.hpp"
Expand Down Expand Up @@ -125,6 +124,12 @@ void defineInputOptions(po::options_description& opts) {
"directory, then all the files and symbolic links to files in "
"the directory are treated as inputs. This option must be "
"specified at least once.");
input.add_options()("in.is-parquet", po::bool_switch()->default_value(false),
"If true, input files are assumed to be parquet files.");
input.add_options()("in.parq2csv-quote", po::bool_switch()->default_value(false),
"If true then put double quotes around valid fields when translating "
"the parquet files to CSV. This option is only valid when --in.is-parquet "
"is specified. Note this flag requires --out.csv.no-quote=false.");
opts.add(input);
}

Expand All @@ -141,7 +146,6 @@ InputLines const makeInputLines(ConfigStore const& config) {
"using --in.path.");
}
std::vector<fs::path> paths;
bool bIsParquetFile = false;
for (auto&& s : config.get<std::vector<std::string>>("in.path")) {
fs::path p(s);
fs::file_status stat = fs::status(p);
Expand All @@ -154,32 +158,38 @@ InputLines const makeInputLines(ConfigStore const& config) {
}
}
}
if (!bIsParquetFile)
bIsParquetFile = (boost::algorithm::ends_with(s.c_str(), ".parquet") ||
boost::algorithm::ends_with(s.c_str(), ".parq"));
}
if (paths.empty()) {
throw std::runtime_error(
"No non-empty input files found among the "
"files and directories specified via --in.path.");
}

// return InputLines(paths, blockSize * MiB, false, names);
if (!bIsParquetFile) return InputLines(paths, blockSize * MiB, false);
if (!config.flag("in.is-parquet")) return InputLines(paths, blockSize * MiB, false);

// In case input files are parquet files, data from config file have to be transfered to the parquet
// reading class Arrow : collect parameter name list to be read from parquet file
std::vector<std::string> names;
std::string st_null = "";
std::string st_delimiter = "";
std::string st_escape = "";

if (config.has("in.csv.field")) names = config.get<std::vector<std::string>>("in.csv.field");
if (config.has("in.csv.null")) st_null = config.get<std::string>("in.csv.null");
if (config.has("in.csv.delimiter")) st_delimiter = config.get<std::string>("in.csv.delimiter");
if (config.has("in.csv.escape")) st_escape = config.get<std::string>("in.csv.escape");

ConfigParamArrow const configParamArrow{names, st_null, st_delimiter, st_escape};
std::vector<std::string> columns;
if (config.has("in.csv.field")) {
columns = config.get<std::vector<std::string>>("in.csv.field");
}
std::set<std::string> optional;
if (config.has("in.csv.optional")) {
for (auto const& column : config.get<std::vector<std::string>>("in.csv.optional")) {
optional.insert(column);
}
}
std::string const st_null =
config.has("in.csv.null") ? config.get<std::string>("in.csv.null") : std::string();
std::string const st_delimiter =
config.has("in.csv.delimiter") ? config.get<std::string>("in.csv.delimiter") : std::string();
std::string const st_escape =
config.has("in.csv.escape") ? config.get<std::string>("in.csv.escape") : std::string();
bool const in_quote = config.flag("in.parq2csv-quote");
bool const out_no_quote = config.has("out.csv.no-quote") ? config.get<bool>("out.csv.no-quote") : false;
if (in_quote && out_no_quote) {
throw std::runtime_error("Option --in.parq2csv-quote=true requires --out.csv.no-quote=false");
}
ConfigParamArrow const configParamArrow{columns, optional, st_null, st_delimiter, st_escape, in_quote};

// Direct parquet file reading is not possible using MT - March 2023
if (config.has("mr.num-workers") && config.get<int>("mr.num-workers") > 1)
Expand Down Expand Up @@ -231,21 +241,21 @@ void ensureOutputFieldExists(ConfigStore& config, std::string const& opt) {
if (!config.has(opt)) {
return;
}
std::vector<std::string> names;
std::vector<std::string> columns;
if (!config.has("out.csv.field")) {
if (!config.has("in.csv.field")) {
std::cerr << "Input CSV field names not specified." << std::endl;
std::cerr << "Input CSV column names not specified." << std::endl;
std::exit(EXIT_FAILURE);
}
names = config.get<std::vector<std::string>>("in.csv.field");
columns = config.get<std::vector<std::string>>("in.csv.field");
} else {
names = config.get<std::vector<std::string>>("out.csv.field");
columns = config.get<std::vector<std::string>>("out.csv.field");
}
std::string const name = config.get<std::string>(opt);
if (std::find(names.begin(), names.end(), name) == names.end()) {
names.push_back(name);
std::string const column = config.get<std::string>(opt);
if (std::find(columns.begin(), columns.end(), column) == columns.end()) {
columns.push_back(column);
}
config.set("out.csv.field", names);
config.set("out.csv.field", columns);
}

std::vector<int32_t> const chunksToDuplicate(Chunker const& chunker, ConfigStore const& config) {
Expand Down
13 changes: 5 additions & 8 deletions src/partition/FileUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ InputFileArrow::InputFileArrow(fs::path const &path, off_t blockSize)
struct ::stat st;

_batchReader = std::make_unique<lsst::partition::ParquetFile>(path.string());
arrow::Status status = _batchReader->setupBatchReader(blockSize);
if (!status.ok())
throw std::runtime_error("InputArrowFile::" + std::string(__func__) +
": Could not setup Arrow recordbatchreader");
_batchReader->setupBatchReader(blockSize);

int fd = ::open(path.string().c_str(), O_RDONLY);

Expand Down Expand Up @@ -138,10 +135,10 @@ void InputFileArrow::read(void *buf, off_t off, size_t sz, int &csvBufferSize,
ConfigParamArrow const &params) const {
uint8_t *cur = static_cast<uint8_t *>(buf);

arrow::Status status = _batchReader->readNextBatch_Table2CSV(cur, csvBufferSize, params.paramNames,
params.str_null, params.str_delimiter);

if (status.ok()) {
auto const success =
_batchReader->readNextBatch_Table2CSV(cur, csvBufferSize, params.columns, params.optionalColumns,
params.str_null, params.str_delimiter, params.quote);
if (success) {
ssize_t n = csvBufferSize;
if (n == 0) {
throw std::runtime_error("InputFileArrow::" + std::string(__func__) + ": received EOF [" +
Expand Down
35 changes: 22 additions & 13 deletions src/partition/FileUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@
#ifndef LSST_PARTITION_FILEUTILS_H
#define LSST_PARTITION_FILEUTILS_H

// System headers
#include <memory>
#include <set>
#include <string>
#include <vector>
#include <sys/types.h>
#include <stdint.h>

// Third-party headers
#include "boost/filesystem.hpp"
#include "boost/static_assert.hpp"

Expand All @@ -37,22 +43,25 @@ namespace lsst::partition {
class ParquetFile;

struct ConfigParamArrow {
std::vector<std::string> const paramNames;
std::vector<std::string> const columns;
std::set<std::string> optionalColumns;
std::string str_null;
std::string str_delimiter;
std::string str_escape;

ConfigParamArrow()
: paramNames(std::vector<std::string>()), str_null(""), str_delimiter(""), str_escape("") {}
ConfigParamArrow(std::vector<std::string> const &paramNames, std::string const &vnull,
std::string const &vdelimiter, std::string const &vescape)
: paramNames(paramNames), str_null(vnull), str_delimiter(vdelimiter), str_escape(vescape) {}

ConfigParamArrow(const ConfigParamArrow &v)
: paramNames(v.paramNames),
str_null(v.str_null),
str_delimiter(v.str_delimiter),
str_escape(v.str_escape) {}
bool quote = false;

ConfigParamArrow(std::vector<std::string> const &columns, std::set<std::string> const &optionalColumns,
std::string const &vnull, std::string const &vdelimiter, std::string const &vescape,
bool vquote)
: columns(columns),
optionalColumns(optionalColumns),
str_null(vnull),
str_delimiter(vdelimiter),
str_escape(vescape),
quote(vquote) {}

ConfigParamArrow() = default;
ConfigParamArrow(const ConfigParamArrow &v) = default;
ConfigParamArrow &operator=(const ConfigParamArrow &) = delete;
};

Expand Down
Loading