Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 143 additions & 10 deletions examples/proxygen_cache/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <folly/String.h>
#include <folly/init/Init.h>
#include <glog/logging.h>
#include <gflags/gflags.h>
#include <proxygen/httpserver/Filters.h>
#include <proxygen/httpserver/HTTPServer.h>
#include <proxygen/httpserver/HTTPServerOptions.h>
Expand All @@ -18,6 +19,11 @@
#include <memory>
#include <string>
#include <utility>
#include <algorithm>
#include <chrono>
#include <cstdint>
#include <cstring>
#include <stdexcept>

using proxygen::HTTPMessage;
using proxygen::HTTPServer;
Expand All @@ -32,8 +38,101 @@ using CacheConfig = facebook::cachelib::LruAllocator::Config;
using WriteHandle = Cache::WriteHandle;
using ReadHandle = Cache::ReadHandle;

struct SampleDataView {
const uint8_t* data;
size_t len;
size_t offset;
};

class SampleData {
public:
static constexpr size_t kDefaultSize = 64ull * 1024 * 1024; // 64 MiB
static constexpr size_t kAlign = 64;

explicit SampleData(size_t bytes = kDefaultSize)
: size_(bytes) {
if (bytes == 0) throw std::invalid_argument("data size must be > 0");
#if defined(_ISOC11_SOURCE) || (__STDC_VERSION__ >= 201112L) || defined(_GNU_SOURCE)
data_ = static_cast<uint8_t*>(aligned_alloc(kAlign, roundUp(bytes, kAlign)));
if (!data_) throw std::bad_alloc();
#else
if (posix_memalign(reinterpret_cast<void**>(&data_), kAlign, roundUp(bytes, kAlign)))
throw std::bad_alloc();
#endif
fillWithPattern();
}

SampleData(const SampleData&) = delete;
SampleData& operator=(const SampleData&) = delete;
SampleData(SampleData&& o) noexcept : data_(o.data_), size_(o.size_) {
o.data_ = nullptr; o.size_ = 0;
}
SampleData& operator=(SampleData&& o) noexcept {
if (this != &o) {
free(data_);
data_ = o.data_;
size_ = o.size_;
o.data_ = nullptr;
o.size_ = 0;
}
return *this;
}
~SampleData() { free(data_); }

size_t size() const noexcept { return size_; }
const uint8_t* raw() const noexcept { return data_; }

// Get a random aligned slice [ptr, ptr+len)
SampleDataView getData(size_t len) const {
if (len == 0) return {data_, 0, 0};
if (len > size_) throw std::invalid_argument("len exceeds data size");

thread_local uint64_t seed = initSeed();
uint64_t r = rngNext(seed);

size_t maxOff = size_ - len;
maxOff &= ~(kAlign - 1);
size_t off = static_cast<size_t>(r % (maxOff + 1));
off &= ~(kAlign - 1);

return {data_ + off, len, off};
}

private:
static size_t roundUp(size_t n, size_t a) { return (n + (a - 1)) & ~(a - 1); }

static inline uint64_t initSeed() {
uint64_t t = static_cast<uint64_t>(
std::chrono::high_resolution_clock::now().time_since_epoch().count());
uint64_t x = t ^ (0x9E3779B97F4A7C15ull * reinterpret_cast<uintptr_t>(&t));
return x ? x : 0xA5A5A5A5A5A5A5A5ull;
}

static inline uint64_t rngNext(uint64_t& s) {
s ^= s >> 12; s ^= s << 25; s ^= s >> 27;
return s * 0x2545F4914F6CDD1Dull;
}

void fillWithPattern() {
static constexpr char kPattern[] = "the quick brown fox jumps over the lazy dog";
const size_t plen = sizeof(kPattern) - 1;
for (size_t off = 0; off < size_;) {
const size_t n = std::min(plen, size_ - off);
std::memcpy(data_ + off, kPattern, n);
off += n;
}
}

uint8_t* data_{nullptr};
size_t size_{0};
};


struct CacheCtx {
std::unique_ptr<Cache> cache;
SampleData sampleData_;
bool enableLookaside{false};
std::function<size_t()> getSampleDataSize;
facebook::cachelib::PoolId poolId{};
};

