diff --git a/src/partition/CMakeLists.txt b/src/partition/CMakeLists.txt index 6bf3150821..731ce933df 100644 --- a/src/partition/CMakeLists.txt +++ b/src/partition/CMakeLists.txt @@ -35,8 +35,8 @@ FUNCTION(partition_apps) partition boost_filesystem sphgeom - arrow - parquet + arrow + parquet ) install(TARGETS ${APP}) ENDFOREACH() @@ -48,6 +48,7 @@ partition_apps( sph-estimate-stats sph-htm-index sph-layout + sph-parq2csv sph-partition-matches sph-partition ) diff --git a/src/partition/CmdLineUtils.cc b/src/partition/CmdLineUtils.cc index 078532c686..78bc7e567e 100644 --- a/src/partition/CmdLineUtils.cc +++ b/src/partition/CmdLineUtils.cc @@ -25,7 +25,6 @@ #include #include #include -#include #include #include #include "boost/algorithm/string/predicate.hpp" @@ -125,6 +124,12 @@ void defineInputOptions(po::options_description& opts) { "directory, then all the files and symbolic links to files in " "the directory are treated as inputs. This option must be " "specified at least once."); + input.add_options()("in.is-parquet", po::bool_switch()->default_value(false), + "If true, input files are assumed to be parquet files."); + input.add_options()("in.parq2csv-quote", po::bool_switch()->default_value(false), + "If true then put double quotes around valid fields when translating " + "the parquet files to CSV. This option is only valid when --in.is-parquet " + "is specified. Note this flag requires --out.csv.no-quote=false."); opts.add(input); } @@ -141,7 +146,6 @@ InputLines const makeInputLines(ConfigStore const& config) { "using --in.path."); } std::vector paths; - bool bIsParquetFile = false; for (auto&& s : config.get>("in.path")) { fs::path p(s); fs::file_status stat = fs::status(p); @@ -154,32 +158,38 @@ InputLines const makeInputLines(ConfigStore const& config) { } } } - if (!bIsParquetFile) - bIsParquetFile = (boost::algorithm::ends_with(s.c_str(), ".parquet") || - boost::algorithm::ends_with(s.c_str(), ".parq")); } if (paths.empty()) { throw std::runtime_error( "No non-empty input files found among the " "files and directories specified via --in.path."); } - - // return InputLines(paths, blockSize * MiB, false, names); - if (!bIsParquetFile) return InputLines(paths, blockSize * MiB, false); + if (!config.flag("in.is-parquet")) return InputLines(paths, blockSize * MiB, false); // In case input files are parquet files, data from config file have to be transfered to the parquet // reading class Arrow : collect parameter name list to be read from parquet file - std::vector names; - std::string st_null = ""; - std::string st_delimiter = ""; - std::string st_escape = ""; - - if (config.has("in.csv.field")) names = config.get>("in.csv.field"); - if (config.has("in.csv.null")) st_null = config.get("in.csv.null"); - if (config.has("in.csv.delimiter")) st_delimiter = config.get("in.csv.delimiter"); - if (config.has("in.csv.escape")) st_escape = config.get("in.csv.escape"); - - ConfigParamArrow const configParamArrow{names, st_null, st_delimiter, st_escape}; + std::vector columns; + if (config.has("in.csv.field")) { + columns = config.get>("in.csv.field"); + } + std::set optional; + if (config.has("in.csv.optional")) { + for (auto const& column : config.get>("in.csv.optional")) { + optional.insert(column); + } + } + std::string const st_null = + config.has("in.csv.null") ? config.get("in.csv.null") : std::string(); + std::string const st_delimiter = + config.has("in.csv.delimiter") ? config.get("in.csv.delimiter") : std::string(); + std::string const st_escape = + config.has("in.csv.escape") ? config.get("in.csv.escape") : std::string(); + bool const in_quote = config.flag("in.parq2csv-quote"); + bool const out_no_quote = config.has("out.csv.no-quote") ? config.get("out.csv.no-quote") : false; + if (in_quote && out_no_quote) { + throw std::runtime_error("Option --in.parq2csv-quote=true requires --out.csv.no-quote=false"); + } + ConfigParamArrow const configParamArrow{columns, optional, st_null, st_delimiter, st_escape, in_quote}; // Direct parquet file reading is not possible using MT - March 2023 if (config.has("mr.num-workers") && config.get("mr.num-workers") > 1) @@ -231,21 +241,21 @@ void ensureOutputFieldExists(ConfigStore& config, std::string const& opt) { if (!config.has(opt)) { return; } - std::vector names; + std::vector columns; if (!config.has("out.csv.field")) { if (!config.has("in.csv.field")) { - std::cerr << "Input CSV field names not specified." << std::endl; + std::cerr << "Input CSV column names not specified." << std::endl; std::exit(EXIT_FAILURE); } - names = config.get>("in.csv.field"); + columns = config.get>("in.csv.field"); } else { - names = config.get>("out.csv.field"); + columns = config.get>("out.csv.field"); } - std::string const name = config.get(opt); - if (std::find(names.begin(), names.end(), name) == names.end()) { - names.push_back(name); + std::string const column = config.get(opt); + if (std::find(columns.begin(), columns.end(), column) == columns.end()) { + columns.push_back(column); } - config.set("out.csv.field", names); + config.set("out.csv.field", columns); } std::vector const chunksToDuplicate(Chunker const& chunker, ConfigStore const& config) { diff --git a/src/partition/FileUtils.cc b/src/partition/FileUtils.cc index dd5b4f6068..37f3b9e357 100644 --- a/src/partition/FileUtils.cc +++ b/src/partition/FileUtils.cc @@ -101,10 +101,7 @@ InputFileArrow::InputFileArrow(fs::path const &path, off_t blockSize) struct ::stat st; _batchReader = std::make_unique(path.string()); - arrow::Status status = _batchReader->setupBatchReader(blockSize); - if (!status.ok()) - throw std::runtime_error("InputArrowFile::" + std::string(__func__) + - ": Could not setup Arrow recordbatchreader"); + _batchReader->setupBatchReader(blockSize); int fd = ::open(path.string().c_str(), O_RDONLY); @@ -138,10 +135,10 @@ void InputFileArrow::read(void *buf, off_t off, size_t sz, int &csvBufferSize, ConfigParamArrow const ¶ms) const { uint8_t *cur = static_cast(buf); - arrow::Status status = _batchReader->readNextBatch_Table2CSV(cur, csvBufferSize, params.paramNames, - params.str_null, params.str_delimiter); - - if (status.ok()) { + auto const success = + _batchReader->readNextBatch_Table2CSV(cur, csvBufferSize, params.columns, params.optionalColumns, + params.str_null, params.str_delimiter, params.quote); + if (success) { ssize_t n = csvBufferSize; if (n == 0) { throw std::runtime_error("InputFileArrow::" + std::string(__func__) + ": received EOF [" + diff --git a/src/partition/FileUtils.h b/src/partition/FileUtils.h index 2a27bd664b..68c49c2a1c 100644 --- a/src/partition/FileUtils.h +++ b/src/partition/FileUtils.h @@ -26,9 +26,15 @@ #ifndef LSST_PARTITION_FILEUTILS_H #define LSST_PARTITION_FILEUTILS_H +// System headers +#include +#include +#include +#include #include #include +// Third-party headers #include "boost/filesystem.hpp" #include "boost/static_assert.hpp" @@ -37,22 +43,25 @@ namespace lsst::partition { class ParquetFile; struct ConfigParamArrow { - std::vector const paramNames; + std::vector const columns; + std::set optionalColumns; std::string str_null; std::string str_delimiter; std::string str_escape; - - ConfigParamArrow() - : paramNames(std::vector()), str_null(""), str_delimiter(""), str_escape("") {} - ConfigParamArrow(std::vector const ¶mNames, std::string const &vnull, - std::string const &vdelimiter, std::string const &vescape) - : paramNames(paramNames), str_null(vnull), str_delimiter(vdelimiter), str_escape(vescape) {} - - ConfigParamArrow(const ConfigParamArrow &v) - : paramNames(v.paramNames), - str_null(v.str_null), - str_delimiter(v.str_delimiter), - str_escape(v.str_escape) {} + bool quote = false; + + ConfigParamArrow(std::vector const &columns, std::set const &optionalColumns, + std::string const &vnull, std::string const &vdelimiter, std::string const &vescape, + bool vquote) + : columns(columns), + optionalColumns(optionalColumns), + str_null(vnull), + str_delimiter(vdelimiter), + str_escape(vescape), + quote(vquote) {} + + ConfigParamArrow() = default; + ConfigParamArrow(const ConfigParamArrow &v) = default; ConfigParamArrow &operator=(const ConfigParamArrow &) = delete; }; diff --git a/src/partition/ParquetInterface.cc b/src/partition/ParquetInterface.cc index 5309c257e4..e09daf02fc 100644 --- a/src/partition/ParquetInterface.cc +++ b/src/partition/ParquetInterface.cc @@ -18,6 +18,11 @@ // Class header #include "partition/ParquetInterface.h" +// System headers +#include +#include +#include + // Third party headers #include #include @@ -27,6 +32,7 @@ namespace { LOG_LOGGER _log = LOG_GET("lsst.qserv.partitioner"); +char const* prefix = "Parquet : "; } // namespace namespace lsst::partition { @@ -37,15 +43,13 @@ std::map, int> typeBufSize{ {arrow::boolean(), 1}, {arrow::float16(), 20}, {arrow::float32(), 20}, {arrow::float64(), 20}, {arrow::float16(), 20}, {arrow::date32(), 20}, {arrow::date64(), 20}}; -ParquetFile::ParquetFile(std::string fileName, int maxMemAllocated) - : _path_to_file(fileName), - _maxMemory(maxMemAllocated), - _vmRSS_init(0), - _batchNumber(0), - _batchSize(0) { - LOGS(_log, LOG_LVL_DEBUG, "Partitioner parquet interface..."); +ParquetFile::ParquetFile(std::string fileName, int maxMemAllocatedMB) + : _path_to_file(fileName), _maxMemoryMB(maxMemAllocatedMB), _vmRSS_init(0), _batchSize(0) { + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Created"); } +ParquetFile::~ParquetFile() { LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Destroyed"); } + int ParquetFile::_dumpProcessMemory(std::string idValue, bool bVerbose) const { int tSize = 0, resident = 0, share = 0; std::ifstream buffer("/proc/self/statm"); @@ -59,10 +63,10 @@ int ParquetFile::_dumpProcessMemory(std::string idValue, bool bVerbose) const { double shared_mem = (share * page_size_kb) / 1024.0; if (bVerbose) { - LOGS(_log, LOG_LVL_DEBUG, "VmSize - " << vmSize << " MB "); - LOGS(_log, LOG_LVL_DEBUG, "VmRSS - " << rss << " MB "); - LOGS(_log, LOG_LVL_DEBUG, "Shared Memory - " << shared_mem << " MB "); - LOGS(_log, LOG_LVL_DEBUG, "Private Memory - " << rss - shared_mem << "MB"); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "VmSize [MB] : " << vmSize); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "VmRSS [MB] : " << rss); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Shared Memory [MB] : " << shared_mem); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Private Memory [MB] : " << rss - shared_mem); } if (!idValue.empty()) { @@ -81,7 +85,7 @@ int ParquetFile::_getRecordSize(std::shared_ptr schema, int strin if (fieldSize < 0) fieldSize = stringDefaultSize; recordSize += fieldSize; } - LOGS(_log, LOG_LVL_DEBUG, "Record size (Bytes) " << recordSize); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Record size [Bytes] : " << recordSize); return recordSize; } @@ -97,14 +101,15 @@ int ParquetFile::_getStringRecordSize(std::shared_ptr schema, int recordSize += fieldSize; recordSize++; } - LOGS(_log, LOG_LVL_DEBUG, "Record size (approx. CSV string length) " << recordSize); + LOGS(_log, LOG_LVL_DEBUG, + ::prefix << "Record size (approx. CSV string length) [Bytes] : " << recordSize); return recordSize; } -arrow::Status ParquetFile::setupBatchReader(int maxBufferSize) { +void ParquetFile::setupBatchReader(int maxBufferSize) { _vmRSS_init = _dumpProcessMemory("VmRSS", true); - int fileRowNumber = _getTotalRowNumber(_path_to_file); + _getTotals(); arrow::MemoryPool* pool = arrow::default_memory_pool(); @@ -119,12 +124,12 @@ arrow::Status ParquetFile::setupBatchReader(int maxBufferSize) { arrow_reader_props.set_batch_size(_batchSize); // default 64 * 1024 parquet::arrow::FileReaderBuilder reader_builder; - ARROW_RETURN_NOT_OK(reader_builder.OpenFile(_path_to_file, /*memory_map=*/false, reader_properties)); + PARQUET_THROW_NOT_OK(reader_builder.OpenFile(_path_to_file, /*memory_map=*/false, reader_properties)); reader_builder.memory_pool(pool); reader_builder.properties(arrow_reader_props); - ARROW_ASSIGN_OR_RAISE(_arrow_reader_gbl, reader_builder.Build()); - ARROW_ASSIGN_OR_RAISE(_rb_reader_gbl, _arrow_reader_gbl->GetRecordBatchReader()); + PARQUET_ASSIGN_OR_THROW(_arrow_reader_gbl, reader_builder.Build()); + PARQUET_ASSIGN_OR_THROW(_rb_reader_gbl, _arrow_reader_gbl->GetRecordBatchReader()); // Compute the nimber of lines read by each batch in function of the maximum memory // allocated to the process @@ -132,163 +137,165 @@ arrow::Status ParquetFile::setupBatchReader(int maxBufferSize) { arrow::Status st = _arrow_reader_gbl->GetSchema(&schema); _recordSize = _getRecordSize(schema); - double tmp = double(_maxMemory) * 1024 * 1024 * 0.85; - LOGS(_log, LOG_LVL_DEBUG, "Batch size mem " << tmp); + double tmp = double(_maxMemoryMB) * 1024 * 1024 * 0.85; + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Batch size mem [Bytes] : " << tmp); int64_t batchSize_mem = int64_t(tmp / _recordSize); // .85 is a "a la louche" factor - LOGS(_log, LOG_LVL_DEBUG, - "Max RAM (MB): " << _maxMemory << " // record size : " << _recordSize - << " -> batch size : " << batchSize_mem); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Max RAM [MB] : " << _maxMemoryMB); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Record size [Bytes] : " << _recordSize); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Batch size [Bytes] : " << batchSize_mem); int64_t batchSize_buf = -1; _maxBufferSize = maxBufferSize; if (maxBufferSize > 0) { _recordBufferSize = _getStringRecordSize(schema); - // batchSize_buf = int((maxBufferSize*1024*1024)/_recordBufferSize); batchSize_buf = int(maxBufferSize / _recordBufferSize); - LOGS(_log, LOG_LVL_DEBUG, - "\nMax buffer size : " << maxBufferSize << " vs " << _recordBufferSize - << " -> batch size : " << batchSize_buf); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Max buffer size [Bytes] : " << maxBufferSize); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Record buffer size [Bytes] : " << _recordBufferSize); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Batch buffer size [Bytes] : " << batchSize_buf); } _batchSize = std::min(batchSize_mem, batchSize_buf); _arrow_reader_gbl->set_batch_size(_batchSize); - _totalBatchNumber = int(fileRowNumber / _batchSize); - if (_totalBatchNumber * _batchSize < fileRowNumber) _totalBatchNumber++; + _totalBatchNumber = int(_numRowsTotal / _batchSize); + if (_totalBatchNumber * _batchSize < _numRowsTotal) _totalBatchNumber++; - LOGS(_log, LOG_LVL_DEBUG, "Number of rows : " << fileRowNumber << " batchSize " << _batchSize); - LOGS(_log, LOG_LVL_DEBUG, "RecordBatchReader : batch number " << _totalBatchNumber); - return arrow::Status::OK(); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "RecordBatchReader : batchSize [Bytes] : " << _batchSize); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "RecordBatchReader : batch number : " << _totalBatchNumber); } -int ParquetFile::_getTotalRowNumber(std::string fileName) const { +void ParquetFile::_getTotals() { std::shared_ptr infile; - PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(fileName, arrow::default_memory_pool())); + PARQUET_ASSIGN_OR_THROW(infile, + arrow::io::ReadableFile::Open(_path_to_file, arrow::default_memory_pool())); + _fileSize = infile->GetSize().ValueOrDie(); std::unique_ptr reader; PARQUET_ASSIGN_OR_THROW(reader, parquet::arrow::OpenFile(infile, arrow::default_memory_pool())); + _numRowGroups = reader->num_row_groups(); std::shared_ptr metadata = reader->parquet_reader()->metadata(); + _numRowsTotal = metadata->num_rows(); - return metadata->num_rows(); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Total file size [Bytes] : " << _fileSize); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Number of row groups : " << _numRowGroups); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Number of rows : " << _numRowsTotal); } -arrow::Status ParquetFile::readNextBatch_Table2CSV(void* buf, int& buffSize, - std::vector const& params, - std::string const& nullStr, std::string const& delimStr) { +bool ParquetFile::readNextBatch_Table2CSV(void* buf, int& buffSize, std::vector const& columns, + std::set const& optionalColumns, + std::string const& nullStr, std::string const& delimStr, + bool quote) { std::shared_ptr table_loc; - _parameterNames = params; - // Get the next data batch, data are formated - arrow::Status batchStatus = _readNextBatchTable_Formatted(table_loc); + _columns = columns; + _optionalColumns = optionalColumns; - if (!batchStatus.ok()) return arrow::Status::ExecutionError("Error while reading and formating batch"); - - arrow::Status status = _table2CSVBuffer(table_loc, buffSize, buf, nullStr, delimStr); - - if (status.ok()) return arrow::Status::OK(); - - return arrow::Status::ExecutionError("Error while writing table to CSV buffer"); + // Get the next data batch (if any is still left), data are formated + if (_readNextBatchTable_Formatted(table_loc)) { + _table2CSVBuffer(table_loc, buffSize, buf, nullStr, delimStr, quote); + return true; + } + return false; } -arrow::Status ParquetFile::_table2CSVBuffer(std::shared_ptr const& table, int& buffSize, - void* buf, std::string const& nullStr, - std::string const& delimStr) { - ARROW_ASSIGN_OR_RAISE(auto outstream, arrow::io::BufferOutputStream::Create(1 << 10)); +void ParquetFile::_table2CSVBuffer(std::shared_ptr const& table, int& buffSize, void* buf, + std::string const& nullStr, std::string const& delimStr, bool quote) { + PARQUET_ASSIGN_OR_THROW(auto outstream, arrow::io::BufferOutputStream::Create(1 << 10)); // Options : null string, no header, no quotes around strings arrow::csv::WriteOptions writeOpt = arrow::csv::WriteOptions::Defaults(); writeOpt.null_string = nullStr; writeOpt.delimiter = delimStr[0]; writeOpt.include_header = false; - writeOpt.quoting_style = arrow::csv::QuotingStyle::None; + writeOpt.quoting_style = quote ? arrow::csv::QuotingStyle::AllValid : arrow::csv::QuotingStyle::None; - ARROW_RETURN_NOT_OK(arrow::csv::WriteCSV(*table, writeOpt, outstream.get())); - ARROW_ASSIGN_OR_RAISE(auto buffer, outstream->Finish()); + arrow::Status status = arrow::csv::WriteCSV(*table, writeOpt, outstream.get()); + if (!status.ok()) { + std::string const msg = "Error while writing table to CSV buffer"; + LOGS(_log, LOG_LVL_ERROR, ::prefix << msg); + throw std::runtime_error(msg); + } + PARQUET_ASSIGN_OR_THROW(auto buffer, outstream->Finish()); - // auto buffer_ptr = buffer.get()->data(); buffSize = buffer->size(); - LOGS(_log, LOG_LVL_DEBUG, - "ParquetFile::Table2CSVBuffer - buffer length : " << buffSize << " // " << _maxBufferSize); + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Buffer size [Bytes] : " << buffSize << " of " << _maxBufferSize); memcpy(buf, (void*)buffer.get()->data(), buffer->size()); - return arrow::Status::OK(); } -arrow::Status ParquetFile::_readNextBatchTable_Formatted(std::shared_ptr& outputTable) { +bool ParquetFile::_readNextBatchTable_Formatted(std::shared_ptr& outputTable) { auto const maybe_batch = _rb_reader_gbl->Next(); + if (maybe_batch == nullptr) { + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "End of file reached"); + return false; + } + PARQUET_ASSIGN_OR_THROW(auto batch, maybe_batch); + std::shared_ptr initTable; + PARQUET_ASSIGN_OR_THROW(initTable, arrow::Table::FromRecordBatches(batch->schema(), {batch})); - std::vector paramNotFound; std::map> fieldConfig; + const arrow::FieldVector fields = initTable->schema()->fields(); + for (auto fd : fields) { + fieldConfig[fd->name()] = fd; + } - if (maybe_batch != nullptr) { - ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); - std::shared_ptr initTable; - ARROW_ASSIGN_OR_RAISE(initTable, arrow::Table::FromRecordBatches(batch->schema(), {batch})); - - // Increment the batch number - _batchNumber++; - - const arrow::FieldVector fields = initTable->schema()->fields(); - for (auto fd : fields) { - fieldConfig[fd->name()] = fd; - } - - arrow::FieldVector formatedTable_fields; - std::vector> formatedTable_columns; - - // Loop over the column names as defined in the partition config file - for (std::string paramName : _parameterNames) { - std::shared_ptr chunkedArray = initTable->GetColumnByName(paramName); + arrow::FieldVector formatedTable_fields; + std::vector> formatedTable_columns; + std::shared_ptr null_array; + // Loop over the column names as defined in the partition config file + for (auto const& column : _columns) { + std::shared_ptr chunkedArray = initTable->GetColumnByName(column); + if (chunkedArray == nullptr) { + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Column name : " << column << " not found in the table"); // Column not found in the arrow table... - if (chunkedArray == nullptr) { - paramNotFound.push_back(paramName); - } else { + if (_optionalColumns.count(column) == 0) { + std::string const msg = "Column '" + column + "' not found in the input file"; + LOGS(_log, LOG_LVL_ERROR, ::prefix << msg); + throw std::runtime_error(msg); + } + // Insert a column with all nulls + if (null_array == nullptr) { + null_array = std::make_shared( + std::make_shared(initTable->num_rows())); + } + formatedTable_columns.push_back(null_array); + formatedTable_fields.push_back(std::make_shared(column, arrow::null())); + } else { + LOGS(_log, LOG_LVL_DEBUG, ::prefix << "Column name : " << column); + if (fieldConfig[column]->type() == arrow::boolean()) { // Column type is boolean -> switch to 0/1 representation - if (fieldConfig[paramName]->type() == arrow::boolean()) { - auto newChunkedArray = _chunkArrayReformatBoolean(chunkedArray, true); - if (newChunkedArray == nullptr) { - return arrow::Status::ExecutionError("Error while formating boolean chunk array"); - } - formatedTable_columns.push_back(newChunkedArray); - - std::shared_ptr newField = - std::make_shared(fieldConfig[paramName]->name(), arrow::int8()); - formatedTable_fields.push_back(newField); - } - // Simply keep the chunk as it is defined in teh arrow table - else { - formatedTable_columns.push_back(chunkedArray); - formatedTable_fields.push_back(fieldConfig[paramName]); + auto newChunkedArray = _chunkArrayReformatBoolean(chunkedArray, true); + if (newChunkedArray == nullptr) { + std::string const msg = "Error while formating boolean chunk array"; + LOGS(_log, LOG_LVL_ERROR, ::prefix << msg); + throw std::runtime_error(msg); } + formatedTable_columns.push_back(newChunkedArray); + std::shared_ptr newField = + std::make_shared(fieldConfig[column]->name(), arrow::int8()); + formatedTable_fields.push_back(newField); + } else { + // Simply keep the chunk as it is defined in the arrow table + formatedTable_columns.push_back(chunkedArray); + formatedTable_fields.push_back(fieldConfig[column]); } - } // end of loop over parameters - - // If a column is not found (i.e. a parameter defined in partition.json does not exist in parquet - // file), throw an error and stop - if (paramNotFound.size() > 0) { - for (auto name : paramNotFound) - LOGS(_log, LOG_LVL_DEBUG, "ERROR : param name " << name << " not found in table columns"); - return arrow::Status::ExecutionError("Configuration file : missing parameter in table"); - } - - // Create the arrow::schema of the new table - std::shared_ptr formatedSchema = std::make_shared( - arrow::Schema(formatedTable_fields, initTable->schema()->endianness())); - - // and finally create the arrow::Table that matches the partition config file - outputTable = arrow::Table::Make(formatedSchema, formatedTable_columns); - arrow::Status resTable = outputTable->ValidateFull(); - if (!resTable.ok()) { - LOGS(_log, LOG_LVL_DEBUG, "ERROR : formated table full validation not OK"); - return arrow::Status::ExecutionError("CSV output table not valid"); } - - return arrow::Status::OK(); } - // The end of the parquet file has been reached - return arrow::Status::ExecutionError("End of RecorBatchReader iterator"); + // Create the arrow::schema of the new table + std::shared_ptr formatedSchema = std::make_shared( + arrow::Schema(formatedTable_fields, initTable->schema()->endianness())); + + // and finally create the arrow::Table that matches the partition config file + outputTable = arrow::Table::Make(formatedSchema, formatedTable_columns); + arrow::Status resTable = outputTable->ValidateFull(); + if (!resTable.ok()) { + std::string const msg = "Formated table not valid"; + LOGS(_log, LOG_LVL_ERROR, ::prefix << msg); + throw std::runtime_error(msg); + } + return true; } std::shared_ptr ParquetFile::_chunkArrayReformatBoolean( @@ -313,9 +320,9 @@ std::shared_ptr ParquetFile::_chunkArrayReformatBoolean( } if (!builder.Finish(&array).ok()) { - std::string errorMsg = "ERROR while finalizing " + inputArray->ToString() + " new chunked array"; - LOGS(_log, LOG_LVL_DEBUG, errorMsg); - return nullptr; + std::string const msg = "Failed to finalize '" + inputArray->ToString() + "' new chunked array"; + LOGS(_log, LOG_LVL_ERROR, ::prefix << msg); + throw std::runtime_error(msg); } if (bCheck) { @@ -338,11 +345,10 @@ std::shared_ptr ParquetFile::_chunkArrayReformatBoolean( // arrow validation of the new chunkedArray auto status = newChunkedArray->ValidateFull(); if (!status.ok()) { - std::string errorMsg = "Invalid new chunkArraay : " + status.ToString(); - LOGS(_log, LOG_LVL_DEBUG, errorMsg); - return nullptr; + std::string const msg = "Invalid new chunkArraay : '" + status.ToString() + "'"; + LOGS(_log, LOG_LVL_ERROR, ::prefix << msg); + throw std::runtime_error(msg); } - return newChunkedArray; } diff --git a/src/partition/ParquetInterface.h b/src/partition/ParquetInterface.h index 19724f6196..7893054f4f 100644 --- a/src/partition/ParquetInterface.h +++ b/src/partition/ParquetInterface.h @@ -23,9 +23,11 @@ /// \brief read parquet file using libarrow library (RecordBatchReader interface) // System headers +#include +#include +#include +#include #include -#include -#include // Third party headers #include "arrow/api.h" @@ -59,40 +61,49 @@ class ParquetFile { /** * Parquet file constructor * @param fileName - parquet file name - * @param maxMemAllocated - max RAM allocated to the process + * @param maxMemAllocatedMB - max RAM allocated to the process */ - ParquetFile(std::string fileName, int maxMemAllocated = 3000 /*MB*/); + ParquetFile(std::string fileName, int maxMemAllocatedMB = 3000); // Disable copy construction and assignment. ParquetFile(ParquetFile const&) = delete; ParquetFile& operator=(ParquetFile const&) = delete; + ~ParquetFile(); + /** * This method initializes the arrow batch reader. The number of data rows read by each batch is defined * to match the constraints defined by the maximum RAM allocate to the reading process and the maximum * buffer size as defined in the partitioner configration file * @param maxBufferSize - maximum buffer size as defined in the partitioner configuration file - * @returns The completion status, where arrow::Status::Success is for success, arrow::raise_error stops - * the process otherwise * @throws arrow::raise_error if the arrow parquet interface or the batch reader cannot be be setup */ - arrow::Status setupBatchReader(int maxBufferSize = -1); + void setupBatchReader(int maxBufferSize = -1); + + // These counters return general parameters of the file. Values of the parameters + // are available after calling method setupBatchReader(). + + std::int64_t getFileSize() const { return _fileSize; } + int getNumRowGroups() const { return _numRowGroups; } + int getNumRowsTotal() const { return _numRowsTotal; } /** * This method reads an arrow batch, formats the table acording to the partition configuration file and * saves it in csv format * @param buf - character buffer containing the content of the arrow table dumped in CSV format * @param buffSize - CSV buffer size returned by the function - * @param params - names of the data columns to be retrieved as defined in the partitioner configuration + * @param columns - names of the data columns to be retrieved as defined in the partitioner configuration * file + * @param optionalColumns - optional parameters as defined in the partitioner configuration file * @param nullString - string that replaces a null value in the csv output buffer - * @param delimStr - delimiter used betweenn values in csv buffer - * @returns The completion status, where arrow::Status::Success is for success, arrow::raise_error stops - * the process otherwise + * @param delimStr - delimiter used between values in csv buffer + * @param quote - if true the double quote fields in the CSV buffer + * @returns true if the batch has been read and formated, false if the end of the file has been reached * @throws arrow::raise_error if batch could not be read or if the table formating process goes wrong */ - arrow::Status readNextBatch_Table2CSV(void* buf, int& buffSize, std::vector const& params, - std::string const& nullStr, std::string const& delimStr); + bool readNextBatch_Table2CSV(void* buf, int& buffSize, std::vector const& columns, + std::set const& optionalColumns, std::string const& nullStr, + std::string const& delimStr, bool quote); int getBatchSize() const { return _batchSize; } int getTotalBatchNumber() const { return _totalBatchNumber; } @@ -126,13 +137,12 @@ class ParquetFile { * This method read the next arrow batch data and proceed to some data formating (column reodering as * defined by partitioner, true/false -> 0/1). * @param outputTable - arrow table containing the data read by the arrow:batch - * @returns The completion status, where arrow::Status::Success is for success, arrow::raise_error stops - * the process otherwise + * @returns true if the batch has been read, false if the end of the file has been reached * @throws arrow::raise_error if batch could not be read, if the data table could not be read from the * batch, if a data column needed by the partitioner is not found in the table or if the outptTable is not * valid as defined */ - arrow::Status _readNextBatchTable_Formatted(std::shared_ptr& table); + bool _readNextBatchTable_Formatted(std::shared_ptr& table); /** * This method creates a character buffer containing the input arrow::Table data. CSV conversion is done @@ -142,20 +152,17 @@ class ParquetFile { * @param buf - character buffer containing the content of the arrow table dumped in CSV format * @param nullStr - string that replaces a null value in the csv output buffer * @param delimStr - delimiter used betweenn values in csv buffer - * @returns The completion status, where arrow::Status::Success is for success, arrow::raise_error stops - * the process otherwise + * @param quote - if true the double quote fields in the CSV buffer * @throws arrow::raise_error if CSV conversion could not be done */ - arrow::Status _table2CSVBuffer(std::shared_ptr const& table, int& buffSize, void* buf, - std::string const& nullStr, std::string const& delimStr); + void _table2CSVBuffer(std::shared_ptr const& table, int& buffSize, void* buf, + std::string const& nullStr, std::string const& delimStr, bool quote); /** - * This method returns the number of data rows stored in the parquet file - * @param filename - the parquet file name - * @returns the number of data rows + * This method extract counters from the metadata section of the input file. * @throws parquet::throw_error if file could not be open or parquet reader could not be defined */ - int _getTotalRowNumber(std::string fileName) const; + void _getTotals(); /** * This method reformates a boolean chunk array : a true/false boolean array becomes a 1/0 int8 array @@ -170,14 +177,20 @@ class ParquetFile { std::shared_ptr& inputArray, bool bCheck = false); std::string _path_to_file; - std::string _part_config_file; - int _maxMemory, _recordSize, _recordBufferSize; + + int _maxMemoryMB; + int _recordSize; + int _recordBufferSize; int _vmRSS_init; - int _batchNumber, _batchSize; + int _batchSize; int _totalBatchNumber; int _maxBufferSize; + int64_t _fileSize = 0; + int _numRowGroups = 0; + int _numRowsTotal = 0; - std::vector _parameterNames; + std::vector _columns; + std::set _optionalColumns; std::unique_ptr _arrow_reader_gbl; std::unique_ptr<::arrow::RecordBatchReader> _rb_reader_gbl; }; diff --git a/src/partition/sph-parq2csv.cc b/src/partition/sph-parq2csv.cc new file mode 100644 index 0000000000..e7f17720c2 --- /dev/null +++ b/src/partition/sph-parq2csv.cc @@ -0,0 +1,226 @@ +/* + * LSST Data Management System + * Copyright 2017 LSST Corporation. + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#include +#include +#include +#include +#include +#include +#include +#include + +// Third party headers +#include "boost/program_options.hpp" +#include "nlohmann/json.hpp" + +#include "partition/ParquetInterface.h" + +namespace po = boost::program_options; +namespace part = lsst::partition; + +using namespace std; +using json = nlohmann::json; + +namespace { + +/** + * This helper class is used to parse the command line parameters. + */ +class CommandLineParams { +public: + /** + * Parse the command line parameters. + * @param argc - number of parameters + * @param argv - list of parameters + * @returns true if the parameters are parsed successfully, false otherwise. + */ + bool parse(int argc, char const* const* argv) { + static char const* help = + "The tool for translating Parquet files into CSV.\n\n" + "Usage:\n" + " sph-parq2csv [options] \n\n" + "Options"; + + po::options_description desc(help, 80); + desc.add_options()("help,h", "Produce this help"); + desc.add_options()("verbose,v", "Produce verbose output."); + desc.add_options()("csv-quote-fields", "Double quote fields as needed in the generated CSV."); + desc.add_options()("max-proc-mem-mb", po::value()->default_value(maxMemAllocatedMB), + "Max size (MB) of RAM allocated to the process."); + desc.add_options()("buf-size-mb", po::value()->default_value(maxBuffSizeMB), + "Buffers size (MB) for translating batches."); + desc.add_options()("parq", po::value>(), "Input file to be translated."); + desc.add_options()("config", po::value>(), + "Input JSON file with definition of columns to be extracted."); + desc.add_options()("csv", po::value>(), "Output file to be written."); + + po::positional_options_description p; + p.add("parq", 1); + p.add("config", 1); + p.add("csv", 1); + + po::variables_map vm; + po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm); + po::notify(vm); + + if (vm.count("help")) { + cout << desc << "\n"; + return false; + } + parqFileName = vm.count("parq") ? vm["parq"].as>().front() : string(); + configFileName = vm.count("config") ? vm["config"].as>().front() : string(); + csvFileName = vm.count("csv") ? vm["csv"].as>().front() : string(); + + if (parqFileName.empty() || configFileName.empty() || csvFileName.empty()) { + throw runtime_error("The names of all required files must be provided."); + } + if (csvFileName == parqFileName) { + throw runtime_error("Input and output file names must be different."); + } + _parseConfigFile(); + + if (vm.count("max-mem-alloc-mb")) { + maxMemAllocatedMB = vm["max-mem-alloc-mb"].as(); + } + if (maxMemAllocatedMB <= 1) { + throw runtime_error("Memory allocation must be equal to 1 or greater."); + } + if (vm.count("buf-size-mb")) { + maxBuffSizeMB = vm["buf-size-mb"].as(); + } + if ((maxBuffSizeMB < 1) || (maxBuffSizeMB > 1024)) { + throw runtime_error("Buffer size (MB) must be in a range of [1,1024]."); + } + verbose = vm.count("verbose") != 0; + quote = vm.count("csv-quote-fields") != 0; + + return true; + } + + // Values of the parsed parameters aere stored in the data members defined below. + + string parqFileName; + string configFileName; + string csvFileName; + + vector columns; + set optionalColumns; + + int maxMemAllocatedMB = 3000; + int maxBuffSizeMB = 16; + + string const nullStr = "\\N"; + string const delimStr = "\t"; + + bool verbose = false; + bool quote = false; + +private: + void _parseConfigFile() { + ifstream file(configFileName, ios_base::in); + if (!file.good()) throw invalid_argument("Failed to open file: '" + configFileName + "'"); + json config; + try { + file >> config; + } catch (...) { + throw runtime_error("Config file: '" + configFileName + "' doesn't have a valid JSON payload"); + } + if (!config.is_object()) { + throw invalid_argument("Config file: '" + configFileName + "' is not a valid JSON object"); + } + if (!config.contains("columns")) { + throw runtime_error("The JSON file must contain a 'columns' key."); + } + if (!config["columns"].is_array()) { + throw runtime_error("The 'columns' key must contain an array."); + } + columns = config["columns"].get>(); + if (columns.empty()) { + throw runtime_error("No columns to be extracted."); + } + optionalColumns.clear(); + if (config.contains("optional")) { + if (!config["optional"].is_array()) { + throw runtime_error("The 'optional' key must contain an object."); + } + for (auto const& column : config["optional"].get>()) { + optionalColumns.insert(column); + } + } + // All optional columns must be defined in the 'columns' array. + for (auto const& name : optionalColumns) { + if (find(columns.begin(), columns.end(), name) == columns.end()) { + throw runtime_error("The optional column '" + name + + "' is not defined in the 'columns' array."); + } + } + } +}; + +} // namespace + +int main(int argc, char const* const* argv) { + int buffSize; + size_t numBytesWritten = 0; + + try { + ::CommandLineParams params; + if (!params.parse(argc, argv)) { + return 1; + } + int const maxBuffSizeBytes = params.maxBuffSizeMB * 1024 * 1024; + unique_ptr buf(new char[maxBuffSizeBytes]); + + if (params.verbose) { + cout << "Translating '" << params.parqFileName << "' into '" << params.csvFileName << "'" << endl; + } + part::ParquetFile parqFile(params.parqFileName, params.maxMemAllocatedMB); + parqFile.setupBatchReader(maxBuffSizeBytes); + + ofstream csvFile(params.csvFileName, ios::out | ios::binary); + if (!csvFile) { + throw runtime_error("Error while opening the output file."); + } + while (true) { + bool const success = parqFile.readNextBatch_Table2CSV(buf.get(), buffSize, params.columns, + params.optionalColumns, params.nullStr, + params.delimStr, params.quote); + if (!success) break; + if (buffSize == 0) { + throw runtime_error("Received EOF while reading the file."); + } + if (params.verbose) { + cout << "Writing " << setw(9) << buffSize << " bytes" << endl; + } + csvFile.write((char*)(buf.get()), buffSize); + numBytesWritten += buffSize; + } + csvFile.close(); + if (params.verbose) { + cout << "Wrote " << setw(9) << numBytesWritten << " bytes" << endl; + } + } catch (exception const& e) { + cerr << "Error: " << e.what() << endl; + return 1; + } + return 0; +}