Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions include/metalldata/impl/metall_graph_priv_for_all.ipp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include <metalldata/metall_graph.hpp>
#include <ygm/utility/assert.hpp>

namespace metalldata {
// The following for_all functions take a function that
Expand Down Expand Up @@ -127,17 +128,10 @@ void metall_graph::priv_for_all_nodes(
},
where);

std::unordered_map<std::string, record_id_type> node_to_id;
auto node_col_idx = m_pnodes->find_series(NODE_COL.unqualified());
m_pnodes->for_all_rows([&](record_id_type rid) {
auto name = m_pnodes->get<std::string_view>(node_col_idx, rid);

node_to_id[std::string(name)] = rid;
});

for (const auto& node : nodeset) {
// throw an exception if the node is not in our node dataframe.
func(node_to_id.at(node));
auto opsa = priv_local_node_find(node);
YGM_ASSERT_RELEASE(opsa.has_value());
func(opsa.value());
}
}
}
Expand Down
18 changes: 5 additions & 13 deletions include/metalldata/impl/metall_graph_set_node_column.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,17 @@ metall_graph::return_code metall_graph::set_node_column(
using record_id_type = record_store_type::record_id_type;
using val_type = typename T::mapped_type;

// create a node_local map of record id to node value.
std::map<std::string, record_id_type> node_to_id{};
m_pnodes->for_all_rows([&](record_id_type id) {
std::string_view node =
m_pnodes->get<std::string_view>(NODE_COL.unqualified(), id);
node_to_id[std::string(node)] = id;
});

// create series and store index so we don't have to keep looking it up.
// create series
auto nodecol_idx = m_pnodes->add_series<val_type>(nodecol_name.unqualified());

size_t invalid_nodes = 0;
for (const auto& [k, v] : collection) {
if (!node_to_id.contains(k)) {
for (const auto& [node_name, value] : collection) {
auto opsv = priv_local_node_find(node_name);
if (!opsv.has_value()) {
++invalid_nodes;
continue;
}
auto node_idx = node_to_id.at(k);
m_pnodes->set(nodecol_idx, node_idx, v);
m_pnodes->set(nodecol_idx, opsv.value(), value);
}

if (invalid_nodes > 0) {
Expand Down
55 changes: 53 additions & 2 deletions include/metalldata/metall_graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
#include <metall/metall.hpp>
#include <multiseries/multiseries_record.hpp>
#include <ygm/comm.hpp>
#include <ygm/container/detail/hash_partitioner.hpp>
#include <metall/utility/metall_mpi_adaptor.hpp>
#include <boost/json.hpp>
#include <boost/unordered/unordered_flat_set.hpp>
#include <metall/container/unordered_map.hpp>
#include <ygm/container/set.hpp>
#include <expected>
#include <optional>
#include <ygm/utility/assert.hpp>

namespace bjsn = boost::json;

Expand All @@ -42,11 +45,23 @@ namespace metalldata {

class metall_graph {
private:
/// multiseries record store are the dataframes
using record_store_type =
multiseries::basic_record_store<metall::manager::allocator_type<std::byte>>;
using string_store_type = record_store_type::string_store_type;
using record_id_type = record_store_type::record_id_type;
using series_index_type = record_store_type::series_index_type;

using record_id_type = record_store_type::record_id_type;
/// string table deduplicates strings
using string_store_type = record_store_type::string_store_type;
using string_table_accessor = compact_string::string_accessor;

/// hash table to index local node's record ids
using local_vertex_map_type = metall::container::unordered_map<
string_table_accessor, record_id_type,
compact_string::string_accessor_hasher,
std::equal_to<compact_string::string_accessor>,
metall::manager::allocator_type<
std::pair<const compact_string::string_accessor, record_id_type>>>;

public:
// TODO: Rationalize these data types to correspond better with JSONLogic and
Expand Down Expand Up @@ -503,6 +518,15 @@ class metall_graph {
record_store_type* m_pnodes = nullptr;
/// Dataframe for directed edges
record_store_type* m_pedges = nullptr;
/// Map from vertex string to local record index
local_vertex_map_type* m_pnode_to_idx = nullptr;
/// String store
string_store_type* m_pstring_store = nullptr;

series_index_type m_u_col_idx;
series_index_type m_v_col_idx;
series_index_type m_dir_col_idx;
series_index_type m_node_col_idx;

size_t local_num_nodes() const { return m_pnodes->num_records(); };
size_t local_num_edges() const { return m_pedges->num_records(); };
Expand Down Expand Up @@ -532,6 +556,33 @@ class metall_graph {
template <typename T>
return_code set_node_column(series_name nodecol_name, const T& collection);

record_id_type priv_local_node_find_or_insert(std::string_view id) {
YGM_ASSERT_RELEASE(m_partitioner.owner(id) == m_comm.rank());
auto v_in_ss = compact_string::add_string(id, *m_pstring_store);
if (!m_pnode_to_idx->contains(v_in_ss)) {
auto ridx = m_pnodes->add_record();
m_pnodes->set(m_node_col_idx, ridx, id);
m_pnode_to_idx->insert_or_assign(v_in_ss, ridx);
return ridx;
}
return m_pnode_to_idx->at(v_in_ss);
}

std::optional<record_id_type> priv_local_node_find(
std::string_view id) const {
YGM_ASSERT_RELEASE(m_partitioner.owner(id) == m_comm.rank());
auto ret = compact_string::find_string(id, *m_pstring_store);
if (ret) {
return m_pnode_to_idx->at(ret.value());
}
return {};
}

// Using YGM's default partitioner to assign node owner
ygm::container::detail::hash_partitioner<
ygm::container::detail::hash<std::string_view>>
m_partitioner;

}; // class metall_graph

} // namespace metalldata
Expand Down
3 changes: 1 addition & 2 deletions include/multiseries/multiseries_record.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,7 @@ class basic_record_store {
const record_id_type record_id,
const series_type &value) {
if constexpr (std::is_same_v<series_type, std::string_view>) {
auto accessor =
cstr::add_string(value.data(), value.size(), *m_string_store);
auto accessor = cstr::add_string(value, *m_string_store);
priv_get_series_container<series_type>(series.container)[record_id] =
accessor;
} else {
Expand Down
52 changes: 38 additions & 14 deletions include/string_table/string_accessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@
#include <string>
#include <string_view>
#include <utility>
#include <boost/container_hash/hash.hpp>

namespace compact_string {

/// \brief Provides a way to access a string stored in a string store.
/// If a string is short, it stores the string in the object itself.
/// If a string is long, it stores the pointer to the string in the object.
/// Can take only strings allocated by allocate_string_embedding_length(),
/// however, w/o the length prefix.
class string_accessor {
public:
public:
using size_type = std::size_t;
using char_type = char;
using offset_t = std::ptrdiff_t;
using offset_t = std::ptrdiff_t;

private:
private:
using self_type = string_accessor;

static constexpr size_t k_num_blocks = sizeof(offset_t);
static constexpr size_t k_short_str_max_length =
k_num_blocks - 2; // -1 for '\0' and -1 for metadata
k_num_blocks - 2; // -1 for '\0' and -1 for metadata

public:
public:
string_accessor() = default;

/// \brief Construct a string accessor from a pointer to string.
Expand Down Expand Up @@ -62,7 +62,7 @@ class string_accessor {
// as offset must be recalculated.
priv_set_long_str_pointer(other.priv_to_long_str_pointer());
}
other.m_entier_block = 0; // clear the data
other.m_entier_block = 0; // clear the data
}

string_accessor &operator=(const string_accessor &other) {
Expand All @@ -83,7 +83,7 @@ class string_accessor {
} else {
priv_set_long_str_pointer(other.priv_to_long_str_pointer());
}
other.m_entier_block = 0; // clear the data
other.m_entier_block = 0; // clear the data
return *this;
}

Expand Down Expand Up @@ -129,7 +129,24 @@ class string_accessor {
}
}

private:
friend bool operator==(const string_accessor &lhs,
const string_accessor &rhs) {
if (lhs.length() != rhs.length()) {
return false;
}

if (lhs.is_short()) {
return std::char_traits<char>::compare(lhs.c_str(), rhs.c_str(),
lhs.length()) == 0;
}

// If the string is long, the same string is stored only once in the
// string store, so comparing c_str(), which returns chars*, is
// sufficient.
return lhs.c_str() == rhs.c_str();
}

private:
bool priv_get_long_flag() const { return m_blocks[k_num_blocks - 1] & 0x1; }

void priv_set_long_str_pointer(const char_type *const str) {
Expand All @@ -138,7 +155,7 @@ class string_accessor {

bool is_negative = false;
if (off < 0) {
off = -off;
off = -off;
is_negative = true;
}
if (uint64_t(off) > (1ULL << 55)) {
Expand All @@ -156,9 +173,9 @@ class string_accessor {
// Finally set the metadata
uint8_t metadata = 0;
if (is_negative) {
metadata |= 0x2; // set the negative bit
metadata |= 0x2; // set the negative bit
}
metadata |= 0x1; // set the long string bit
metadata |= 0x1; // set the long string bit

m_blocks[k_num_blocks - 1] = metadata;
}
Expand All @@ -174,7 +191,7 @@ class string_accessor {
off = -off;
}
auto addr =
reinterpret_cast<std::ptrdiff_t>(const_cast<self_type *>(this)) + off;
reinterpret_cast<std::ptrdiff_t>(const_cast<self_type *>(this)) + off;

return reinterpret_cast<char_type *>(addr);
}
Expand Down Expand Up @@ -217,4 +234,11 @@ class string_accessor {
"sizeof(offset_ptr_t) != sizeof(uint64_t)");
};
};
} // namespace compact_string

struct string_accessor_hasher {
std::size_t operator()(const string_accessor &str) const {
return boost::hash_range(str.c_str(), str.c_str() + str.length());
}
};

} // namespace compact_string
Loading