Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 1 addition & 85 deletions include/multiseries/multiseries_record.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,22 +142,6 @@ class basic_record_store {
return m_series.size() - 1;
}

/// \brief Returns the series data of a record
/// \param series_name The name of the series
/// \param record_id The record ID
/// \return The series data
/// If the series data does not exist, it throws a runtime error.
template <typename series_type>
const auto get(const std::string_view series_name,
const record_id_type record_id) const {
priv_series_type_check<series_type>();
auto itr = priv_find_series(series_name);
if (itr == m_series.end()) {
throw std::runtime_error("Series not found: " + std::string(series_name));
}
return priv_get_series_data<series_type>(itr->container, record_id);
}

/// \brief Returns the series data of a record
/// \param series_index The index of the series
/// \param record_id The record ID
Expand Down Expand Up @@ -229,21 +213,6 @@ class basic_record_store {
return to_return_record;
}

/// \brief Returns if a series data of a record is None (does not exist)
bool is_none(const std::string_view series_name,
const record_id_type record_id) const {
auto itr = priv_find_series(series_name);
if (itr == m_series.end()) {
return true;
}

return !std::visit(
[&record_id](const auto &container) {
return container.contains(record_id);
},
itr->container);
}

/// \brief Returns if a series data of a record is None (does not exist)
bool is_none(const series_index_type series_index,
const record_id_type record_id) const {
Expand All @@ -259,21 +228,6 @@ class basic_record_store {

/// \brief Set a series data of a record (row)
/// TODO: explore the use of templated variadic variants. (See Roger)
/// template <typename... Types>
/// void foo(std::variant<Types...>) {}

template <typename T>
void set(const std::string_view series_name, const record_id_type record_id,
T value) {
priv_series_type_check<T>();
auto itr = priv_find_series(series_name);
if (itr == m_series.end()) {
throw std::runtime_error("Series not found: " + std::string(series_name));
}

priv_set_series_data<T>(*itr, record_id, value);
}

template <typename T>
void set(const series_index_type series_index, const record_id_type record_id,
T value) {
Expand Down Expand Up @@ -331,25 +285,6 @@ class basic_record_store {
itr->container);
}

/// \brief Loop over all records of a series, skipping None values.
/// series_func_t: [](int record_id, auto single_series_value) {}
template <typename series_type, typename series_func_t>
void for_all(const std::string_view series_name,
series_func_t series_func) const {
auto itr = priv_find_series(series_name);
if (itr == m_series.end()) {
throw std::runtime_error("Series not found: " + std::string(series_name));
}

const auto &container =
priv_get_series_container<series_type>(itr->container);
for (size_t i = 0; i < m_record_status.size(); ++i) {
if (m_record_status[i] && container.contains(i)) {
series_func(i, container.at(i));
}
}
}

template <typename series_type, typename series_func_t>
void for_all(const series_index_type series_index,
series_func_t series_func) const {
Expand All @@ -358,7 +293,7 @@ class basic_record_store {
}

const auto &container =
priv_get_series_container<series_type>(m_series[series_index]);
priv_get_series_container<series_type>(m_series[series_index].container);
for (size_t i = 0; i < m_record_status.size(); ++i) {
if (m_record_status[i] && container.contains(i)) {
series_func(i, container.at(i));
Expand Down Expand Up @@ -475,25 +410,6 @@ class basic_record_store {
return series_names;
}

/// \brief Remove a single data
bool remove(const std::string_view series_name,
const record_id_type record_id) {
auto itr = priv_find_series(series_name);
if (itr == m_series.end()) {
return false;
}

bool to_return = false;
std::visit(
[&record_id, &to_return](auto &container) {
if (container.contains(record_id)) {
to_return = container.erase(record_id);
}
},
itr->container);
return to_return;
}

/// \brief Remove a single data
bool remove(const series_index_type series_index,
const record_id_type record_id) {
Expand Down
56 changes: 20 additions & 36 deletions src/examples/multiseries/demo_series_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@

using namespace multiseries;

void parse_option(int argc,
char *argv[],
std::filesystem::path &metall_path,
void parse_option(int argc, char *argv[], std::filesystem::path &metall_path,
size_t &num_records) {
int opt_char;
while ((opt_char = getopt(argc, argv, "d:n:")) != -1) {
Expand All @@ -44,14 +42,13 @@ void parse_option(int argc,
}
}

using record_store_type = basic_record_store<metall::manager::allocator_type<
std::byte> >;
using record_store_type =
basic_record_store<metall::manager::allocator_type<std::byte> >;
using string_store_type = record_store_type::string_store_type;

template<typename data_type, typename generator_type>
template <typename data_type, typename generator_type>
void run_bench(const std::filesystem::path &metall_path,
const size_t num_records,
const container_kind kind,
const size_t num_records, const container_kind kind,
generator_type &&generator) {
metall::manager manager(metall::create_only, metall_path);

Expand All @@ -63,46 +60,39 @@ void run_bench(const std::filesystem::path &metall_path,
record_store->add_series<data_type>("data", kind);
for (int64_t i = 0; i < num_records; ++i) {
const auto record_id = record_store->add_record();
record_store->set<data_type>("data", record_id, generator());
const auto demo_id = record_store->find_series("data");
record_store->set<data_type>(demo_id, record_id, generator());
}

std::cout << "Total #of records: " << record_store->num_records() <<
std::endl;
std::cout << "Total #of records: " << record_store->num_records()
<< std::endl;
std::cout << "#of unique strings: " << string_store->size() << std::endl;
std::cout << get_dir_usage(metall_path) << std::endl;
}

int main(int argc, char **argv) {
std::filesystem::path metall_path{"./metall_data"};
size_t num_records = 1'000'000;
size_t num_records = 1'000'000;
parse_option(argc, argv, metall_path, num_records);

std::cout << "Ingest bool values" << std::endl;
std::cout << "Dense container" << std::endl;
run_bench<bool>(metall_path,
num_records,
container_kind::dense,
run_bench<bool>(metall_path, num_records, container_kind::dense,
[]() { return bool(std::rand() % 2); });

std::cout << "Sparse container" << std::endl;
run_bench<bool>(metall_path,
num_records,
container_kind::sparse,
run_bench<bool>(metall_path, num_records, container_kind::sparse,
[]() { return bool(std::rand() % 2); });

std::cout << "----------" << std::endl;

std::cout << "Ingest int64_t values" << std::endl;
std::cout << "Dense container" << std::endl;
run_bench<int64_t>(metall_path,
num_records,
container_kind::dense,
run_bench<int64_t>(metall_path, num_records, container_kind::dense,
[]() { return std::rand(); });

std::cout << "Sparse container" << std::endl;
run_bench<int64_t>(metall_path,
num_records,
container_kind::sparse,
run_bench<int64_t>(metall_path, num_records, container_kind::sparse,
[]() { return std::rand(); });

std::cout << "----------" << std::endl;
Expand All @@ -111,19 +101,13 @@ int main(int argc, char **argv) {
boost::uuids::random_generator gen;
std::cout << "Sample UUID: " << boost::uuids::to_string(gen()) << std::endl;
std::cout << "Dense container" << std::endl;
run_bench<std::string_view>(metall_path,
num_records,
container_kind::dense,
[&gen]() -> std::string {
return boost::uuids::to_string(gen());
});
run_bench<std::string_view>(
metall_path, num_records, container_kind::dense,
[&gen]() -> std::string { return boost::uuids::to_string(gen()); });
std::cout << "Sparse container" << std::endl;
run_bench<std::string_view>(metall_path,
num_records,
container_kind::sparse,
[&gen]() -> std::string {
return boost::uuids::to_string(gen());
});
run_bench<std::string_view>(
metall_path, num_records, container_kind::sparse,
[&gen]() -> std::string { return boost::uuids::to_string(gen()); });

return 0;
}
13 changes: 7 additions & 6 deletions src/examples/multiseries/find_max_single.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <multiseries/multiseries_record.hpp>

using record_store_type =
multiseries::basic_record_store<metall::manager::allocator_type<std::byte>>;
multiseries::basic_record_store<metall::manager::allocator_type<std::byte>>;
using string_store_type = record_store_type::string_store_type;

struct option {
Expand Down Expand Up @@ -68,10 +68,10 @@ void show_usage(std::ostream &os) {
auto start_timer() { return std::chrono::high_resolution_clock::now(); }

auto get_elapsed_time_seconds(
const std::chrono::time_point<std::chrono::high_resolution_clock> &start) {
const std::chrono::time_point<std::chrono::high_resolution_clock> &start) {
const auto end = std::chrono::high_resolution_clock::now();
return std::chrono::duration_cast<std::chrono::duration<double>>(end - start)
.count();
.count();
}

int main(int argc, char *argv[]) {
Expand All @@ -91,7 +91,7 @@ int main(int argc, char *argv[]) {

metall::manager manager(metall::open_read_only, opt.metall_path);
auto *record_store =
manager.find<record_store_type>(metall::unique_instance).first;
manager.find<record_store_type>(metall::unique_instance).first;
if (!record_store) {
std::cerr << "Failed to find record store in " + opt.metall_path.string()
<< std::endl;
Expand All @@ -110,8 +110,9 @@ int main(int argc, char *argv[]) {

using value_type = int64_t;
std::cout << "Value type is: " << typeid(value_type).name() << std::endl;
value_type max_value = std::numeric_limits<value_type>::min();
record_store->for_all<value_type>(series_name,
value_type max_value = std::numeric_limits<value_type>::min();
auto series_idx = record_store->find_series(series_name);
record_store->for_all<value_type>(series_idx,
[&](const auto, const auto value) {
max_value = std::max(max_value, value);
});
Expand Down
66 changes: 33 additions & 33 deletions src/examples/multiseries/ingest_parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

#include "utils.hpp"

using record_store_type = multiseries::basic_record_store<
metall::manager::allocator_type<std::byte> >;
using record_store_type =
multiseries::basic_record_store<metall::manager::allocator_type<std::byte> >;
using string_store_type = record_store_type::string_store_type;

struct option {
Expand Down Expand Up @@ -87,13 +87,13 @@ int main(int argc, char **argv) {

ygm::utility::timer setup_timer;
metall::utility::metall_mpi_adaptor mpi_adaptor(
metall::create_only, opt.metall_path, comm.get_mpi_comm());
metall::create_only, opt.metall_path, comm.get_mpi_comm());
auto &manager = mpi_adaptor.get_local_manager();

auto *string_store = manager.construct<string_store_type>(
metall::unique_instance)(manager.get_allocator());
metall::unique_instance)(manager.get_allocator());
auto *record_store = manager.construct<record_store_type>(
metall::unique_instance)(string_store, manager.get_allocator());
metall::unique_instance)(string_store, manager.get_allocator());

ygm::io::parquet_parser parquetp(comm, {opt.input_path});
const auto &schema = parquetp.get_schema();
Expand Down Expand Up @@ -129,35 +129,35 @@ int main(int argc, char **argv) {
continue; // Leave the field empty for None/NaN values
}

const auto &name = schema[i].name;
auto name_idx = record_store->find_series(schema[i].name);
std::visit(
[&record_store, &record_id, &name, &opt](auto &&field) {
using T = std::decay_t<decltype(field)>;
if constexpr (std::is_same_v<T, int32_t> ||
std::is_same_v<T, int64_t>) {
record_store->set<int64_t>(name, record_id, field);
if (opt.profile) {
total_ingested_bytes += sizeof(T);
}
} else if constexpr (std::is_same_v<T, float> ||
std::is_same_v<T, double>) {
record_store->set<double>(name, record_id, field);
if (opt.profile) {
total_ingested_bytes += sizeof(T);
}
} else if constexpr (std::is_same_v<T, std::string>) {
record_store->set<std::string_view>(name, record_id, field);

if (opt.profile) {
total_ingested_str_size += field.size();
total_ingested_bytes += field.size(); // Assume ASCII
++total_num_strs;
}
} else {
throw std::runtime_error("Unsupported type");
[&record_store, &record_id, name_idx, &opt](auto &&field) {
using T = std::decay_t<decltype(field)>;
if constexpr (std::is_same_v<T, int32_t> ||
std::is_same_v<T, int64_t>) {
record_store->set<int64_t>(name_idx, record_id, field);
if (opt.profile) {
total_ingested_bytes += sizeof(T);
}
},
std::move(field));
} else if constexpr (std::is_same_v<T, float> ||
std::is_same_v<T, double>) {
record_store->set<double>(name_idx, record_id, field);
if (opt.profile) {
total_ingested_bytes += sizeof(T);
}
} else if constexpr (std::is_same_v<T, std::string>) {
record_store->set<std::string_view>(name_idx, record_id, field);

if (opt.profile) {
total_ingested_str_size += field.size();
total_ingested_bytes += field.size(); // Assume ASCII
++total_num_strs;
}
} else {
throw std::runtime_error("Unsupported type");
}
},
std::move(field));
}
});
comm.barrier();
Expand All @@ -177,7 +177,7 @@ int main(int argc, char **argv) {
comm.cout0() << "Series name, Load factor" << std::endl;
for (const auto &s : schema) {
const auto ave_load_factor =
ygm::sum(record_store->load_factor(s.name), comm) / comm.size();
ygm::sum(record_store->load_factor(s.name), comm) / comm.size();
comm.cout0() << " " << s.name << ", " << ave_load_factor << std::endl;
}

Expand Down
Loading