Skip to content

Commit 40bc738

Browse files
committed
Binary KVReplayGenerator
------------------------ - binary request generation and replay - fast forwarding of a trace - preloading requests into memory
1 parent ff44c3c commit 40bc738

File tree

11 files changed

+359
-72
lines changed

11 files changed

+359
-72
lines changed

cachelib/cachebench/cache/Cache.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ class Cache {
312312
// return true if the key was previously detected to be inconsistent. This
313313
// is useful only when consistency checking is enabled by calling
314314
// enableConsistencyCheck()
315-
bool isInvalidKey(const std::string& key) {
315+
bool isInvalidKey(const std::string_view key) {
316316
return invalidKeys_[key].load(std::memory_order_relaxed);
317317
}
318318

@@ -430,7 +430,7 @@ class Cache {
430430
// Since this can be accessed from multiple threads, the map is initialized
431431
// during start up and only the value is updated by flipping the bit
432432
// atomically.
433-
std::unordered_map<std::string, std::atomic<bool>> invalidKeys_;
433+
std::unordered_map<std::string_view, std::atomic<bool>> invalidKeys_;
434434

435435
// number of inconsistency detected so far with the operations
436436
std::atomic<unsigned int> inconsistencyCount_{0};

cachelib/cachebench/runner/AsyncCacheStressor.h

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,9 @@ class AsyncCacheStressor : public Stressor {
222222
ThroughputStats& stats,
223223
const Request* req,
224224
folly::EventBase* evb,
225-
const std::string* key) {
225+
const std::string_view& key) {
226226
++stats.get;
227-
auto lock = chainedItemAcquireSharedLock(*key);
227+
auto lock = chainedItemAcquireSharedLock(key);
228228

229229
if (ticker_) {
230230
ticker_->updateTimeStamp(req->timestamp);
@@ -233,8 +233,7 @@ class AsyncCacheStressor : public Stressor {
233233
// add a distribution over sequences of requests/access patterns
234234
// e.g. get-no-set and set-no-get
235235

236-
auto onReadyFn = [&, req, key = *key,
237-
l = std::move(lock)](auto hdl) mutable {
236+
auto onReadyFn = [&, req, key, l = std::move(lock)](auto hdl) mutable {
238237
auto result = OpResultType::kGetMiss;
239238

240239
if (hdl == nullptr) {
@@ -247,7 +246,7 @@ class AsyncCacheStressor : public Stressor {
247246
// appropriate here)
248247
l.unlock();
249248
auto xlock = chainedItemAcquireUniqueLock(key);
250-
setKey(pid, stats, &key, *(req->sizeBegin), req->ttlSecs,
249+
setKey(pid, stats, key, *(req->sizeBegin), req->ttlSecs,
251250
req->admFeatureMap);
252251
}
253252
} else {
@@ -260,8 +259,8 @@ class AsyncCacheStressor : public Stressor {
260259
}
261260
};
262261

263-
cache_->recordAccess(*key);
264-
auto sf = cache_->asyncFind(*key);
262+
cache_->recordAccess(key);
263+
auto sf = cache_->asyncFind(key);
265264
if (sf.isReady()) {
266265
// If the handle is ready, call onReadyFn directly to process the handle
267266
onReadyFn(std::move(sf).value());
@@ -283,9 +282,9 @@ class AsyncCacheStressor : public Stressor {
283282
ThroughputStats& stats,
284283
const Request* req,
285284
folly::EventBase* evb,
286-
const std::string* key) {
285+
const std::string_view& key) {
287286
++stats.get;
288-
auto lock = chainedItemAcquireUniqueLock(*key);
287+
auto lock = chainedItemAcquireUniqueLock(key);
289288

290289
// This was moved outside the lambda, as otherwise gcc-8.x crashes with an
291290
// internal compiler error here (suspected regression in folly).
@@ -297,7 +296,7 @@ class AsyncCacheStressor : public Stressor {
297296
++stats.getMiss;
298297

299298
++stats.set;
300-
wHdl = cache_->allocate(pid, *key, *(req->sizeBegin), req->ttlSecs);
299+
wHdl = cache_->allocate(pid, key, *(req->sizeBegin), req->ttlSecs);
301300
if (!wHdl) {
302301
++stats.setFailure;
303302
return;
@@ -327,7 +326,7 @@ class AsyncCacheStressor : public Stressor {
327326
};
328327

329328
// Always use asyncFind as findToWrite is sync when using HybridCache
330-
auto sf = cache_->asyncFind(*key);
329+
auto sf = cache_->asyncFind(key);
331330
if (sf.isReady()) {
332331
onReadyFn(std::move(sf).value());
333332
return;
@@ -345,10 +344,10 @@ class AsyncCacheStressor : public Stressor {
345344
void asyncUpdate(ThroughputStats& stats,
346345
const Request* req,
347346
folly::EventBase* evb,
348-
const std::string* key) {
347+
const std::string_view& key) {
349348
++stats.get;
350349
++stats.update;
351-
auto lock = chainedItemAcquireUniqueLock(*key);
350+
auto lock = chainedItemAcquireUniqueLock(key);
352351
if (ticker_) {
353352
ticker_->updateTimeStamp(req->timestamp);
354353
}
@@ -363,7 +362,7 @@ class AsyncCacheStressor : public Stressor {
363362
cache_->updateItemRecordVersion(wHdl);
364363
};
365364

366-
auto sf = cache_->asyncFind(*key);
365+
auto sf = cache_->asyncFind(key);
367366
if (sf.isReady()) {
368367
onReadyFn(std::move(sf).value());
369368
return;
@@ -457,18 +456,18 @@ class AsyncCacheStressor : public Stressor {
457456
const auto pid = static_cast<PoolId>(opPoolDist(gen));
458457
const Request& req(getReq(pid, gen, lastRequestId));
459458
OpType op = req.getOp();
460-
const std::string* key = &(req.key);
461-
std::string oneHitKey;
459+
std::string_view key = req.key;
460+
std::string_view oneHitKey;
462461
if (op == OpType::kLoneGet || op == OpType::kLoneSet) {
463462
oneHitKey = Request::getUniqueKey();
464-
key = &oneHitKey;
463+
key = oneHitKey;
465464
}
466465

467466
OpResultType result(OpResultType::kNop);
468467
switch (op) {
469468
case OpType::kLoneSet:
470469
case OpType::kSet: {
471-
auto lock = chainedItemAcquireUniqueLock(*key);
470+
auto lock = chainedItemAcquireUniqueLock(key);
472471
result = setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs,
473472
req.admFeatureMap);
474473

@@ -481,8 +480,8 @@ class AsyncCacheStressor : public Stressor {
481480
}
482481
case OpType::kDel: {
483482
++stats.del;
484-
auto lock = chainedItemAcquireUniqueLock(*key);
485-
auto res = cache_->remove(*key);
483+
auto lock = chainedItemAcquireUniqueLock(key);
484+
auto res = cache_->remove(key);
486485
if (res == CacheT::RemoveRes::kNotFoundInRam) {
487486
++stats.delNotFound;
488487
}
@@ -532,7 +531,7 @@ class AsyncCacheStressor : public Stressor {
532531
OpResultType setKey(
533532
PoolId pid,
534533
ThroughputStats& stats,
535-
const std::string* key,
534+
const std::string_view& key,
536535
size_t size,
537536
uint32_t ttlSecs,
538537
const std::unordered_map<std::string, std::string>& featureMap) {
@@ -543,7 +542,7 @@ class AsyncCacheStressor : public Stressor {
543542
}
544543

545544
++stats.set;
546-
auto it = cache_->allocate(pid, *key, size, ttlSecs);
545+
auto it = cache_->allocate(pid, key, size, ttlSecs);
547546
if (it == nullptr) {
548547
++stats.setFailure;
549548
return OpResultType::kSetFailure;

cachelib/cachebench/runner/CacheStressor.h

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -325,24 +325,24 @@ class CacheStressor : public Stressor {
325325
const auto pid = static_cast<PoolId>(opPoolDist(gen));
326326
const Request& req(getReq(pid, gen, lastRequestId));
327327
OpType op = req.getOp();
328-
const std::string* key = &(req.key);
329-
std::string oneHitKey;
328+
std::string_view key = req.key;
329+
std::string_view oneHitKey;
330330
if (op == OpType::kLoneGet || op == OpType::kLoneSet) {
331331
oneHitKey = Request::getUniqueKey();
332-
key = &oneHitKey;
332+
key = oneHitKey;
333333
}
334334

335335
OpResultType result(OpResultType::kNop);
336336
switch (op) {
337337
case OpType::kLoneSet:
338338
case OpType::kSet: {
339339
if (config_.onlySetIfMiss) {
340-
auto it = cache_->find(*key);
340+
auto it = cache_->find(key);
341341
if (it != nullptr) {
342342
continue;
343343
}
344344
}
345-
auto lock = chainedItemAcquireUniqueLock(*key);
345+
auto lock = chainedItemAcquireUniqueLock(key);
346346
result = setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs,
347347
req.admFeatureMap, req.itemValue);
348348

@@ -352,17 +352,17 @@ class CacheStressor : public Stressor {
352352
case OpType::kGet: {
353353
++stats.get;
354354

355-
auto slock = chainedItemAcquireSharedLock(*key);
356-
auto xlock = decltype(chainedItemAcquireUniqueLock(*key)){};
355+
auto slock = chainedItemAcquireSharedLock(key);
356+
auto xlock = decltype(chainedItemAcquireUniqueLock(key)){};
357357

358358
if (ticker_) {
359359
ticker_->updateTimeStamp(req.timestamp);
360360
}
361361
// TODO currently pure lookaside, we should
362362
// add a distribution over sequences of requests/access patterns
363363
// e.g. get-no-set and set-no-get
364-
cache_->recordAccess(*key);
365-
auto it = cache_->find(*key);
364+
cache_->recordAccess(key);
365+
auto it = cache_->find(key);
366366
if (it == nullptr) {
367367
++stats.getMiss;
368368
result = OpResultType::kGetMiss;
@@ -372,7 +372,7 @@ class CacheStressor : public Stressor {
372372
// upgrade access privledges, (lock_upgrade is not
373373
// appropriate here)
374374
slock = {};
375-
xlock = chainedItemAcquireUniqueLock(*key);
375+
xlock = chainedItemAcquireUniqueLock(key);
376376
setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs,
377377
req.admFeatureMap, req.itemValue);
378378
}
@@ -384,22 +384,22 @@ class CacheStressor : public Stressor {
384384
}
385385
case OpType::kDel: {
386386
++stats.del;
387-
auto lock = chainedItemAcquireUniqueLock(*key);
388-
auto res = cache_->remove(*key);
387+
auto lock = chainedItemAcquireUniqueLock(key);
388+
auto res = cache_->remove(key);
389389
if (res == CacheT::RemoveRes::kNotFoundInRam) {
390390
++stats.delNotFound;
391391
}
392392
break;
393393
}
394394
case OpType::kAddChained: {
395395
++stats.get;
396-
auto lock = chainedItemAcquireUniqueLock(*key);
397-
auto it = cache_->findToWrite(*key);
396+
auto lock = chainedItemAcquireUniqueLock(key);
397+
auto it = cache_->findToWrite(key);
398398
if (!it) {
399399
++stats.getMiss;
400400

401401
++stats.set;
402-
it = cache_->allocate(pid, *key, *(req.sizeBegin), req.ttlSecs);
402+
it = cache_->allocate(pid, key, *(req.sizeBegin), req.ttlSecs);
403403
if (!it) {
404404
++stats.setFailure;
405405
break;
@@ -430,11 +430,11 @@ class CacheStressor : public Stressor {
430430
case OpType::kUpdate: {
431431
++stats.get;
432432
++stats.update;
433-
auto lock = chainedItemAcquireUniqueLock(*key);
433+
auto lock = chainedItemAcquireUniqueLock(key);
434434
if (ticker_) {
435435
ticker_->updateTimeStamp(req.timestamp);
436436
}
437-
auto it = cache_->findToWrite(*key);
437+
auto it = cache_->findToWrite(key);
438438
if (it == nullptr) {
439439
++stats.getMiss;
440440
++stats.updateMiss;
@@ -445,7 +445,7 @@ class CacheStressor : public Stressor {
445445
}
446446
case OpType::kCouldExist: {
447447
++stats.couldExistOp;
448-
if (!cache_->couldExist(*key)) {
448+
if (!cache_->couldExist(key)) {
449449
++stats.couldExistOpFalse;
450450
}
451451
break;
@@ -480,7 +480,7 @@ class CacheStressor : public Stressor {
480480
OpResultType setKey(
481481
PoolId pid,
482482
ThroughputStats& stats,
483-
const std::string* key,
483+
const std::string_view& key,
484484
size_t size,
485485
uint32_t ttlSecs,
486486
const std::unordered_map<std::string, std::string>& featureMap,
@@ -492,7 +492,7 @@ class CacheStressor : public Stressor {
492492
}
493493

494494
++stats.set;
495-
auto it = cache_->allocate(pid, *key, size, ttlSecs);
495+
auto it = cache_->allocate(pid, key, size, ttlSecs);
496496
if (it == nullptr) {
497497
++stats.setFailure;
498498
return OpResultType::kSetFailure;

cachelib/cachebench/runner/Stressor.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "cachelib/cachebench/runner/CacheStressor.h"
2222
#include "cachelib/cachebench/runner/FastShutdown.h"
2323
#include "cachelib/cachebench/runner/IntegrationStressor.h"
24+
#include "cachelib/cachebench/workload/BinaryKVReplayGenerator.h"
2425
#include "cachelib/cachebench/workload/KVReplayGenerator.h"
2526
#include "cachelib/cachebench/workload/OnlineGenerator.h"
2627
#include "cachelib/cachebench/workload/PieceWiseReplayGenerator.h"
@@ -142,6 +143,8 @@ std::unique_ptr<GeneratorBase> makeGenerator(const StressorConfig& config) {
142143
return std::make_unique<PieceWiseReplayGenerator>(config);
143144
} else if (config.generator == "replay") {
144145
return std::make_unique<KVReplayGenerator>(config);
146+
} else if (config.generator == "binary-replay") {
147+
return std::make_unique<BinaryKVReplayGenerator>(config);
145148
} else if (config.generator.empty() || config.generator == "workload") {
146149
// TODO: Remove the empty() check once we label workload-based configs
147150
// properly

cachelib/cachebench/util/Config.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ StressorConfig::StressorConfig(const folly::dynamic& configJson) {
9090
// If you added new fields to the configuration, update the JSONSetVal
9191
// to make them available for the json configs and increment the size
9292
// below
93-
checkCorrectSize<StressorConfig, 504>();
93+
checkCorrectSize<StressorConfig, 552>();
9494
}
9595

9696
bool StressorConfig::usesChainedItems() const {
@@ -197,6 +197,9 @@ DistributionConfig::DistributionConfig(const folly::dynamic& jsonConfig,
197197

198198
ReplayGeneratorConfig::ReplayGeneratorConfig(const folly::dynamic& configJson) {
199199
JSONSetVal(configJson, ampFactor);
200+
JSONSetVal(configJson, binaryFileName);
201+
JSONSetVal(configJson, fastForwardCount);
202+
JSONSetVal(configJson, preLoadReqs);
200203
JSONSetVal(configJson, replaySerializationMode);
201204
JSONSetVal(configJson, relaxedSerialIntervalMs);
202205
JSONSetVal(configJson, numAggregationFields);
@@ -215,7 +218,7 @@ ReplayGeneratorConfig::ReplayGeneratorConfig(const folly::dynamic& configJson) {
215218
"Unsupported request serialization mode: {}", replaySerializationMode));
216219
}
217220

218-
checkCorrectSize<ReplayGeneratorConfig, 128>();
221+
checkCorrectSize<ReplayGeneratorConfig, 176>();
219222
}
220223

221224
ReplayGeneratorConfig::SerializeMode

cachelib/cachebench/util/Config.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,18 @@ struct ReplayGeneratorConfig : public JSONConfig {
126126

127127
uint32_t ampFactor{1};
128128

129+
// the path of the binary file to make
130+
std::string binaryFileName{};
131+
132+
// The number of requests (not including ampFactor) to skip
133+
// in the trace. This is so that after warming up the cache
134+
// with a certain number of requests, we can easily reattach
135+
// and resume execution with different cache configurations.
136+
uint64_t fastForwardCount{0};
137+
138+
// The number of requests to pre load into the request queues
139+
uint64_t preLoadReqs{0};
140+
129141
// The time interval threshold when replaySerializationMode is relaxed.
130142
uint64_t relaxedSerialIntervalMs{500};
131143

0 commit comments

Comments
 (0)