Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
### Main (unreleased)

- [367](https://github.com/Shopify/job-iteration/pull/367) - Iteration can use multiple Active Job backends simultaneously by inferring the interruption adapter from the job's `queue_adapter_name`. Iteration will now raise `JobIteration::Integrations::LoadError` if no interruption adapter is found for the job's queue adapter, instead of never interrupting the job. `JobIteration.interruption_adapter` and `.load_integrations` have been removed. `JobIteration::Integrations.register` has been added.

## v1.4.1 (Sep 5, 2023)

### Bug fixes

- [427](https://github.com/Shopify/job-iteration/pull/427) - Use the Rails application logger. Changes from [338](https://github.com/Shopify/job-iteration/pull/338) resulted in logging to the original value of ActiveJob.logger, not the one configured by the Rails application.

## v1.4.0 (Aug 23, 2023)

### Changes
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ end
```

Iteration hooks into Sidekiq and Resque out of the box to support graceful interruption. No extra configuration is required.
Adapters for other Active Job backends can be registered with `JobIteration::Integrations.register("my_queue_adapter_name", object)`, where `object` must implement the `call` method returning `true` if the job must be interrupted and `false` otherwise.

## Guides

Expand Down Expand Up @@ -183,7 +184,7 @@ There a few configuration assumptions that are required for Iteration to work wi

**Why can't I just iterate in `#perform` method and do whatever I want?** You can, but then your job has to comply with a long list of requirements, such as the ones above. This creates leaky abstractions more easily, when instead we can expose a more powerful abstraction for developers--without exposing the underlying infrastructure.

**What happens when my job is interrupted?** A checkpoint will be persisted to Redis after the current `each_iteration`, and the job will be re-enqueued. Once it's popped off the queue, the worker will work off from the next iteration.
**What happens when my job is interrupted?** A checkpoint will be persisted after the current `each_iteration`, and the job will be re-enqueued. Once it's popped off the queue, the worker will work off from the next iteration.

**What happens with retries?** An interruption of a job does not count as a retry. The iteration of job that caused the job to fail will be retried and progress will continue from there on.

Expand Down
35 changes: 1 addition & 34 deletions lib/job-iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

require "active_job"
require_relative "./job-iteration/version"
require_relative "./job-iteration/integrations"
require_relative "./job-iteration/enumerator_builder"
require_relative "./job-iteration/iteration"
require_relative "./job-iteration/log_subscriber"

module JobIteration
IntegrationLoadError = Class.new(StandardError)

INTEGRATIONS = [:resque, :sidekiq]

extend self

attr_writer :logger
Expand Down Expand Up @@ -49,11 +46,6 @@ def logger
# where the throttle backoff value will take precedence over this setting.
attr_accessor :default_retry_backoff

# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
attr_accessor :interruption_adapter

self.interruption_adapter = -> { false }

# Set if you want to use your own enumerator builder instead of default EnumeratorBuilder.
# @example
#
Expand All @@ -65,29 +57,4 @@ def logger
attr_accessor :enumerator_builder

self.enumerator_builder = JobIteration::EnumeratorBuilder

def load_integrations
loaded = nil
INTEGRATIONS.each do |integration|
load_integration(integration)
if loaded
raise IntegrationLoadError,
"#{loaded} integration has already been loaded, but #{integration} is also available. " \
"Iteration will only work with one integration."
end
loaded = integration
rescue LoadError
end
end

def load_integration(integration)
unless INTEGRATIONS.include?(integration)
raise IntegrationLoadError,
"#{integration} integration is not supported. Available integrations: #{INTEGRATIONS.join(", ")}"
end

require_relative "./job-iteration/integrations/#{integration}"
end
end

JobIteration.load_integrations unless ENV["ITERATION_DISABLE_AUTOCONFIGURE"]
8 changes: 7 additions & 1 deletion lib/job-iteration/active_record_enumerator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def records
def batches
cursor = finder_cursor
Enumerator.new(method(:size)) do |yielder|
while (records = cursor.next_batch(@batch_size))
while (records = instrument_next_batch(cursor))
yielder.yield(records, cursor_value(records.last)) if records.any?
end
end
Expand All @@ -43,6 +43,12 @@ def size

private

def instrument_next_batch(cursor)
ActiveSupport::Notifications.instrument("active_record_cursor.iteration") do
cursor.next_batch(@batch_size)
end
end

def cursor_value(record)
positions = @columns.map do |column|
attribute_name = column.to_s.split(".").last
Expand Down
37 changes: 37 additions & 0 deletions lib/job-iteration/integrations.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

module JobIteration
# @api private
module Integrations
LoadError = Class.new(StandardError)

extend self

attr_accessor :registered_integrations

self.registered_integrations = {}

autoload :Sidekiq, "job-iteration/integrations/sidekiq"
autoload :Resque, "job-iteration/integrations/resque"

# @api public
def register(name, callable)
raise ArgumentError, "Interruption adapter must respond to #call" unless callable.respond_to?(:call)

registered_integrations[name] = callable
end

def load(name)
if (callable = registered_integrations[name])
callable
else
begin
klass = "#{self}::#{name.camelize}".constantize
register(name, klass)
rescue NameError
raise LoadError, "Could not find integration for '#{name}'"
end
end
end
end
end
30 changes: 18 additions & 12 deletions lib/job-iteration/integrations/resque.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,27 @@

module JobIteration
module Integrations
module ResqueIterationExtension # @private
def initialize(*) # @private
$resque_worker = self
super
module Resque
module IterationExtension
def initialize(*)
$resque_worker = self
super
end
end
end

# @private
module ::Resque
class Worker
# The patch is required in order to call shutdown? on a Resque::Worker instance
prepend(ResqueIterationExtension)
# @private
module ::Resque
class Worker
# The patch is required in order to call shutdown? on a Resque::Worker instance
prepend(IterationExtension)
end
end
end

JobIteration.interruption_adapter = -> { $resque_worker.try!(:shutdown?) }
class << self
def call
$resque_worker.try!(:shutdown?)
end
end
end
end
end
18 changes: 10 additions & 8 deletions lib/job-iteration/integrations/sidekiq.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# frozen_string_literal: true

require "sidekiq"

module JobIteration
module Integrations # @private
JobIteration.interruption_adapter = -> do
if defined?(Sidekiq::CLI) && Sidekiq::CLI.instance
Sidekiq::CLI.instance.launcher.stopping?
else
false
module Integrations
module Sidekiq
class << self
def call
if defined?(::Sidekiq::CLI) && (instance = ::Sidekiq::CLI.instance)
instance.launcher.stopping?
else
false
end
end
end
end
end
Expand Down
6 changes: 5 additions & 1 deletion lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ def retry_job(*, **)

private

def interruption_adapter
@interruption_adapter ||= JobIteration::Integrations.load(self.class.queue_adapter_name)
end

def enumerator_builder
JobIteration.enumerator_builder.new(self)
end
Expand Down Expand Up @@ -295,7 +299,7 @@ def job_should_exit?
return true
end

JobIteration.interruption_adapter.call || (defined?(super) && super)
interruption_adapter.call || (defined?(super) && super)
end

def handle_completed(completed)
Expand Down
4 changes: 2 additions & 2 deletions lib/job-iteration/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def call
# MyJob.perform_now
# end
def iterate_exact_times(n_times)
JobIteration.stubs(:interruption_adapter).returns(StoppingSupervisor.new(n_times.size))
JobIteration::Integrations.stubs(:load).returns(StoppingSupervisor.new(n_times.size))
end

# Stubs interruption adapter to interrupt the job after every sing iteration.
Expand All @@ -47,7 +47,7 @@ def mark_job_worker_as_interrupted
def stub_shutdown_adapter_to_return(value)
adapter = mock
adapter.stubs(:call).returns(value)
JobIteration.stubs(:interruption_adapter).returns(adapter)
JobIteration::Integrations.stubs(:load).returns(adapter)
end
end
end
77 changes: 37 additions & 40 deletions test/integration/integrations_test.rb
Original file line number Diff line number Diff line change
@@ -1,55 +1,52 @@
# frozen_string_literal: true

require "test_helper"
require "open3"

class IntegrationsTest < ActiveSupport::TestCase
test "will prevent loading two integrations" do
with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do
rubby = <<~RUBBY
require 'bundler/setup'
require 'job-iteration'
RUBBY
_stdout, stderr, status = run_ruby(rubby)

assert_equal false, status.success?
assert_match(/resque integration has already been loaded, but sidekiq is also available/, stderr)

class IntegrationsTest < IterationUnitTest
class IterationJob < ActiveJob::Base
include JobIteration::Iteration

def build_enumerator(cursor:)
enumerator_builder.build_once_enumerator(cursor: cursor)
end
end

test "successfully loads one (resque) integration" do
with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do
rubby = <<~RUBBY
require 'bundler/setup'
# Remove sidekiq, only resque will be left
$LOAD_PATH.delete_if { |p| p =~ /sidekiq/ }
require 'job-iteration'
RUBBY
_stdout, _stderr, status = run_ruby(rubby)

assert_equal true, status.success?
def each_iteration(*)
end
end

private
class ResqueJob < IterationJob
self.queue_adapter = :resque
end

class SidekiqJob < IterationJob
self.queue_adapter = :sidekiq
end

test "loads multiple integrations" do
resque_job = ResqueJob.new.serialize
ActiveJob::Base.execute(resque_job)

sidekiq_job = SidekiqJob.new.serialize
ActiveJob::Base.execute(sidekiq_job)
end

test ".register accepts an object does implementing #call" do
JobIteration::Integrations.register(:registration_test, -> { true })

def run_ruby(body)
stdout, stderr, status = nil
Tempfile.open do |f|
f.write(body)
f.close
assert(JobIteration::Integrations.registered_integrations[:registration_test].call)
end

command = "ruby #{f.path}"
stdout, stderr, status = Open3.capture3(command)
test ".register raises when the callable object does not implement #call" do
error = assert_raises(ArgumentError) do
JobIteration::Integrations.register("foo", "bar")
end
[stdout, stderr, status]
assert_equal("Interruption adapter must respond to #call", error.message)
end

def with_env(variable, value)
original = ENV[variable]
ENV[variable] = value
yield
ensure
ENV[variable] = original
test "raises for unknown Active Job queue adapter names" do
error = assert_raises(JobIteration::Integrations::LoadError) do
JobIteration::Integrations.load("unknown")
end
assert_equal("Could not find integration for 'unknown'", error.message)
end
end
1 change: 0 additions & 1 deletion test/support/sidekiq/init.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# frozen_string_literal: true

require "job-iteration"
require "job-iteration/integrations/sidekiq"

require "active_job"
require "i18n"
Expand Down
3 changes: 1 addition & 2 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "minitest/autorun"

ENV["ITERATION_DISABLE_AUTOCONFIGURE"] = "true"

require "job-iteration"
require "job-iteration/test_helper"

Expand Down Expand Up @@ -40,6 +38,7 @@ def enqueue_at(job, timestamp)
end

ActiveJob::Base.queue_adapter = :iteration_test
JobIteration::Integrations.register("iteration_test", -> { false })

class Product < ActiveRecord::Base
has_many :comments
Expand Down
10 changes: 0 additions & 10 deletions test/unit/active_job_iteration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -724,16 +724,6 @@ def test_on_shutdown_called_before_reenqueue
assert_jobs_in_queue(1)
end

def test_mark_job_worker_as_interrupted
mark_job_worker_as_interrupted

assert_equal(true, JobIteration.interruption_adapter.call)

continue_iterating

assert_equal(false, JobIteration.interruption_adapter.call)
end

def test_reenqueue_self
iterate_exact_times(2.times)

Expand Down
Loading