Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
6ecfc4e
[C++] Add ORC stripe statistics extraction foundation
cbb330 Jan 27, 2026
aeb48bb
[C++] Add Arrow expression builder for ORC statistics
cbb330 Jan 27, 2026
dfc1235
[C++] Add lazy evaluation infrastructure for ORC predicate pushdown
cbb330 Jan 27, 2026
75ee051
[C++] Add basic ORC stripe filtering API with predicate pushdown
cbb330 Jan 27, 2026
237419a
[C++] Integrate ORC stripe filtering with dataset scanner
cbb330 Jan 27, 2026
70019a9
[C++][Dataset] Apply stripe-selected ORC scanning with simplified con…
cbb330 Feb 15, 2026
df15093
[C++][Dataset] Add ORC predicate pushdown tests for all-null, filesys…
cbb330 Feb 15, 2026
79cdd3e
[Python][Dataset] Add filters support to orc.read_table with path/fil…
cbb330 Feb 15, 2026
07ac686
[Python][Tests] Add ORC read_table(filters=...) smoke and correctness…
cbb330 Feb 15, 2026
5953dda
[Python][Tests] Add all-null stripe semantics tests for IS NULL and I…
cbb330 Feb 15, 2026
4c2afaf
[Python][Tests] Add BufferReader fallback coverage for read_table(fil…
cbb330 Feb 15, 2026
5924525
[Python][Tests] Add Parquet-vs-ORC predicate parity integration test
cbb330 Feb 15, 2026
deed1be
[Python][Dataset] Fix file-like ORC filter fallback with output proje…
cbb330 Feb 15, 2026
11e120e
[Python][Tests] Add BufferReader projection and empty-column fallback…
cbb330 Feb 15, 2026
32fbcda
[Python][Tests] Verify projected path-based ORC filters without filte…
cbb330 Feb 15, 2026
745b051
[Python][Tests] Verify projected path-based ORC filters with empty ou…
cbb330 Feb 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions cpp/src/arrow/adapters/orc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <list>
#include <memory>
#include <optional>
#include <sstream>
#include <string>
#include <vector>
Expand All @@ -30,9 +31,11 @@

#include "arrow/adapters/orc/util.h"
#include "arrow/builder.h"
#include "arrow/compute/expression.h"
#include "arrow/io/interfaces.h"
#include "arrow/memory_pool.h"
#include "arrow/record_batch.h"
#include "arrow/scalar.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/table_builder.h"
Expand Down Expand Up @@ -100,6 +103,119 @@ constexpr uint64_t kOrcNaturalWriteSize = 128 * 1024;

using internal::checked_cast;

// Statistics container for min/max values from ORC stripe statistics
struct MinMaxStats {
int64_t min;
int64_t max;
bool has_null;

MinMaxStats(int64_t min_val, int64_t max_val, bool null_flag)
: min(min_val), max(max_val), has_null(null_flag) {}
};

// Extract stripe-level statistics for a specific column
// Returns nullopt if statistics are missing or invalid
std::optional<MinMaxStats> ExtractStripeStatistics(
const std::unique_ptr<liborc::StripeStatistics>& stripe_stats,
uint32_t orc_column_id,
const std::shared_ptr<DataType>& field_type) {

if (!stripe_stats) {
return std::nullopt; // No statistics available
}

// Get column statistics
const liborc::ColumnStatistics* col_stats =
stripe_stats->getColumnStatistics(orc_column_id);

if (!col_stats) {
return std::nullopt; // Column statistics missing
}

// Only INT64 support in this initial implementation
if (field_type->id() != Type::INT64) {
return std::nullopt; // Unsupported type
}

// Dynamic cast to get integer-specific statistics
const auto* int_stats =
dynamic_cast<const liborc::IntegerColumnStatistics*>(col_stats);

if (!int_stats) {
return std::nullopt; // Wrong statistics type
}

// Check if min/max are available
if (!int_stats->hasMinimum() || !int_stats->hasMaximum()) {
return std::nullopt; // Statistics incomplete
}

// Extract raw values
int64_t min_value = int_stats->getMinimum();
int64_t max_value = int_stats->getMaximum();
bool has_null = col_stats->hasNull();

// Sanity check: min should be <= max
if (min_value > max_value) {
return std::nullopt; // Invalid statistics
}

return MinMaxStats(min_value, max_value, has_null);
}

// Build Arrow Expression representing stripe statistics guarantee
// Returns expression: (field >= min AND field <= max) OR is_null(field)
//
// This expression describes what values COULD exist in the stripe.
// Arrow's SimplifyWithGuarantee() will use this to determine if
// a predicate could be satisfied by this stripe.
//
// Example: If stripe has min=0, max=100, the guarantee is:
// (field >= 0 AND field <= 100) OR is_null(field)
//
// Then for predicate "field > 200", SimplifyWithGuarantee returns literal(false),
// indicating the stripe can be skipped.
compute::Expression BuildMinMaxExpression(
const FieldRef& field_ref,
const std::shared_ptr<DataType>& field_type,
const Scalar& min_value,
const Scalar& max_value,
bool has_null) {

// Create field reference expression
auto field_expr = compute::field_ref(field_ref);

// Build range expression: field >= min AND field <= max
auto min_expr = compute::greater_equal(field_expr, compute::literal(min_value));
auto max_expr = compute::less_equal(field_expr, compute::literal(max_value));
auto range_expr = compute::and_(std::move(min_expr), std::move(max_expr));

// If stripe contains nulls, add null handling
// This ensures we don't skip stripes with nulls when predicate
// could match null values
if (has_null) {
auto null_expr = compute::is_null(field_expr);
return compute::or_(std::move(range_expr), std::move(null_expr));
}

return range_expr;
}

// Convenience overload that takes MinMaxStats directly
compute::Expression BuildMinMaxExpression(
const FieldRef& field_ref,
const std::shared_ptr<DataType>& field_type,
const MinMaxStats& stats) {

// Convert int64 to Arrow scalar
auto min_scalar = std::make_shared<Int64Scalar>(stats.min);
auto max_scalar = std::make_shared<Int64Scalar>(stats.max);

return BuildMinMaxExpression(field_ref, field_type,
*min_scalar, *max_scalar,
stats.has_null);
}

class ArrowInputFile : public liborc::InputStream {
public:
explicit ArrowInputFile(const std::shared_ptr<io::RandomAccessFile>& file)
Expand Down Expand Up @@ -526,6 +642,31 @@ class ORCFileReader::Impl {
pool_);
}

Result<std::shared_ptr<RecordBatchReader>> NextStripeReader(
int64_t batch_size, const std::vector<std::string>& include_names) {
if (current_row_ >= NumberOfRows()) {
return nullptr;
}

liborc::RowReaderOptions opts = DefaultRowReaderOptions();
if (!include_names.empty()) {
RETURN_NOT_OK(SelectNames(&opts, include_names));
}
StripeInformation stripe_info{0, 0, 0, 0};
RETURN_NOT_OK(SelectStripeWithRowNumber(&opts, current_row_, &stripe_info));
ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts));
std::unique_ptr<liborc::RowReader> row_reader;

