Skip to content
14 changes: 13 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ set(SPARROW_IPC_HEADERS
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/arrow_interface/arrow_array/private_data.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/arrow_interface/arrow_schema.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/arrow_interface/arrow_schema/private_data.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/chunk_memory_output_stream.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/chunk_memory_serializer.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/config.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/config/sparrow_ipc_version.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
Expand All @@ -109,24 +111,34 @@ set(SPARROW_IPC_HEADERS
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize_variable_size_binary_array.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/deserialize.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/encapsulated_message.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/file_output_stream.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/flatbuffer_utils.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/magic_values.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/memory_output_stream.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/metadata.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/output_stream.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/serialize_utils.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/serialize.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/serializer.hpp
${SPARROW_IPC_INCLUDE_DIR}/sparrow_ipc/utils.hpp
)

set(SPARROW_IPC_SRC
${SPARROW_IPC_SOURCE_DIR}/serialize_utils.cpp
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_array.cpp
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_array/private_data.cpp
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema.cpp
${SPARROW_IPC_SOURCE_DIR}/arrow_interface/arrow_schema/private_data.cpp
${SPARROW_IPC_SOURCE_DIR}/chunk_memory_serializer.cpp
${SPARROW_IPC_SOURCE_DIR}/deserialize_fixedsizebinary_array.cpp
${SPARROW_IPC_SOURCE_DIR}/deserialize_utils.cpp
${SPARROW_IPC_SOURCE_DIR}/deserialize.cpp
${SPARROW_IPC_SOURCE_DIR}/encapsulated_message.cpp
${SPARROW_IPC_SOURCE_DIR}/file_output_stream.cpp
${SPARROW_IPC_SOURCE_DIR}/flatbuffer_utils.cpp
${SPARROW_IPC_SOURCE_DIR}/metadata.cpp
${SPARROW_IPC_SOURCE_DIR}/serialize_utils.cpp
${SPARROW_IPC_SOURCE_DIR}/serialize.cpp
${SPARROW_IPC_SOURCE_DIR}/serializer.cpp
${SPARROW_IPC_SOURCE_DIR}/utils.cpp
)

Expand Down
81 changes: 81 additions & 0 deletions include/sparrow_ipc/chunk_memory_output_stream.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#pragma once

#include <cstdint>
#include <ranges>

#include "sparrow_ipc/output_stream.hpp"

namespace sparrow_ipc
{
template <typename R>
requires std::ranges::random_access_range<R>
&& std::ranges::random_access_range<std::ranges::range_value_t<R>>
&& std::same_as<typename std::ranges::range_value_t<R>::value_type, uint8_t>
class chunked_memory_output_stream final : public output_stream
{
public:

explicit chunked_memory_output_stream(R& chunks)
: m_chunks(&chunks) {};

std::size_t write(std::span<const std::uint8_t> span) override
{
m_chunks->emplace_back(span.begin(), span.end());
return span.size();
}

std::size_t write(std::vector<uint8_t>&& buffer)
{
m_chunks->emplace_back(std::move(buffer));
return m_chunks->back().size();
}

std::size_t write(uint8_t value, std::size_t count) override
{
m_chunks->emplace_back(count, value);
return count;
}

void reserve(std::size_t size) override
{
m_chunks->reserve(size);
}

void reserve(const std::function<std::size_t()>& calculate_reserve_size) override
{
m_chunks->reserve(calculate_reserve_size());
}

size_t size() const override
{
return std::accumulate(
m_chunks->begin(),
m_chunks->end(),
0,
[](size_t acc, const auto& chunk)
{
return acc + chunk.size();
}
);
}

void flush() override
{
// Implementation for flushing memory
}

void close() override
{
// Implementation for closing the stream
}

bool is_open() const override
{
return true;
}

private:

R* m_chunks;
};
}
74 changes: 74 additions & 0 deletions include/sparrow_ipc/chunk_memory_serializer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#pragma once

#include <sparrow/record_batch.hpp>

#include "sparrow_ipc/chunk_memory_output_stream.hpp"
#include "sparrow_ipc/memory_output_stream.hpp"
#include "sparrow_ipc/serialize.hpp"
#include "sparrow_ipc/serialize_utils.hpp"
#include "sparrow_ipc/config/config.hpp"

namespace sparrow_ipc
{
class SPARROW_IPC_API chunk_serializer
{
public:

chunk_serializer(
const sparrow::record_batch& rb,
chunked_memory_output_stream<std::vector<std::vector<uint8_t>>>& stream
);

template <std::ranges::input_range R>
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
chunk_serializer(
Comment on lines +22 to +24
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same range constraint issue: constructor uses record_batches[0] and record_batches.size(); tighten constraints to std::ranges::random_access_range + sized_range or adapt to first/advance.

Copilot uses AI. Check for mistakes.

const R& record_batches,
chunked_memory_output_stream<std::vector<std::vector<uint8_t>>>& stream
)
: m_pstream(&stream)
{
if (record_batches.empty())
{
throw std::invalid_argument("Record batches collection is empty");
}
m_dtypes = get_column_dtypes(record_batches[0]);

m_pstream->reserve(record_batches.size() + 1);
std::vector<uint8_t> buffer;
memory_output_stream schema_stream(buffer);
serialize_schema_message(record_batches[0], schema_stream);
m_pstream->write(std::move(buffer));
append(record_batches);
}

void append(const sparrow::record_batch& rb);

template <std::ranges::input_range R>
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
void append(const R& record_batches)
{
if (m_ended)
{
throw std::runtime_error("Cannot append to a serializer that has been ended");
}

m_pstream->reserve(m_pstream->size() + record_batches.size());

for (const auto& rb : record_batches)
{
std::vector<uint8_t> buffer;
memory_output_stream stream(buffer);
serialize_record_batch(rb, stream);
m_pstream->write(std::move(buffer));
}
}

void end();

private:

std::vector<sparrow::data_type> m_dtypes;
chunked_memory_output_stream<std::vector<std::vector<uint8_t>>>* m_pstream;
bool m_ended{false};
};
}
1 change: 1 addition & 0 deletions include/sparrow_ipc/encapsulated_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <cstdint>
#include <variant>
#include <span>

#include "Message_generated.h"

Expand Down
36 changes: 36 additions & 0 deletions include/sparrow_ipc/file_output_stream.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#include <fstream>
#include <functional>

#include "sparrow_ipc/output_stream.hpp"


namespace sparrow_ipc
{
class SPARROW_IPC_API file_output_stream final : public output_stream
{
public:

explicit file_output_stream(std::ofstream& file);

std::size_t write(std::span<const std::uint8_t> span) override;

std::size_t write(uint8_t value, std::size_t count = 1) override;

size_t size() const override;

void reserve(std::size_t size) override;

void reserve(const std::function<std::size_t()>& calculate_reserve_size) override;

void flush() override;

void close() override;

bool is_open() const override;

private:

std::ofstream& m_file;
size_t m_written_bytes = 0;
};
}
Loading
Loading