Expand Down Expand Up @@ -67,7 +166,8 @@ class CacheHandler : public RequestHandler {

if (method == proxygen::HTTPMethod::GET) {
handleGet(key);
} else if (method == proxygen::HTTPMethod::PUT) {
} else if (method == proxygen::HTTPMethod::PUT ||
method == proxygen::HTTPMethod::POST) {
handlePut(key);
} else if (method == proxygen::HTTPMethod::DELETE) {
handleDelete(key);
Expand Down Expand Up @@ -106,6 +206,18 @@ class CacheHandler : public RequestHandler {
void handleGet(const std::string& key) {
ReadHandle h = ctx_->cache->find(key);
if (!h) {
if (ctx_->enableLookaside) {
WriteHandle wh = ctx_->cache->allocate(ctx_->poolId, key, ctx_->getSampleDataSize());
if (wh) {
auto sample = ctx_->sampleData_.getData(ctx_->getSampleDataSize());
std::memcpy(wh->getMemory(), sample.data, sample.len);
ctx_->cache->insertOrReplace(std::move(wh));
VLOG(1) << "Lookaside: populated key " << key << " with "
<< sample.len << " bytes from offset " << sample.offset;
} else {
VLOG(1) << "Lookaside: allocation failed for key " << key;
}
}
ResponseBuilder(downstream_)
.status(404, "Not Found")
.body("Key not found\n")
Expand Down Expand Up @@ -206,24 +318,45 @@ std::unique_ptr<Cache> buildCache(size_t cacheBytes,

} // anonymous namespace

// ---- Flags ----
DEFINE_uint32(port, 8111, "Port to listen on");
DEFINE_uint64(cache_size_mb, 1ULL * 1024, "Cache size in MB");
DEFINE_int32(item_size, 65536, "Default item size (bytes) for lookaside fills");
DEFINE_bool(enable_lookaside, false, "Enable lookaside population on GET miss");

int main(int argc, char* argv[]) {
folly::Init init(&argc, &argv);
FLAGS_logtostderr = 1; // show logs on stderr
FLAGS_minloglevel = 0; // include INFO
FLAGS_stderrthreshold = 0;
FLAGS_v = 2; // enable VLOG(1..2)
// Flags (basic): port and cache size
uint16_t port = 8111;
size_t cacheBytes = 1024 * 1024 * 1024; // 1 GB
if (argc >= 2) {
port = static_cast<uint16_t>(std::stoi(argv[1]));
FLAGS_v = 0; // enable VLOG(1..2)

// All argument parsing is handled by gflags via folly::Init above.
// Read the values from FLAGS_*.
const uint16_t port = static_cast<uint16_t>(FLAGS_port);
const size_t cacheBytes = static_cast<size_t>(FLAGS_cache_size_mb) * 1024 * 1024;
const bool enableLookaside = FLAGS_enable_lookaside;
const int itemSize = FLAGS_item_size;

if (itemSize <= 0) {
LOG(ERROR) << "--item_size must be > 0 (got " << itemSize << ")";
return 2;
}
if (argc >= 3) {
cacheBytes = folly::to<size_t>(argv[2]); // bytes
if (cacheBytes < static_cast<size_t>(itemSize)) {
LOG(WARNING) << "--cache_size_mb (" << cacheBytes
<< ") is smaller than --item_size (" << itemSize
<< "); allocations may fail.";
}

LOG(INFO) << "Effective config: port=" << port
<< " cache_size=" << cacheBytes
<< " enable_lookaside=" << (enableLookaside ? "true" : "false")
<< " item_size=" << itemSize;

auto ctx = std::make_shared<CacheCtx>();
ctx->cache = buildCache(cacheBytes, ctx->poolId);
ctx->enableLookaside = enableLookaside;
ctx->getSampleDataSize = [itemSize]() { return static_cast<size_t>(itemSize); };

proxygen::HTTPServerOptions options;
options.threads = static_cast<size_t>(std::thread::hardware_concurrency());
Expand All @@ -234,7 +367,7 @@ int main(int argc, char* argv[]) {

HTTPServer server(std::move(options));
std::vector<HTTPServer::IPConfig> ips = {
{folly::SocketAddress("0.0.0.0", port, true),
{folly::SocketAddress("0.0.0.0", static_cast<uint16_t>(port), true),
proxygen::HTTPServer::Protocol::HTTP}};
server.bind(ips);

Expand Down
Loading