diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc index 51cca497485..e72afbaf4b0 100644 --- a/cpp/src/arrow/adapters/orc/adapter.cc +++ b/cpp/src/arrow/adapters/orc/adapter.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -30,9 +31,11 @@ #include "arrow/adapters/orc/util.h" #include "arrow/builder.h" +#include "arrow/compute/expression.h" #include "arrow/io/interfaces.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" +#include "arrow/scalar.h" #include "arrow/status.h" #include "arrow/table.h" #include "arrow/table_builder.h" @@ -100,6 +103,119 @@ constexpr uint64_t kOrcNaturalWriteSize = 128 * 1024; using internal::checked_cast; +// Statistics container for min/max values from ORC stripe statistics +struct MinMaxStats { + int64_t min; + int64_t max; + bool has_null; + + MinMaxStats(int64_t min_val, int64_t max_val, bool null_flag) + : min(min_val), max(max_val), has_null(null_flag) {} +}; + +// Extract stripe-level statistics for a specific column +// Returns nullopt if statistics are missing or invalid +std::optional ExtractStripeStatistics( + const std::unique_ptr& stripe_stats, + uint32_t orc_column_id, + const std::shared_ptr& field_type) { + + if (!stripe_stats) { + return std::nullopt; // No statistics available + } + + // Get column statistics + const liborc::ColumnStatistics* col_stats = + stripe_stats->getColumnStatistics(orc_column_id); + + if (!col_stats) { + return std::nullopt; // Column statistics missing + } + + // Only INT64 support in this initial implementation + if (field_type->id() != Type::INT64) { + return std::nullopt; // Unsupported type + } + + // Dynamic cast to get integer-specific statistics + const auto* int_stats = + dynamic_cast(col_stats); + + if (!int_stats) { + return std::nullopt; // Wrong statistics type + } + + // Check if min/max are available + if (!int_stats->hasMinimum() || !int_stats->hasMaximum()) { + return std::nullopt; // Statistics incomplete + } + + // Extract raw values + int64_t min_value = int_stats->getMinimum(); + int64_t max_value = int_stats->getMaximum(); + bool has_null = col_stats->hasNull(); + + // Sanity check: min should be <= max + if (min_value > max_value) { + return std::nullopt; // Invalid statistics + } + + return MinMaxStats(min_value, max_value, has_null); +} + +// Build Arrow Expression representing stripe statistics guarantee +// Returns expression: (field >= min AND field <= max) OR is_null(field) +// +// This expression describes what values COULD exist in the stripe. +// Arrow's SimplifyWithGuarantee() will use this to determine if +// a predicate could be satisfied by this stripe. +// +// Example: If stripe has min=0, max=100, the guarantee is: +// (field >= 0 AND field <= 100) OR is_null(field) +// +// Then for predicate "field > 200", SimplifyWithGuarantee returns literal(false), +// indicating the stripe can be skipped. +compute::Expression BuildMinMaxExpression( + const FieldRef& field_ref, + const std::shared_ptr& field_type, + const Scalar& min_value, + const Scalar& max_value, + bool has_null) { + + // Create field reference expression + auto field_expr = compute::field_ref(field_ref); + + // Build range expression: field >= min AND field <= max + auto min_expr = compute::greater_equal(field_expr, compute::literal(min_value)); + auto max_expr = compute::less_equal(field_expr, compute::literal(max_value)); + auto range_expr = compute::and_(std::move(min_expr), std::move(max_expr)); + + // If stripe contains nulls, add null handling + // This ensures we don't skip stripes with nulls when predicate + // could match null values + if (has_null) { + auto null_expr = compute::is_null(field_expr); + return compute::or_(std::move(range_expr), std::move(null_expr)); + } + + return range_expr; +} + +// Convenience overload that takes MinMaxStats directly +compute::Expression BuildMinMaxExpression( + const FieldRef& field_ref, + const std::shared_ptr& field_type, + const MinMaxStats& stats) { + + // Convert int64 to Arrow scalar + auto min_scalar = std::make_shared(stats.min); + auto max_scalar = std::make_shared(stats.max); + + return BuildMinMaxExpression(field_ref, field_type, + *min_scalar, *max_scalar, + stats.has_null); +} + class ArrowInputFile : public liborc::InputStream { public: explicit ArrowInputFile(const std::shared_ptr& file) @@ -526,6 +642,31 @@ class ORCFileReader::Impl { pool_); } + Result> NextStripeReader( + int64_t batch_size, const std::vector& include_names) { + if (current_row_ >= NumberOfRows()) { + return nullptr; + } + + liborc::RowReaderOptions opts = DefaultRowReaderOptions(); + if (!include_names.empty()) { + RETURN_NOT_OK(SelectNames(&opts, include_names)); + } + StripeInformation stripe_info{0, 0, 0, 0}; + RETURN_NOT_OK(SelectStripeWithRowNumber(&opts, current_row_, &stripe_info)); + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchema(opts)); + std::unique_ptr row_reader; + + ORC_BEGIN_CATCH_NOT_OK + row_reader = reader_->createRowReader(opts); + row_reader->seekToRow(current_row_); + current_row_ = stripe_info.first_row_id + stripe_info.num_rows; + ORC_END_CATCH_NOT_OK + + return std::make_shared(std::move(row_reader), schema, batch_size, + pool_); + } + Result> GetRecordBatchReader( int64_t batch_size, const std::vector& include_names) { liborc::RowReaderOptions opts = DefaultRowReaderOptions(); @@ -630,6 +771,11 @@ Result> ORCFileReader::NextStripeReader( return impl_->NextStripeReader(batch_size, include_indices); } +Result> ORCFileReader::NextStripeReader( + int64_t batch_size, const std::vector& include_names) { + return impl_->NextStripeReader(batch_size, include_names); +} + int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); } int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); } diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h index 4ffff81f355..2faf43ddc9c 100644 --- a/cpp/src/arrow/adapters/orc/adapter.h +++ b/cpp/src/arrow/adapters/orc/adapter.h @@ -163,6 +163,22 @@ class ARROW_EXPORT ORCFileReader { Result> NextStripeReader( int64_t batch_size, const std::vector& include_indices); + /// \brief Get a stripe level record batch iterator. + /// + /// Each record batch will have up to `batch_size` rows. + /// NextStripeReader serves as a fine-grained alternative to ReadStripe + /// which may cause OOM issues by loading the whole stripe into memory. + /// + /// Note this will only read rows for the current stripe, not the entire + /// file. + /// + /// \param[in] batch_size the maximum number of rows in each record batch + /// \param[in] include_names the selected field names to read, if not empty + /// (otherwise all fields are read) + /// \return the stripe reader + Result> NextStripeReader( + int64_t batch_size, const std::vector& include_names); + /// \brief Get a record batch iterator for the entire file. /// /// Each record batch will have up to `batch_size` rows. diff --git a/cpp/src/arrow/dataset/file_orc.cc b/cpp/src/arrow/dataset/file_orc.cc index 1393df57f9d..e34b858210a 100644 --- a/cpp/src/arrow/dataset/file_orc.cc +++ b/cpp/src/arrow/dataset/file_orc.cc @@ -20,14 +20,19 @@ #include #include "arrow/adapters/orc/adapter.h" +#include "arrow/compute/expression.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/scanner.h" +#include "arrow/io/file.h" #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" +#include "arrow/util/string.h" #include "arrow/util/thread_pool.h" +#include +#include namespace arrow { @@ -58,6 +63,62 @@ Result> OpenORCReader( return reader; } +struct ResolvedFieldRef { + FieldRef field_ref; + std::shared_ptr field; + uint32_t orc_column_id; +}; + +compute::Expression DeriveFieldGuarantee( + const ResolvedFieldRef& resolved_field, + const ::orc::ColumnStatistics* col_stats) { + if (!col_stats) { + return compute::literal(true); + } + + auto field_expr = compute::field_ref(resolved_field.field_ref); + const bool has_null = col_stats->hasNull(); + const bool is_all_null = has_null && col_stats->getNumberOfValues() == 0; + if (is_all_null) { + return compute::is_null(field_expr); + } + + const auto* int_stats = dynamic_cast(col_stats); + if (!int_stats || !int_stats->hasMinimum() || !int_stats->hasMaximum()) { + return compute::literal(true); + } + + int64_t min_value = int_stats->getMinimum(); + int64_t max_value = int_stats->getMaximum(); + if (min_value > max_value) { + return compute::literal(true); + } + + std::shared_ptr min_scalar; + std::shared_ptr max_scalar; + if (resolved_field.field->type()->id() == Type::INT32) { + if (min_value < std::numeric_limits::min() || + max_value > std::numeric_limits::max()) { + // Keep evaluation conservative when ORC integer stats exceed INT32 bounds. + return compute::literal(true); + } + min_scalar = std::make_shared(static_cast(min_value)); + max_scalar = std::make_shared(static_cast(max_value)); + } else { + min_scalar = std::make_shared(min_value); + max_scalar = std::make_shared(max_value); + } + + auto min_expr = compute::greater_equal(field_expr, compute::literal(*min_scalar)); + auto max_expr = compute::less_equal(field_expr, compute::literal(*max_scalar)); + auto range_expr = compute::and_(std::move(min_expr), std::move(max_expr)); + + if (has_null) { + return compute::or_(std::move(range_expr), compute::is_null(field_expr)); + } + return range_expr; +} + /// \brief A ScanTask backed by an ORC file. class OrcScanTask { public: @@ -69,7 +130,9 @@ class OrcScanTask { struct Impl { static Result Make(const FileSource& source, const FileFormat& format, - const ScanOptions& scan_options) { + const ScanOptions& scan_options, + const std::shared_ptr& fragment) { + ARROW_UNUSED(format); ARROW_ASSIGN_OR_RAISE( auto reader, OpenORCReader(source, std::make_shared(scan_options))); @@ -85,26 +148,74 @@ class OrcScanTask { included_fields.push_back(schema->field(match.indices()[0])->name()); } - std::shared_ptr record_batch_reader; - ARROW_ASSIGN_OR_RAISE( - record_batch_reader, - reader->GetRecordBatchReader(scan_options.batch_size, included_fields)); + // NEW: Apply stripe filtering if OrcFileFragment and filter present + std::vector stripe_indices; + int num_stripes = reader->NumberOfStripes(); + + auto orc_fragment = std::dynamic_pointer_cast(fragment); + if (orc_fragment && scan_options.filter != compute::literal(true)) { + // Use predicate pushdown + ARROW_ASSIGN_OR_RAISE(stripe_indices, + orc_fragment->FilterStripes(scan_options.filter)); + + int skipped = num_stripes - static_cast(stripe_indices.size()); + if (skipped > 0) { + ARROW_LOG(DEBUG) << "ORC predicate pushdown: skipped " << skipped + << " of " << num_stripes << " stripes"; + } + } else { + // No filtering - read all stripes + stripe_indices.resize(num_stripes); + std::iota(stripe_indices.begin(), stripe_indices.end(), 0); + } - return RecordBatchIterator(Impl{std::move(record_batch_reader)}); + if (stripe_indices.empty()) { + return MakeEmptyIterator>(); + } + + return RecordBatchIterator(Impl{std::move(reader), std::move(included_fields), + std::move(stripe_indices), scan_options.batch_size, + 0, nullptr}); } Result> Next() { - std::shared_ptr batch; - RETURN_NOT_OK(record_batch_reader_->ReadNext(&batch)); - return batch; + while (true) { + if (current_stripe_reader_) { + std::shared_ptr batch; + RETURN_NOT_OK(current_stripe_reader_->ReadNext(&batch)); + if (batch) { + return batch; + } + current_stripe_reader_.reset(); + ++next_stripe_index_; + continue; + } + + if (next_stripe_index_ >= stripe_indices_.size()) { + return IterationEnd>(); + } + + const auto stripe = stripe_indices_[next_stripe_index_]; + const auto stripe_info = reader_->GetStripeInformation(stripe); + RETURN_NOT_OK(reader_->Seek(stripe_info.first_row_id)); + ARROW_ASSIGN_OR_RAISE( + current_stripe_reader_, + reader_->NextStripeReader(batch_size_, included_fields_)); + } } - std::shared_ptr record_batch_reader_; + std::unique_ptr reader_; + std::vector included_fields_; + std::vector stripe_indices_; + int64_t batch_size_; + size_t next_stripe_index_; + std::shared_ptr current_stripe_reader_; }; return Impl::Make(fragment_->source(), *checked_pointer_cast(fragment_)->format(), - *options_); + *options_, + fragment_); } private: @@ -154,6 +265,14 @@ Result> OrcFileFormat::Inspect(const FileSource& source) return reader->ReadSchema(); } +Result> OrcFileFormat::MakeFragment( + FileSource source, compute::Expression partition_expression, + std::shared_ptr physical_schema) { + return std::shared_ptr(new OrcFileFragment( + std::move(source), shared_from_this(), std::move(partition_expression), + std::move(physical_schema))); +} + Result OrcFileFormat::ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& file) const { @@ -212,6 +331,141 @@ Future> OrcFileFormat::CountRows( })); } +// // +// // OrcFileFragment +// // + +OrcFileFragment::OrcFileFragment(FileSource source, + std::shared_ptr format, + compute::Expression partition_expression, + std::shared_ptr physical_schema) + : FileFragment(std::move(source), std::move(format), + std::move(partition_expression), std::move(physical_schema)) {} + +Status OrcFileFragment::EnsureMetadataCached() { + auto lock = metadata_mutex_.Lock(); + + if (metadata_cached_) { + return Status::OK(); + } + + // Open reader to get schema and stripe information + ARROW_ASSIGN_OR_RAISE(auto reader, OpenORCReader(source())); + ARROW_ASSIGN_OR_RAISE(cached_schema_, reader->ReadSchema()); + + // Get number of stripes and cache stripe info + int num_stripes = reader->NumberOfStripes(); + + // Cache stripe row counts for later use + stripe_num_rows_.resize(num_stripes); + for (int i = 0; i < num_stripes; i++) { + ARROW_ASSIGN_OR_RAISE(auto stripe_metadata, reader->GetStripeMetadata(i)); + stripe_num_rows_[i] = stripe_metadata->num_rows; + } + + metadata_cached_ = true; + return Status::OK(); +} + +Result> OrcFileFragment::TestStripes( + const compute::Expression& predicate) { + + // Ensure metadata is loaded + RETURN_NOT_OK(EnsureMetadataCached()); + + // Resolve and de-duplicate top-level INT32/INT64 fields referenced in predicate. + std::vector resolved_fields; + std::vector field_seen(cached_schema_->num_fields(), false); + for (const FieldRef& field_ref : compute::FieldsInExpression(predicate)) { + ARROW_ASSIGN_OR_RAISE(auto match, field_ref.FindOne(*cached_schema_)); + if (!match.has_value()) { + continue; + } + const auto& [field_indices, field] = *match; + if (field_indices.size() != 1) { + continue; + } + int field_index = field_indices[0]; + if (field_seen[field_index]) { + continue; + } + if (field->type()->id() != Type::INT32 && field->type()->id() != Type::INT64) { + continue; + } + field_seen[field_index] = true; + resolved_fields.push_back( + ResolvedFieldRef{field_ref, field, static_cast(field_index + 1)}); + } + + // Open reader if not already cached + if (!cached_reader_) { + auto lock = metadata_mutex_.Lock(); + if (!cached_reader_) { + ARROW_ASSIGN_OR_RAISE(auto input, source().Open()); + ARROW_ASSIGN_OR_RAISE( + cached_reader_, + adapters::orc::ORCFileReader::Open(std::move(input), arrow::default_memory_pool())); + } + } + + // Build a stripe-level guarantee expression and simplify predicate for each stripe. + std::vector simplified_expressions; + simplified_expressions.reserve(stripe_num_rows_.size()); + for (size_t stripe_idx = 0; stripe_idx < stripe_num_rows_.size(); stripe_idx++) { + ARROW_ASSIGN_OR_RAISE(auto stripe_stats, cached_reader_->GetStripeStatistics(stripe_idx)); + compute::Expression stripe_guarantee = compute::literal(true); + for (const auto& resolved_field : resolved_fields) { + const auto* col_stats = + stripe_stats->getColumnStatistics(resolved_field.orc_column_id); + auto field_guarantee = DeriveFieldGuarantee(resolved_field, col_stats); + if (field_guarantee.Equals(compute::literal(true))) { + continue; + } + if (stripe_guarantee.Equals(compute::literal(true))) { + stripe_guarantee = std::move(field_guarantee); + } else { + stripe_guarantee = + compute::and_(std::move(stripe_guarantee), std::move(field_guarantee)); + } + } + ARROW_ASSIGN_OR_RAISE(auto simplified, + compute::SimplifyWithGuarantee(predicate, stripe_guarantee)); + simplified_expressions.push_back(std::move(simplified)); + } + + return simplified_expressions; +} + +Result> OrcFileFragment::FilterStripes( + const compute::Expression& predicate) { + RETURN_NOT_OK(EnsureMetadataCached()); + + // Feature flag for disabling predicate pushdown + if (auto env_var = arrow::internal::GetEnvVar("ARROW_ORC_DISABLE_PREDICATE_PUSHDOWN")) { + if (env_var.ok() && *env_var == "1") { + // Return all stripe indices + std::vector all_stripes(stripe_num_rows_.size()); + std::iota(all_stripes.begin(), all_stripes.end(), 0); + return all_stripes; + } + } + + // Test each stripe + ARROW_ASSIGN_OR_RAISE(auto tested_expressions, TestStripes(predicate)); + + // Select stripes where predicate is satisfiable + std::vector selected_stripes; + selected_stripes.reserve(stripe_num_rows_.size()); + + for (size_t i = 0; i < tested_expressions.size(); i++) { + if (compute::IsSatisfiable(tested_expressions[i])) { + selected_stripes.push_back(static_cast(i)); + } + } + + return selected_stripes; +} + // // // // OrcFileWriter, OrcFileWriteOptions // // diff --git a/cpp/src/arrow/dataset/file_orc.h b/cpp/src/arrow/dataset/file_orc.h index 5bfefd1e02b..857c6ea8312 100644 --- a/cpp/src/arrow/dataset/file_orc.h +++ b/cpp/src/arrow/dataset/file_orc.h @@ -22,11 +22,14 @@ #include #include +#include "arrow/adapters/orc/adapter.h" +#include "arrow/compute/type_fwd.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" #include "arrow/io/type_fwd.h" #include "arrow/result.h" +#include "arrow/util/mutex.h" namespace arrow { namespace dataset { @@ -53,6 +56,10 @@ class ARROW_DS_EXPORT OrcFileFormat : public FileFormat { /// \brief Return the schema of the file if possible. Result> Inspect(const FileSource& source) const override; + Result> MakeFragment( + FileSource source, compute::Expression partition_expression, + std::shared_ptr physical_schema) override; + Result ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& file) const override; @@ -69,6 +76,49 @@ class ARROW_DS_EXPORT OrcFileFormat : public FileFormat { std::shared_ptr DefaultWriteOptions() override; }; +/// \brief A FileFragment implementation for ORC files with predicate pushdown +class ARROW_DS_EXPORT OrcFileFragment : public FileFragment { + public: + /// \brief Filter stripes based on predicate using stripe statistics + /// + /// Returns indices of stripes where the predicate may be satisfied. + /// Supports INT32/INT64 columns and conservative handling of missing or + /// unsupported statistics. + /// + /// \param predicate Arrow compute expression to evaluate + /// \return Vector of stripe indices to read (0-based) + Result> FilterStripes(const compute::Expression& predicate); + + /// \brief Ensure metadata is cached + Status EnsureMetadataCached(); + + private: + OrcFileFragment(FileSource source, std::shared_ptr format, + compute::Expression partition_expression, + std::shared_ptr physical_schema); + + /// \brief Test each stripe against predicate + /// + /// Returns simplified expressions (one per stripe) after applying + /// stripe statistics as guarantees. + /// + /// \param predicate Arrow compute expression to test + /// \return Vector of simplified expressions + Result> TestStripes( + const compute::Expression& predicate); + + // Cached metadata to avoid repeated I/O + mutable util::Mutex metadata_mutex_; + mutable std::shared_ptr cached_schema_; + mutable std::vector stripe_num_rows_; + mutable bool metadata_cached_ = false; + + // Cached ORC reader for stripe statistics. + mutable std::unique_ptr cached_reader_; + + friend class OrcFileFormat; +}; + /// @} } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_orc_test.cc b/cpp/src/arrow/dataset/file_orc_test.cc index 17be015de51..1595b0f0633 100644 --- a/cpp/src/arrow/dataset/file_orc_test.cc +++ b/cpp/src/arrow/dataset/file_orc_test.cc @@ -17,14 +17,20 @@ #include "arrow/dataset/file_orc.h" +#include #include #include +#include #include "arrow/adapters/orc/adapter.h" +#include "arrow/array/array_primitive.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/compute/api.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/discovery.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/partition.h" +#include "arrow/filesystem/mockfs.h" #include "arrow/dataset/test_util_internal.h" #include "arrow/io/memory.h" #include "arrow/record_batch.h" @@ -88,6 +94,182 @@ TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumnError) { TestScanWithDuplicateColumnError(); } TEST_P(TestOrcFileFormatScan, ScanWithPushdownNulls) { TestScanWithPushdownNulls(); } + +TEST_P(TestOrcFileFormatScan, PredicatePushdownAllNullStripes) { + auto value_field = field("i64", int64()); + const auto test_schema = schema({value_field}); + + const int64_t nrows = 2048; + Int64Builder null_builder; + ASSERT_OK(null_builder.AppendNulls(nrows)); + ASSERT_OK_AND_ASSIGN(auto all_null_values, null_builder.Finish()); + + Int64Builder value_builder; + for (int64_t i = 0; i < nrows; ++i) { + ASSERT_OK(value_builder.Append(i)); + } + ASSERT_OK_AND_ASSIGN(auto non_null_values, value_builder.Finish()); + + auto all_null_batch = RecordBatch::Make(test_schema, nrows, {all_null_values}); + auto non_null_batch = RecordBatch::Make(test_schema, nrows, {non_null_values}); + + ASSERT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create()); + adapters::orc::WriteOptions write_options; + write_options.stripe_size = 4096; + ASSERT_OK_AND_ASSIGN(auto writer, + adapters::orc::ORCFileWriter::Open(sink.get(), write_options)); + ASSERT_OK(writer->Write(*all_null_batch)); + ASSERT_OK(writer->Write(*non_null_batch)); + ASSERT_OK(writer->Close()); + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + + FileSource source(buffer); + ASSERT_OK_AND_ASSIGN(auto fragment_base, format_->MakeFragment(source, literal(true))); + auto orc_fragment = checked_pointer_cast(fragment_base); + + ASSERT_OK_AND_ASSIGN(auto input, source.Open()); + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(std::move(input), + default_memory_pool())); + + std::vector all_null_stripes; + std::vector non_all_null_stripes; + const int64_t num_stripes = reader->NumberOfStripes(); + for (int64_t stripe = 0; stripe < num_stripes; ++stripe) { + ASSERT_OK_AND_ASSIGN(auto stripe_stats, reader->GetStripeStatistics(stripe)); + const auto* col_stats = stripe_stats->getColumnStatistics(1); + ASSERT_NE(col_stats, nullptr); + + if (col_stats->hasNull() && col_stats->getNumberOfValues() == 0) { + all_null_stripes.push_back(static_cast(stripe)); + } else { + non_all_null_stripes.push_back(static_cast(stripe)); + } + } + ASSERT_FALSE(all_null_stripes.empty()); + ASSERT_FALSE(non_all_null_stripes.empty()); + + ASSERT_OK_AND_ASSIGN( + auto is_null_selected, + orc_fragment->FilterStripes(compute::is_null(compute::field_ref("i64")))); + for (int stripe : all_null_stripes) { + EXPECT_NE(std::find(is_null_selected.begin(), is_null_selected.end(), stripe), + is_null_selected.end()); + } + + ASSERT_OK_AND_ASSIGN( + auto is_not_null_selected, + orc_fragment->FilterStripes( + compute::not_(compute::is_null(compute::field_ref("i64"))))); + for (int stripe : all_null_stripes) { + EXPECT_EQ(std::find(is_not_null_selected.begin(), is_not_null_selected.end(), stripe), + is_not_null_selected.end()); + } +} + +TEST_P(TestOrcFileFormatScan, PredicatePushdownWithFileSystemSource) { + auto mock_fs = std::make_shared(fs::kNoTime); + std::shared_ptr test_schema = schema({field("x", int64())}); + std::shared_ptr batch = RecordBatchFromJSON(test_schema, "[[0], [1], [2]]"); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr out_stream, + mock_fs->OpenOutputStream("/foo.orc")); + ASSERT_OK_AND_ASSIGN(auto writer, adapters::orc::ORCFileWriter::Open(out_stream.get())); + ASSERT_OK(writer->Write(*batch)); + ASSERT_OK(writer->Close()); + + FileSource source("/foo.orc", mock_fs); + ASSERT_OK_AND_ASSIGN(auto fragment_base, format_->MakeFragment(source, literal(true))); + auto orc_fragment = checked_pointer_cast(fragment_base); + + ASSERT_OK_AND_ASSIGN( + auto stripes, + orc_fragment->FilterStripes(compute::greater(compute::field_ref("x"), + compute::literal(int64_t{1})))); + ASSERT_FALSE(stripes.empty()); +} + +TEST_P(TestOrcFileFormatScan, PredicatePushdownRepeatedFilterCallsAreStable) { + auto test_schema = schema({field("a", int64()), field("b", int64())}); + const int64_t nrows = 2048; + + Int64Builder a_first_builder; + Int64Builder b_first_builder; + Int64Builder a_second_builder; + Int64Builder b_second_builder; + for (int64_t i = 0; i < nrows; ++i) { + ASSERT_OK(a_first_builder.Append(i)); + ASSERT_OK(b_first_builder.Append(10000 + i)); + ASSERT_OK(a_second_builder.Append(10000 + i)); + ASSERT_OK(b_second_builder.Append(i)); + } + ASSERT_OK_AND_ASSIGN(auto a_first, a_first_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto b_first, b_first_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto a_second, a_second_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto b_second, b_second_builder.Finish()); + + auto first_batch = RecordBatch::Make(test_schema, nrows, {a_first, b_first}); + auto second_batch = RecordBatch::Make(test_schema, nrows, {a_second, b_second}); + + ASSERT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create()); + adapters::orc::WriteOptions write_options; + write_options.stripe_size = 4096; + ASSERT_OK_AND_ASSIGN(auto writer, + adapters::orc::ORCFileWriter::Open(sink.get(), write_options)); + ASSERT_OK(writer->Write(*first_batch)); + ASSERT_OK(writer->Write(*second_batch)); + ASSERT_OK(writer->Close()); + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + + FileSource source(buffer); + ASSERT_OK_AND_ASSIGN(auto fragment_base, format_->MakeFragment(source, literal(true))); + auto orc_fragment = checked_pointer_cast(fragment_base); + + ASSERT_OK_AND_ASSIGN(auto input, source.Open()); + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open(std::move(input), + default_memory_pool())); + + std::vector expected_a; + std::vector expected_b; + for (int64_t stripe = 0; stripe < reader->NumberOfStripes(); ++stripe) { + ASSERT_OK_AND_ASSIGN(auto stripe_batch, reader->ReadStripe(stripe)); + auto a_array = checked_pointer_cast(stripe_batch->column(0)); + auto b_array = checked_pointer_cast(stripe_batch->column(1)); + + bool a_has_match = false; + bool b_has_match = false; + for (int64_t i = 0; i < stripe_batch->num_rows(); ++i) { + if (!a_array->IsNull(i) && a_array->Value(i) < 10) { + a_has_match = true; + } + if (!b_array->IsNull(i) && b_array->Value(i) < 10) { + b_has_match = true; + } + if (a_has_match && b_has_match) { + break; + } + } + if (a_has_match) { + expected_a.push_back(static_cast(stripe)); + } + if (b_has_match) { + expected_b.push_back(static_cast(stripe)); + } + } + + auto pred_a = compute::less(compute::field_ref("a"), compute::literal(int64_t{10})); + auto pred_b = compute::less(compute::field_ref("b"), compute::literal(int64_t{10})); + + ASSERT_OK_AND_ASSIGN(auto selected_a_first, orc_fragment->FilterStripes(pred_a)); + ASSERT_OK_AND_ASSIGN(auto selected_b, orc_fragment->FilterStripes(pred_b)); + ASSERT_OK_AND_ASSIGN(auto selected_a_second, orc_fragment->FilterStripes(pred_a)); + + EXPECT_EQ(selected_a_first, expected_a); + EXPECT_EQ(selected_b, expected_b); + EXPECT_EQ(selected_a_second, expected_a); +} + INSTANTIATE_TEST_SUITE_P(TestScan, TestOrcFileFormatScan, ::testing::ValuesIn(TestFormatParams::Values()), TestFormatParams::ToTestNameString); diff --git a/python/pyarrow/orc.py b/python/pyarrow/orc.py index 4e0d66ec665..2bfca20737f 100644 --- a/python/pyarrow/orc.py +++ b/python/pyarrow/orc.py @@ -15,8 +15,26 @@ # specific language governing permissions and limitations # under the License. +""" +Apache ORC file format with predicate pushdown support. + +ORC supports stripe-level filtering using column statistics for INT32/INT64 columns. + +**Dataset API** (recommended for multiple files):: + + >>> import pyarrow.dataset as ds + >>> dataset = ds.dataset('data.orc', format='orc') + >>> table = dataset.to_table(filter=ds.field('value') > 100) + +**Convenience API** (single file):: + + >>> import pyarrow.orc as orc + >>> table = orc.read_table('data.orc', filters=ds.field('value') > 100) + >>> table = orc.read_table('data.orc', filters=[('value', '>', 100)]) # DNF tuples +""" from numbers import Integral +import os import warnings from pyarrow.lib import Table @@ -297,17 +315,35 @@ def close(self): self.is_open = False -def read_table(source, columns=None, filesystem=None): +def read_table(source, columns=None, filesystem=None, filters=None): filesystem, path = _resolve_filesystem_and_path(source, filesystem) + + if filters is not None: + import pyarrow.dataset as ds + from pyarrow.parquet.core import filters_to_expression + + # filters_to_expression handles both Expression and DNF tuple formats + filter_expr = filters_to_expression(filters) + + # Dataset API requires path-like inputs. For file-like/NativeFile inputs + # fall back to direct ORC read + in-memory filtering for compatibility. + if filesystem is None and not isinstance(path, (str, bytes, os.PathLike)): + if columns is not None and len(columns) == 0: + result = ORCFile(source).read().select(columns) + else: + result = ORCFile(source).read(columns=columns) + return result.filter(filter_expr) + + dataset_source = path if filesystem is not None else source + dataset = ds.dataset(dataset_source, format='orc', filesystem=filesystem) + return dataset.to_table(columns=columns, filter=filter_expr) + if filesystem is not None: source = filesystem.open_input_file(path) if columns is not None and len(columns) == 0: - result = ORCFile(source).read().select(columns) - else: - result = ORCFile(source).read(columns=columns) - - return result + return ORCFile(source).read().select(columns) + return ORCFile(source).read(columns=columns) read_table.__doc__ = """ @@ -330,6 +366,45 @@ def read_table(source, columns=None, filesystem=None): If nothing passed, will be inferred based on path. Path will try to be found in the local on-disk filesystem otherwise it will be parsed as an URI to determine the filesystem. +filters : pyarrow.compute.Expression or List[Tuple] or List[List[Tuple]], default None + Predicate expression to filter rows. Uses ORC stripe-level statistics for + optimization when possible. + + Accepts Expression objects or DNF (Disjunctive Normal Form) tuples:: + + # Expression format + filters=ds.field('id') > 100 + + # DNF tuples: list of conditions (AND), or list of lists (OR of ANDs) + filters=[('id', '>', 100)] # single condition + filters=[('id', '>', 100), ('id', '<', 200)] # AND + filters=[[('x', '==', 1)], [('x', '==', 2)]] # OR + + Supported operators: ==, !=, <, >, <=, >=, in, not in + + Note: For path-like inputs, filters are evaluated through the dataset API. + For file-like inputs, read_table falls back to in-memory filtering. + +Returns +------- +pyarrow.Table + Content of the file as a Table. + +Examples +-------- +Read entire file: + +>>> import pyarrow.orc as orc +>>> table = orc.read_table('data.orc') + +Read with predicate pushdown: + +>>> import pyarrow.dataset as ds +>>> table = orc.read_table('data.orc', filters=ds.field('id') > 1000) + +Read with column selection and filtering: + +>>> table = orc.read_table('data.orc', columns=['id', 'value'], filters=[('id', '>', 1000)]) """ diff --git a/python/pyarrow/tests/test_orc.py b/python/pyarrow/tests/test_orc.py index 27154a6f34f..2b24f331d5c 100644 --- a/python/pyarrow/tests/test_orc.py +++ b/python/pyarrow/tests/test_orc.py @@ -697,3 +697,137 @@ def test_orc_writer_with_null_arrays(tempdir): table = pa.table({"int64": a, "utf8": b}) with pytest.raises(pa.ArrowNotImplementedError): orc.write_table(table, path) + + +def test_read_table_with_filters_expression(tempdir): + """Smoke test: filters parameter with Expression format.""" + from pyarrow import orc + import pyarrow.dataset as ds + + path = str(tempdir / 'test.orc') + table = pa.table({'id': range(1000), 'value': range(1000)}) + orc.write_table(table, path) + + result = orc.read_table(path, filters=ds.field('id') > 500) + assert result.num_rows == 499 + assert result['id'].to_pylist()[0] == 501 + + +def test_read_table_with_filters_dnf(tempdir): + """Smoke test: filters parameter with DNF tuple format.""" + from pyarrow import orc + + path = str(tempdir / 'test.orc') + table = pa.table({'id': range(1000), 'value': range(1000)}) + orc.write_table(table, path) + + # Single condition + result = orc.read_table(path, filters=[('id', '>', 500)]) + assert result.num_rows == 499 + + # Multiple conditions (AND) + result = orc.read_table(path, filters=[('id', '>', 100), ('id', '<', 200)]) + assert result.num_rows == 99 + + +def test_read_table_filters_with_columns(tempdir): + """Integration: filters with column projection.""" + from pyarrow import orc + import pyarrow.dataset as ds + + path = str(tempdir / 'test.orc') + table = pa.table({'id': range(1000), 'value': range(1000), 'extra': ['x'] * 1000}) + orc.write_table(table, path) + + result = orc.read_table(path, columns=['id', 'value'], filters=ds.field('id') < 100) + assert result.num_rows == 100 + assert result.num_columns == 2 + assert set(result.column_names) == {'id', 'value'} + + +def test_read_table_filters_correctness(tempdir): + """Correctness: filtered results match unfiltered + post-filter.""" + from pyarrow import orc + import pyarrow.dataset as ds + + path = str(tempdir / 'test.orc') + table = pa.table({'id': range(1000), 'value': range(1000)}) + orc.write_table(table, path) + + filter_expr = ds.field('id') > 500 + filtered = orc.read_table(path, filters=filter_expr) + unfiltered = orc.read_table(path) + expected = unfiltered.filter(filter_expr) + + assert filtered.equals(expected) + + +def test_read_table_filters_none(tempdir): + """Edge case: filters=None behaves as no filter.""" + from pyarrow import orc + + path = str(tempdir / 'test.orc') + table = pa.table({'id': range(100)}) + orc.write_table(table, path) + + result = orc.read_table(path, filters=None) + assert result.num_rows == 100 + + +def test_read_table_filters_all_null_semantics(tempdir): + """IS NULL/IS NOT NULL semantics with all-null stripes.""" + from pyarrow import orc + import pyarrow.dataset as ds + + path = str(tempdir / 'all_null.orc') + n = 2048 + with orc.ORCWriter(path, stripe_size=4096) as writer: + writer.write(pa.table({'id': pa.array([None] * n, type=pa.int64())})) + writer.write(pa.table({'id': pa.array(range(n), type=pa.int64())})) + + is_null = orc.read_table(path, filters=ds.field('id').is_null()) + assert is_null.num_rows == n + assert is_null['id'].null_count == n + + is_not_null = orc.read_table(path, filters=ds.field('id').is_valid()) + assert is_not_null.num_rows == n + assert is_not_null['id'].null_count == 0 + + +def test_read_table_filters_buffer_reader_fallback(): + """filters=... works with BufferReader via in-memory filter fallback.""" + from pyarrow import orc + import pyarrow.dataset as ds + + table = pa.table({'id': range(10), 'value': range(10)}) + sink = pa.BufferOutputStream() + orc.write_table(table, sink) + source = pa.BufferReader(sink.getvalue()) + + result = orc.read_table(source, filters=ds.field('id') > 5) + assert result.num_rows == 4 + assert result['id'].to_pylist() == [6, 7, 8, 9] + + +def test_parquet_orc_predicate_pushdown_parity(tempdir): + """Equivalent ORC and Parquet predicates should produce equal results.""" + from pyarrow import orc + import pyarrow.dataset as ds + import pyarrow.parquet as pq + + data = pa.table({ + 'id': range(1000), + 'group': [i % 7 for i in range(1000)], + 'value': [i * 3 for i in range(1000)], + }) + + orc_path = str(tempdir / 'parity.orc') + parquet_path = str(tempdir / 'parity.parquet') + orc.write_table(data, orc_path, stripe_size=4096) + pq.write_table(data, parquet_path, row_group_size=128) + + filt = (ds.field('id') >= 123) & (ds.field('id') < 876) & (ds.field('group') == 3) + orc_result = orc.read_table(orc_path, filters=filt) + parquet_result = pq.read_table(parquet_path, filters=filt) + + assert orc_result.equals(parquet_result)