Skip to content

Commit 091d54a

Browse files
committed
CLDSRV-783: Add token reservation system for rate limiting
Implement token reservation architecture where workers request capacity in advance from Redis. Workers maintain local token buffers and consume tokens in-memory (no Redis in hot path). Background refill job requests tokens every 100ms asynchronously. Redis enforces GCRA atomically at token grant time, ensuring strict quota enforcement across distributed workers. Components: - grantTokens.lua: Redis Lua script for atomic token granting - tokenBucket.js: WorkerTokenBucket class for local token management - refillJob.js: Background job for async token replenishment - client.js: Add grantTokens() method to Redis client
1 parent fef2c04 commit 091d54a

File tree

4 files changed

+530
-4
lines changed

4 files changed

+530
-4
lines changed

lib/api/apiUtils/rateLimit/client.js

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,22 @@ const Redis = require('ioredis');
55
const { config } = require('../../../Config');
66

77
const updateCounterScript = fs.readFileSync(`${__dirname }/updateCounter.lua`).toString();
8+
const reconcileCounterScript = fs.readFileSync(`${__dirname }/reconcileCounter.lua`).toString();
9+
const grantTokensScript = fs.readFileSync(`${__dirname }/grantTokens.lua`).toString();
810

911
const SCRIPTS = {
1012
updateCounter: {
1113
numberOfKeys: 1,
1214
lua: updateCounterScript,
1315
},
16+
reconcileCounter: {
17+
numberOfKeys: 1,
18+
lua: reconcileCounterScript,
19+
},
20+
grantTokens: {
21+
numberOfKeys: 1,
22+
lua: grantTokensScript,
23+
},
1424
};
1525

