Skip to content

Commit

Permalink
perf: support system transaction
Browse files Browse the repository at this point in the history
Signed-off-by: Jian Zhang <[email protected]>
  • Loading branch information
zz-jason committed Sep 4, 2024
1 parent b273d12 commit cbd4e95
Show file tree
Hide file tree
Showing 48 changed files with 514 additions and 429 deletions.
2 changes: 1 addition & 1 deletion benchmarks/micro-benchmarks/InsertUpdateBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ static void BenchUpdateInsert(benchmark::State& state) {
std::filesystem::remove_all(dirPath);
std::filesystem::create_directories(dirPath);

StoreOption* option = CreateStoreOption("/tmp/InsertUpdateBench");
StoreOption* option = CreateStoreOption("/tmp/leanstore/InsertUpdateBench");
option->mCreateFromScratch = true;
option->mWorkerThreads = 4;
auto sLeanStore = std::make_unique<leanstore::LeanStore>(option);
Expand Down
1 change: 1 addition & 0 deletions benchmarks/ycsb/YcsbLeanStore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class YcsbLeanStore : public YcsbExecutor {
auto dataDirStr = FLAGS_ycsb_data_dir + std::string("/leanstore");
StoreOption* option = CreateStoreOption(dataDirStr.c_str());
option->mCreateFromScratch = createFromScratch;
option->mEnableEagerGc = true;
option->mWorkerThreads = FLAGS_ycsb_threads;
option->mBufferPoolSize = FLAGS_ycsb_mem_kb * 1024;
option->mEnableMetrics = true;
Expand Down
10 changes: 5 additions & 5 deletions benchmarks/ycsb/ycsb-config.flags
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
--ycsb_val_size=200
--ycsb_record_count=100000
--ycsb_mem_kb=1048576
--ycsb_run_for_seconds=5
--ycsb_target=basickv
--ycsb_workload=c
--ycsb_threads=2
--ycsb_cmd=load
--ycsb_run_for_seconds=600
--ycsb_target=transactionkv
--ycsb_workload=b
--ycsb_threads=8
--ycsb_cmd=run
27 changes: 19 additions & 8 deletions include/leanstore/LeanStore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,13 @@ class LeanStore {
//! NOTE: Ownerd by LeanStore instance, should be destroyed together with it
cr::CRManager* mCRManager;

//! The global timestamp oracle, used to generate start and commit timestamps
//! for all transactions in the store. Start from a positive number, 0
//! indicates invalid timestamp
std::atomic<uint64_t> mTimestampOracle = 1;
//! The global timestamp oracle for user transactions. Used to generate start and commit
//! timestamps for user transactions. Start from a positive number, 0 indicates invalid timestamp
std::atomic<uint64_t> mUsrTso = 1;

//! The global timestamp oracle for system transactions. Used to generate timestamps for system
//! transactions. Start from a positive number, 0 indicates invalid timestamp
std::atomic<uint64_t> mSysTso = 1;

//! The metrics manager
std::unique_ptr<leanstore::telemetry::MetricsManager> mMetricsManager;
Expand Down Expand Up @@ -118,13 +121,21 @@ class LeanStore {
//! Unregister a TransactionKV
void DropTransactionKV(const std::string& name);

uint64_t GetTs() {
return mTimestampOracle.load();
uint64_t GetUsrTxTs() {
return mUsrTso.load();

Check warning on line 125 in include/leanstore/LeanStore.hpp

View check run for this annotation

Codecov / codecov/patch

include/leanstore/LeanStore.hpp#L124-L125

Added lines #L124 - L125 were not covered by tests
}

uint64_t GetSysTxTs() {
return mSysTso.load();
}

//! Alloc a new timestamp from the timestamp oracle
uint64_t AllocTs() {
return mTimestampOracle.fetch_add(1);
uint64_t AllocUsrTxTs() {
return mUsrTso.fetch_add(1);
}

uint64_t AllocSysTxTs() {
return mSysTso.fetch_add(1);
}

//! Execute a custom user function on a worker thread.
Expand Down
18 changes: 10 additions & 8 deletions include/leanstore/btree/core/BTreeGeneric.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "leanstore/sync/HybridLatch.hpp"
#include "leanstore/utils/Log.hpp"

#include <rapidjson/document.h>

#include <atomic>
#include <limits>

Expand Down Expand Up @@ -55,9 +57,9 @@ class BTreeGeneric : public leanstore::storage::BufferManagedTree {

//! Try to merge the current node with its left or right sibling, reclaim the merged left or right
//! sibling if successful.
bool TryMergeMayJump(BufferFrame& toMerge, bool swizzleSibling = true);
bool TryMergeMayJump(TXID sysTxId, BufferFrame& toMerge, bool swizzleSibling = true);

void TrySplitMayJump(BufferFrame& toSplit, int16_t pos = -1);
void TrySplitMayJump(TXID sysTxId, BufferFrame& toSplit, int16_t pos = -1);

XMergeReturnCode XMerge(GuardedBufferFrame<BTreeNode>& guardedParent,
GuardedBufferFrame<BTreeNode>& guardedChild,
Expand Down Expand Up @@ -141,7 +143,7 @@ class BTreeGeneric : public leanstore::storage::BufferManagedTree {
//! | |
//! newLeft toSplit
///
void splitRootMayJump(GuardedBufferFrame<BTreeNode>& guardedParent,
void splitRootMayJump(TXID sysTxId, GuardedBufferFrame<BTreeNode>& guardedParent,
GuardedBufferFrame<BTreeNode>& guardedChild,
const BTreeNode::SeparatorInfo& sepInfo);

Expand All @@ -152,7 +154,7 @@ class BTreeGeneric : public leanstore::storage::BufferManagedTree {
//! | | |
//! toSplit newLeft toSplit
///
void splitNonRootMayJump(GuardedBufferFrame<BTreeNode>& guardedParent,
void splitNonRootMayJump(TXID sysTxId, GuardedBufferFrame<BTreeNode>& guardedParent,
GuardedBufferFrame<BTreeNode>& guardedChild,
const BTreeNode::SeparatorInfo& sepInfo,
uint16_t spaceNeededForSeparator);
Expand Down Expand Up @@ -198,15 +200,15 @@ class BTreeGeneric : public leanstore::storage::BufferManagedTree {
xGuardedMeta.Reclaim();
}

// static void ToJson(BTreeGeneric& btree, rapidjson::Document* resultDoc);
static void ToJson(BTreeGeneric& btree, rapidjson::Document* resultDoc);

private:
static void freeBTreeNodesRecursive(BTreeGeneric& btree,
GuardedBufferFrame<BTreeNode>& guardedNode);

// static void toJsonRecursive(BTreeGeneric& btree, GuardedBufferFrame<BTreeNode>& guardedNode,
// rapidjson::Value* resultObj,
// rapidjson::Value::AllocatorType& allocator);
static void toJsonRecursive(BTreeGeneric& btree, GuardedBufferFrame<BTreeNode>& guardedNode,
rapidjson::Value* resultObj,
rapidjson::Value::AllocatorType& allocator);

static ParentSwipHandler findParentMayJump(BTreeGeneric& btree, BufferFrame& bfToFind) {
return FindParent<true>(btree, bfToFind);
Expand Down
15 changes: 9 additions & 6 deletions include/leanstore/btree/core/PessimisticExclusiveIterator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,23 @@ class PessimisticExclusiveIterator : public PessimisticIterator {
virtual void InsertToCurrentNode(Slice key, Slice val) {
LS_DCHECK(KeyInCurrentNode(key));
LS_DCHECK(HasEnoughSpaceFor(key.size(), val.size()));
LS_DCHECK(mSlotId != -1);
LS_DCHECK(Valid());
mSlotId = mGuardedLeaf->InsertDoNotCopyPayload(key, val.size(), mSlotId);
std::memcpy(mGuardedLeaf->ValData(mSlotId), val.data(), val.size());
}

void SplitForKey(Slice key) {
auto sysTxId = mBTree.mStore->AllocSysTxTs();
while (true) {
JUMPMU_TRY() {
if (mSlotId == -1 || !KeyInCurrentNode(key)) {
if (!Valid() || !KeyInCurrentNode(key)) {
mBTree.FindLeafCanJump(key, mGuardedLeaf);
}
BufferFrame* bf = mGuardedLeaf.mBf;
mGuardedLeaf.unlock();
mSlotId = -1;
SetToInvalid();

mBTree.TrySplitMayJump(*bf);
mBTree.TrySplitMayJump(sysTxId, *bf);
COUNTERS_BLOCK() {
WorkerCounters::MyCounters().dt_split[mBTree.mTreeId]++;
}
Expand Down Expand Up @@ -162,7 +163,8 @@ class PessimisticExclusiveIterator : public PessimisticIterator {

mSlotId = -1;
JUMPMU_TRY() {
mBTree.TrySplitMayJump(*mGuardedLeaf.mBf, splitSlot);
TXID sysTxId = mBTree.mStore->AllocSysTxTs();

Check warning on line 166 in include/leanstore/btree/core/PessimisticExclusiveIterator.hpp

View check run for this annotation

Codecov / codecov/patch

include/leanstore/btree/core/PessimisticExclusiveIterator.hpp#L166

Added line #L166 was not covered by tests
mBTree.TrySplitMayJump(sysTxId, *mGuardedLeaf.mBf, splitSlot);

LS_DLOG("[Contention Split] succeed, pageId={}, contention pct={}, split "
"slot={}",
Expand Down Expand Up @@ -201,7 +203,8 @@ class PessimisticExclusiveIterator : public PessimisticIterator {
mGuardedLeaf.unlock();
mSlotId = -1;
JUMPMU_TRY() {
mBTree.TryMergeMayJump(*mGuardedLeaf.mBf);
TXID sysTxId = mBTree.mStore->AllocSysTxTs();
mBTree.TryMergeMayJump(sysTxId, *mGuardedLeaf.mBf);
}
JUMPMU_CATCH() {
LS_DLOG("TryMergeIfNeeded failed, pageId={}", mGuardedLeaf.mBf->mHeader.mPageId);
Expand Down
5 changes: 4 additions & 1 deletion include/leanstore/btree/core/PessimisticIterator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

#include <functional>

#include <sys/syscall.h>

namespace leanstore::storage::btree {

using LeafCallback = std::function<void(GuardedBufferFrame<BTreeNode>& guardedLeaf)>;
Expand Down Expand Up @@ -401,7 +403,8 @@ inline void PessimisticIterator::Next() {
if (mGuardedLeaf->mNumSlots == 0) {
SetCleanUpCallback([&, toMerge = mGuardedLeaf.mBf]() {
JUMPMU_TRY() {
mBTree.TryMergeMayJump(*toMerge, true);
TXID sysTxId = mBTree.mStore->AllocSysTxTs();

Check warning on line 406 in include/leanstore/btree/core/PessimisticIterator.hpp

View check run for this annotation

Codecov / codecov/patch

include/leanstore/btree/core/PessimisticIterator.hpp#L406

Added line #L406 was not covered by tests
mBTree.TryMergeMayJump(sysTxId, *toMerge, true);
}
JUMPMU_CATCH() {
}
Expand Down
4 changes: 2 additions & 2 deletions include/leanstore/buffer-manager/AsyncWriteBuffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace leanstore::storage {
//! writeBuffer.SubmitAll();
//! writeBuffer.WaitAll();
//! writeBuffer.IterateFlushedBfs([](BufferFrame& flushedBf, uint64_t
//! flushedGsn) {
//! flushedPsn) {
//! // do something with flushedBf
//! }, numFlushedBfs);
///
Expand Down Expand Up @@ -72,7 +72,7 @@ class AsyncWriteBuffer {
return mAIo.GetNumRequests();
}

void IterateFlushedBfs(std::function<void(BufferFrame& flushedBf, uint64_t flushedGsn)> callback,
void IterateFlushedBfs(std::function<void(BufferFrame& flushedBf, uint64_t flushedPsn)> callback,
uint64_t numFlushedBfs);

private:
Expand Down
29 changes: 19 additions & 10 deletions include/leanstore/buffer-manager/BufferFrame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "leanstore/utils/UserThread.hpp"

#include <atomic>
#include <cstdint>
#include <limits>

namespace leanstore::storage {
Expand Down Expand Up @@ -66,14 +67,14 @@ class BufferFrameHeader {
//! ID of page resides in this buffer frame.
PID mPageId = std::numeric_limits<PID>::max();

//! ID of the last worker who has modified the containing page. For remote
//! flush avoidance (RFA), see "Rethinking Logging, Checkpoints, and Recovery
//! for High-Performance Storage Engines, SIGMOD 2020" for details.
//! ID of the last worker who has modified the containing page. For remote flush avoidance (RFA),
//! see "Rethinking Logging, Checkpoints, and Recovery for High-Performance Storage Engines,
//! SIGMOD 2020" for details.
WORKERID mLastWriterWorker = std::numeric_limits<uint8_t>::max();

//! The flushed global sequence number of the containing page. Initialized to
//! the page GSN when loaded from disk.
uint64_t mFlushedGsn = 0;
//! The flushed page sequence number of the containing page. Initialized when the containing page
//! is loaded from disk.
uint64_t mFlushedPsn = 0;

//! Whether the containing page is being written back to disk.
std::atomic<bool> mIsBeingWrittenBack = false;
Expand All @@ -97,7 +98,7 @@ class BufferFrameHeader {

mPageId = std::numeric_limits<PID>::max();
mLastWriterWorker = std::numeric_limits<uint8_t>::max();
mFlushedGsn = 0;
mFlushedPsn = 0;
mIsBeingWrittenBack.store(false, std::memory_order_release);
mContentionStats.Reset();
mCrc = 0;
Expand Down Expand Up @@ -130,9 +131,15 @@ class Page {
//! Short for "global sequence number", increased when a page is modified.
//! It's used to check whether the page has been read or written by
//! transactions in other workers.
//! A page is "dirty" when mPage.mGSN > mHeader.mFlushedGsn.
uint64_t mGSN = 0;

//! Short for "system transaction id", increased when a system transaction modifies the page.
uint64_t mSysTxId = 0;

//! Short for "page sequence number", increased when a page is modified by any user or system
//! transaction. A page is "dirty" when mPage.mPsn > mHeader.mFlushedPsn.
uint64_t mPsn = 0;

//! The btree ID it belongs to.
TREEID mBTreeId = std::numeric_limits<TREEID>::max();

Expand Down Expand Up @@ -176,7 +183,7 @@ class BufferFrame {
}

bool IsDirty() const {
return mPage.mGSN != mHeader.mFlushedGsn;
return mPage.mPsn != mHeader.mFlushedPsn;
}

bool IsFree() const {
Expand All @@ -192,9 +199,11 @@ class BufferFrame {
LS_DCHECK(mHeader.mState == State::kFree);
mHeader.mPageId = pageId;
mHeader.mState = State::kHot;
mHeader.mFlushedGsn = 0;
mHeader.mFlushedPsn = 0;

mPage.mGSN = 0;
mPage.mSysTxId = 0;
mPage.mPsn = 0;
}

// Pre: bf is exclusively locked
Expand Down
Loading

0 comments on commit cbd4e95

Please sign in to comment.