|
18 | 18 | #include "arrow/adapters/orc/adapter.h" |
19 | 19 |
|
20 | 20 | #include <algorithm> |
| 21 | +#include <cmath> |
21 | 22 | #include <list> |
22 | 23 | #include <memory> |
23 | 24 | #include <sstream> |
|
33 | 34 | #include "arrow/io/interfaces.h" |
34 | 35 | #include "arrow/memory_pool.h" |
35 | 36 | #include "arrow/record_batch.h" |
| 37 | +#include "arrow/scalar.h" |
36 | 38 | #include "arrow/status.h" |
37 | 39 | #include "arrow/table.h" |
38 | 40 | #include "arrow/table_builder.h" |
|
43 | 45 | #include "arrow/util/key_value_metadata.h" |
44 | 46 | #include "arrow/util/macros.h" |
45 | 47 | #include "orc/Exceptions.hh" |
| 48 | +#include "orc/Statistics.hh" |
| 49 | +#include "orc/Type.hh" |
46 | 50 |
|
47 | 51 | // alias to not interfere with nested orc namespace |
48 | 52 | namespace liborc = orc; |
@@ -98,6 +102,131 @@ namespace { |
98 | 102 | // The following is required by ORC to be uint64_t |
99 | 103 | constexpr uint64_t kOrcNaturalWriteSize = 128 * 1024; |
100 | 104 |
|
| 105 | +// Convert ORC column statistics to Arrow OrcColumnStatistics |
| 106 | +// Returns a Result with the converted statistics, or an error if conversion fails |
| 107 | +Result<OrcColumnStatistics> ConvertColumnStatistics( |
| 108 | + const liborc::ColumnStatistics* orc_stats) { |
| 109 | + OrcColumnStatistics stats; |
| 110 | + |
| 111 | + stats.has_null = orc_stats->hasNull(); |
| 112 | + stats.num_values = static_cast<int64_t>(orc_stats->getNumberOfValues()); |
| 113 | + stats.has_min_max = false; |
| 114 | + stats.min = nullptr; |
| 115 | + stats.max = nullptr; |
| 116 | + |
| 117 | + // Try to extract min/max based on the ORC column statistics type |
| 118 | + const auto* int_stats = |
| 119 | + dynamic_cast<const liborc::IntegerColumnStatistics*>( |
| 120 | + orc_stats); |
| 121 | + const auto* double_stats = |
| 122 | + dynamic_cast<const liborc::DoubleColumnStatistics*>( |
| 123 | + orc_stats); |
| 124 | + const auto* string_stats = |
| 125 | + dynamic_cast<const liborc::StringColumnStatistics*>( |
| 126 | + orc_stats); |
| 127 | + const auto* date_stats = |
| 128 | + dynamic_cast<const liborc::DateColumnStatistics*>( |
| 129 | + orc_stats); |
| 130 | + const auto* ts_stats = |
| 131 | + dynamic_cast<const liborc::TimestampColumnStatistics*>( |
| 132 | + orc_stats); |
| 133 | + const auto* decimal_stats = |
| 134 | + dynamic_cast<const liborc::DecimalColumnStatistics*>( |
| 135 | + orc_stats); |
| 136 | + |
| 137 | + if (int_stats != nullptr) { |
| 138 | + if (int_stats->hasMinimum() && int_stats->hasMaximum()) { |
| 139 | + stats.has_min_max = true; |
| 140 | + stats.min = std::make_shared<Int64Scalar>( |
| 141 | + int_stats->getMinimum()); |
| 142 | + stats.max = std::make_shared<Int64Scalar>( |
| 143 | + int_stats->getMaximum()); |
| 144 | + } |
| 145 | + } else if (double_stats != nullptr) { |
| 146 | + if (double_stats->hasMinimum() && |
| 147 | + double_stats->hasMaximum()) { |
| 148 | + double min_val = double_stats->getMinimum(); |
| 149 | + double max_val = double_stats->getMaximum(); |
| 150 | + if (!std::isnan(min_val) && !std::isnan(max_val)) { |
| 151 | + stats.has_min_max = true; |
| 152 | + stats.min = std::make_shared<DoubleScalar>(min_val); |
| 153 | + stats.max = std::make_shared<DoubleScalar>(max_val); |
| 154 | + } |
| 155 | + } |
| 156 | + } else if (string_stats != nullptr) { |
| 157 | + if (string_stats->hasMinimum() && |
| 158 | + string_stats->hasMaximum()) { |
| 159 | + stats.has_min_max = true; |
| 160 | + stats.min = std::make_shared<StringScalar>( |
| 161 | + string_stats->getMinimum()); |
| 162 | + stats.max = std::make_shared<StringScalar>( |
| 163 | + string_stats->getMaximum()); |
| 164 | + } |
| 165 | + } else if (date_stats != nullptr) { |
| 166 | + if (date_stats->hasMinimum() && |
| 167 | + date_stats->hasMaximum()) { |
| 168 | + stats.has_min_max = true; |
| 169 | + stats.min = std::make_shared<Date32Scalar>( |
| 170 | + date_stats->getMinimum(), date32()); |
| 171 | + stats.max = std::make_shared<Date32Scalar>( |
| 172 | + date_stats->getMaximum(), date32()); |
| 173 | + } |
| 174 | + } else if (ts_stats != nullptr) { |
| 175 | + if (ts_stats->hasMinimum() && ts_stats->hasMaximum()) { |
| 176 | + stats.has_min_max = true; |
| 177 | + // getMinimum/getMaximum return milliseconds. |
| 178 | + // getMinimumNanos/getMaximumNanos return the |
| 179 | + // last 6 digits of nanoseconds. |
| 180 | + int64_t min_millis = ts_stats->getMinimum(); |
| 181 | + int64_t max_millis = ts_stats->getMaximum(); |
| 182 | + int32_t min_nanos = ts_stats->getMinimumNanos(); |
| 183 | + int32_t max_nanos = ts_stats->getMaximumNanos(); |
| 184 | + |
| 185 | + // millis * 1,000,000 + sub-millisecond nanos |
| 186 | + int64_t min_ns = min_millis * 1000000LL + min_nanos; |
| 187 | + int64_t max_ns = max_millis * 1000000LL + max_nanos; |
| 188 | + |
| 189 | + auto ts_type = timestamp(TimeUnit::NANO); |
| 190 | + stats.min = |
| 191 | + std::make_shared<TimestampScalar>(min_ns, ts_type); |
| 192 | + stats.max = |
| 193 | + std::make_shared<TimestampScalar>(max_ns, ts_type); |
| 194 | + } |
| 195 | + } else if (decimal_stats != nullptr) { |
| 196 | + if (decimal_stats->hasMinimum() && |
| 197 | + decimal_stats->hasMaximum()) { |
| 198 | + liborc::Decimal min_dec = decimal_stats->getMinimum(); |
| 199 | + liborc::Decimal max_dec = decimal_stats->getMaximum(); |
| 200 | + |
| 201 | + if (min_dec.scale != max_dec.scale) { |
| 202 | + // Corrupted stats: scales should always match within |
| 203 | + // a column. has_min_max remains false (conservative). |
| 204 | + } else { |
| 205 | + stats.has_min_max = true; |
| 206 | + |
| 207 | + Decimal128 min_d128(min_dec.value.getHighBits(), |
| 208 | + min_dec.value.getLowBits()); |
| 209 | + Decimal128 max_d128(max_dec.value.getHighBits(), |
| 210 | + max_dec.value.getLowBits()); |
| 211 | + |
| 212 | + // Precision 38 is max for Decimal128; the dataset |
| 213 | + // layer will Cast() to the actual column type. |
| 214 | + auto dec_type = decimal128(38, min_dec.scale); |
| 215 | + |
| 216 | + stats.min = |
| 217 | + std::make_shared<Decimal128Scalar>(min_d128, |
| 218 | + dec_type); |
| 219 | + stats.max = |
| 220 | + std::make_shared<Decimal128Scalar>(max_d128, |
| 221 | + dec_type); |
| 222 | + } |
| 223 | + } |
| 224 | + } |
| 225 | + // Other types (Boolean, Binary, Collection, etc.) don't have min/max statistics |
| 226 | + |
| 227 | + return stats; |
| 228 | +} |
| 229 | + |
101 | 230 | using internal::checked_cast; |
102 | 231 |
|
103 | 232 | class ArrowInputFile : public liborc::InputStream { |
@@ -204,6 +333,52 @@ Status CheckTimeZoneDatabaseAvailability() { |
204 | 333 | } |
205 | 334 | #endif |
206 | 335 |
|
| 336 | +// Recursively build OrcSchemaField tree by walking paired ORC and Arrow types |
| 337 | +Status BuildOrcSchemaFieldsRecursive(const liborc::Type* orc_type, |
| 338 | + const std::shared_ptr<Field>& arrow_field, |
| 339 | + OrcSchemaField* out_field) { |
| 340 | + out_field->field = arrow_field; |
| 341 | + out_field->orc_column_id = static_cast<int>(orc_type->getColumnId()); |
| 342 | + |
| 343 | + // For struct types, recursively build children |
| 344 | + if (arrow_field->type()->id() == Type::STRUCT && |
| 345 | + orc_type->getKind() == liborc::STRUCT) { |
| 346 | + const auto& struct_type = checked_cast<const StructType&>(*arrow_field->type()); |
| 347 | + size_t num_children = struct_type.num_fields(); |
| 348 | + out_field->children.resize(num_children); |
| 349 | + |
| 350 | + for (size_t i = 0; i < num_children; ++i) { |
| 351 | + const liborc::Type* orc_subtype = orc_type->getSubtype(i); |
| 352 | + RETURN_NOT_OK(BuildOrcSchemaFieldsRecursive( |
| 353 | + orc_subtype, struct_type.field(static_cast<int>(i)), |
| 354 | + &out_field->children[i])); |
| 355 | + } |
| 356 | + } |
| 357 | + // For list types, handle the child |
| 358 | + else if (arrow_field->type()->id() == Type::LIST && |
| 359 | + orc_type->getKind() == liborc::LIST) { |
| 360 | + const auto& list_type = checked_cast<const ListType&>(*arrow_field->type()); |
| 361 | + out_field->children.resize(1); |
| 362 | + const liborc::Type* orc_subtype = orc_type->getSubtype(0); |
| 363 | + RETURN_NOT_OK(BuildOrcSchemaFieldsRecursive(orc_subtype, list_type.value_field(), |
| 364 | + &out_field->children[0])); |
| 365 | + } |
| 366 | + // For map types, handle key and value children |
| 367 | + else if (arrow_field->type()->id() == Type::MAP && |
| 368 | + orc_type->getKind() == liborc::MAP) { |
| 369 | + const auto& map_type = checked_cast<const MapType&>(*arrow_field->type()); |
| 370 | + out_field->children.resize(2); |
| 371 | + const liborc::Type* key_type = orc_type->getSubtype(0); |
| 372 | + const liborc::Type* value_type = orc_type->getSubtype(1); |
| 373 | + RETURN_NOT_OK(BuildOrcSchemaFieldsRecursive(key_type, map_type.key_field(), |
| 374 | + &out_field->children[0])); |
| 375 | + RETURN_NOT_OK(BuildOrcSchemaFieldsRecursive(value_type, map_type.item_field(), |
| 376 | + &out_field->children[1])); |
| 377 | + } |
| 378 | + |
| 379 | + return Status::OK(); |
| 380 | +} |
| 381 | + |
207 | 382 | } // namespace |
208 | 383 |
|
209 | 384 | class ORCFileReader::Impl { |
@@ -409,6 +584,42 @@ class ORCFileReader::Impl { |
409 | 584 | return ReadBatch(opts, schema, stripes_[static_cast<size_t>(stripe)].num_rows); |
410 | 585 | } |
411 | 586 |
|
| 587 | + Result<std::shared_ptr<Table>> ReadStripes( |
| 588 | + const std::vector<int64_t>& stripe_indices) { |
| 589 | + if (stripe_indices.empty()) { |
| 590 | + return Status::Invalid("stripe_indices cannot be empty"); |
| 591 | + } |
| 592 | + |
| 593 | + std::vector<std::shared_ptr<Table>> tables; |
| 594 | + tables.reserve(stripe_indices.size()); |
| 595 | + |
| 596 | + for (int64_t stripe_index : stripe_indices) { |
| 597 | + ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index)); |
| 598 | + ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches({batch})); |
| 599 | + tables.push_back(table); |
| 600 | + } |
| 601 | + |
| 602 | + return ConcatenateTables(tables); |
| 603 | + } |
| 604 | + |
| 605 | + Result<std::shared_ptr<Table>> ReadStripes(const std::vector<int64_t>& stripe_indices, |
| 606 | + const std::vector<int>& include_indices) { |
| 607 | + if (stripe_indices.empty()) { |
| 608 | + return Status::Invalid("stripe_indices cannot be empty"); |
| 609 | + } |
| 610 | + |
| 611 | + std::vector<std::shared_ptr<Table>> tables; |
| 612 | + tables.reserve(stripe_indices.size()); |
| 613 | + |
| 614 | + for (int64_t stripe_index : stripe_indices) { |
| 615 | + ARROW_ASSIGN_OR_RAISE(auto batch, ReadStripe(stripe_index, include_indices)); |
| 616 | + ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches({batch})); |
| 617 | + tables.push_back(table); |
| 618 | + } |
| 619 | + |
| 620 | + return ConcatenateTables(tables); |
| 621 | + } |
| 622 | + |
412 | 623 | Status SelectStripe(liborc::RowReaderOptions* opts, int64_t stripe) { |
413 | 624 | ARROW_RETURN_IF(stripe < 0 || stripe >= NumberOfStripes(), |
414 | 625 | Status::Invalid("Out of bounds stripe: ", stripe)); |
@@ -548,6 +759,102 @@ class ORCFileReader::Impl { |
548 | 759 | return NextStripeReader(batch_size, empty_vec); |
549 | 760 | } |
550 | 761 |
|
| 762 | + Result<OrcColumnStatistics> GetColumnStatistics(int column_index) { |
| 763 | + ORC_BEGIN_CATCH_NOT_OK |
| 764 | + std::unique_ptr<liborc::Statistics> file_stats = reader_->getStatistics(); |
| 765 | + if (column_index < 0 || |
| 766 | + static_cast<uint32_t>(column_index) >= file_stats->getNumberOfColumns()) { |
| 767 | + return Status::Invalid("Column index ", column_index, " out of range [0, ", |
| 768 | + file_stats->getNumberOfColumns(), ")"); |
| 769 | + } |
| 770 | + // NOTE: col_stats is a non-owning pointer into file_stats. |
| 771 | + // ConvertColumnStatistics copies all values into Arrow scalars synchronously, |
| 772 | + // so the pointer remains valid for the duration of this call. |
| 773 | + const liborc::ColumnStatistics* col_stats = |
| 774 | + file_stats->getColumnStatistics(static_cast<uint32_t>(column_index)); |
| 775 | + return ConvertColumnStatistics(col_stats); |
| 776 | + ORC_END_CATCH_NOT_OK |
| 777 | + } |
| 778 | + |
| 779 | + Result<OrcColumnStatistics> GetStripeColumnStatistics(int64_t stripe_index, |
| 780 | + int column_index) { |
| 781 | + ORC_BEGIN_CATCH_NOT_OK |
| 782 | + if (stripe_index < 0 || stripe_index >= static_cast<int64_t>(stripes_.size())) { |
| 783 | + return Status::Invalid("Stripe index ", stripe_index, " out of range"); |
| 784 | + } |
| 785 | + std::unique_ptr<liborc::Statistics> stripe_stats = |
| 786 | + reader_->getStripeStatistics(static_cast<uint64_t>(stripe_index)); |
| 787 | + if (column_index < 0 || |
| 788 | + static_cast<uint32_t>(column_index) >= stripe_stats->getNumberOfColumns()) { |
| 789 | + return Status::Invalid("Column index ", column_index, " out of range [0, ", |
| 790 | + stripe_stats->getNumberOfColumns(), ")"); |
| 791 | + } |
| 792 | + // NOTE: col_stats is a non-owning pointer into stripe_stats. |
| 793 | + // ConvertColumnStatistics copies all values into Arrow scalars synchronously, |
| 794 | + // so the pointer remains valid for the duration of this call. |
| 795 | + const liborc::ColumnStatistics* col_stats = |
| 796 | + stripe_stats->getColumnStatistics(static_cast<uint32_t>(column_index)); |
| 797 | + return ConvertColumnStatistics(col_stats); |
| 798 | + ORC_END_CATCH_NOT_OK |
| 799 | + } |
| 800 | + |
| 801 | + Result<std::vector<OrcColumnStatistics>> GetStripeStatistics( |
| 802 | + int64_t stripe_index, const std::vector<int>& column_indices) { |
| 803 | + ORC_BEGIN_CATCH_NOT_OK |
| 804 | + if (stripe_index < 0 || stripe_index >= static_cast<int64_t>(stripes_.size())) { |
| 805 | + return Status::Invalid("Stripe index ", stripe_index, " out of range"); |
| 806 | + } |
| 807 | + std::unique_ptr<liborc::Statistics> stripe_stats = |
| 808 | + reader_->getStripeStatistics(static_cast<uint64_t>(stripe_index)); |
| 809 | + std::vector<OrcColumnStatistics> results; |
| 810 | + results.reserve(column_indices.size()); |
| 811 | + for (int col_idx : column_indices) { |
| 812 | + if (col_idx < 0 || |
| 813 | + static_cast<uint32_t>(col_idx) >= stripe_stats->getNumberOfColumns()) { |
| 814 | + return Status::Invalid("Column index ", col_idx, " out of range [0, ", |
| 815 | + stripe_stats->getNumberOfColumns(), ")"); |
| 816 | + } |
| 817 | + // NOTE: col_stats is a non-owning pointer into stripe_stats. |
| 818 | + // ConvertColumnStatistics copies all values into Arrow scalars synchronously, |
| 819 | + // so the pointer remains valid for the duration of this call. |
| 820 | + const liborc::ColumnStatistics* col_stats = |
| 821 | + stripe_stats->getColumnStatistics(static_cast<uint32_t>(col_idx)); |
| 822 | + ARROW_ASSIGN_OR_RAISE(auto converted, ConvertColumnStatistics(col_stats)); |
| 823 | + results.push_back(std::move(converted)); |
| 824 | + } |
| 825 | + return results; |
| 826 | + ORC_END_CATCH_NOT_OK |
| 827 | + } |
| 828 | + |
| 829 | + Result<std::shared_ptr<OrcSchemaManifest>> BuildSchemaManifest( |
| 830 | + const std::shared_ptr<Schema>& arrow_schema) { |
| 831 | + auto manifest = std::make_shared<OrcSchemaManifest>(); |
| 832 | + |
| 833 | + const liborc::Type& orc_root_type = reader_->getType(); |
| 834 | + |
| 835 | + if (orc_root_type.getKind() != liborc::STRUCT) { |
| 836 | + return Status::Invalid("ORC root type must be STRUCT"); |
| 837 | + } |
| 838 | + |
| 839 | + size_t num_fields = arrow_schema->num_fields(); |
| 840 | + if (num_fields != orc_root_type.getSubtypeCount()) { |
| 841 | + return Status::Invalid("Arrow schema field count (", num_fields, |
| 842 | + ") does not match ORC type subtype count (", |
| 843 | + orc_root_type.getSubtypeCount(), ")"); |
| 844 | + } |
| 845 | + |
| 846 | + manifest->schema_fields.resize(num_fields); |
| 847 | + |
| 848 | + for (size_t i = 0; i < num_fields; ++i) { |
| 849 | + const liborc::Type* orc_subtype = orc_root_type.getSubtype(i); |
| 850 | + RETURN_NOT_OK(BuildOrcSchemaFieldsRecursive( |
| 851 | + orc_subtype, arrow_schema->field(static_cast<int>(i)), |
| 852 | + &manifest->schema_fields[i])); |
| 853 | + } |
| 854 | + |
| 855 | + return manifest; |
| 856 | + } |
| 857 | + |
551 | 858 | private: |
552 | 859 | MemoryPool* pool_; |
553 | 860 | std::unique_ptr<liborc::Reader> reader_; |
@@ -613,6 +920,17 @@ Result<std::shared_ptr<RecordBatch>> ORCFileReader::ReadStripe( |
613 | 920 | return impl_->ReadStripe(stripe, include_names); |
614 | 921 | } |
615 | 922 |
|
| 923 | +Result<std::shared_ptr<Table>> ORCFileReader::ReadStripes( |
| 924 | + const std::vector<int64_t>& stripe_indices) { |
| 925 | + return impl_->ReadStripes(stripe_indices); |
| 926 | +} |
| 927 | + |
| 928 | +Result<std::shared_ptr<Table>> ORCFileReader::ReadStripes( |
| 929 | + const std::vector<int64_t>& stripe_indices, |
| 930 | + const std::vector<int>& include_indices) { |
| 931 | + return impl_->ReadStripes(stripe_indices, include_indices); |
| 932 | +} |
| 933 | + |
616 | 934 | Status ORCFileReader::Seek(int64_t row_number) { return impl_->Seek(row_number); } |
617 | 935 |
|
618 | 936 | Result<std::shared_ptr<RecordBatchReader>> ORCFileReader::NextStripeReader( |
@@ -678,6 +996,25 @@ std::string ORCFileReader::GetSerializedFileTail() { |
678 | 996 | return impl_->GetSerializedFileTail(); |
679 | 997 | } |
680 | 998 |
|
| 999 | +Result<OrcColumnStatistics> ORCFileReader::GetColumnStatistics(int column_index) { |
| 1000 | + return impl_->GetColumnStatistics(column_index); |
| 1001 | +} |
| 1002 | + |
| 1003 | +Result<OrcColumnStatistics> ORCFileReader::GetStripeColumnStatistics( |
| 1004 | + int64_t stripe_index, int column_index) { |
| 1005 | + return impl_->GetStripeColumnStatistics(stripe_index, column_index); |
| 1006 | +} |
| 1007 | + |
| 1008 | +Result<std::vector<OrcColumnStatistics>> ORCFileReader::GetStripeStatistics( |
| 1009 | + int64_t stripe_index, const std::vector<int>& column_indices) { |
| 1010 | + return impl_->GetStripeStatistics(stripe_index, column_indices); |
| 1011 | +} |
| 1012 | + |
| 1013 | +Result<std::shared_ptr<OrcSchemaManifest>> ORCFileReader::BuildSchemaManifest( |
| 1014 | + const std::shared_ptr<Schema>& arrow_schema) const { |
| 1015 | + return impl_->BuildSchemaManifest(arrow_schema); |
| 1016 | +} |
| 1017 | + |
681 | 1018 | namespace { |
682 | 1019 |
|
683 | 1020 | class ArrowOutputStream : public liborc::OutputStream { |
|
0 commit comments