Skip to content

Commit 59ec9b1

Browse files
committed
[C++][Python] Add public APIs for reading and serializing IPC dictionary messages
This adds low-level APIs for working with IPC dictionary messages outside of the stream/file reader/writer context, enabling message-at-a-time IPC workflows with dictionary-encoded data. C++ changes: - Add public ReadDictionary(Message, DictionaryMemo*, IpcReadOptions) to read a single dictionary message into a memo - Add CollectAndSerializeDictionaries(RecordBatch, DictionaryMemo*, IpcWriteOptions) to serialize dictionary messages with pointer-based deduplication - Expose dictionary_memo() accessor on RecordBatchStreamReader and RecordBatchFileReader - Refactor internal ReadDictionary to ReadDictionaryMessage in StreamDecoderInternal; make dictionary_memo_ protected Python changes: - Add ipc.read_dictionary_message() to populate a DictionaryMemo from a dictionary Message or Buffer - Add RecordBatch.serialize_dictionaries() to serialize dictionary IPC messages with memo-based deduplication - Add dictionary_memo property on RecordBatchStreamReader and RecordBatchFileReader - Add DictionaryMemo.wrap() for non-owning references to reader memos - Add read_dictionary_message to API docs - Comprehensive test coverage for all new APIs
1 parent 68d1368 commit 59ec9b1

12 files changed

Lines changed: 459 additions & 17 deletions

File tree

cpp/src/arrow/ipc/reader.cc

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -900,15 +900,6 @@ Status ReadDictionary(const Buffer& metadata, const IpcReadContext& context,
900900
return Status::OK();
901901
}
902902

903-
Status ReadDictionary(const Message& message, const IpcReadContext& context,
904-
DictionaryKind* kind) {
905-
// Only invoke this method if we already know we have a dictionary message
906-
DCHECK_EQ(message.type(), MessageType::DICTIONARY_BATCH);
907-
CHECK_HAS_BODY(message);
908-
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message.body()));
909-
return ReadDictionary(*message.metadata(), context, kind, reader.get());
910-
}
911-
912903
} // namespace
913904

914905
Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
@@ -948,6 +939,15 @@ Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
948939
reader.get());
949940
}
950941

