Skip to content

Commit

Permalink
entity cache initial size and increment made configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
mrbald committed Dec 9, 2024
1 parent 98d9e5d commit 15d3ac8
Showing 1 changed file with 115 additions and 123 deletions.
238 changes: 115 additions & 123 deletions rtb/datacache/entity_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
*
*/

#ifndef __DATACACHE_ENTITY_CACHE_HPP__
#define __DATACACHE_ENTITY_CACHE_HPP__
#pragma once

#include <algorithm>
#include <type_traits>
#include <concepts>
#include <boost/interprocess/exceptions.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/string.hpp>
Expand All @@ -34,6 +34,7 @@

#include <boost/version.hpp>
#include <boost/core/demangle.hpp>
#include <utility>
#include "rtb/core/core.hpp"
#include "rtb/common/concepts.hpp"

Expand Down Expand Up @@ -92,8 +93,10 @@ struct retriever<Tag,std::vector<std::shared_ptr<Serializable>>> {
return !entries.empty();
}
};

template<typename Memory, template <class,class...> class Container, size_t MEMORY_SIZE = 67108864, typename ...T>

inline constexpr size_t ENTITY_CACHE_DEFAULT_MEMORY_SIZE = 67108864 /* 64 MB*/;
inline constexpr size_t ENTITY_CACHE_DEFAULT_MEMORY_GROW_INCREMENT = 67108864 /* 64 MB*/;
template<typename Memory, template <class,class...> class Container, typename ...T>
class entity_cache
{
public:
Expand All @@ -105,135 +108,113 @@ class entity_cache
using char_string = boost::interprocess::basic_string<char, std::char_traits<char>, char_allocator> ;
using Container_t = Container<char_allocator,T...> ;
using Data_t = typename Container_t::value_type;

entity_cache(const std::string &name) :
_segment_ptr(), _container_ptr(), _store_name(), _cache_name(name), _named_mutex(bip::open_or_create,
(_cache_name + "_mutex").c_str()) {
//TODO: add to ctor to switch between mmap and shm
//TODO: maybe needs bip::scoped_lock to lock for other processes calling grow_memory
std::string data_base_dir = "/tmp/CACHE" ;
_store_name = Memory::convert_base_dir(data_base_dir) + _cache_name;
_segment_ptr.reset(Memory::open_or_create_segment(_store_name.c_str(), MEMORY_SIZE) ) ;
_container_ptr = _segment_ptr->template find_or_construct<Container_t>( _cache_name.c_str() )
(typename Container_t::ctor_args_list() , typename Container_t::allocator_type(_segment_ptr->get_segment_manager()));

explicit entity_cache(std::string name, size_t const memory_size = ENTITY_CACHE_DEFAULT_MEMORY_SIZE,
size_t const memory_grow_increment = ENTITY_CACHE_DEFAULT_MEMORY_GROW_INCREMENT)
: _segment_ptr(), _container_ptr(), _store_name(), _cache_name(std::move(name)),
_named_mutex(bip::open_or_create, (_cache_name + "_mutex").c_str()),
_memory_grow_increment{memory_grow_increment} {
// TODO: add to ctor to switch between mmap and shm
// TODO: maybe needs bip::scoped_lock to lock for other processes calling grow_memory
std::string data_base_dir = "/tmp/CACHE";
_store_name = Memory::convert_base_dir(data_base_dir) + _cache_name;
_segment_ptr.reset(Memory::open_or_create_segment(_store_name.c_str(), memory_size));
_container_ptr = _segment_ptr->template find_or_construct<Container_t>(_cache_name.c_str())(
typename Container_t::ctor_args_list(),
typename Container_t::allocator_type(_segment_ptr->get_segment_manager()));
}


template <std::invocable F>
auto read(F&& f) {
auto guard = bip::sharable_lock{_named_mutex};
return f();
}

template <std::invocable F>
auto modify(F&& f) {
auto guard = bip::scoped_lock{_named_mutex};
return f();
}

void clear() {
bip::scoped_lock<bip::named_upgradable_mutex> guard(_named_mutex) ;
bip::scoped_lock guard(_named_mutex) ;
_container_ptr->clear();
}

template<typename Tag, typename Key, typename Serializable, typename Arg>
bool update( Key && key, Serializable && data, Arg&& arg) {
bip::scoped_lock<bip::named_upgradable_mutex> guard(_named_mutex) ;
bip::scoped_lock guard(_named_mutex) ;
bool is_success {false};
auto &index = _container_ptr->template get<Tag>();
auto p = index.equal_range(std::forward<Arg>(arg));
while ( p.first != p.second ) {
try {
is_success |= update_data(std::forward<Key>(key),std::forward<Serializable>(data),index,p.first++);
} catch (const bad_alloc_exception_t &e) {
LOG(debug) << boost::core::demangle(typeid(*this).name())
<< " data was not updated , MEMORY AVAILABLE="
<< _segment_ptr->get_free_memory() ;
grow_memory(MEMORY_SIZE);
is_success |= update_data(std::forward<Key>(key),std::forward<Serializable>(data),index,p.first++);
}
while (p.first != p.second) {
is_success |= retried_with_grow("update_data", [&, this] {
return update_data(std::forward<Key>(key),std::forward<Serializable>(data),index,p.first++);
});
}
return is_success;
}

template<typename Tag, typename Key, typename Serializable, typename ...Args>
bool update( Key && key, Serializable && data, Args&& ...args) {
bip::scoped_lock<bip::named_upgradable_mutex> guard(_named_mutex) ;
bip::scoped_lock guard(_named_mutex) ;
bool is_success {false};
//Memory::attach([this](){attach();}); // reattach to newly created
auto &index = _container_ptr->template get<Tag>();
auto p = index.equal_range(boost::make_tuple(std::forward<Args>(args)...));
while ( p.first != p.second ) {
try {
is_success |= update_data(std::forward<Key>(key),std::forward<Serializable>(data),index,p.first++);
} catch (const bad_alloc_exception_t &e) {
LOG(debug) << boost::core::demangle(typeid(*this).name())
<< " data was not updated , MEMORY AVAILABLE="
<< _segment_ptr->get_free_memory() ;
grow_memory(MEMORY_SIZE);
is_success |= update_data(std::forward<Key>(key),std::forward<Serializable>(data),index,p.first++);
}
while ( p.first != p.second ) {
is_success |= retried_with_grow("update_data", [&, this] {
return update_data(std::forward<Key>(key),std::forward<Serializable>(data),index,p.first++);
});
}
return is_success;
}

template<typename Tag, typename Functor, typename Arg>
bool update( Functor && func, Arg&& arg) {
bip::scoped_lock<bip::named_upgradable_mutex> guard(_named_mutex) ;
bip::scoped_lock guard(_named_mutex) ;
auto &index = _container_ptr->template get<Tag>();
auto p = index.equal_range(std::forward<Arg>(arg));
return modify(index, p.first, p.second, std::forward<Functor>(func));
}

template<typename Tag, typename Functor, typename ...Args>
bool update( Functor && func, Args&& ...args) {
bip::scoped_lock<bip::named_upgradable_mutex> guard(_named_mutex) ;
bip::scoped_lock guard(_named_mutex) ;
auto &index = _container_ptr->template get<Tag>();
auto p = index.equal_range(boost::make_tuple(std::forward<Args>(args)...));
return modify(index, p.first, p.second, std::forward<Functor>(func));
}

template<typename Key, typename Serializable>
auto insert( Key && key, Serializable &&data) {
bip::scoped_lock<bip::named_upgradable_mutex> guard(_named_mutex) ;
decltype(insert_data(key, data)) is_success ;
try {
is_success = insert_data(std::forward<Key>(key), std::forward<Serializable>(data));
} catch (const bad_alloc_exception_t &e) {
LOG(debug) << boost::core::demangle(typeid(*this).name())
<< " data was not inserted , MEMORY AVAILABLE="
<< _segment_ptr->get_free_memory();
grow_memory(MEMORY_SIZE);
is_success = insert_data(std::forward<Key>(key), std::forward<Serializable>(data));
}

return is_success;
bip::scoped_lock guard(_named_mutex) ;
return insert_unsafe(std::forward<Key>(key), std::forward<Serializable>(data));
}

/*****************
template<typename Serializable>
bool insert( const std::vector<Serializable> &data) {
bip::scoped_lock<bip::named_upgradable_mutex> guard(_named_mutex) ;
bool is_success {false};
std::size_t n {data.size()} ;
for ( const auto &item : data) {
try {
if ( insert_data(data) ) { --n; }
} catch (const bad_alloc_exception_t &e) {
LOG(debug) << boost::core::demangle(typeid(*this).name())
<< " data was not inserted , MEMORY AVAILABLE="
<< _segment_ptr->get_free_memory();
grow_memory(MEMORY_SIZE);
if ( insert_data(data) ) { --n; }
}
}
return !n;

template<typename Key, typename Serializable>
auto insert_unsafe( Key && key, Serializable &&data) {
return retried_with_grow("insert_data", [&, this] {
return insert_data(std::forward<Key>(key), std::forward<Serializable>(data));
});
}
*******************/


template<typename Tag, typename Serializable, typename ...Args>
bool retrieve(Serializable &entry, Args&& ...args) {
bip::sharable_lock<bip::named_upgradable_mutex> guard(_named_mutex);
bip::sharable_lock guard(_named_mutex);
return retriever<Tag,Serializable>()(*_container_ptr,entry,std::forward<Args>(args)...);
}

template<typename Tag, typename ...Args>
auto retrieve_raw(Args&& ...args) {
bip::sharable_lock<bip::named_upgradable_mutex> guard(_named_mutex);
bip::sharable_lock guard(_named_mutex);
auto &idx = _container_ptr->template get<Tag>();
return equal_range(idx, std::forward<Args>(args)...);
}

template<typename Serializable>
bool retrieve(std::vector<std::shared_ptr<Serializable>> &entries) {
bip::sharable_lock<bip::named_upgradable_mutex> guard(_named_mutex);
bip::sharable_lock guard(_named_mutex);
auto p = std::make_pair(_container_ptr->begin(), _container_ptr->end());
std::transform ( p.first, p.second, std::back_inserter(entries), [] ( const Data_t &data ) {
std::shared_ptr<Serializable> impl_ptr { std::make_shared<Serializable>() } ;
Expand All @@ -244,38 +225,55 @@ class entity_cache
}

template<typename Tag, typename ...Args>
void remove(Args&& ...args) {
bip::scoped_lock<bip::named_upgradable_mutex> guard(_named_mutex);
auto remove(Args&& ...args) {
bip::scoped_lock guard(_named_mutex);
return remove_unsafe<Tag>(std::forward<Args>(args)...);
}

template<typename Tag, typename ...Args>
void remove_unsafe(Args&& ...args) {
auto p = _container_ptr->template get<Tag>().equal_range(boost::make_tuple(std::forward<Args>(args)...));
_container_ptr->erase(p.first, p.second);
}


template<typename Tag, typename Arg>
auto remove(Arg && arg) {
bip::scoped_lock guard(_named_mutex);
return remove_unsafe<Tag>(std::forward<Arg>(arg));
}

template<typename Tag, typename Arg>
void remove(Arg && arg) {
bip::scoped_lock<bip::named_upgradable_mutex> guard(_named_mutex);
void remove_unsafe(Arg && arg) {
auto p = _container_ptr->template get<Tag>().equal_range(std::forward<Arg>(arg));
_container_ptr->erase(p.first, p.second);
}

char_string create_ipc_key(const std::string &key) const {
try {
char_string tmp(key.data(), key.size(), _segment_ptr->get_segment_manager()) ;
return tmp;
} catch ( const bad_alloc_exception_t &e ) {
LOG(debug) << boost::core::demangle(typeid(*this).name())
<< " create_ipc_key failed , MEMORY AVAILABLE="
<< _segment_ptr->get_free_memory();
grow_memory(MEMORY_SIZE) ;
char_string tmp(key.data(), key.size(), _segment_ptr->get_segment_manager()) ;
return tmp;
}
}
char_string create_ipc_key(const std::string &key) const {
return retried_with_grow("create_ipc_key", [&, this] {
return char_string{key.data(), key.size(), _segment_ptr->get_segment_manager()};
});
}

size_t get_size() const {
bip::sharable_lock<bip::named_upgradable_mutex> guard(_named_mutex) ;
return _container_ptr->size();
}
size_t get_size() const {
bip::sharable_lock guard(_named_mutex) ;
return _container_ptr->size();
}
private:
auto retried_with_grow([[maybe_unused]] std::string_view op_name, auto&& op) {
try {
return op();
} catch ([[maybe_unused]] bad_alloc_exception_t const& e) {
#ifndef NDEBUG
LOG(debug) << boost::core::demangle(typeid(*this).name()) << " " << op_name
<< " failed , MEMORY AVAILABLE=" << _segment_ptr->get_free_memory();
#endif
if (grow_memory(_memory_grow_increment)) [[likely]] {
return op();
}
throw;
}
}

void attach() const {
if constexpr ( !std::is_same_v<segment_t, boost::interprocess::managed_heap_memory> ) {
_segment_ptr.reset(new segment_t(bip::open_only,_store_name.c_str()) ) ;
Expand All @@ -284,17 +282,20 @@ class entity_cache
(typename Container_t::ctor_args_list(), typename Container_t::allocator_type(_segment_ptr->get_segment_manager()));
}

void grow_memory(size_t size) const {
bool grow_memory(size_t size) const {
bool success = false;
try {
if constexpr ( !std::is_same_v<segment_t, boost::interprocess::managed_heap_memory> ) {
_segment_ptr.reset() ;
}
Memory::grow(_segment_ptr, _store_name.c_str(), size) ;
} catch ( const bad_alloc_exception_t &e ) {
LOG(debug) << boost::core::demangle(typeid(*this).name())
<< " failed to grow " << e.what() << ":free mem=" << _segment_ptr->get_free_memory() ;
if constexpr (!std::is_same_v<segment_t, boost::interprocess::managed_heap_memory>) {
_segment_ptr.reset();
}
Memory::grow(_segment_ptr, _store_name.c_str(), size);
success = true;
} catch (bad_alloc_exception_t const& e) {
LOG(debug) << boost::core::demangle(typeid(*this).name()) << " failed to grow " << e.what()
<< ":free mem=" << _segment_ptr->get_free_memory();
}
Memory::attach([this](){attach();}); // reattach to newly created
Memory::attach([this]() { attach(); }); // reattach to newly created
return success;
}

template<typename Key, typename Serializable>
Expand All @@ -316,29 +317,20 @@ class entity_cache
auto modify ( Index & index, Iterator first , Iterator last, Functor && func) {
bool is_success {false};
while ( first != last ) {
try {
is_success |= index.modify(first++, std::forward<Functor>(func));
} catch (const bad_alloc_exception_t &e) {
LOG(debug) << boost::core::demangle(typeid(*this).name())
<< " data by functor was not updated , MEMORY AVAILABLE="
<< _segment_ptr->get_free_memory() ;
grow_memory(MEMORY_SIZE);
is_success |= index.modify(first++, func);
}
is_success |= retried_with_grow("modify", [&] {
return index.modify(first++, std::forward<Functor>(func));
});
}
return is_success;
}

mutable boost::scoped_ptr<segment_t> _segment_ptr;
mutable std::unique_ptr<segment_t> _segment_ptr;
mutable Container_t *_container_ptr ;
std::string _store_name ;
std::string _cache_name ;
mutable boost::interprocess::named_upgradable_mutex _named_mutex;
size_t _memory_grow_increment;
};

}


#endif /* __DATACACHE_ENTITY_CACHE_HPP__ */


0 comments on commit 15d3ac8

Please sign in to comment.