1626
class RateLimitClient {
@@ -25,13 +35,13 @@ class RateLimitClient {
2535
/**
2636
* @typedef {Object} CounterUpdateBatch
2737
* @property {string} key - counter key
28-
* @property {number} cost - cost to add to counter
38+
* @property {number} cost - per-worker cost to add to counter
2939
*/
3040

3141
/**
3242
* @typedef {Object} CounterUpdateBatchResult
3343
* @property {string} key - counter key
34-
* @property {number} value - current value of counter
44+
* @property {number} value - current value of counter after update
3545
*/
3646

3747
/**
@@ -41,8 +51,9 @@ class RateLimitClient {
4151
*/
4252

4353
/**
44-
* Add cost to the counter at key.
45-
* Returns the new value for the counter
54+
* Update local counter values in Redis by adding per-worker costs.
55+
* Each worker divides its consumption by worker count before syncing.
56+
* Redis sums all workers' costs to get total node consumption.
4657
*
4758
* @param {CounterUpdateBatch[]} batch - batch of counter updates
4859
* @param {RateLimitClient~batchUpdate} cb
@@ -65,6 +76,50 @@ class RateLimitClient {
6576
})));
6677
});
6778
}
79+
80+
/**
81+
* @callback RateLimitClient~grantTokens
82+
* @param {Error|null} err
83+
* @param {number|undefined} granted - Number of tokens granted (0 if denied)
84+
*/
85+
86+
/**
87+
* Request tokens from Redis with atomic GCRA enforcement
88+
*
89+
* This method atomically:
90+
* 1. Evaluates GCRA for N tokens
91+
* 2. Grants tokens if quota available
92+
* 3. Advances Redis counter by granted tokens
93+
*
94+
* Used by token reservation system to request capacity in advance.
95+
*
96+
* @param {string} bucketName - Bucket name
97+
* @param {number} requested - Number of tokens requested
98+
* @param {number} interval - Interval per request in ms
99+
* @param {number} burstCapacity - Burst capacity in ms
100+
* @param {RateLimitClient~grantTokens} cb - Callback
101+
*/
102+
grantTokens(bucketName, requested, interval, burstCapacity, cb) {
103+
const key = `throttling:bucket:${bucketName}:rps`;
104+
const now = Date.now();
105+
106+
this.redis.grantTokens(
107+
key,
108+
requested,
109+
interval,
110+
burstCapacity,
111+
now,
112+
(err, result) => {
113+
if (err) {
114+
return cb(err);
115+
}
116+
117+
// Result is number of tokens granted (0 if denied, partial if limited)
118+
const granted = parseInt(result, 10);
119+
return cb(null, granted);
120+
}
121+
);
122+
}
68123
}
69124

70125
let instance;
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
-- grantTokens.lua
2+
-- Atomically evaluates GCRA and grants tokens for rate limiting
3+
--
4+
-- This script implements token reservation: workers request capacity
5+
-- in advance, and this script enforces the node-level quota using GCRA.
6+
--
7+
-- KEYS[1]: Counter key (e.g., "throttling:bucket:mybucket:rps")
8+
-- ARGV[1]: Requested token count (number of requests)
9+
-- ARGV[2]: Interval per request in milliseconds
10+
-- ARGV[3]: Burst capacity in milliseconds (bucket size)
11+
-- ARGV[4]: Current timestamp in milliseconds (arrivedAt)
12+
--
13+
-- Returns: Number of tokens granted (0 if quota exhausted, partial if limited)
14+
15+
local key = KEYS[1]
16+
local requested = tonumber(ARGV[1])
17+
local interval = tonumber(ARGV[2])
18+
local burstCapacity = tonumber(ARGV[3])
19+
local arrivedAt = tonumber(ARGV[4])
20+
21+
-- Get current counter value (emptyAt timestamp)
22+
local emptyAt = tonumber(redis.call('GET', key) or 0)
23+
24+
-- GCRA evaluation
25+
-- expectedTime: When the bucket will be empty (or now if already empty)
26+
local expectedTime = math.max(emptyAt, arrivedAt)
27+
28+
-- Calculate cost for requested tokens
29+
local cost = requested * interval
30+
31+
-- Check if request fits within burst capacity
32+
-- Allow if: expectedTime + cost <= arrivedAt + burstCapacity
33+
local allowAt = expectedTime + cost
34+
local burstLimit = arrivedAt + burstCapacity
35+
36+
if allowAt <= burstLimit then
37+
-- Full request allowed
38+
local newEmptyAt = expectedTime + cost
39+
redis.call('SET', key, newEmptyAt)
40+
redis.call('PEXPIRE', key, burstCapacity + 10000) -- TTL = burst + 10s buffer
41+
42+
return requested
43+
else
44+
-- Request exceeds capacity, grant partial tokens
45+
-- available = (arrivedAt + burstCapacity - expectedTime) / interval
46+
local availableCapacity = burstLimit - expectedTime
47+
48+
if availableCapacity > 0 then
49+
-- Grant partial tokens
50+
local granted = math.floor(availableCapacity / interval)
51+
52+
if granted > 0 then
53+
local actualCost = granted * interval
54+
local newEmptyAt = expectedTime + actualCost
55+
redis.call('SET', key, newEmptyAt)
56+
redis.call('PEXPIRE', key, burstCapacity + 10000)
57+
58+
return granted
59+
end
60+
end
61+
62+
-- No capacity available
63+
return 0
64+
end
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
const werelogs = require('werelogs');
2+
3+
const { getAllTokenBuckets, cleanupTokenBuckets } = require('./tokenBucket');
4+
5+
const logger = new werelogs.Logger('S3');
6+
7+
let refillInterval = null;
8+
9+
// Refill interval in milliseconds (how often to check and refill buckets)
10+
const REFILL_INTERVAL_MS = 100;
11+
12+
// Cleanup interval for expired buckets (every 10 seconds)
13+
const CLEANUP_INTERVAL_MS = 10000;
14+
let cleanupCounter = 0;
15+
16+
/**
17+
* Background refill job for token buckets
18+
*
19+
* This job runs periodically (default: every 100ms) and:
20+
* 1. Iterates through all active token buckets
21+
* 2. Triggers async refill for buckets below threshold
22+
* 3. Periodically cleans up expired/idle token buckets
23+
*
24+
* The refills are asynchronous and non-blocking, keeping Redis
25+
* out of the hot request path.
26+
*/
27+
async function refillTokenBuckets() {
28+
const tokenBuckets = getAllTokenBuckets();
29+
30+
if (tokenBuckets.size === 0) {
31+
return {
32+
checked: 0,
33+
refilled: 0,
34+
};
35+
}
36+
37+
let checked = 0;
38+
let refilled = 0;
39+
40+
// Trigger refill for all active buckets
41+
const refillPromises = [];
42+
43+
for (const [bucketName, bucket] of tokenBuckets.entries()) {
44+
checked++;
45+
46+
// Trigger async refill if needed (non-blocking)
47+
const promise = bucket.refillIfNeeded().then(() => {
48+
// Check if refill actually happened
49+
if (bucket.refillCount > 0) {
50+
refilled++;
51+
}
52+
}).catch(err => {
53+
logger.error('Token refill error', {
54+
bucketName,
55+
error: err.message,
56+
stack: err.stack,
57+
});
58+
});
59+
60+
refillPromises.push(promise);
61+
}
62+
63+
// Wait for all refills to complete
64+
await Promise.all(refillPromises);
65+
66+
return {
67+
checked,
68+
refilled,
69+
};
70+
}
71+
72+
/**
73+
* Main refill job loop
74+
* Runs periodically to proactively refill token buckets
75+
*/
76+
async function startRefillJob() {
77+
logger.info('Starting token refill job', {
78+
refillIntervalMs: REFILL_INTERVAL_MS,
79+
cleanupIntervalMs: CLEANUP_INTERVAL_MS,
80+
});
81+
82+
const tick = async () => {
83+
try {
84+
const stats = await refillTokenBuckets();
85+
86+
if (stats.refilled > 0) {
87+
logger.debug('Refill tick completed', stats);
88+
}
89+
90+
// Periodic cleanup (every CLEANUP_INTERVAL_MS)
91+
cleanupCounter++;
92+
if (cleanupCounter * REFILL_INTERVAL_MS >= CLEANUP_INTERVAL_MS) {
93+
const removed = cleanupTokenBuckets();
94+
if (removed > 0) {
95+
logger.debug('Cleaned up expired token buckets', {
96+
removed,
97+
});
98+
}
99+
cleanupCounter = 0;
100+
}
101+
102+
} catch (err) {
103+
logger.error('Refill job error', {
104+
error: err.message,
105+
stack: err.stack,
106+
});
107+
}
108+
};
109+
110+
// Start periodic ticks
111+
refillInterval = setInterval(tick, REFILL_INTERVAL_MS);
112+
}
113+
114+
/**
115+
* Stop the refill job
116+
* Used during graceful shutdown
117+
*/
118+
function stopRefillJob() {
119+
if (refillInterval) {
120+
clearInterval(refillInterval);
121+
refillInterval = null;
122+
logger.info('Stopped token refill job');
123+
}
124+
}
125+
126+
module.exports = {
127+
startRefillJob,
128+
stopRefillJob,
129+
refillTokenBuckets,
130+
};

0 commit comments

Comments
 (0)