942+
Status ReadDictionary(const Message& message, DictionaryMemo* dictionary_memo,
943+
const IpcReadOptions& options) {
944+
CHECK_MESSAGE_TYPE(MessageType::DICTIONARY_BATCH, message.type());
945+
CHECK_HAS_BODY(message);
946+
IpcReadContext context(dictionary_memo, options, /*swap=*/false);
947+
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message.body()));
948+
return ReadDictionary(*message.metadata(), context, /*kind=*/nullptr, reader.get());
949+
}
950+
951951
// Streaming format decoder
952952
class StreamDecoderInternal : public MessageDecoderListener {
953953
public:
@@ -966,11 +966,11 @@ class StreamDecoderInternal : public MessageDecoderListener {
966966
field_inclusion_mask_(),
967967
num_required_initial_dictionaries_(0),
968968
num_read_initial_dictionaries_(0),
969-
dictionary_memo_(),
970969
schema_(nullptr),
971970
filtered_schema_(nullptr),
972971
stats_(),
973-
swap_endian_(false) {}
972+
swap_endian_(false),
973+
dictionary_memo_() {}
974974

975975
Status OnMessageDecoded(std::unique_ptr<Message> message) override {
976976
++stats_.num_messages;
@@ -1036,7 +1036,7 @@ class StreamDecoderInternal : public MessageDecoderListener {
10361036
num_required_initial_dictionaries_,
10371037
") of dictionaries at the start of the stream");
10381038
}
1039-
RETURN_NOT_OK(ReadDictionary(*message));
1039+
RETURN_NOT_OK(ReadDictionaryMessage(*message));
10401040
num_read_initial_dictionaries_++;
10411041
if (num_read_initial_dictionaries_ == num_required_initial_dictionaries_) {
10421042
state_ = State::RECORD_BATCHES;
@@ -1047,7 +1047,7 @@ class StreamDecoderInternal : public MessageDecoderListener {
10471047

10481048
Status OnRecordBatchMessageDecoded(std::unique_ptr<Message> message) {
10491049
if (message->type() == MessageType::DICTIONARY_BATCH) {
1050-
return ReadDictionary(*message);
1050+
return ReadDictionaryMessage(*message);
10511051
} else {
10521052
CHECK_HAS_BODY(*message);
10531053
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
@@ -1062,10 +1062,13 @@ class StreamDecoderInternal : public MessageDecoderListener {
10621062
}
10631063

10641064
// Read dictionary from dictionary batch
1065-
Status ReadDictionary(const Message& message) {
1065+
Status ReadDictionaryMessage(const Message& message) {
10661066
DictionaryKind kind;
10671067
IpcReadContext context(&dictionary_memo_, options_, swap_endian_);
1068-
RETURN_NOT_OK(::arrow::ipc::ReadDictionary(message, context, &kind));
1068+
DCHECK_EQ(message.type(), MessageType::DICTIONARY_BATCH);
1069+
CHECK_HAS_BODY(message);
1070+
ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message.body()));
1071+
RETURN_NOT_OK(ReadDictionary(*message.metadata(), context, &kind, reader.get()));
10691072
++stats_.num_dictionary_batches;
10701073
switch (kind) {
10711074
case DictionaryKind::New:
@@ -1086,11 +1089,13 @@ class StreamDecoderInternal : public MessageDecoderListener {
10861089
std::vector<bool> field_inclusion_mask_;
10871090
int num_required_initial_dictionaries_;
10881091
int num_read_initial_dictionaries_;
1089-
DictionaryMemo dictionary_memo_;
10901092
std::shared_ptr<Schema> schema_;
10911093
std::shared_ptr<Schema> filtered_schema_;
10921094
ReadStats stats_;
10931095
bool swap_endian_;
1096+
1097+
protected:
1098+
DictionaryMemo dictionary_memo_;
10941099
};
10951100

10961101
// ----------------------------------------------------------------------
@@ -1158,6 +1163,8 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader,
11581163

11591164
ReadStats stats() const override { return StreamDecoderInternal::stats(); }
11601165

1166+
DictionaryMemo* dictionary_memo() override { return &dictionary_memo_; }
1167+
11611168
private:
11621169
std::unique_ptr<MessageReader> message_reader_;
11631170
};
@@ -1490,6 +1497,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
14901497

14911498
ReadStats stats() const override { return stats_.poll(); }
14921499

1500+
DictionaryMemo* dictionary_memo() override { return &dictionary_memo_; }
1501+
14931502
Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> GetRecordBatchGenerator(
14941503
const bool coalesce, const io::IOContext& io_context,
14951504
const io::CacheOptions cache_options,

cpp/src/arrow/ipc/reader.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader {
9898

9999
/// \brief Return current read statistics
100100
virtual ReadStats stats() const = 0;
101+
102+
/// \brief Return the DictionaryMemo used by this reader
103+
virtual DictionaryMemo* dictionary_memo() = 0;
101104
};
102105

103106
/// \brief Reads the record batch file format
@@ -200,6 +203,9 @@ class ARROW_EXPORT RecordBatchFileReader
200203
/// \brief Return current read statistics
201204
virtual ReadStats stats() const = 0;
202205

206+
/// \brief Return the DictionaryMemo used by this reader
207+
virtual DictionaryMemo* dictionary_memo() = 0;
208+
203209
/// \brief Computes the total number of rows in the file.
204210
virtual Result<int64_t> CountRows() = 0;
205211

@@ -580,6 +586,19 @@ Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
580586
const DictionaryMemo* dictionary_memo, const IpcReadOptions& options,
581587
io::RandomAccessFile* file);
582588

589+
/// \brief Read a dictionary message and add its contents to a DictionaryMemo
590+
///
591+
/// If the memo already contains a dictionary with the same id, it is replaced.
592+
/// Does not perform endian swapping; intended for use with native-endian data.
593+
/// For cross-endian support, use RecordBatchStreamReader or RecordBatchFileReader.
594+
///
595+
/// \param[in] message a Message of type DICTIONARY_BATCH
596+
/// \param[in,out] dictionary_memo DictionaryMemo to populate with the dictionary data
597+
/// \param[in] options IPC options for reading
598+
ARROW_EXPORT
599+
Status ReadDictionary(const Message& message, DictionaryMemo* dictionary_memo,
600+
const IpcReadOptions& options = IpcReadOptions::Defaults());
601+
583602
/// \brief Read arrow::Tensor as encapsulated IPC message in file
584603
///
585604
/// \param[in] file an InputStream pointed at the start of the message

cpp/src/arrow/ipc/writer.cc

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,41 @@ Status GetDictionaryPayload(int64_t id, bool is_delta,
821821
return assembler.Assemble(dictionary);
822822
}
823823

824+
Result<std::vector<std::shared_ptr<Buffer>>> CollectAndSerializeDictionaries(
825+
const RecordBatch& batch, DictionaryMemo* dictionary_memo,
826+
const IpcWriteOptions& options) {
827+
DictionaryFieldMapper mapper(*batch.schema());
828+
ARROW_ASSIGN_OR_RAISE(auto dictionaries, CollectDictionaries(batch, mapper));
829+
830+
std::vector<std::shared_ptr<Buffer>> result;
831+
for (const auto& pair : dictionaries) {
832+
int64_t id = pair.first;
833+
const auto& dictionary = pair.second;
834+
835+
if (dictionary_memo->HasDictionary(id)) {
836+
ARROW_ASSIGN_OR_RAISE(auto existing,
837+
dictionary_memo->GetDictionary(id, options.memory_pool));
838+
if (existing.get() == dictionary->data().get()) {
839+
continue;
840+
}
841+
}
842+
843+
IpcPayload payload;
844+
RETURN_NOT_OK(GetDictionaryPayload(id, dictionary, options, &payload));
845+
846+
ARROW_ASSIGN_OR_RAISE(auto stream,
847+
io::BufferOutputStream::Create(1024, options.memory_pool));
848+
int32_t metadata_length = 0;
849+
RETURN_NOT_OK(WriteIpcPayload(payload, options, stream.get(), &metadata_length));
850+
ARROW_ASSIGN_OR_RAISE(auto buffer, stream->Finish());
851+
result.push_back(std::move(buffer));
852+
853+
ARROW_ASSIGN_OR_RAISE(
854+
std::ignore, dictionary_memo->AddOrReplaceDictionary(id, dictionary->data()));
855+
}
856+
return result;
857+
}
858+
824859
Status GetRecordBatchPayload(const RecordBatch& batch, const IpcWriteOptions& options,
825860
IpcPayload* out) {
826861
return GetRecordBatchPayload(batch, NULLPTR, options, out);

cpp/src/arrow/ipc/writer.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,24 @@ Status GetDictionaryPayload(int64_t id, bool is_delta,
377377
const std::shared_ptr<Array>& dictionary,
378378
const IpcWriteOptions& options, IpcPayload* payload);
379379

380+
/// \brief Collect and serialize dictionary messages for a RecordBatch
381+
///
382+
/// For each dictionary-encoded field in the batch, checks the memo to determine
383+
/// whether serialization is needed. If the memo has no dictionary for a given id,
384+
/// the dictionary is serialized and added. If the memo already has a dictionary
385+
/// for that id, the ArrayData pointers are compared: if they are the same object
386+
/// the dictionary is skipped (deduplicated), otherwise a replacement dictionary
387+
/// message is serialized and the memo is updated.
388+
///
389+
/// \param[in] batch the RecordBatch to collect dictionaries from
390+
/// \param[in,out] dictionary_memo tracks which dictionaries have been serialized
391+
/// \param[in] options IPC write options
392+
/// \return vector of serialized dictionary IPC message buffers
393+
ARROW_EXPORT
394+
Result<std::vector<std::shared_ptr<Buffer>>> CollectAndSerializeDictionaries(
395+
const RecordBatch& batch, DictionaryMemo* dictionary_memo,
396+
const IpcWriteOptions& options = IpcWriteOptions::Defaults());
397+
380398
/// \brief Compute IpcPayload for the given record batch
381399
/// \param[in] batch the RecordBatch that is being serialized
382400
/// \param[in] options options for serialization

docs/source/python/api/ipc.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ Inter-Process Communication
3434
ipc.open_stream
3535
ipc.read_message
3636
ipc.read_record_batch
37+
ipc.read_dictionary_message
3738
ipc.get_record_batch_size
3839
ipc.read_tensor
3940
ipc.write_tensor

python/pyarrow/includes/libarrow.pxd

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1916,7 +1916,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
19161916
int64_t num_replaced_dictionaries
19171917

19181918
cdef cppclass CDictionaryMemo" arrow::ipc::DictionaryMemo":
1919-
pass
1919+
c_bool HasDictionary(int64_t id) const
19201920

19211921
cdef cppclass CIpcPayload" arrow::ipc::IpcPayload":
19221922
MessageType type
@@ -1970,6 +1970,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
19701970
const CIpcReadOptions& options)
19711971

19721972
CIpcReadStats stats()
1973+
CDictionaryMemo* dictionary_memo()
19731974

19741975
cdef cppclass CRecordBatchFileReader \
19751976
" arrow::ipc::RecordBatchFileReader":
@@ -1992,6 +1993,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
19921993
CResult[CRecordBatchWithMetadata] ReadRecordBatchWithCustomMetadata(int i)
19931994

19941995
CIpcReadStats stats()
1996+
CDictionaryMemo* dictionary_memo()
19951997

19961998
shared_ptr[const CKeyValueMetadata] metadata()
19971999

@@ -2020,12 +2022,21 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
20202022
CDictionaryMemo* dictionary_memo,
20212023
const CIpcReadOptions& options)
20222024

2025+
CStatus ReadDictionary(const CMessage& message,
2026+
CDictionaryMemo* dictionary_memo,
2027+
const CIpcReadOptions& options)
2028+
20232029
CResult[shared_ptr[CBuffer]] SerializeSchema(
20242030
const CSchema& schema, CMemoryPool* pool)
20252031

20262032
CResult[shared_ptr[CBuffer]] SerializeRecordBatch(
20272033
const CRecordBatch& schema, const CIpcWriteOptions& options)
20282034

2035+
CResult[vector[shared_ptr[CBuffer]]] CollectAndSerializeDictionaries(
2036+
const CRecordBatch& batch,
2037+
CDictionaryMemo* dictionary_memo,
2038+
const CIpcWriteOptions& options)
2039+
20292040
CResult[shared_ptr[CSchema]] ReadSchema(const CMessage& message,
20302041
CDictionaryMemo* dictionary_memo)
20312042

python/pyarrow/ipc.pxi

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,15 @@ cdef class _RecordBatchStreamReader(RecordBatchReader):
11021102
raise ValueError("Operation on closed reader")
11031103
return _wrap_read_stats(self.stream_reader.stats())
11041104

1105+
@property
1106+
def dictionary_memo(self):
1107+
"""
1108+
The DictionaryMemo associated with this reader.
1109+
"""
1110+
if not self.reader:
1111+
raise ValueError("Operation on closed reader")
1112+
return DictionaryMemo.wrap(self.stream_reader.dictionary_memo(), self)
1113+
11051114

11061115
cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter):
11071116

@@ -1293,6 +1302,15 @@ cdef class _RecordBatchFileReader(_Weakrefable):
12931302
wrapped = pyarrow_wrap_metadata(self.reader.get().metadata())
12941303
return wrapped.to_dict() if wrapped is not None else None
12951304

1305+
@property
1306+
def dictionary_memo(self):
1307+
"""
1308+
The DictionaryMemo associated with this reader.
1309+
"""
1310+
if not self.reader:
1311+
raise ValueError("Operation on closed reader")
1312+
return DictionaryMemo.wrap(self.reader.get().dictionary_memo(), self)
1313+
12961314

12971315
def get_tensor_size(Tensor tensor):
12981316
"""
@@ -1502,3 +1520,31 @@ def read_record_batch(obj, Schema schema,
15021520
CIpcReadOptions.Defaults()))
15031521

15041522
return pyarrow_wrap_batch(result)
1523+
1524+
1525+
def read_dictionary_message(obj, DictionaryMemo dictionary_memo):
1526+
"""
1527+
Read a dictionary message into a DictionaryMemo.
1528+
1529+
If the memo already contains a dictionary with the same id, it is
1530+
replaced. The memo must already have dictionary types registered,
1531+
typically from a prior read_schema call with the same memo.
1532+
1533+
Parameters
1534+
----------
1535+
obj : Message or Buffer-like
1536+
A message of type DICTIONARY_BATCH.
1537+
dictionary_memo : DictionaryMemo
1538+
Memo to populate with the dictionary data.
1539+
"""
1540+
cdef Message message
1541+
1542+
if isinstance(obj, Message):
1543+
message = obj
1544+
else:
1545+
message = read_message(obj)
1546+
1547+
with nogil:
1548+
check_status(ReadDictionary(deref(message.message.get()),
1549+
dictionary_memo.memo,
1550+
CIpcReadOptions.Defaults()))

python/pyarrow/ipc.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
RecordBatchReader, _ReadPandasMixin,
2727
MetadataVersion, Alignment,
2828
read_message, read_record_batch, read_schema,
29+
read_dictionary_message,
2930
read_tensor, write_tensor,
3031
get_record_batch_size, get_tensor_size)
3132
import pyarrow.lib as lib

python/pyarrow/lib.pxd

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ cdef class DictionaryMemo(_Weakrefable):
128128
# it on the heap so as to avoid C++ ABI issues with Python wheels.
129129
shared_ptr[CDictionaryMemo] sp_memo
130130
CDictionaryMemo* memo
131+
object _parent
132+
133+
@staticmethod
134+
cdef DictionaryMemo wrap(CDictionaryMemo* memo, object parent)
131135

132136

133137
cdef class DictionaryType(DataType):

0 commit comments

Comments
 (0)