Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve ycsb performance #128

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions include/leanstore/concurrency/ConcurrencyControl.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "leanstore/LeanStore.hpp"
#include "leanstore/Slice.hpp"
#include "leanstore/Units.hpp"
#include "leanstore/concurrency/HistoryStorage.hpp"
#include "leanstore/profiling/counters/CRCounters.hpp"
Expand Down Expand Up @@ -193,11 +194,12 @@ class ConcurrencyControl {
//! @param getCallback: the callback function to be called when the version is found.
//! @return: true if the version is found, false otherwise.
inline bool GetVersion(WORKERID newerWorkerId, TXID newerTxId, COMMANDID newerCommandId,
std::function<void(const uint8_t*, uint64_t versionSize)> getCallback) {
std::function<void(Slice)> versionCallback) {
utils::Timer timer(CRCounters::MyCounters().cc_ms_history_tree_retrieve);
auto isRemoveCommand = newerCommandId & kRemoveCommandMark;
return Other(newerWorkerId)
.mHistoryStorage.GetVersion(newerTxId, newerCommandId, isRemoveCommand, getCallback);
.mHistoryStorage.GetVersion(newerTxId, newerCommandId, isRemoveCommand,
std::move(versionCallback));
}

//! Put a version to the version storage. The callback function is called with the version data
Expand Down
3 changes: 2 additions & 1 deletion include/leanstore/concurrency/HistoryStorage.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "leanstore/Slice.hpp"
#include "leanstore/Units.hpp"

#include <cstdint>
Expand Down Expand Up @@ -88,7 +89,7 @@ class HistoryStorage {
uint64_t payloadLength, std::function<void(uint8_t*)> cb, bool sameThread = true);

bool GetVersion(TXID newerTxId, COMMANDID newerCommandId, const bool isRemoveCommand,
std::function<void(const uint8_t*, uint64_t)> cb);
std::function<void(Slice)> cb);

void PurgeVersions(TXID fromTxId, TXID toTxId, RemoveVersionCallback cb, const uint64_t limit);

