Skip to content

Commit 2ecccea

Browse files
committed
CLDSRV-783: Replace GCRA with token consumption in request path
Switch from optimistic GCRA with reconciliation to token reservation. Workers consume tokens from local buffer instead of evaluating GCRA per request. This keeps Redis out of the hot path while enforcing strict quotas. Changes: - helpers.js: Use token consumption instead of GCRA evaluation - server.js: Start token refill job instead of sync job - cleanup.js: Add token bucket cleanup - gcra.js: Mark workers parameter unused (kept for compatibility)
1 parent 091d54a commit 2ecccea

File tree

4 files changed

+50
-50
lines changed

4 files changed

+50
-50
lines changed

lib/api/apiUtils/rateLimit/cleanup.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
const { expireCounters, expireCachedConfigs } = require('./cache');
1+
const { expireCounters, expireCachedConfigs, expireRequestTimestamps } = require('./cache');
22
const { rateLimitCleanupInterval } = require('../../../../constants');
33

44
let cleanupInterval = null;
@@ -26,6 +26,7 @@ function startCleanupJob(log, options = {}) {
2626
const now = Date.now();
2727
const expiredCounters = expireCounters(now);
2828
const expiredConfigs = expireCachedConfigs(now);
29+
expireRequestTimestamps(now);
2930

3031
if (expiredCounters > 0 || expiredConfigs > 0) {
3132
log.debug('Rate limit cleanup completed', {

lib/api/apiUtils/rateLimit/gcra.js

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,34 +52,42 @@ function evaluate(emptyAt, arrivedAt, interval, burstCapacity) {
5252
*
5353
* In a distributed setup with N nodes and W workers per node:
5454
* - Global limit: R requests per second
55-
* - Per-node limit: R / N
5655
* - Per-worker limit: R / N / W
5756
* - Interval = 1000ms / (R / N / W)
5857
*
5958
* The interval represents milliseconds between requests. We divide 1000 (milliseconds
6059
* in a second) by the rate to convert "requests per second" to "milliseconds per request".
6160
*
6261
* Examples:
63-
* - 10 req/s → interval = 1000/10 = 100ms (one request every 100ms)
64-
* - 1 req/s → interval = 1000/1 = 1000ms (one request every second)
65-
* - 0.5 req/s → interval = 1000/0.5 = 2000ms (one request every 2 seconds)
62+
* - 100 req/s ÷ 1 node ÷ 10 workers = 10 req/s per worker → interval = 100ms
63+
* - 600 req/s ÷ 6 nodes ÷ 10 workers = 10 req/s per worker → interval = 100ms
64+
*
65+
* Dynamic work-stealing is achieved through Redis sync reconciliation:
66+
* - Each worker evaluates locally at its fixed per-worker quota
67+
* - Workers report consumed / workers to Redis
68+
* - Redis sums all workers' shares
69+
* - Workers overwrite local counters with Redis values
70+
* - Idle workers' unused capacity accumulates in Redis
71+
* - Busy workers pull back higher emptyAt values and throttle proportionally
6672
*
6773
* IMPORTANT: Limit must be >= N * W, otherwise per-worker rate < 1 req/s
6874
* which results in intervals > 1000ms and effectively blocks traffic.
6975
*
7076
* @param {number} limit - Global requests per second
7177
* @param {number} nodes - Total number of nodes
72-
* @param {number} workers - Number of workers per node
78+
* @param {number} _workers - Number of workers per node (unused in token reservation)
7379
* @returns {number} Interval in milliseconds between requests
7480
*/
75-
function calculateInterval(limit, nodes, workers) {
76-
// Per-worker rate = limit / nodes / workers
77-
const perWorkerRate = limit / nodes / workers;
81+
// eslint-disable-next-line no-unused-vars
82+
function calculateInterval(limit, nodes, _workers) {
83+
// Per-node rate = limit / nodes (workers NOT divided)
84+
// This allows dynamic work-stealing - workers evaluate at node quota
85+
const perNodeRate = limit / nodes;
7886

7987
// Interval = 1000ms / rate
8088
// Dividing 1000 (ms in a second) by rate converts "requests per second"
8189
// to "milliseconds between requests". Higher rate = smaller interval = more requests.
82-
return 1000 / perWorkerRate;
90+
return 1000 / perNodeRate;
8391
}
8492

8593
module.exports = {

lib/api/apiUtils/rateLimit/helpers.js

Lines changed: 19 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
const { config } = require('../../../Config');
22
const cache = require('./cache');
3-
const { evaluate, calculateInterval } = require('./gcra');
43
const constants = require('../../../../constants');
4+
const { getTokenBucket } = require('./tokenBucket');
55

66
/**
77
* Get rate limit configuration from cache only (no metadata fetch)
@@ -75,10 +75,10 @@ function extractAndCacheRateLimitConfig(bucket, bucketName, log) {
7575
}
7676

7777
/**
78-
* Check rate limit with pre-resolved configuration
78+
* Check rate limit with pre-resolved configuration using token reservation
7979
*
80-
* Uses GCRA algorithm to determine if request should be rate limited.
81-
* Updates counter if request is allowed.
80+
* Uses token bucket: Workers maintain local tokens granted by Redis.
81+
* Token consumption is pure in-memory (fast). Refills happen async in background.
8282
*
8383
* @param {string} bucketName - Bucket name
8484
* @param {object|null} limitConfig - Pre-resolved rate limit config
@@ -92,50 +92,29 @@ function checkRateLimitWithConfig(bucketName, limitConfig, log, callback) {
9292
return callback(null, false);
9393
}
9494

95-
// Calculate interval for this limit
96-
const nodes = config.rateLimiting.nodes || 1;
97-
const workers = config.clusters || 1;
98-
const interval = calculateInterval(limitConfig.limit, nodes, workers);
99-
100-
// Get burst capacity (default to 1 if not configured)
101-
const burstCapacity = config.rateLimiting.bucket?.defaultBurstCapacity ||
102-
constants.rateLimitDefaultBurstCapacity;
103-
const bucketSize = burstCapacity * 1000;
104-
105-
// Get counter (in-memory only, no sync with other workers)
106-
const counterKey = `bucket:${bucketName}:rps`;
107-
const emptyAt = cache.getCounter(counterKey) || 0;
108-
const arrivedAt = Date.now();
109-
110-
log.debug('Checking rate limit with GCRA', {
111-
bucketName,
112-
limit: limitConfig.limit,
113-
source: limitConfig.source,
114-
interval,
115-
emptyAt,
116-
arrivedAt,
117-
});
118-
119-
// Evaluate GCRA
120-
const result = evaluate(emptyAt, arrivedAt, interval, bucketSize);
121-
122-
// Update counter if allowed
123-
if (result.allowed) {
124-
cache.setCounter(counterKey, result.newEmptyAt);
125-
log.debug('Rate limit check: allowed', {
95+
// Get or create token bucket for this bucket
96+
const tokenBucket = getTokenBucket(bucketName, limitConfig, log);
97+
98+
// Try to consume a token (in-memory, no Redis)
99+
const allowed = tokenBucket.tryConsume();
100+
101+
if (allowed) {
102+
log.trace('Rate limit check: allowed (token consumed)', {
126103
bucketName,
127-
newEmptyAt: result.newEmptyAt,
104+
tokensRemaining: tokenBucket.tokens,
128105
});
129106
} else {
130-
log.debug('Rate limit check: denied', {
107+
log.debug('Rate limit check: denied (no tokens available)', {
131108
bucketName,
132109
limit: limitConfig.limit,
133-
allowAt: result.newEmptyAt,
134-
retryAfterMs: result.newEmptyAt - arrivedAt,
110+
source: limitConfig.source,
135111
});
136112
}
137113

138-
return callback(null, !result.allowed);
114+
// Return inverse: callback expects "rateLimited" boolean
115+
// allowed=true → rateLimited=false
116+
// allowed=false → rateLimited=true
117+
return callback(null, !allowed);
139118
}
140119

141120
module.exports = {

lib/server.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const {
2424
isManagementAgentUsed,
2525
} = require('./management/agentClient');
2626
const { startCleanupJob } = require('./api/apiUtils/rateLimit/cleanup');
27+
const { startRefillJob, stopRefillJob } = require('./api/apiUtils/rateLimit/refillJob');
2728

2829
const HttpAgent = require('agentkeepalive');
2930
const QuotaService = require('./utilization/instance');
@@ -293,6 +294,10 @@ class S3Server {
293294
*/
294295
cleanUp() {
295296
logger.info('server shutting down');
297+
// Stop token refill job if running
298+
if (this.config.rateLimiting?.enabled) {
299+
stopRefillJob();
300+
}
296301
Promise.all(this.servers.map(server =>
297302
new Promise(resolve => server.close(resolve))
298303
)).then(() => process.exit(0));
@@ -360,6 +365,13 @@ class S3Server {
360365
// Start rate limit cleanup job
361366
if (this.config.rateLimiting?.enabled) {
362367
startCleanupJob(log);
368+
// Start token refill job for token reservation system
369+
startRefillJob().catch(err => {
370+
log.error('Failed to start token refill job', {
371+
error: err.message,
372+
stack: err.stack,
373+
});
374+
});
363375
}
364376

365377
// TODO this should wait for metadata healthcheck to be ok

0 commit comments

Comments
 (0)