ORC_BEGIN_CATCH_NOT_OK
row_reader = reader_->createRowReader(opts);
row_reader->seekToRow(current_row_);
current_row_ = stripe_info.first_row_id + stripe_info.num_rows;
ORC_END_CATCH_NOT_OK

return std::make_shared<OrcStripeReader>(std::move(row_reader), schema, batch_size,
pool_);
}

Result<std::shared_ptr<RecordBatchReader>> GetRecordBatchReader(
int64_t batch_size, const std::vector<std::string>& include_names) {
liborc::RowReaderOptions opts = DefaultRowReaderOptions();
Expand Down Expand Up @@ -630,6 +771,11 @@ Result<std::shared_ptr<RecordBatchReader>> ORCFileReader::NextStripeReader(
return impl_->NextStripeReader(batch_size, include_indices);
}

Result<std::shared_ptr<RecordBatchReader>> ORCFileReader::NextStripeReader(
int64_t batch_size, const std::vector<std::string>& include_names) {
return impl_->NextStripeReader(batch_size, include_names);
}

int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); }

int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); }
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/arrow/adapters/orc/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,22 @@ class ARROW_EXPORT ORCFileReader {
Result<std::shared_ptr<RecordBatchReader>> NextStripeReader(
int64_t batch_size, const std::vector<int>& include_indices);

/// \brief Get a stripe level record batch iterator.
///
/// Each record batch will have up to `batch_size` rows.
/// NextStripeReader serves as a fine-grained alternative to ReadStripe
/// which may cause OOM issues by loading the whole stripe into memory.
///
/// Note this will only read rows for the current stripe, not the entire
/// file.
///
/// \param[in] batch_size the maximum number of rows in each record batch
/// \param[in] include_names the selected field names to read, if not empty
/// (otherwise all fields are read)
/// \return the stripe reader
Result<std::shared_ptr<RecordBatchReader>> NextStripeReader(
int64_t batch_size, const std::vector<std::string>& include_names);

/// \brief Get a record batch iterator for the entire file.
///
/// Each record batch will have up to `batch_size` rows.
Expand Down
Loading