diff --git a/packages/libp2p/package.json b/packages/libp2p/package.json index 9117ed57d7..65736e75ca 100644 --- a/packages/libp2p/package.json +++ b/packages/libp2p/package.json @@ -103,7 +103,6 @@ "merge-options": "^3.0.4", "multiformats": "^13.0.0", "private-ip": "^3.0.1", - "rate-limiter-flexible": "^4.0.0", "uint8arrays": "^5.0.0" }, "devDependencies": { diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index 3ca821f939..c48ca0e5aa 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -1,9 +1,9 @@ import { CodeError, KEEP_ALIVE } from '@libp2p/interface' import { PeerMap } from '@libp2p/peer-collections' import { defaultAddressSort } from '@libp2p/utils/address-sort' +import { RateLimiter } from '@libp2p/utils/rate-limiter' import { type Multiaddr, type Resolver, multiaddr } from '@multiformats/multiaddr' import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers' -import { RateLimiterMemory } from 'rate-limiter-flexible' import { codes } from '../errors.js' import { getPeerAddress } from '../get-peer.js' import { AutoDial } from './auto-dial.js' @@ -167,7 +167,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { public readonly dialQueue: DialQueue public readonly autoDial: AutoDial public readonly connectionPruner: ConnectionPruner - private readonly inboundConnectionRateLimiter: RateLimiterMemory + private readonly inboundConnectionRateLimiter: RateLimiter private readonly peerStore: PeerStore private readonly metrics?: Metrics @@ -206,7 +206,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { this.maxIncomingPendingConnections = init.maxIncomingPendingConnections ?? defaultOptions.maxIncomingPendingConnections // controls individual peers trying to dial us too quickly - this.inboundConnectionRateLimiter = new RateLimiterMemory({ + this.inboundConnectionRateLimiter = new RateLimiter({ points: init.inboundConnectionThreshold ?? defaultOptions.inboundConnectionThreshold, duration: 1 }) diff --git a/packages/stream-multiplexer-mplex/package.json b/packages/stream-multiplexer-mplex/package.json index 897d9bd1d4..fcd88fb6c2 100644 --- a/packages/stream-multiplexer-mplex/package.json +++ b/packages/stream-multiplexer-mplex/package.json @@ -66,7 +66,6 @@ "it-pipe": "^3.0.1", "it-pushable": "^3.2.1", "it-stream-types": "^2.0.1", - "rate-limiter-flexible": "^4.0.0", "uint8-varint": "^2.0.0", "uint8arraylist": "^2.4.3", "uint8arrays": "^5.0.0" diff --git a/packages/stream-multiplexer-mplex/src/mplex.ts b/packages/stream-multiplexer-mplex/src/mplex.ts index 04b351463a..23746b9660 100644 --- a/packages/stream-multiplexer-mplex/src/mplex.ts +++ b/packages/stream-multiplexer-mplex/src/mplex.ts @@ -1,8 +1,8 @@ import { CodeError } from '@libp2p/interface' import { closeSource } from '@libp2p/utils/close-source' +import { RateLimiter } from '@libp2p/utils/rate-limiter' import { pipe } from 'it-pipe' import { type Pushable, pushable } from 'it-pushable' -import { RateLimiterMemory } from 'rate-limiter-flexible' import { toString as uint8ArrayToString } from 'uint8arrays' import { Decoder } from './decode.js' import { encode } from './encode.js' @@ -59,7 +59,7 @@ export class MplexStreamMuxer implements StreamMuxer { private readonly _init: MplexStreamMuxerInit private readonly _source: Pushable private readonly closeController: AbortController - private readonly rateLimiter: RateLimiterMemory + private readonly rateLimiter: RateLimiter private readonly closeTimeout: number private readonly logger: ComponentLogger @@ -114,7 +114,7 @@ export class MplexStreamMuxer implements StreamMuxer { */ this.closeController = new AbortController() - this.rateLimiter = new RateLimiterMemory({ + this.rateLimiter = new RateLimiter({ points: init.disconnectThreshold ?? DISCONNECT_THRESHOLD, duration: 1 }) diff --git a/packages/utils/package.json b/packages/utils/package.json index c6db92b33e..635eacefaa 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -84,6 +84,10 @@ "types": "./dist/src/peer-queue.d.ts", "import": "./dist/src/peer-queue.js" }, + "./rate-limiter": { + "types": "./dist/src/rate-limiter.d.ts", + "import": "./dist/src/rate-limiter.js" + }, "./stream-to-ma-conn": { "types": "./dist/src/stream-to-ma-conn.d.ts", "import": "./dist/src/stream-to-ma-conn.js" diff --git a/packages/utils/src/rate-limiter.ts b/packages/utils/src/rate-limiter.ts new file mode 100644 index 0000000000..508bd43027 --- /dev/null +++ b/packages/utils/src/rate-limiter.ts @@ -0,0 +1,287 @@ +import { CodeError } from '@libp2p/interface' +import delay from 'delay' + +export interface RateLimiterInit { + /** + * Number of points + * + * @default 4 + */ + points?: number + + /** + * Per seconds + * + * @default 1 + */ + duration?: number + + /** + * Block if consumed more than points in current duration for blockDuration seconds + * + * @default 0 + */ + blockDuration?: number + + /** + * Execute allowed actions evenly over duration + * + * @default false + */ + execEvenly?: boolean + + /** + * ms, works with execEvenly=true option + * + * @default duration * 1000 / points + */ + execEvenlyMinDelayMs?: number + + /** + * @default rlflx + */ + keyPrefix?: string +} + +export interface GetKeySecDurationOptions { + customDuration?: number +} + +export interface RateLimiterResult { + remainingPoints: number + msBeforeNext: number + consumedPoints: number + isFirstInDuration: boolean +} + +export interface RateRecord { + value: number + expiresAt?: Date + timeoutId?: ReturnType +} + +export class RateLimiter { + public readonly memoryStorage: MemoryStorage + protected points: number + protected duration: number + protected blockDuration: number + protected execEvenly: boolean + protected execEvenlyMinDelayMs: number + protected keyPrefix: string + + constructor (opts: RateLimiterInit = {}) { + this.points = opts.points ?? 4 + this.duration = opts.duration ?? 1 + this.blockDuration = opts.blockDuration ?? 0 + this.execEvenly = opts.execEvenly ?? false + this.execEvenlyMinDelayMs = opts.execEvenlyMinDelayMs ?? (this.duration * 1000 / this.points) + this.keyPrefix = opts.keyPrefix ?? 'rlflx' + this.memoryStorage = new MemoryStorage() + } + + async consume (key: string, pointsToConsume: number = 1, options: GetKeySecDurationOptions = {}): Promise { + const rlKey = this.getKey(key) + const secDuration = this._getKeySecDuration(options) + let res = this.memoryStorage.incrby(rlKey, pointsToConsume, secDuration) + res.remainingPoints = Math.max(this.points - res.consumedPoints, 0) + + if (res.consumedPoints > this.points) { + // Block only first time when consumed more than points + if (this.blockDuration > 0 && res.consumedPoints <= (this.points + pointsToConsume)) { + // Block key + res = this.memoryStorage.set(rlKey, res.consumedPoints, this.blockDuration) + } + + throw new CodeError('Rate limit exceeded', 'ERR_RATE_LIMIT_EXCEEDED', res) + } else if (this.execEvenly && res.msBeforeNext > 0 && !res.isFirstInDuration) { + // Execute evenly + let delayMs = Math.ceil(res.msBeforeNext / (res.remainingPoints + 2)) + if (delayMs < this.execEvenlyMinDelayMs) { + delayMs = res.consumedPoints * this.execEvenlyMinDelayMs + } + + await delay(delayMs) + } + + return res + } + + penalty (key: string, points: number = 1, options: GetKeySecDurationOptions = {}): RateLimiterResult { + const rlKey = this.getKey(key) + const secDuration = this._getKeySecDuration(options) + const res = this.memoryStorage.incrby(rlKey, points, secDuration) + res.remainingPoints = Math.max(this.points - res.consumedPoints, 0) + + return res + } + + reward (key: string, points: number = 1, options: GetKeySecDurationOptions = {}): RateLimiterResult { + const rlKey = this.getKey(key) + const secDuration = this._getKeySecDuration(options) + const res = this.memoryStorage.incrby(rlKey, -points, secDuration) + res.remainingPoints = Math.max(this.points - res.consumedPoints, 0) + + return res + } + + /** + * Block any key for secDuration seconds + * + * @param key + * @param secDuration + */ + block (key: string, secDuration: number): RateLimiterResult { + const msDuration = secDuration * 1000 + const initPoints = this.points + 1 + + this.memoryStorage.set(this.getKey(key), initPoints, secDuration) + + return { + remainingPoints: 0, + msBeforeNext: msDuration === 0 ? -1 : msDuration, + consumedPoints: initPoints, + isFirstInDuration: false + } + } + + set (key: string, points: number, secDuration: number = 0): RateLimiterResult { + const msDuration = (secDuration >= 0 ? secDuration : this.duration) * 1000 + + this.memoryStorage.set(this.getKey(key), points, secDuration) + + return { + remainingPoints: 0, + msBeforeNext: msDuration === 0 ? -1 : msDuration, + consumedPoints: points, + isFirstInDuration: false + } + } + + get (key: string): RateLimiterResult | undefined { + const res = this.memoryStorage.get(this.getKey(key)) + + if (res != null) { + res.remainingPoints = Math.max(this.points - res.consumedPoints, 0) + } + + return res + } + + delete (key: string): void { + this.memoryStorage.delete(this.getKey(key)) + } + + private _getKeySecDuration (options?: GetKeySecDurationOptions): number { + if (options?.customDuration != null && options.customDuration >= 0) { + return options.customDuration + } + + return this.duration + } + + getKey (key: string): string { + return this.keyPrefix.length > 0 ? `${this.keyPrefix}:${key}` : key + } + + parseKey (rlKey: string): string { + return rlKey.substring(this.keyPrefix.length) + } +} + +class MemoryStorage { + public readonly storage: Map + + constructor () { + this.storage = new Map() + } + + incrby (key: string, value: number, durationSec: number): RateLimiterResult { + const existing = this.storage.get(key) + + if (existing != null) { + const msBeforeExpires = existing.expiresAt != null + ? existing.expiresAt.getTime() - new Date().getTime() + : -1 + + if (existing.expiresAt == null || msBeforeExpires > 0) { + // Change value + existing.value += value + + return { + remainingPoints: 0, + msBeforeNext: msBeforeExpires, + consumedPoints: existing.value, + isFirstInDuration: false + } + } + + return this.set(key, value, durationSec) + } + + return this.set(key, value, durationSec) + } + + set (key: string, value: number, durationSec: number): RateLimiterResult { + const durationMs = durationSec * 1000 + const existing = this.storage.get(key) + + if (existing != null) { + clearTimeout(existing.timeoutId) + } + + const record: RateRecord = { + value, + expiresAt: durationMs > 0 ? new Date(Date.now() + durationMs) : undefined + } + + this.storage.set(key, record) + + if (durationMs > 0) { + record.timeoutId = setTimeout(() => { + this.storage.delete(key) + }, durationMs) + + if (record.timeoutId.unref != null) { + record.timeoutId.unref() + } + } + + return { + remainingPoints: 0, + msBeforeNext: durationMs === 0 ? -1 : durationMs, + consumedPoints: record.value, + isFirstInDuration: true + } + } + + get (key: string): RateLimiterResult | undefined { + const existing = this.storage.get(key) + + if (existing != null) { + const msBeforeExpires = existing.expiresAt != null + ? existing.expiresAt.getTime() - new Date().getTime() + : -1 + return { + remainingPoints: 0, + msBeforeNext: msBeforeExpires, + consumedPoints: existing.value, + isFirstInDuration: false + } + } + } + + delete (key: string): boolean { + const record = this.storage.get(key) + + if (record != null) { + if (record.timeoutId != null) { + clearTimeout(record.timeoutId) + } + + this.storage.delete(key) + + return true + } + return false + } +} diff --git a/packages/utils/test/rate-limiter.spec.ts b/packages/utils/test/rate-limiter.spec.ts new file mode 100644 index 0000000000..c76c3e0765 --- /dev/null +++ b/packages/utils/test/rate-limiter.spec.ts @@ -0,0 +1,271 @@ +/* eslint-disable no-unused-expressions */ +import { expect } from 'aegir/chai' +import delay from 'delay' +import { RateLimiter } from '../src/rate-limiter.js' + +describe('RateLimiter with fixed window', function () { + this.timeout(5000) + + it('consume 1 point', async () => { + const testKey = 'consume1' + const rateLimiterMemory = new RateLimiter({ points: 2, duration: 5 }) + await rateLimiterMemory.consume(testKey) + const res = rateLimiterMemory.memoryStorage.get(rateLimiterMemory.getKey(testKey)) + + expect(res).to.have.property('consumedPoints', 1) + }) + + it('can not consume more than maximum points', async () => { + const testKey = 'consume2' + const rateLimiterMemory = new RateLimiter({ points: 1, duration: 5 }) + + await expect(rateLimiterMemory.consume(testKey, 2)).to.eventually.be.rejected + .with.nested.property('props.msBeforeNext').that.is.gte(0) + }) + + it('execute evenly over duration with minimum delay 20 ms', async () => { + const testKey = 'consumeEvenlyMinDelay' + const rateLimiterMemory = new RateLimiter({ + points: 100, duration: 1, execEvenly: true, execEvenlyMinDelayMs: 20 + }) + + await rateLimiterMemory.consume(testKey) + + const timeFirstConsume = Date.now() + + await rateLimiterMemory.consume(testKey) + + expect(Date.now() - timeFirstConsume >= 20).to.equal(true) + }) + + it('execute evenly over duration', async () => { + const testKey = 'consumeEvenly' + const rateLimiterMemory = new RateLimiter({ + points: 2, duration: 5, execEvenly: true, execEvenlyMinDelayMs: 1 + }) + await rateLimiterMemory.consume(testKey) + + const timeFirstConsume = Date.now() + + await rateLimiterMemory.consume(testKey) + + // Second consume should be delayed more than 2 seconds + // Explanation: + // 1) consume at 0ms, remaining duration = 5000ms + // 2) delayed consume for (4999 / (0 + 2)) ~= 2500ms, where 2 is a fixed value + // , because it mustn't delay in the beginning and in the end of duration + // 3) consume after 2500ms by timeout + + const diff = Date.now() - timeFirstConsume + expect(diff > 2400 && diff < 2600).to.equal(true) + }) + + it('makes penalty', async () => { + const testKey = 'penalty1' + const rateLimiterMemory = new RateLimiter({ points: 3, duration: 5 }) + await rateLimiterMemory.consume(testKey) + + rateLimiterMemory.penalty(testKey) + + const res = rateLimiterMemory.memoryStorage.get(rateLimiterMemory.getKey(testKey)) + + expect(res).to.have.property('consumedPoints', 2) + }) + + it('reward points', async () => { + const testKey = 'reward1' + const rateLimiterMemory = new RateLimiter({ points: 1, duration: 5 }) + + await rateLimiterMemory.consume(testKey) + + rateLimiterMemory.reward(testKey) + + const res = rateLimiterMemory.memoryStorage.get(rateLimiterMemory.getKey(testKey)) + + expect(res).to.have.property('consumedPoints', 0) + }) + + it('use keyPrefix from options', () => { + const testKey = 'key' + const keyPrefix = 'test' + const rateLimiterMemory = new RateLimiter({ keyPrefix, points: 1, duration: 5 }) + + expect(rateLimiterMemory.getKey(testKey)).to.equal('test:key') + }) + + it('blocks key for block duration when consumed more than points', async () => { + const testKey = 'block' + const rateLimiterMemory = new RateLimiter({ points: 1, duration: 1, blockDuration: 2 }) + + await expect(rateLimiterMemory.consume(testKey, 2)).to.eventually.be.rejected + .with.nested.property('props.msBeforeNext').that.is.greaterThan(1000) + }) + + it('do not block key second time until block expires no matter how many points consumed', async () => { + const testKey = 'donotblocktwice' + const rateLimiterMemory = new RateLimiter({ points: 1, duration: 1, blockDuration: 2 }) + + await expect(rateLimiterMemory.consume(testKey, 2)).to.eventually.be.rejected() + + await delay(1201) + + await expect(rateLimiterMemory.consume(testKey)).to.eventually.be.rejected() + .with.nested.property('props.msBeforeNext').that.is.lessThan(1000) + }) + + it('block expires in blockDuration seconds', async () => { + const testKey = 'blockexpires' + const rateLimiterMemory = new RateLimiter({ points: 1, duration: 1, blockDuration: 2 }) + + await expect(rateLimiterMemory.consume(testKey, 2)).to.eventually.be.rejected() + + await delay(2000) + + const res = await rateLimiterMemory.consume(testKey) + + expect(res).to.have.property('consumedPoints', 1) + }) + + it('block custom key', async () => { + const testKey = 'blockcustom' + const rateLimiterMemory = new RateLimiter({ points: 1, duration: 1 }) + rateLimiterMemory.block(testKey, 2) + + await expect(rateLimiterMemory.consume(testKey)).to.eventually.be.rejected() + .with.nested.property('props.msBeforeNext').that.is.within(1000, 2000) + }) + + it('get by key', async () => { + const testKey = 'get' + const rateLimiterMemory = new RateLimiter({ points: 2, duration: 5 }) + + await rateLimiterMemory.consume(testKey) + + const res = rateLimiterMemory.get(testKey) + + expect(res).to.have.property('remainingPoints', 1) + }) + + it('get resolves null if key is not set', () => { + const testKey = 'getbynotexistingkey' + const rateLimiterMemory = new RateLimiter({ points: 2, duration: 5 }) + + expect(rateLimiterMemory.get(testKey)).to.be.undefined() + }) + + it('delete resolves true if key is set', async () => { + const testKey = 'deletekey' + const rateLimiterMemory = new RateLimiter({ points: 2, duration: 5 }) + await rateLimiterMemory.consume(testKey) + + rateLimiterMemory.delete(testKey) + + expect(rateLimiterMemory.get(testKey)).to.be.undefined() + }) + + it('delete resolves false if key is not set', () => { + const testKey = 'deletekey2' + const rateLimiterMemory = new RateLimiter({ points: 2, duration: 5 }) + rateLimiterMemory.delete(testKey) + + expect(rateLimiterMemory.get(testKey)).to.be.undefined() + }) + + it('consume applies options.customDuration to set expire', async () => { + const testKey = 'options.customDuration' + const rateLimiterMemory = new RateLimiter({ points: 1, duration: 5 }) + + const res = await rateLimiterMemory.consume(testKey, 1, { customDuration: 1 }) + expect(res.msBeforeNext).to.be.lte(1000) + }) + + it('consume applies options.customDuration to set not expiring key', async () => { + const testKey = 'options.customDuration.forever' + const rateLimiterMemory = new RateLimiter({ points: 1, duration: 5 }) + + const res = await rateLimiterMemory.consume(testKey, 1, { customDuration: 0 }) + expect(res).to.have.property('msBeforeNext', -1) + }) + + it('penalty applies options.customDuration to set expire', () => { + const testKey = 'options.customDuration' + const rateLimiterMemory = new RateLimiter({ points: 1, duration: 5 }) + + const res = rateLimiterMemory.penalty(testKey, 1, { customDuration: 1 }) + expect(res).to.have.property('msBeforeNext').that.is.lte(1000) + }) + + it('reward applies options.customDuration to set expire', () => { + const testKey = 'options.customDuration' + const rateLimiterMemory = new RateLimiter({ points: 1, duration: 5 }) + + const res = rateLimiterMemory.reward(testKey, 1, { customDuration: 1 }) + expect(res).to.have.property('msBeforeNext').that.is.lte(1000) + }) + + it('does not expire key if duration set to 0', async () => { + const testKey = 'neverexpire' + const rateLimiterMemory = new RateLimiter({ points: 2, duration: 0 }) + await rateLimiterMemory.consume(testKey, 1) + await rateLimiterMemory.consume(testKey, 1) + + const res = rateLimiterMemory.get(testKey) + expect(res).to.have.property('consumedPoints', 2) + expect(res).to.have.property('msBeforeNext', -1) + }) + + it('block key forever, if secDuration is 0', async () => { + const testKey = 'neverexpire' + const rateLimiter = new RateLimiter({ points: 1, duration: 1 }) + rateLimiter.block(testKey, 0) + + await delay(1000) + + const res = rateLimiter.get(testKey) + expect(res).to.have.property('consumedPoints', 2) + expect(res).to.have.property('msBeforeNext', -1) + }) + + it('set points by key', () => { + const testKey = 'set' + const rateLimiter = new RateLimiter({ points: 10, duration: 1 }) + rateLimiter.set(testKey, 12) + + const res = rateLimiter.get(testKey) + expect(res).to.have.property('consumedPoints', 12) + expect(res).to.have.property('remainingPoints', 0) + }) + + it('set points by key forever', async () => { + const testKey = 'setforever' + const rateLimiter = new RateLimiter({ points: 10, duration: 1 }) + rateLimiter.set(testKey, 12, 0) + + await delay(1100) + + const res = rateLimiter.get(testKey) + expect(res).to.have.property('consumedPoints', 12) + expect(res).to.have.property('msBeforeNext', -1) + }) + + it('consume should start new time window if previous already expired (msBeforeNext is negative)', async () => { + const keyPrefix = 'test' + const testKey = 'consume-negative-before-next' + const rateLimiterMemory = new RateLimiter({ points: 2, duration: 5, keyPrefix }) + await rateLimiterMemory.consume(testKey) + + const rec = rateLimiterMemory.memoryStorage.storage.get(`${keyPrefix}:${testKey}`) + expect(rec).to.be.ok() + + if (rec == null) { + throw new Error('No record for key') + } + + rec.expiresAt = new Date(Date.now() - 1000) + + const res = await rateLimiterMemory.consume(testKey) + expect(res).to.have.property('consumedPoints', 1) + expect(res).to.have.property('remainingPoints', 1) + expect(res).to.have.property('msBeforeNext', 5000) + }) +}) diff --git a/packages/utils/typedoc.json b/packages/utils/typedoc.json index 5b7c9f2d16..dfc56cb77f 100644 --- a/packages/utils/typedoc.json +++ b/packages/utils/typedoc.json @@ -11,8 +11,9 @@ "./src/multiaddr/is-private.ts", "./src/peer-queue.ts", "./src/queue/index.ts", + "./src/rate-limiter.ts", "./src/stream-to-ma-conn.ts", "./src/tracked-list.ts", - "./src/tracked-map.ts", + "./src/tracked-map.ts" ] }