Skip to content

Commit

Permalink
add worker_ids, and have workers yield self to hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
Th3-M4jor committed Feb 15, 2025
1 parent 9cd6bc3 commit 2a25d7f
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 22 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ And into two different points in the worker's, dispatcher's and scheduler's life
- `(worker|dispatcher|scheduler)_start`: after the worker/dispatcher/scheduler has finished booting and right before it starts the polling loop or loading the recurring schedule.
- `(worker|dispatcher|scheduler)_stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown (which is just `exit!`).

The hooks for workers will have the worker instance yielded to the block so that you may read its configuration
for logging or other metrics reporting purposes.

You can use the following methods with a block to do this:
```ruby
SolidQueue.on_start
Expand All @@ -398,6 +401,14 @@ For example:
```ruby
SolidQueue.on_start { start_metrics_server }
SolidQueue.on_stop { stop_metrics_server }
SolidQueue.on_worker_start do |worker|
Rails.logger.info "Worker #{worker.worker_id} started with queues: #{worker.queues.join(',')}"
end
SolidQueue.on_worker_stop do |worker|
Rails.logger.info "Worker #{worker.worker_id} stopped with queues: #{worker.queues.join(',')}"
end
```

These can be called several times to add multiple hooks, but it needs to happen before Solid Queue is started. An initializer would be a good place to do this.
Expand Down
6 changes: 3 additions & 3 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ module SolidQueue

[ Dispatcher, Scheduler, Worker ].each do |process|
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
process.on_start { block.call }
process.on_start(&block)
end

define_singleton_method(:"on_#{process.name.demodulize.downcase}_stop") do |&block|
process.on_stop { block.call }
process.on_stop(&block)
end

define_singleton_method(:"on_#{process.name.demodulize.downcase}_exit") do |&block|
process.on_exit { block.call }
process.on_exit(&block)
end
end

Expand Down
27 changes: 21 additions & 6 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@ def instantiate

def initialize(**options)
@options = options.with_defaults(default_options)
@worker_id_counter = 0
end

def configured_processes
if only_work? then workers
else
dispatchers + workers + schedulers
@configured_processes ||= begin
if only_work? then workers
else
dispatchers + workers + schedulers
end
end
end

Expand Down Expand Up @@ -109,12 +112,24 @@ def skip_recurring_tasks?
end

def workers
workers_options.flat_map do |worker_options|
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) }
workers_options.flat_map { |worker_options| generate_workers(worker_options) }
end

def generate_workers(worker_options)
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
processes.times.map do
worker_id = next_worker_id
options_with_defaults = worker_options.with_defaults(WORKER_DEFAULTS).merge(worker_id:)
Process.new(:worker, options_with_defaults)
end
end

def next_worker_id
id = @worker_id_counter
@worker_id_counter += 1
id
end

def dispatchers
dispatchers_options.map do |dispatcher_options|
Process.new :dispatcher, dispatcher_options.with_defaults(DISPATCHER_DEFAULTS)
Expand Down
10 changes: 9 additions & 1 deletion lib/solid_queue/lifecycle_hooks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,18 @@ def run_exit_hooks

def run_hooks_for(event)
self.class.lifecycle_hooks.fetch(event, []).each do |block|
block.call
if yield_self_to_hooks?
block.call(self)
else
block.call
end
rescue Exception => exception
handle_thread_error(exception)
end
end

def yield_self_to_hooks?
false
end
end
end
15 changes: 12 additions & 3 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,24 @@ class Worker < Processes::Poller
before_shutdown :run_stop_hooks
after_shutdown :run_exit_hooks

attr_accessor :queues, :pool
attr_accessor :pool

attr_reader :worker_id, :queues

def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)

@queues = Array(options[:queues])
# Ensure that the queues array is deep frozen to prevent accidental modification
@queues = Array(options[:queues]).map(&:freeze).freeze

@pool = Pool.new(options[:threads], on_idle: -> { wake_up })
@worker_id = options[:worker_id]

super(**options)
end

def metadata
super.merge(queues: queues.join(","), thread_pool_size: pool.size)
super.merge(queues: queues.join(","), thread_pool_size: pool.size, worker_id:)
end

private
Expand Down Expand Up @@ -54,5 +59,9 @@ def all_work_completed?
def set_procline
procline "waiting for jobs in #{queues.join(",")}"
end

def yield_self_to_hooks?
true
end
end
end
2 changes: 1 addition & 1 deletion test/integration/jobs_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class JobsLifecycleTest < ActiveSupport::TestCase
setup do
@_on_thread_error = SolidQueue.on_thread_error
SolidQueue.on_thread_error = silent_on_thread_error_for([ ExpectedTestError, RaisingJob::DefaultError ], @_on_thread_error)
@worker = SolidQueue::Worker.new(queues: "background", threads: 3)
@worker = SolidQueue::Worker.new(queues: "background", threads: 3, worker_id: 1)
@dispatcher = SolidQueue::Dispatcher.new(batch_size: 10, polling_interval: 0.2)
end

Expand Down
31 changes: 23 additions & 8 deletions test/integration/lifecycle_hooks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,17 @@ class LifecycleHooksTest < ActiveSupport::TestCase
SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) }
SolidQueue.on_exit { JobResult.create!(status: :hook_called, value: :exit) }

SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) }
SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) }
SolidQueue.on_worker_exit { JobResult.create!(status: :hook_called, value: :worker_exit) }
SolidQueue.on_worker_start do |w|
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_#{w.worker_id}_start")
end

SolidQueue.on_worker_stop do |w|
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_#{w.worker_id}_stop")
end

SolidQueue.on_worker_exit do |w|
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_#{w.worker_id}_exit")
end

SolidQueue.on_dispatcher_start { JobResult.create!(status: :hook_called, value: :dispatcher_start) }
SolidQueue.on_dispatcher_stop { JobResult.create!(status: :hook_called, value: :dispatcher_stop) }
Expand All @@ -22,23 +30,30 @@ class LifecycleHooksTest < ActiveSupport::TestCase
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_stop) }
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_exit) }

pid = run_supervisor_as_fork(workers: [ { queues: "*" } ], dispatchers: [ { batch_size: 100 } ], skip_recurring: false)
wait_for_registered_processes(4)
pid = run_supervisor_as_fork(
workers: [ { queues: "first_queue" }, { queues: "second_queue", processes: 2 } ],
dispatchers: [ { batch_size: 100 } ],
skip_recurring: false
)

wait_for_registered_processes(6)

terminate_process(pid)
wait_for_registered_processes(0)


results = skip_active_record_query_cache do
job_results = JobResult.where(status: :hook_called)
assert_equal 12, job_results.count
assert_equal 18, job_results.count
job_results
end

assert_equal({ "hook_called" => 12 }, results.map(&:status).tally)
assert_equal({ "hook_called" => 18 }, results.map(&:status).tally)
assert_equal %w[
start stop exit
worker_start worker_stop worker_exit
worker_first_queue_0_start worker_first_queue_0_stop worker_first_queue_0_exit
worker_second_queue_1_start worker_second_queue_1_stop worker_second_queue_1_exit
worker_second_queue_2_start worker_second_queue_2_stop worker_second_queue_2_exit
dispatcher_start dispatcher_stop dispatcher_exit
scheduler_start scheduler_stop scheduler_exit
].sort, results.map(&:value).sort
Expand Down

0 comments on commit 2a25d7f

Please sign in to comment.