Skip to content

Commit

Permalink
Added: RateLimiterMongo, tests, add attrs to RateLimiterRes
Browse files Browse the repository at this point in the history
  • Loading branch information
animir committed Jun 1, 2018
1 parent cf01c1b commit 47c3417
Show file tree
Hide file tree
Showing 15 changed files with 557 additions and 52 deletions.
3 changes: 2 additions & 1 deletion .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"max-len": ["error", { "code": 140 }],
"node/no-unpublished-require": ["error", {
"allowModules": ["mocha", "chai", "redis-mock"]
}]
}],
"node/no-unsupported-features": "off"
}
}
4 changes: 3 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
const RateLimiterRedis = require('./lib/RateLimiterRedis');
const RateLimiterMongo = require('./lib/RateLimiterMongo');
const { RateLimiterClusterMaster, RateLimiterCluster } = require('./lib/RateLimiterCluster');
const RateLimiterMemory = require('./lib/RateLimiterMemory');

module.exports = {
RateLimiterRedis,
RateLimiterMongo,
RateLimiterMemory,
RateLimiterClusterMaster,
RateLimiterCluster,
};
};
12 changes: 10 additions & 2 deletions lib/RateLimiterCluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const masterSendToWorker = function (worker, msg, type, res) {
data: {
remainingPoints: res.remainingPoints,
msBeforeNext: res.msBeforeNext,
consumedPoints: res.consumedPoints,
isFirstInDuration: res.isFirstInDuration,
},
});
};
Expand Down Expand Up @@ -111,13 +113,19 @@ const workerProcessMsg = function (msg) {

if (this._promises[msg.id]) {
clearTimeout(this._promises[msg.id].timeoutId);
const res = new RateLimiterRes(
msg.data.remainingPoints,
msg.data.msBeforeNext,
msg.data.consumedPoints,
msg.data.isFirstInDuration // eslint-disable-line comma-dangle
);

switch (msg.type) {
case 'resolve':
this._promises[msg.id].resolve(new RateLimiterRes(msg.data.remainingPoints, msg.data.msBeforeNext));
this._promises[msg.id].resolve(res);
break;
case 'reject':
this._promises[msg.id].reject(new RateLimiterRes(msg.data.remainingPoints, msg.data.msBeforeNext));
this._promises[msg.id].reject(res);
break;
default:
throw new Error(`RateLimiterCluster: no such message type '${msg.type}'`);
Expand Down
19 changes: 10 additions & 9 deletions lib/RateLimiterMemory.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ class RateLimiterMemory extends RateLimiterAbstract {
consume(key, pointsToConsume = 1) {
return new Promise((resolve, reject) => {
const rlKey = this.getKey(key);
const isFirstInDuration = this._memoryStorage.get(rlKey) === null;
const storageRes = this._memoryStorage.incrby(rlKey, pointsToConsume, this.duration);
const res = new RateLimiterRes(this.points - storageRes.consumedPoints, storageRes.msBeforeNext);
const res = this._memoryStorage.incrby(rlKey, pointsToConsume, this.duration);
res.remainingPoints = this.points - res.consumedPoints;

if (storageRes.consumedPoints > this.points) {
reject(new RateLimiterRes(0, storageRes.msBeforeNext));
} else if (this.execEvenly && storageRes.msBeforeNext > 0 && !isFirstInDuration) {
const delay = Math.ceil(storageRes.msBeforeNext / ((this.points - storageRes.consumedPoints) + 2));
if (res.consumedPoints > this.points) {
reject(new RateLimiterRes(0, res.msBeforeNext));
} else if (this.execEvenly && res.msBeforeNext > 0 && !res.isFirstInDuration) {
const delay = Math.ceil(res.msBeforeNext / ((this.points - res.consumedPoints) + 2));

setTimeout(resolve, delay, res);
} else {
Expand All @@ -37,15 +36,17 @@ class RateLimiterMemory extends RateLimiterAbstract {
const rlKey = this.getKey(key);
return new Promise((resolve) => {
const res = this._memoryStorage.incrby(rlKey, points, this.duration);
resolve(new RateLimiterRes(this.points - res.consumedPoints, res.msBeforeNext));
res.remainingPoints = this.points - res.consumedPoints;
resolve(res);
});
}

reward(key, points = 1) {
const rlKey = this.getKey(key);
return new Promise((resolve) => {
const res = this._memoryStorage.incrby(rlKey, -points, this.duration);
resolve(new RateLimiterRes(this.points - res.consumedPoints, res.msBeforeNext));
res.remainingPoints = this.points - res.consumedPoints;
resolve(res);
});
}
}
Expand Down
135 changes: 135 additions & 0 deletions lib/RateLimiterMongo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
const RateLimiterStoreAbstract = require('./RateLimiterStoreAbstract');
const RateLimiterRes = require('./RateLimiterRes');

const getRateLimiterRes = function (points, result) {
const res = new RateLimiterRes();

res.isFirstInDuration = result.value === null;
res.consumedPoints = res.isFirstInDuration ? points : result.value.points;

res.remainingPoints = Math.max(this.points - res.consumedPoints, 0);
res.msBeforeNext = res.isFirstInDuration
? this.duration * 1000
: Math.max(new Date(result.value.expire).getTime() - Date.now(), 0);

return res;
};

const afterConsume = function (resolve, reject, rlKey, points, result) {
const res = getRateLimiterRes.call(this, points, result);

if (res.consumedPoints > this.points) {
// Block key for this.blockDuration seconds
if (this.blockOnPointsConsumed > 0 && res.consumedPoints >= this.blockOnPointsConsumed) {
this._blockedKeys.add(rlKey, this.blockDuration);
res.msBeforeNext = this.msBlockDuration;
}

reject(res);
} else if (this.execEvenly && res.msBeforeNext > 0 && !res.isFirstInDuration) {
const delay = Math.ceil(res.msBeforeNext / (res.remainingPoints + 2));
setTimeout(resolve, delay, res);
} else {
resolve(res);
}
};

const update = function (key, points) {
return this._collection.findOneAndUpdate(
{
expire: { $gt: new Date() },
key,
},
{
$inc: { points },
$setOnInsert: { expire: new Date(Date.now() + (this.duration * 1000)) },
},
{
upsert: true,
returnNewDocument: true,
} // eslint-disable-line comma-dangle
);
};

class RateLimiterMongo extends RateLimiterStoreAbstract {
/**
*
* @param {Object} opts
* Defaults {
* ... see other in RateLimiterStoreAbstract
*
* mongo: MongoClient
* }
*/
constructor(opts) {
super(opts);

this.mongo = opts.mongo;
this._collection = this.mongo.db('node-rate-limiter-flexible').collection(this.keyPrefix);
this._collection.ensureIndex({ expire: -1 }, { expireAfterSeconds: 0 });
}

get mongo() {
return this._mongo;
}

set mongo(value) {
if (typeof value === 'undefined') {
throw new Error('mongo is not set');
}
this._mongo = value;
}

/**
*
* @param key
* @param pointsToConsume
* @returns {Promise<any>}
*/
consume(key, pointsToConsume = 1) {
return new Promise((resolve, reject) => {
const rlKey = this.getKey(key);

const blockMsBeforeExpire = this.getBlockMsBeforeExpire(rlKey);
if (blockMsBeforeExpire > 0) {
return reject(new RateLimiterRes(0, blockMsBeforeExpire));
}

update.call(this, rlKey, pointsToConsume)
.then((res) => {
afterConsume.call(this, resolve, reject, rlKey, pointsToConsume, res);
})
.catch((err) => {
this.handleError(err, 'consume', resolve, reject, key, pointsToConsume);
});
});
}

penalty(key, points = 1) {
const rlKey = this.getKey(key);
return new Promise((resolve, reject) => {
update.call(this, rlKey, points)
.then((res) => {
resolve(getRateLimiterRes.call(this, points, res));
})
.catch((err) => {
this.handleError(err, 'penalty', resolve, reject, key, points);
});
});
}

reward(key, points = 1) {
const rlKey = this.getKey(key);
return new Promise((resolve, reject) => {
update.call(this, rlKey, -points)
.then((res) => {
resolve(getRateLimiterRes.call(this, points, res));
})
.catch((err) => {
this.handleError(err, 'reward', resolve, reject, key, points);
});
});
}
}

module.exports = RateLimiterMongo;
14 changes: 7 additions & 7 deletions lib/RateLimiterRedis.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ const afterConsume = function (resolve, reject, rlKey, results) {
[, resTtlMs] = resTtlMs;
}
const res = new RateLimiterRes();
let isFirstInDuration = resSet === 'OK';
res.isFirstInDuration = resSet === 'OK';

res.remainingPoints = Math.max(this.points - consumed, 0);
if (resTtlMs === -1) { // If rlKey created by incrby() not by set()
isFirstInDuration = true;
res.isFirstInDuration = true;
res.msBeforeNext = this.duration;
this.redis.expire(rlKey, this.duration);
} else {
Expand All @@ -44,7 +44,7 @@ const afterConsume = function (resolve, reject, rlKey, results) {
}

reject(res);
} else if (this.execEvenly && res.msBeforeNext > 0 && !isFirstInDuration) {
} else if (this.execEvenly && res.msBeforeNext > 0 && !res.isFirstInDuration) {
const delay = Math.ceil(res.msBeforeNext / (res.remainingPoints + 2));
setTimeout(resolve, delay, res);
} else {
Expand Down Expand Up @@ -155,11 +155,11 @@ class RateLimiterRedis extends RateLimiterAbstract {
penalty(key, points = 1) {
const rlKey = this.getKey(key);
return new Promise((resolve, reject) => {
this.redis.incrby(rlKey, points, (err, value) => {
this.redis.incrby(rlKey, points, (err, consumedPoints) => {
if (err) {
handleRedisError.call(this, 'penalty', resolve, reject, key, points);
} else {
resolve(value);
resolve(new RateLimiterRes(this.points - consumedPoints, 0, consumedPoints));
}
});
});
Expand All @@ -168,11 +168,11 @@ class RateLimiterRedis extends RateLimiterAbstract {
reward(key, points = 1) {
const rlKey = this.getKey(key);
return new Promise((resolve, reject) => {
this.redis.incrby(rlKey, -points, (err, value) => {
this.redis.incrby(rlKey, -points, (err, consumedPoints) => {
if (err) {
handleRedisError.call(this, 'reward', resolve, reject, key, points);
} else {
resolve(value);
resolve(new RateLimiterRes(this.points - consumedPoints, 0, consumedPoints));
}
});
});
Expand Down
21 changes: 20 additions & 1 deletion lib/RateLimiterRes.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
module.exports = class RateLimiterRes {
constructor(remainingPoints, msBeforeNext) {
constructor(remainingPoints, msBeforeNext, consumedPoints, isFirstInDuration) {
this.remainingPoints = typeof remainingPoints === 'undefined' ? 0 : remainingPoints; // Remaining points in current duration
this.msBeforeNext = typeof msBeforeNext === 'undefined' ? 0 : msBeforeNext; // Milliseconds before next action
this.consumedPoints = typeof consumedPoints === 'undefined' ? 0 : consumedPoints; // Consumed points in current duration
this.isFirstInDuration = typeof isFirstInDuration === 'undefined' ? false : isFirstInDuration;
}

get msBeforeNext() {
Expand All @@ -21,4 +23,21 @@ module.exports = class RateLimiterRes {
this._remainingPoints = p;
return this;
}

get consumedPoints() {
return this._consumedPoints;
}

set consumedPoints(p) {
this._consumedPoints = p;
return this;
}

get isFirstInDuration() {
return this._isFirstInDuration;
}

set isFirstInDuration(value) {
this._isFirstInDuration = Boolean(value);
}
};
82 changes: 82 additions & 0 deletions lib/RateLimiterStoreAbstract.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
const RateLimiterAbstract = require('./RateLimiterAbstract');
const BlockedKeys = require('./component/BlockedKeys');

module.exports = class RateLimiterStoreAbstract extends RateLimiterAbstract {
/**
*
* @param opts Object Defaults {
* ... see other in RateLimiterAbstract
*
* blockOnPointsConsumed: 40, // Number of points when key is blocked
* blockDuration: 10, // Block duration in seconds
* insuranceLimiter: RateLimiterAbstract
* }
*/
constructor(opts = {}) {
super(opts);

this.blockOnPointsConsumed = opts.blockOnPointsConsumed;
this.blockDuration = opts.blockDuration;
this.insuranceLimiter = opts.insuranceLimiter;
this._blockedKeys = new BlockedKeys();
}

getBlockMsBeforeExpire(rlKey) {
if (this.blockOnPointsConsumed > 0) {
return this._blockedKeys.msBeforeExpire(rlKey);
}

return 0;
}

handleError(err, funcName, resolve, reject, key, points) {
if (!(this.insuranceLimiter instanceof RateLimiterAbstract)) {
reject(err);
} else {
this.insuranceLimiter[funcName](key, points)
.then((res) => {
resolve(res);
})
.catch((res) => {
reject(res);
});
}
}

get blockOnPointsConsumed() {
return this._blockOnPointsConsumed;
}

set blockOnPointsConsumed(value) {
this._blockOnPointsConsumed = value ? parseInt(value) : 0;
if (this.blockOnPointsConsumed > 0 && this.points >= this.blockOnPointsConsumed) {
throw new Error('blockOnPointsConsumed option must be more than points option');
}
}

get blockDuration() {
return this._blockDuration;
}

get msBlockDuration() {
return this._blockDuration * 1000;
}

set blockDuration(value) {
this._blockDuration = value ? parseInt(value) : 0;
if (this.blockDuration > 0 && this.blockOnPointsConsumed === 0) {
throw new Error('blockOnPointsConsumed option must be set up');
}
}

get insuranceLimiter() {
return this._insuranceLimiter;
}

set insuranceLimiter(value) {
if (typeof value !== 'undefined' && !(value instanceof RateLimiterAbstract)) {
throw new Error('insuranceLimiter must be instance of RateLimiterAbstract');
}
this._insuranceLimiter = value;
}
};
Loading

0 comments on commit 47c3417

Please sign in to comment.