Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ build/

# unless supporting rvm < 1.11.0 or doing something fancy, ignore this:
.rvmrc

Gemfile.lock
66 changes: 0 additions & 66 deletions Gemfile.lock

This file was deleted.

28 changes: 25 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require 'resque/throttler'
Or in a Gemfile:

```ruby
gem 'resque-throttler', :require => 'resque/throttler'
gem 'resque-throttler', require: 'resque/throttler'
```

Usage
Expand All @@ -27,8 +27,30 @@ Usage
require 'resque'
require 'resque/throttler'

# Rate limit at 10 jobs from `my_queue` per minute
Resque.rate_limit(:my_queue, :at => 10, :per => 60)
# Rate limit at 10 jobs from `my_queue` per minute, ignores jobs that have taken more than an hour
Resque.rate_limit(:my_queue, at: 10, per: 60, job_timeout: 3600)
```

Centralized Garbage Collection
-----

With many resque proceses and large rate limits the amount of work maintaining the rate limit data
in Redis can grow outside what should be performed by resque processes. In this case, a centralized
rake task should be used to prevent resque worker slowdown and Redis CPU pressure.

In your Rakefile, add:
```
require 'resque/throttler/tasks'
```

And execute the long-running rake task, similar to a resque worker:
```bash
bundle exec rake resque:rate_limit_gc
```

And in your initializer, shut off the inline gc
```ruby
Resque.perform_inline_rate_limit_gc = false
```

Similar Resque Plugins
Expand Down
77 changes: 56 additions & 21 deletions lib/resque/throttler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,75 +4,110 @@
module Resque::Plugins
module Throttler
extend self


attr_writer :perform_inline_rate_limit_gc

def self.extended(other)
other.instance_variable_set(:@rate_limits, {})
end


def perform_inline_rate_limit_gc
if defined?(@perform_inline_rate_limit_gc)
@perform_inline_rate_limit_gc
else
true
end
end

def pop(queue)
if queue_at_or_over_rate_limit?(queue)
gc_rate_limit_data_for_queue(queue)
gc_rate_limit_data_for_queue(queue) if perform_inline_rate_limit_gc
nil
else
super
end
end

def rate_limit(queue, options={})
if options.keys.sort != [:at, :per]
raise ArgumentError.new("Mising either :at or :per in options")
if [:at, :per].any? { |key| !options.keys.include? key }
raise ArgumentError.new("Mising either :at or :per in options")
elsif !(options.keys - [:at, :per, :job_timeout]).empty?
raise ArgumentError.new("Unknown rate limit keys #{(options.keys - [:at, :per, :job_timeout]).join(', ')}")
end
Comment thread
danielevans marked this conversation as resolved.

@rate_limits[queue.to_s] = options
end

def rate_limit_for(queue)
@rate_limits[queue.to_s]
end

def queue_rate_limited?(queue)
!!@rate_limits[queue.to_s]
end


def rate_limited_queues
@rate_limits.keys
end

def queue_at_or_over_rate_limit?(queue)
if queue_rate_limited?(queue)
redis.scard("throttler:#{queue}_uuids") >= rate_limit_for(queue)[:at]
else
false
end
end

def gc_rate_limit_data_for_queue(queue)
return unless queue_rate_limited?(queue)

limit = rate_limit_for(queue)
queue_key = "throttler:#{queue}_uuids"
uuids = redis.smembers(queue_key)

uuids.each do |uuid|
job_ended_at = redis.hmget("throttler:jobs:#{uuid}", "ended_at")[0]
if job_ended_at && Time.at(job_ended_at.to_i) < Time.now - limit[:per]
redis.srem(queue_key, uuid)
redis.del("throttler:jobs:#{uuid}")
job_hash = redis.hgetall("throttler:jobs:#{uuid}")

job_started_at = job_hash['started_at']
job_ended_at = job_hash['ended_at']

# Happy Path
gc_job = job_ended_at && Time.at(job_ended_at.to_i) < Time.now - limit[:per]

if job_hash.empty?
warn "[resque-throttler] job in #{queue} queue detected with empty rate limit hash"
gc_job = true
end

if limit[:job_timeout] && job_started_at && Time.at(job_started_at.to_i) < Time.now - limit[:job_timeout]
warn "[resque-throttler] job in #{queue} queue exceeded job timeout"
gc_job = true
end

if gc_job
redis.multi do
redis.srem(queue_key, uuid)
redis.del("throttler:jobs:#{uuid}")
end
end
end
end

end
end

Resque.extend(Resque::Plugins::Throttler)

class Resque::Job

def initialize_with_throttler(queue, payload)
@throttled = Resque.queue_rate_limited?(queue)
@throttler_uuid = SecureRandom.uuid
initialize_without_throttler(queue, payload)
end
alias_method :initialize_without_throttler, :initialize
alias_method :initialize, :initialize_with_throttler

def perform_with_throttler
if @throttled
begin
Expand All @@ -90,7 +125,7 @@ def perform_with_throttler
end
alias_method :perform_without_throttler, :perform
alias_method :perform, :perform_with_throttler

# This is added for when there is a dirty exit
# TODO: testme
def fail_with_throttler(exception)
Expand All @@ -101,5 +136,5 @@ def fail_with_throttler(exception)
end
alias_method :fail_without_throttler, :fail
alias_method :fail, :fail_with_throttler

end
12 changes: 12 additions & 0 deletions lib/resque/throttler/tasks.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace :resque do
desc 'Performs garbage collection on all rate limited queues'
task rate_limit_gc: [ :preload, :setup ] do
loop do
Resque.rate_limited_queues.each do |queue|
Resque.gc_rate_limit_data_for_queue(queue)
end

sleep (ENV['RESQUE_RATE_LIMIT_GC_SLEEP'] || 0.25).to_f
end
end
end
16 changes: 8 additions & 8 deletions resque-throttler.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = "resque-throttler"
s.version = '0.1.5'
s.version = '0.2.0'
s.licenses = ['MIT']
s.authors = ["Jon Bracy"]
s.email = ["jonbracy@gmail.com"]
Expand All @@ -14,18 +14,18 @@ Gem::Specification.new do |s|
s.extensions = []
s.require_paths = ["lib"]
#s.extra_rdoc_files = ["LICENSE", "README.md"]
# Developoment

# Developoment
s.add_development_dependency 'rake'
#s.add_development_dependency 'rdoc'
#s.add_development_dependency 'sdoc'
s.add_development_dependency 'bundler'
s.add_development_dependency 'activesupport'
s.add_development_dependency 'minitest'
s.add_development_dependency 'minitest-reporters'
s.add_development_dependency 'mocha'
s.add_development_dependency 'minitest', '~> 5.4'
s.add_development_dependency 'minitest-reporters', '~> 1.0'
s.add_development_dependency 'mocha', '~> 1.1.0'
#s.add_development_dependency 'sdoc-templates-42floors'

# Runtime
s.add_runtime_dependency 'resque', '~> 1.25'
end
s.add_runtime_dependency 'resque'
end
Loading