Expand Down
19 changes: 5 additions & 14 deletions include/leanstore/concurrency/Logging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,11 @@ class Logging {
//! processing.
WalEntryComplex* mActiveWALEntryComplex;

//! Protects mTxToCommit
std::mutex mTxToCommitMutex;
//! The pending-to-commit transactions which have remote dependencies.
std::atomic<Transaction*> mActiveTxToCommit;

//! The queue for each worker thread to store pending-to-commit transactions which have remote
//! dependencies.
std::vector<Transaction> mTxToCommit;

//! Protects mTxToCommit
std::mutex mRfaTxToCommitMutex;

//! The queue for each worker thread to store pending-to-commit transactions which doesn't have
//! any remote dependencies.
std::vector<Transaction> mRfaTxToCommit;
//! The pending-to-commit transactions which doesn't have any remote dependencies.
std::atomic<Transaction*> mActiveRfaTxToCommit;

//! Represents the maximum commit timestamp in the worker. Transactions in the worker are
//! committed if their commit timestamps are smaller than it.
Expand Down Expand Up @@ -137,8 +129,7 @@ class Logging {

void publishWalFlushReq();

//! Calculate the continuous free space left in the wal ring buffer. Return
//! size of the contiguous free space.
//! Continuous free space left in the wal ring buffer.
uint32_t walContiguousFreeSpace();
};

Expand Down
5 changes: 3 additions & 2 deletions src/btree/ChainedTuple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ std::tuple<OpCode, uint16_t> ChainedTuple::GetVisibleTuple(Slice payload,
uint16_t versionsRead = 1;
while (true) {
bool found = cr::WorkerContext::My().mCc.GetVersion(
newerWorkerId, newerTxId, newerCommandId,
[&](const uint8_t* versionBuf, uint64_t versionSize) {
newerWorkerId, newerTxId, newerCommandId, [&](Slice versionSlice) {
auto* versionBuf = versionSlice.data();
auto versionSize = versionSlice.size();
auto& version = *reinterpret_cast<const Version*>(versionBuf);
switch (version.mType) {
case VersionType::kUpdate: {
Expand Down
4 changes: 2 additions & 2 deletions src/btree/TransactionKV.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,12 @@ std::tuple<OpCode, uint16_t> TransactionKV::getVisibleTuple(Slice payload, ValCa
switch (tuple->mFormat) {
case TupleFormat::kChained: {
const auto* const chainedTuple = ChainedTuple::From(payload.data());
ret = chainedTuple->GetVisibleTuple(payload, callback);
ret = chainedTuple->GetVisibleTuple(payload, std::move(callback));
JUMPMU_RETURN ret;
}
case TupleFormat::kFat: {
const auto* const fatTuple = FatTuple::From(payload.data());
ret = fatTuple->GetVisibleTuple(callback);
ret = fatTuple->GetVisibleTuple(std::move(callback));
JUMPMU_RETURN ret;
}
default: {
Expand Down
4 changes: 2 additions & 2 deletions src/btree/Tuple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ bool Tuple::ToFat(PessimisticExclusiveIterator& xIter) {
}

if (!cr::WorkerContext::My().mCc.GetVersion(
newerWorkerId, newerTxId, newerCommandId, [&](const uint8_t* version, uint64_t) {
newerWorkerId, newerTxId, newerCommandId, [&](Slice version) {
numDeltasToReplace++;
const auto& chainedDelta = *UpdateVersion::From(version);
const auto& chainedDelta = *UpdateVersion::From(version.data());
LS_DCHECK(chainedDelta.mType == VersionType::kUpdate);
LS_DCHECK(chainedDelta.mIsDelta);

Expand Down
51 changes: 16 additions & 35 deletions src/concurrency/GroupCommitter.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "leanstore/concurrency/GroupCommitter.hpp"

#include "leanstore/concurrency/CRManager.hpp"
#include "leanstore/concurrency/Transaction.hpp"
#include "leanstore/concurrency/WorkerContext.hpp"
#include "leanstore/profiling/counters/CPUCounters.hpp"
#include "leanstore/telemetry/MetricOnlyTimer.hpp"
Expand Down Expand Up @@ -41,7 +42,7 @@ void GroupCommitter::runImpl() {
}

void GroupCommitter::collectWalRecords(TXID& minFlushedSysTx, TXID& minFlushedUsrTx,
std::vector<uint64_t>& numRfaTxs,
std::vector<uint64_t>& numRfaTxs [[maybe_unused]],
std::vector<WalFlushReq>& walFlushReqCopies) {
leanstore::telemetry::MetricOnlyTimer timer;
SCOPED_DEFER({
Expand All @@ -54,11 +55,6 @@ void GroupCommitter::collectWalRecords(TXID& minFlushedSysTx, TXID& minFlushedUs
for (auto workerId = 0u; workerId < mWorkerCtxs.size(); workerId++) {
auto& logging = mWorkerCtxs[workerId]->mLogging;

// collect logging info
std::unique_lock<std::mutex> guard(logging.mRfaTxToCommitMutex);
numRfaTxs[workerId] = logging.mRfaTxToCommit.size();
guard.unlock();

auto lastReqVersion = walFlushReqCopies[workerId].mVersion;
auto version = logging.mWalFlushReq.Get(walFlushReqCopies[workerId]);
walFlushReqCopies[workerId].mVersion = version;
Expand Down Expand Up @@ -120,7 +116,7 @@ void GroupCommitter::flushWalRecords() {
}

void GroupCommitter::determineCommitableTx(TXID minFlushedSysTx, TXID minFlushedUsrTx,
const std::vector<uint64_t>& numRfaTxs,
const std::vector<uint64_t>& numRfaTxs [[maybe_unused]],
const std::vector<WalFlushReq>& walFlushReqCopies) {
leanstore::telemetry::MetricOnlyTimer timer;
SCOPED_DEFER({
Expand All @@ -137,41 +133,26 @@ void GroupCommitter::determineCommitableTx(TXID minFlushedSysTx, TXID minFlushed
// commit transactions with remote dependency
TXID maxCommitTs = 0;
{
std::unique_lock<std::mutex> g(logging.mTxToCommitMutex);
uint64_t i = 0;
for (; i < logging.mTxToCommit.size(); ++i) {
auto& tx = logging.mTxToCommit[i];
if (!tx.CanCommit(minFlushedSysTx, minFlushedUsrTx)) {
break;
}
maxCommitTs = std::max<TXID>(maxCommitTs, tx.mCommitTs);
tx.mState = TxState::kCommitted;
if (auto* tx = logging.mActiveTxToCommit.load(std::memory_order_relaxed);
tx != nullptr && tx->CanCommit(minFlushedSysTx, minFlushedUsrTx)) {
maxCommitTs = tx->mCommitTs;
tx->mState = TxState::kCommitted;
logging.mActiveTxToCommit.store(nullptr);
LS_DLOG("Transaction with remote dependency committed, workerId={}, startTs={}, "
"commitTs={}, minFlushedSysTx={}, minFlushedUsrTx={}",
workerId, tx.mStartTs, tx.mCommitTs, minFlushedSysTx, minFlushedUsrTx);
}
if (i > 0) {
logging.mTxToCommit.erase(logging.mTxToCommit.begin(), logging.mTxToCommit.begin() + i);
workerId, tx->mStartTs, tx->mCommitTs, minFlushedSysTx, minFlushedUsrTx);
}
}

// commit transactions without remote dependency
TXID maxCommitTsRfa = 0;
{
std::unique_lock<std::mutex> g(logging.mRfaTxToCommitMutex);
uint64_t i = 0;
for (; i < numRfaTxs[workerId]; ++i) {
auto& tx = logging.mRfaTxToCommit[i];
maxCommitTsRfa = std::max<TXID>(maxCommitTsRfa, tx.mCommitTs);
tx.mState = TxState::kCommitted;
LS_DLOG("Transaction without remote dependency committed, workerId={}, startTs={}, "
"commitTs={}",
workerId, tx.mStartTs, tx.mCommitTs);
}
if (i > 0) {
logging.mRfaTxToCommit.erase(logging.mRfaTxToCommit.begin(),
logging.mRfaTxToCommit.begin() + i);
}
if (auto* tx = logging.mActiveRfaTxToCommit.load(std::memory_order_relaxed); tx != nullptr) {
maxCommitTsRfa = tx->mCommitTs;
tx->mState = TxState::kCommitted;
logging.mActiveRfaTxToCommit.store(nullptr);
LS_DLOG("Transaction without remote dependency committed, workerId={}, "
"startTs={}, commitTs={}",
workerId, tx->mStartTs, tx->mCommitTs);
}

// Has committed transaction
Expand Down
5 changes: 2 additions & 3 deletions src/concurrency/HistoryStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ void HistoryStorage::PutVersion(TXID txId, COMMANDID commandId, TREEID treeId, b
}

bool HistoryStorage::GetVersion(TXID newerTxId, COMMANDID newerCommandId,
const bool isRemoveCommand,
std::function<void(const uint8_t*, uint64_t)> cb) {
const bool isRemoveCommand, std::function<void(Slice)> cb) {
volatile BasicKV* btree = (isRemoveCommand) ? mRemoveIndex : mUpdateIndex;
const uint64_t keySize = sizeof(newerTxId) + sizeof(newerCommandId);
uint8_t keyBuffer[keySize];
Expand All @@ -114,7 +113,7 @@ bool HistoryStorage::GetVersion(TXID newerTxId, COMMANDID newerCommandId,
BasicKV* kv = const_cast<BasicKV*>(btree);
auto ret = kv->Lookup(key, [&](const Slice& payload) {
const auto& versionContainer = *VersionMeta::From(payload.data());
cb(versionContainer.mPayload, payload.length() - sizeof(VersionMeta));
cb({versionContainer.mPayload, payload.length() - sizeof(VersionMeta)});
});

if (ret == OpCode::kNotFound) {
Expand Down
6 changes: 2 additions & 4 deletions src/concurrency/WorkerContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,9 @@ void WorkerContext::CommitTx() {

// for group commit
if (mActiveTx.mHasRemoteDependency) {
std::unique_lock<std::mutex> g(mLogging.mTxToCommitMutex);
mLogging.mTxToCommit.push_back(mActiveTx);
mLogging.mActiveTxToCommit.store(&mActiveTx);
} else {
std::unique_lock<std::mutex> g(mLogging.mRfaTxToCommitMutex);
mLogging.mRfaTxToCommit.push_back(mActiveTx);
mLogging.mActiveRfaTxToCommit.store(&mActiveTx);
}

// Cleanup versions in history tree
Expand Down
Loading