Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have all processes yield self to lifecycle hooks #516

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ And into two different points in the worker's, dispatcher's and scheduler's life
- `(worker|dispatcher|scheduler)_start`: after the worker/dispatcher/scheduler has finished booting and right before it starts the polling loop or loading the recurring schedule.
- `(worker|dispatcher|scheduler)_stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown (which is just `exit!`).

Each of these hooks has an instance of the supervisor/worker/dispatcher/scheduler yielded to the block so that you may read its configuration for logging or metrics reporting purposes.

You can use the following methods with a block to do this:
```ruby
SolidQueue.on_start
Expand All @@ -396,8 +398,20 @@ SolidQueue.on_scheduler_stop

For example:
```ruby
SolidQueue.on_start { start_metrics_server }
SolidQueue.on_stop { stop_metrics_server }
SolidQueue.on_start do |supervisor|
MyMetricsReporter.process_name = supervisor.name

start_metrics_server
end

SolidQueue.on_stop do |_supervisor|
stop_metrics_server
end

SolidQueue.on_worker_start do |worker|
MyMetricsReporter.process_name = worker.name
MyMetricsReporter.queues = worker.queues.join(',')
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is just because these are examples, but Solid Queue's log subscriber already logs all this when processes start and stop 🤔 I'm just not sure if it's ok to add examples that would be discouraged because you'd end up with duplicated/redundant logs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to instead set a couple values in a somewhat contrived MyMetricsReporter class, which hopefully does a better job of showing what someone might want to use it for.

```

These can be called several times to add multiple hooks, but it needs to happen before Solid Queue is started. An initializer would be a good place to do this.
Expand Down
6 changes: 3 additions & 3 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ module SolidQueue

[ Dispatcher, Scheduler, Worker ].each do |process|
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
process.on_start { block.call }
process.on_start(&block)
end

define_singleton_method(:"on_#{process.name.demodulize.downcase}_stop") do |&block|
process.on_stop { block.call }
process.on_stop(&block)
end

define_singleton_method(:"on_#{process.name.demodulize.downcase}_exit") do |&block|
process.on_exit { block.call }
process.on_exit(&block)
end
end

Expand Down
4 changes: 3 additions & 1 deletion lib/solid_queue/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module SolidQueue
class Dispatcher < Processes::Poller
include LifecycleHooks
attr_accessor :batch_size, :concurrency_maintenance
attr_reader :batch_size

after_boot :run_start_hooks
after_boot :start_concurrency_maintenance
Expand All @@ -26,6 +26,8 @@ def metadata
end

private
attr_reader :concurrency_maintenance

def poll
batch = dispatch_next_batch

Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/lifecycle_hooks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def run_exit_hooks

def run_hooks_for(event)
self.class.lifecycle_hooks.fetch(event, []).each do |block|
block.call
block.call(self)
rescue Exception => exception
handle_thread_error(exception)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class Scheduler < Processes::Base
include Processes::Runnable
include LifecycleHooks

attr_accessor :recurring_schedule
attr_reader :recurring_schedule

after_boot :run_start_hooks
after_boot :schedule_recurring_tasks
Expand Down
6 changes: 4 additions & 2 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ class Worker < Processes::Poller
before_shutdown :run_stop_hooks
after_shutdown :run_exit_hooks

attr_accessor :queues, :pool
attr_reader :queues, :pool

def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)

@queues = Array(options[:queues])
# Ensure that the queues array is deep frozen to prevent accidental modification
@queues = Array(options[:queues]).map(&:freeze).freeze

@pool = Pool.new(options[:threads], on_idle: -> { wake_up })

super(**options)
Expand Down
78 changes: 58 additions & 20 deletions test/integration/lifecycle_hooks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,79 @@ class LifecycleHooksTest < ActiveSupport::TestCase
self.use_transactional_tests = false

test "run lifecycle hooks" do
SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) }
SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) }
SolidQueue.on_exit { JobResult.create!(status: :hook_called, value: :exit) }
SolidQueue.on_start do |s|
JobResult.create!(status: :hook_called, value: "#{s.class.name.demodulize}_start")
end

SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) }
SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) }
SolidQueue.on_worker_exit { JobResult.create!(status: :hook_called, value: :worker_exit) }
SolidQueue.on_stop do |s|
JobResult.create!(status: :hook_called, value: "#{s.class.name.demodulize}_stop")
end

SolidQueue.on_dispatcher_start { JobResult.create!(status: :hook_called, value: :dispatcher_start) }
SolidQueue.on_dispatcher_stop { JobResult.create!(status: :hook_called, value: :dispatcher_stop) }
SolidQueue.on_dispatcher_exit { JobResult.create!(status: :hook_called, value: :dispatcher_exit) }
SolidQueue.on_exit do |s|
JobResult.create!(status: :hook_called, value: "#{s.class.name.demodulize}_exit")
end

SolidQueue.on_scheduler_start { JobResult.create!(status: :hook_called, value: :scheduler_start) }
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_stop) }
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_exit) }
SolidQueue.on_worker_start do |w|
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_start")
end

pid = run_supervisor_as_fork(workers: [ { queues: "*" } ], dispatchers: [ { batch_size: 100 } ], skip_recurring: false)
wait_for_registered_processes(4)
SolidQueue.on_worker_stop do |w|
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_stop")
end

SolidQueue.on_worker_exit do |w|
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_exit")
end

SolidQueue.on_dispatcher_start do |d|
JobResult.create!(status: :hook_called, value: "dispatcher_#{d.batch_size}_start")
end

SolidQueue.on_dispatcher_stop do |d|
JobResult.create!(status: :hook_called, value: "dispatcher_#{d.batch_size}_stop")
end

SolidQueue.on_dispatcher_exit do |d|
JobResult.create!(status: :hook_called, value: "dispatcher_#{d.batch_size}_exit")
end

SolidQueue.on_scheduler_start do |s|
JobResult.create!(status: :hook_called, value: "#{s.class.name.demodulize}_start")
end

SolidQueue.on_scheduler_stop do |s|
JobResult.create!(status: :hook_called, value: "#{s.class.name.demodulize}_stop")
end

SolidQueue.on_scheduler_exit do |s|
JobResult.create!(status: :hook_called, value: "#{s.class.name.demodulize}_exit")
end

pid = run_supervisor_as_fork(
workers: [ { queues: "first_queue" }, { queues: "second_queue", processes: 1 } ],
dispatchers: [ { batch_size: 100 } ],
skip_recurring: false
)

wait_for_registered_processes(5)

terminate_process(pid)
wait_for_registered_processes(0)


results = skip_active_record_query_cache do
job_results = JobResult.where(status: :hook_called)
assert_equal 12, job_results.count
assert_equal 15, job_results.count
job_results
end

assert_equal({ "hook_called" => 12 }, results.map(&:status).tally)
assert_equal({ "hook_called" => 15 }, results.map(&:status).tally)
assert_equal %w[
start stop exit
worker_start worker_stop worker_exit
dispatcher_start dispatcher_stop dispatcher_exit
scheduler_start scheduler_stop scheduler_exit
Supervisor_start Supervisor_stop Supervisor_exit
worker_first_queue_start worker_first_queue_stop worker_first_queue_exit
worker_second_queue_start worker_second_queue_stop worker_second_queue_exit
dispatcher_100_start dispatcher_100_stop dispatcher_100_exit
Scheduler_start Scheduler_stop Scheduler_exit
].sort, results.map(&:value).sort
ensure
SolidQueue::Supervisor.clear_hooks
Expand Down
Loading