Skip to content

Commit d3ef924

Browse files
author
Sara McAllister
committed
Kangaroo module
1 parent 7db2c64 commit d3ef924

26 files changed

+4195
-0
lines changed

cachelib/navy/CMakeLists.txt

+10
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,16 @@ add_library (cachelib_navy
3636
common/Types.cpp
3737
driver/Driver.cpp
3838
Factory.cpp
39+
kangaroo/ChainedLogIndex.cpp
40+
kangaroo/Kangaroo.cpp
41+
kangaroo/KangarooLog.cpp
42+
kangaroo/KangarooLogSegment.cpp
43+
kangaroo/KangarooBucketStorage.cpp
44+
kangaroo/LogBucket.cpp
45+
kangaroo/RripBucket.cpp
46+
kangaroo/RripBitVector.cpp
47+
kangaroo/RripBucketStorage.cpp
48+
kangaroo/KangarooSizeDistribution.cpp
3949
scheduler/ThreadPoolJobScheduler.cpp
4050
scheduler/ThreadPoolJobQueue.cpp
4151
serialization/RecordIO.cpp
+254
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
#include <mutex>
2+
#include <shared_mutex>
3+
4+
#include "cachelib/navy/kangaroo/ChainedLogIndex.h"
5+
6+
namespace facebook {
7+
namespace cachelib {
8+
namespace navy {
9+
10+
void ChainedLogIndex::allocate() {
11+
{
12+
numAllocations_++;
13+
allocations.resize(numAllocations_);
14+
allocations[numAllocations_ - 1] = new ChainedLogIndexEntry[allocationSize_];
15+
}
16+
}
17+
18+
ChainedLogIndex::ChainedLogIndex(uint64_t numHashBuckets,
19+
uint16_t allocationSize, SetNumberCallback setNumberCb)
20+
: numHashBuckets_{numHashBuckets},
21+
allocationSize_{allocationSize},
22+
numMutexes_{numHashBuckets / 10 + 1},
23+
setNumberCb_{setNumberCb} {
24+
mutexes_ = std::make_unique<folly::SharedMutex[]>(numMutexes_);
25+
index_.resize(numHashBuckets_, -1);
26+
{
27+
std::unique_lock<folly::SharedMutex> lock{allocationMutex_};
28+
allocate();
29+
}
30+
}
31+
32+
ChainedLogIndex::~ChainedLogIndex() {
33+
{
34+
std::unique_lock<folly::SharedMutex> lock{allocationMutex_};
35+
/*for (uint64_t i = 0; i < numAllocations_; i++) {
36+
delete allocations[i];
37+
}*/
38+
}
39+
}
40+
41+
ChainedLogIndexEntry* ChainedLogIndex::findEntryNoLock(uint16_t offset) {
42+
uint16_t arrayOffset = offset % allocationSize_;
43+
uint16_t vectorOffset = offset / allocationSize_;
44+
if (vectorOffset > numAllocations_) {
45+
return nullptr;
46+
}
47+
return &allocations[vectorOffset][arrayOffset];
48+
}
49+
50+
ChainedLogIndexEntry* ChainedLogIndex::findEntry(uint16_t offset) {
51+
std::shared_lock<folly::SharedMutex> lock{allocationMutex_};
52+
return findEntryNoLock(offset);
53+
}
54+
55+
ChainedLogIndexEntry* ChainedLogIndex::allocateEntry(uint16_t& offset) {
56+
std::unique_lock<folly::SharedMutex> lock{allocationMutex_};
57+
if (nextEmpty_ >= numAllocations_ * allocationSize_) {
58+
allocate();
59+
}
60+
offset = nextEmpty_;
61+
ChainedLogIndexEntry* entry = findEntryNoLock(offset);
62+
if (nextEmpty_ == maxSlotUsed_) {
63+
nextEmpty_++;
64+
maxSlotUsed_++;
65+
} else {
66+
nextEmpty_ = entry->next_;
67+
}
68+
entry->next_ = -1;
69+
return entry;
70+
}
71+
72+
uint16_t ChainedLogIndex::releaseEntry(uint16_t offset) {
73+
std::unique_lock<folly::SharedMutex> lock{allocationMutex_};
74+
ChainedLogIndexEntry* entry = findEntryNoLock(offset);
75+
uint16_t ret = entry->next_;
76+
entry->invalidate();
77+
entry->next_ = nextEmpty_;
78+
nextEmpty_ = offset;
79+
return ret;
80+
}
81+
82+
PartitionOffset ChainedLogIndex::lookup(HashedKey hk, bool hit, uint32_t* hits) {
83+
const auto lib = getLogIndexBucket(hk);
84+
uint32_t tag = createTag(hk);
85+
{
86+
std::shared_lock<folly::SharedMutex> lock{getMutex(lib)};
87+
ChainedLogIndexEntry* currentHead = findEntry(index_[lib.index()]);
88+
while (currentHead) {
89+
if (currentHead->isValid() &&
90+
currentHead->tag() == tag) {
91+
if (hit) {
92+
currentHead->incrementHits();
93+
}
94+
if (hits != nullptr) {
95+
*hits = currentHead->hits();
96+
}
97+
return currentHead->offset();
98+
}
99+
currentHead = findEntry(currentHead->next());
100+
}
101+
}
102+
hits = 0;
103+
return PartitionOffset(0, false);
104+
}
105+
106+
Status ChainedLogIndex::insert(HashedKey hk, PartitionOffset po, uint8_t hits) {
107+
const auto lib = getLogIndexBucket(hk);
108+
uint32_t tag = createTag(hk);
109+
insert(tag, lib, po, hits);
110+
}
111+
112+
Status ChainedLogIndex::insert(uint32_t tag, KangarooBucketId bid,
113+
PartitionOffset po, uint8_t hits) {
114+
const auto lib = getLogIndexBucketFromSetBucket(bid);
115+
insert(tag, lib, po, hits);
116+
}
117+
118+
Status ChainedLogIndex::insert(uint32_t tag, LogIndexBucket lib,
119+
PartitionOffset po, uint8_t hits) {
120+
{
121+
std::unique_lock<folly::SharedMutex> lock{getMutex(lib)};
122+
123+
uint16_t* oldNext = &index_[lib.index()];
124+
ChainedLogIndexEntry* nextEntry = findEntry(index_[lib.index()]);
125+
while (nextEntry) {
126+
if (nextEntry->isValid() && nextEntry->tag() == tag) {
127+
nextEntry->populateEntry(po, tag, hits);
128+
return Status::Ok;
129+
}
130+
oldNext = &nextEntry->next_;
131+
nextEntry = findEntry(*oldNext);
132+
}
133+
134+
uint16_t entryOffset;
135+
ChainedLogIndexEntry* newEntry = allocateEntry(entryOffset);
136+
newEntry->populateEntry(po, tag, hits);
137+
(*oldNext) = entryOffset;
138+
}
139+
return Status::Ok;
140+
}
141+
142+
Status ChainedLogIndex::remove(HashedKey hk, PartitionOffset po) {
143+
uint64_t tag = createTag(hk);
144+
const auto lib = getLogIndexBucket(hk);
145+
return remove(tag, lib, po);
146+
}
147+
148+
Status ChainedLogIndex::remove(uint64_t tag, KangarooBucketId bid, PartitionOffset po) {
149+
auto lib = getLogIndexBucketFromSetBucket(bid);
150+
return remove(tag, lib, po);
151+
}
152+
153+
Status ChainedLogIndex::remove(uint64_t tag, LogIndexBucket lib, PartitionOffset po) {
154+
{
155+
std::unique_lock<folly::SharedMutex> lock{getMutex(lib)};
156+
ChainedLogIndexEntry* nextEntry = findEntry(index_[lib.index()]);
157+
uint16_t* oldNext = &index_[lib.index()];
158+
while (nextEntry) {
159+
if (nextEntry->isValid() && nextEntry->tag() == tag && nextEntry->offset() == po) {
160+
*oldNext = releaseEntry(*oldNext);
161+
return Status::Ok;
162+
}
163+
oldNext = &nextEntry->next_;
164+
nextEntry = findEntry(nextEntry->next_);
165+
}
166+
}
167+
return Status::NotFound;
168+
}
169+
170+
// Counts number of items in log corresponding to bucket
171+
uint64_t ChainedLogIndex::countBucket(HashedKey hk) {
172+
const auto lib = getLogIndexBucket(hk);
173+
uint64_t count = 0;
174+
{
175+
std::shared_lock<folly::SharedMutex> lock{getMutex(lib)};
176+
ChainedLogIndexEntry* nextEntry = findEntry(index_[lib.index()]);
177+
while (nextEntry) {
178+
if (nextEntry->isValid()) {
179+
count++;
180+
}
181+
nextEntry = findEntry(nextEntry->next_);
182+
}
183+
}
184+
return count;
185+
}
186+
187+
// Get iterator for all items in the same bucket
188+
ChainedLogIndex::BucketIterator ChainedLogIndex::getHashBucketIterator(HashedKey hk) {
189+
const auto lib = getLogIndexBucket(hk);
190+
auto idx = setNumberCb_(hk.keyHash());
191+
{
192+
std::shared_lock<folly::SharedMutex> lock{getMutex(lib)};
193+
auto currentHead = findEntry(index_[lib.index()]);
194+
while (currentHead) {
195+
if (currentHead->isValid()) {
196+
return BucketIterator(idx, currentHead);
197+
}
198+
currentHead = findEntry(currentHead->next_);
199+
}
200+
}
201+
return BucketIterator();
202+
}
203+
204+
ChainedLogIndex::BucketIterator ChainedLogIndex::getNext(ChainedLogIndex::BucketIterator bi) {
205+
if (bi.done()) {
206+
return bi;
207+
}
208+
auto lib = getLogIndexBucketFromSetBucket(bi.bucket_);
209+
{
210+
std::shared_lock<folly::SharedMutex> lock{getMutex(lib)};
211+
auto currentHead = findEntry(bi.nextEntry_);
212+
while (currentHead) {
213+
if (currentHead->isValid()) {
214+
return BucketIterator(bi.bucket_, currentHead);
215+
}
216+
currentHead = findEntry(currentHead->next_);
217+
}
218+
}
219+
return BucketIterator();
220+
}
221+
222+
PartitionOffset ChainedLogIndex::find(KangarooBucketId bid, uint64_t tag) {
223+
auto lib = getLogIndexBucketFromSetBucket(bid);
224+
{
225+
std::shared_lock<folly::SharedMutex> lock{getMutex(lib)};
226+
ChainedLogIndexEntry* nextEntry = findEntry(index_[lib.index()]);
227+
uint16_t* oldNext = &index_[lib.index()];
228+
while (nextEntry) {
229+
if (nextEntry->isValid() && nextEntry->tag() == tag) {
230+
PartitionOffset po = nextEntry->offset();
231+
return po;
232+
}
233+
oldNext = &nextEntry->next_;
234+
nextEntry = findEntry(nextEntry->next_);
235+
}
236+
}
237+
return PartitionOffset(0, false);
238+
}
239+
240+
ChainedLogIndex::LogIndexBucket ChainedLogIndex::getLogIndexBucket(HashedKey hk) {
241+
return getLogIndexBucketFromSetBucket(setNumberCb_(hk.keyHash()));
242+
}
243+
244+
ChainedLogIndex::LogIndexBucket ChainedLogIndex::getLogIndexBucket(uint64_t key) {
245+
return getLogIndexBucketFromSetBucket(setNumberCb_(key));
246+
}
247+
248+
ChainedLogIndex::LogIndexBucket ChainedLogIndex::getLogIndexBucketFromSetBucket(KangarooBucketId bid) {
249+
return LogIndexBucket(bid.index() % numHashBuckets_);
250+
}
251+
252+
} // namespace navy
253+
} // namespace cachelib
254+
} // namespace facebook

0 commit comments

Comments
 (0)