Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: support system transaction #127

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/micro-benchmarks/InsertUpdateBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ static void BenchUpdateInsert(benchmark::State& state) {
std::filesystem::remove_all(dirPath);
std::filesystem::create_directories(dirPath);

StoreOption* option = CreateStoreOption("/tmp/InsertUpdateBench");
StoreOption* option = CreateStoreOption("/tmp/leanstore/InsertUpdateBench");
option->mCreateFromScratch = true;
option->mWorkerThreads = 4;
auto sLeanStore = std::make_unique<leanstore::LeanStore>(option);
Expand Down
1 change: 1 addition & 0 deletions benchmarks/ycsb/YcsbLeanStore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class YcsbLeanStore : public YcsbExecutor {
auto dataDirStr = FLAGS_ycsb_data_dir + std::string("/leanstore");
StoreOption* option = CreateStoreOption(dataDirStr.c_str());
option->mCreateFromScratch = createFromScratch;
option->mEnableEagerGc = true;
option->mWorkerThreads = FLAGS_ycsb_threads;
option->mBufferPoolSize = FLAGS_ycsb_mem_kb * 1024;
option->mEnableMetrics = true;
Expand Down
10 changes: 5 additions & 5 deletions benchmarks/ycsb/ycsb-config.flags
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
--ycsb_val_size=200
--ycsb_record_count=100000
--ycsb_mem_kb=1048576
--ycsb_run_for_seconds=5
--ycsb_target=basickv
--ycsb_workload=c
--ycsb_threads=2
--ycsb_cmd=load
--ycsb_run_for_seconds=600
--ycsb_target=transactionkv
--ycsb_workload=b
--ycsb_threads=8
--ycsb_cmd=run
27 changes: 19 additions & 8 deletions include/leanstore/LeanStore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,13 @@
//! NOTE: Ownerd by LeanStore instance, should be destroyed together with it
cr::CRManager* mCRManager;

//! The global timestamp oracle, used to generate start and commit timestamps
//! for all transactions in the store. Start from a positive number, 0
//! indicates invalid timestamp
std::atomic<uint64_t> mTimestampOracle = 1;
//! The global timestamp oracle for user transactions. Used to generate start and commit
//! timestamps for user transactions. Start from a positive number, 0 indicates invalid timestamp
std::atomic<uint64_t> mUsrTso = 1;

//! The global timestamp oracle for system transactions. Used to generate timestamps for system
//! transactions. Start from a positive number, 0 indicates invalid timestamp
std::atomic<uint64_t> mSysTso = 1;

//! The metrics manager
std::unique_ptr<leanstore::telemetry::MetricsManager> mMetricsManager;
Expand Down Expand Up @@ -118,13 +121,21 @@
//! Unregister a TransactionKV
void DropTransactionKV(const std::string& name);

uint64_t GetTs() {
return mTimestampOracle.load();
uint64_t GetUsrTxTs() {
return mUsrTso.load();

Check warning on line 125 in include/leanstore/LeanStore.hpp

View check run for this annotation

Codecov / codecov/patch

include/leanstore/LeanStore.hpp#L124-L125

Added lines #L124 - L125 were not covered by tests
}

uint64_t GetSysTxTs() {
return mSysTso.load();
}

//! Alloc a new timestamp from the timestamp oracle
uint64_t AllocTs() {
return mTimestampOracle.fetch_add(1);
uint64_t AllocUsrTxTs() {
return mUsrTso.fetch_add(1);
}

uint64_t AllocSysTxTs() {
return mSysTso.fetch_add(1);
}

//! Execute a custom user function on a worker thread.
Expand Down
18 changes: 10 additions & 8 deletions include/leanstore/btree/core/BTreeGeneric.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "leanstore/sync/HybridLatch.hpp"
#include "leanstore/utils/Log.hpp"

#include <rapidjson/document.h>

#include <atomic>
#include <limits>

Expand Down Expand Up @@ -55,9 +57,9 @@ class BTreeGeneric : public leanstore::storage::BufferManagedTree {

//! Try to merge the current node with its left or right sibling, reclaim the merged left or right
//! sibling if successful.
bool TryMergeMayJump(BufferFrame& toMerge, bool swizzleSibling = true);
bool TryMergeMayJump(TXID sysTxId, BufferFrame& toMerge, bool swizzleSibling = true);

void TrySplitMayJump(BufferFrame& toSplit, int16_t pos = -1);
void TrySplitMayJump(TXID sysTxId, BufferFrame& toSplit, int16_t pos = -1);

XMergeReturnCode XMerge(GuardedBufferFrame<BTreeNode>& guardedParent,
GuardedBufferFrame<BTreeNode>& guardedChild,
Expand Down Expand Up @@ -141,7 +143,7 @@ class BTreeGeneric : public leanstore::storage::BufferManagedTree {
//! | |
//! newLeft toSplit
///
void splitRootMayJump(GuardedBufferFrame<BTreeNode>& guardedParent,
void splitRootMayJump(TXID sysTxId, GuardedBufferFrame<BTreeNode>& guardedParent,
GuardedBufferFrame<BTreeNode>& guardedChild,
const BTreeNode::SeparatorInfo& sepInfo);

Expand All @@ -152,7 +154,7 @@ class BTreeGeneric : public leanstore::storage::BufferManagedTree {
//! | | |
//! toSplit newLeft toSplit
///
void splitNonRootMayJump(GuardedBufferFrame<BTreeNode>& guardedParent,
void splitNonRootMayJump(TXID sysTxId, GuardedBufferFrame<BTreeNode>& guardedParent,
GuardedBufferFrame<BTreeNode>& guardedChild,
const BTreeNode::SeparatorInfo& sepInfo,
uint16_t spaceNeededForSeparator);
Expand Down Expand Up @@ -198,15 +200,15 @@ class BTreeGeneric : public leanstore::storage::BufferManagedTree {
xGuardedMeta.Reclaim();
}

// static void ToJson(BTreeGeneric& btree, rapidjson::Document* resultDoc);
static void ToJson(BTreeGeneric& btree, rapidjson::Document* resultDoc);

private:
static void freeBTreeNodesRecursive(BTreeGeneric& btree,
GuardedBufferFrame<BTreeNode>& guardedNode);

// static void toJsonRecursive(BTreeGeneric& btree, GuardedBufferFrame<BTreeNode>& guardedNode,
// rapidjson::Value* resultObj,
// rapidjson::Value::AllocatorType& allocator);
static void toJsonRecursive(BTreeGeneric& btree, GuardedBufferFrame<BTreeNode>& guardedNode,
rapidjson::Value* resultObj,
rapidjson::Value::AllocatorType& allocator);

static ParentSwipHandler findParentMayJump(BTreeGeneric& btree, BufferFrame& bfToFind) {
return FindParent<true>(btree, bfToFind);
Expand Down
15 changes: 9 additions & 6 deletions include/leanstore/btree/core/PessimisticExclusiveIterator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,23 @@
virtual void InsertToCurrentNode(Slice key, Slice val) {
LS_DCHECK(KeyInCurrentNode(key));
LS_DCHECK(HasEnoughSpaceFor(key.size(), val.size()));
LS_DCHECK(mSlotId != -1);
LS_DCHECK(Valid());
mSlotId = mGuardedLeaf->InsertDoNotCopyPayload(key, val.size(), mSlotId);
std::memcpy(mGuardedLeaf->ValData(mSlotId), val.data(), val.size());
}

void SplitForKey(Slice key) {
auto sysTxId = mBTree.mStore->AllocSysTxTs();
while (true) {
JUMPMU_TRY() {
if (mSlotId == -1 || !KeyInCurrentNode(key)) {
if (!Valid() || !KeyInCurrentNode(key)) {
mBTree.FindLeafCanJump(key, mGuardedLeaf);
}
BufferFrame* bf = mGuardedLeaf.mBf;
mGuardedLeaf.unlock();
mSlotId = -1;
SetToInvalid();

mBTree.TrySplitMayJump(*bf);
mBTree.TrySplitMayJump(sysTxId, *bf);
COUNTERS_BLOCK() {
WorkerCounters::MyCounters().dt_split[mBTree.mTreeId]++;
}
Expand Down Expand Up @@ -162,7 +163,8 @@

mSlotId = -1;
JUMPMU_TRY() {
mBTree.TrySplitMayJump(*mGuardedLeaf.mBf, splitSlot);
TXID sysTxId = mBTree.mStore->AllocSysTxTs();

Check warning on line 166 in include/leanstore/btree/core/PessimisticExclusiveIterator.hpp

View check run for this annotation

Codecov / codecov/patch

include/leanstore/btree/core/PessimisticExclusiveIterator.hpp#L166

Added line #L166 was not covered by tests
mBTree.TrySplitMayJump(sysTxId, *mGuardedLeaf.mBf, splitSlot);

LS_DLOG("[Contention Split] succeed, pageId={}, contention pct={}, split "
"slot={}",
Expand Down Expand Up @@ -201,7 +203,8 @@
mGuardedLeaf.unlock();
mSlotId = -1;
JUMPMU_TRY() {
mBTree.TryMergeMayJump(*mGuardedLeaf.mBf);
TXID sysTxId = mBTree.mStore->AllocSysTxTs();
mBTree.TryMergeMayJump(sysTxId, *mGuardedLeaf.mBf);
}
JUMPMU_CATCH() {
LS_DLOG("TryMergeIfNeeded failed, pageId={}", mGuardedLeaf.mBf->mHeader.mPageId);
Expand Down
5 changes: 4 additions & 1 deletion include/leanstore/btree/core/PessimisticIterator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

#include <functional>

#include <sys/syscall.h>

namespace leanstore::storage::btree {

using LeafCallback = std::function<void(GuardedBufferFrame<BTreeNode>& guardedLeaf)>;
Expand Down Expand Up @@ -401,7 +403,8 @@
if (mGuardedLeaf->mNumSlots == 0) {
SetCleanUpCallback([&, toMerge = mGuardedLeaf.mBf]() {
JUMPMU_TRY() {
mBTree.TryMergeMayJump(*toMerge, true);
TXID sysTxId = mBTree.mStore->AllocSysTxTs();

Check warning on line 406 in include/leanstore/btree/core/PessimisticIterator.hpp

View check run for this annotation

Codecov / codecov/patch

include/leanstore/btree/core/PessimisticIterator.hpp#L406

Added line #L406 was not covered by tests
mBTree.TryMergeMayJump(sysTxId, *toMerge, true);
}
JUMPMU_CATCH() {
}
Expand Down
4 changes: 2 additions & 2 deletions include/leanstore/buffer-manager/AsyncWriteBuffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace leanstore::storage {
//! writeBuffer.SubmitAll();
//! writeBuffer.WaitAll();
//! writeBuffer.IterateFlushedBfs([](BufferFrame& flushedBf, uint64_t
//! flushedGsn) {
//! flushedPsn) {
//! // do something with flushedBf
//! }, numFlushedBfs);
///
Expand Down Expand Up @@ -72,7 +72,7 @@ class AsyncWriteBuffer {
return mAIo.GetNumRequests();
}

void IterateFlushedBfs(std::function<void(BufferFrame& flushedBf, uint64_t flushedGsn)> callback,
void IterateFlushedBfs(std::function<void(BufferFrame& flushedBf, uint64_t flushedPsn)> callback,
uint64_t numFlushedBfs);

private:
Expand Down
29 changes: 19 additions & 10 deletions include/leanstore/buffer-manager/BufferFrame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "leanstore/utils/UserThread.hpp"

#include <atomic>
#include <cstdint>
#include <limits>

namespace leanstore::storage {
Expand Down Expand Up @@ -66,14 +67,14 @@ class BufferFrameHeader {
//! ID of page resides in this buffer frame.
PID mPageId = std::numeric_limits<PID>::max();

//! ID of the last worker who has modified the containing page. For remote
//! flush avoidance (RFA), see "Rethinking Logging, Checkpoints, and Recovery
//! for High-Performance Storage Engines, SIGMOD 2020" for details.
//! ID of the last worker who has modified the containing page. For remote flush avoidance (RFA),
//! see "Rethinking Logging, Checkpoints, and Recovery for High-Performance Storage Engines,
//! SIGMOD 2020" for details.
WORKERID mLastWriterWorker = std::numeric_limits<uint8_t>::max();

//! The flushed global sequence number of the containing page. Initialized to
//! the page GSN when loaded from disk.
uint64_t mFlushedGsn = 0;
//! The flushed page sequence number of the containing page. Initialized when the containing page
//! is loaded from disk.
uint64_t mFlushedPsn = 0;

//! Whether the containing page is being written back to disk.
std::atomic<bool> mIsBeingWrittenBack = false;
Expand All @@ -97,7 +98,7 @@ class BufferFrameHeader {

mPageId = std::numeric_limits<PID>::max();
mLastWriterWorker = std::numeric_limits<uint8_t>::max();
mFlushedGsn = 0;
mFlushedPsn = 0;
mIsBeingWrittenBack.store(false, std::memory_order_release);
mContentionStats.Reset();
mCrc = 0;
Expand Down Expand Up @@ -130,9 +131,15 @@ class Page {
//! Short for "global sequence number", increased when a page is modified.
//! It's used to check whether the page has been read or written by
//! transactions in other workers.
//! A page is "dirty" when mPage.mGSN > mHeader.mFlushedGsn.
uint64_t mGSN = 0;

//! Short for "system transaction id", increased when a system transaction modifies the page.
uint64_t mSysTxId = 0;

//! Short for "page sequence number", increased when a page is modified by any user or system
//! transaction. A page is "dirty" when mPage.mPsn > mHeader.mFlushedPsn.
uint64_t mPsn = 0;

//! The btree ID it belongs to.
TREEID mBTreeId = std::numeric_limits<TREEID>::max();

Expand Down Expand Up @@ -176,7 +183,7 @@ class BufferFrame {
}

bool IsDirty() const {
return mPage.mGSN != mHeader.mFlushedGsn;
return mPage.mPsn != mHeader.mFlushedPsn;
}

bool IsFree() const {
Expand All @@ -192,9 +199,11 @@ class BufferFrame {
LS_DCHECK(mHeader.mState == State::kFree);
mHeader.mPageId = pageId;
mHeader.mState = State::kHot;
mHeader.mFlushedGsn = 0;
mHeader.mFlushedPsn = 0;

mPage.mGSN = 0;
mPage.mSysTxId = 0;
mPage.mPsn = 0;
}

// Pre: bf is exclusively locked
Expand Down
Loading
Loading