Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable multisharding for solid queue #505

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,73 @@ my_periodic_resque_job:

and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time.

## Multisharding

If your application reaches a point where the pressure on the database used for jobs is such that you need to spread the load over multiple databases, then this section is for you.

You can extend the Solid Queue database configuration to use different shards:

```ruby
config.solid_queue.connects_to = {
shards: {
queue_shard_one: { writing: :queue_shard_one },
queue_shard_two: { writing: :queue_shard_two }
}
}
```

Queue database shards will need to have been defined in `config/database.yml` as shown in the installation section. Both shards need to share the same schema, and down the line share the same migration configuration:

```yaml
production:
primary:
<<: *default
database: storage/production.sqlite3
queue_shard_one:
<<: *default
database: storage/production_queue_shard_one.sqlite3
migrations_paths: db/queue_migrate
queue_shard_two:
<<: *default
database: storage/production_queue_shard_two.sqlite3
migrations_paths: db/queue_migrate
```

Simply converting a simpler database configuration such as `config.solid_queue.connects_to = { database: { writing: :queue } }` to `config.solid_queue.connects_to = { shards: { queue: { writing: :queue } } }` will not have any effects on the behavior of Solid Queue.

### Configuration

In `config/environments/production.rb` or for the environment you want to enable Solid Queue in, you can define the following options:

```ruby
config.solid_queue.primary_shard = :queue_shard_one # optional
config.solid_queue.active_shard = ENV["SOLID_QUEUE_ACTIVE_SHARD"]&.to_sym
config.solid_queue.shard_selection_lambda = ->(active_job:, active_jobs:) { nil }
```

- `config.solid_queue.primary_shard` is the shard that will be used to enqueue or schedule jobs without any specific adapter configuration. It defaults to the first shard found in `config.solid_queue.connects_to` (ActiveRecord default)
- `config.solid_queue.active_shard` is the shard that will be used by workers, dispatchers and schedulers to manage and process jobs. It defaults to the `primary_shard`.
With a basic Solid Queue configuration and the option described above you can start a worker and dispatcher working on the `queue_shard_two` shard by running `SOLID_QUEUE_ACTIVE_SHARD=queue_shard_two bin/jobs start`
- `config.solid_queue.shard_selection_lambda` helps you define a custom strategy to determine in which shard a job should be enqueued. It accepts keyword parameters `active_job` when a single job is enqueued or scheduled and `active_jobs` when jobs are bulk enqueued. If the lambda is defined but returns `nil`, Solid Queue will use the adapter defined for the job.

### Enqueueing jobs in different shards

Individual jobs can be assigned to shards by leveraging their `queue_adapter` property:

```ruby
class SomeJob < ApplicationJob
self.queue_adapter = ActiveJob::QueueAdapters::SolidQueueAdapter.new(db_shard: :queue_shard_two)
```

This job will be enqueued in the shard named `queue_shard_two`.

Alternatively you can define a lambda to implement a custom strategy for defining which shard a job will be enqueued to:

```ruby
config.solid_queue.shard_selection_lambda = ->(active_job:, active_jobs:) { SolidQueue.connects_to[:shards].keys.sample } # pick a shard randomly
```


## Inspiration

Solid Queue has been inspired by [resque](https://github.com/resque/resque) and [GoodJob](https://github.com/bensheldon/good_job). We recommend checking out these projects as they're great examples from which we've learnt a lot.
Expand Down
11 changes: 10 additions & 1 deletion app/models/solid_queue/record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@ module SolidQueue
class Record < ActiveRecord::Base
self.abstract_class = true

connects_to(**SolidQueue.connects_to) if SolidQueue.connects_to
def self.connects_to_and_set_active_shard
connects_to(**SolidQueue.connects_to)

if SolidQueue.connects_to.key?(:shards) &&
SolidQueue.connects_to[:shards].key?(SolidQueue.active_shard)
self.default_shard = SolidQueue.active_shard
end
end

connects_to_and_set_active_shard if SolidQueue.connects_to

def self.non_blocking_lock
if SolidQueue.use_skip_locked
Expand Down
27 changes: 24 additions & 3 deletions lib/active_job/queue_adapters/solid_queue_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,41 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :solid_queue
class SolidQueueAdapter
def initialize(db_shard: nil)
@db_shard = db_shard
end

def enqueue_after_transaction_commit?
true
end

def enqueue(active_job) # :nodoc:
SolidQueue::Job.enqueue(active_job)
select_shard(active_job:) { SolidQueue::Job.enqueue(active_job) }
end

def enqueue_at(active_job, timestamp) # :nodoc:
SolidQueue::Job.enqueue(active_job, scheduled_at: Time.at(timestamp))
select_shard(active_job:) do
SolidQueue::Job.enqueue(active_job, scheduled_at: Time.at(timestamp))
end
end

def enqueue_all(active_jobs) # :nodoc:
SolidQueue::Job.enqueue_all(active_jobs)
select_shard(active_jobs:) { SolidQueue::Job.enqueue_all(active_jobs) }
end

private

def select_shard(active_job: nil, active_jobs: nil, &block)
shard =
SolidQueue.shard_selection_lambda&.call(active_job:, active_jobs:) ||
@db_shard ||
SolidQueue.primary_shard

if shard
ActiveRecord::Base.connected_to(shard: shard) { block.call }
else
block.call
end
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ module SolidQueue
mattr_accessor :clear_finished_jobs_after, default: 1.day
mattr_accessor :default_concurrency_control_period, default: 3.minutes

mattr_accessor :primary_shard, :active_shard, :shard_selection_lambda

delegate :on_start, :on_stop, to: Supervisor

def on_worker_start(...)
Expand Down
10 changes: 10 additions & 0 deletions lib/solid_queue/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,15 @@ class Engine < ::Rails::Engine
SolidQueue::Processes::Base.include SolidQueue::Processes::OgInterruptible
end
end

initializer "solid_queue.shard_configuration" do
ActiveSupport.on_load(:solid_queue) do
# Record the name of the primary shard, which should be used for
# adapter less jobs
if SolidQueue.connects_to.key?(:shards) && SolidQueue.primary_shard.nil?
SolidQueue.primary_shard = SolidQueue.connects_to[:shards].keys.first
end
end
end
end
end
8 changes: 8 additions & 0 deletions test/dummy/app/jobs/shard_two_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class ShardTwoJob < ApplicationJob
self.queue_adapter = ActiveJob::QueueAdapters::SolidQueueAdapter.new(db_shard: :queue_shard_two)
queue_as :background

def perform(arg)
JobBuffer.add(arg)
end
end
8 changes: 8 additions & 0 deletions test/dummy/config/database.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ development:
<<: *default
database: <%= database_name_from("development_queue") %>
migrations_paths: db/queue_migrate
queue_shard_two:
<<: *default
database: <%= database_name_from("development_queue_shard_two") %>
migrations_paths: db/queue_migrate

test:
primary:
Expand All @@ -65,3 +69,7 @@ test:
<<: *default
database: <%= database_name_from("test_queue") %>
migrations_paths: db/queue_migrate
queue_shard_two:
<<: *default
database: <%= database_name_from("test_queue_shard_two") %>
migrations_paths: db/queue_migrate
7 changes: 6 additions & 1 deletion test/dummy/config/environments/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@

# Replace the default in-process and non-durable queuing backend for Active Job.
config.active_job.queue_adapter = :solid_queue
config.solid_queue.connects_to = { database: { writing: :queue } }
config.solid_queue.connects_to = {
shards: {
queue: { writing: :queue },
queue_shard_two: { writing: :queue_shard_two }
}
}

# Annotate rendered view with file names.
# config.action_view.annotate_rendered_view_with_filenames = true
Expand Down
141 changes: 141 additions & 0 deletions test/dummy/db/queue_shard_two_schema.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# This file is auto-generated from the current state of the database. Instead
# of editing this file, please use the migrations feature of Active Record to
# incrementally modify your database, and then regenerate this schema definition.
#
# This file is the source Rails uses to define your schema when running `bin/rails
# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to
# be faster and is potentially less error prone than running all of your
# migrations from scratch. Old migrations may fail to apply correctly if those
# migrations use external dependencies or application code.
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.1].define(version: 1) do
create_table "solid_queue_blocked_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "queue_name", null: false
t.integer "priority", default: 0, null: false
t.string "concurrency_key", null: false
t.datetime "expires_at", null: false
t.datetime "created_at", null: false
t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release"
t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance"
t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true
end

create_table "solid_queue_claimed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.bigint "job_id", null: false
t.bigint "process_id"
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true
t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id"
end

create_table "solid_queue_failed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.bigint "job_id", null: false
t.text "error"
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true
end

create_table "solid_queue_jobs", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.string "queue_name", null: false
t.string "class_name", null: false
t.text "arguments"
t.integer "priority", default: 0, null: false
t.string "active_job_id"
t.datetime "scheduled_at"
t.datetime "finished_at"
t.string "concurrency_key"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id"
t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name"
t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at"
t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering"
t.index ["scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting"
end

create_table "solid_queue_pauses", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.string "queue_name", null: false
t.datetime "created_at", null: false
t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true
end

create_table "solid_queue_processes", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.string "kind", null: false
t.datetime "last_heartbeat_at", null: false
t.bigint "supervisor_id"
t.integer "pid", null: false
t.string "hostname"
t.text "metadata"
t.datetime "created_at", null: false
t.string "name", null: false
t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at"
t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true
t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id"
end

create_table "solid_queue_ready_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "queue_name", null: false
t.integer "priority", default: 0, null: false
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_ready_executions_on_job_id", unique: true
t.index ["priority", "job_id"], name: "index_solid_queue_poll_all"
t.index ["queue_name", "priority", "job_id"], name: "index_solid_queue_poll_by_queue"
end

create_table "solid_queue_recurring_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "task_key", null: false
t.datetime "run_at", null: false
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_recurring_executions_on_job_id", unique: true
t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true
end

create_table "solid_queue_recurring_tasks", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.string "key", null: false
t.string "schedule", null: false
t.string "command", limit: 2048
t.string "class_name"
t.text "arguments"
t.string "queue_name"
t.integer "priority", default: 0
t.boolean "static", default: true, null: false
t.text "description"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true
t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static"
end

create_table "solid_queue_scheduled_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "queue_name", null: false
t.integer "priority", default: 0, null: false
t.datetime "scheduled_at", null: false
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true
t.index ["scheduled_at", "priority", "job_id"], name: "index_solid_queue_dispatch_all"
end

create_table "solid_queue_semaphores", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.string "key", null: false
t.integer "value", default: 1, null: false
t.datetime "expires_at", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at"
t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value"
t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true
end

add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
end
16 changes: 16 additions & 0 deletions test/integration/jobs_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ class JobsLifecycleTest < ActiveSupport::TestCase
assert_equal 2, SolidQueue::Job.finished.count
end

test "enqueue and run jobs from different shards" do
AddToBufferJob.perform_later "hey"
ShardTwoJob.perform_later "ho"

change_active_shard_to(:queue_shard_two) do
@dispatcher.start
@worker.start

wait_for_jobs_to_finish_for(2.seconds)
end

assert_equal [ "ho" ], JobBuffer.values.sort
assert_equal 1, SolidQueue::ReadyExecution.count
assert_equal 1, ActiveRecord::Base.connected_to(shard: :queue_shard_two) { SolidQueue::Job.finished.count }
end

test "enqueue and run jobs that fail without retries" do
RaisingJob.perform_later(ExpectedTestError, "A")
RaisingJob.perform_later(ExpectedTestError, "B")
Expand Down
2 changes: 1 addition & 1 deletion test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ExpectedTestError < RuntimeError; end


class ActiveSupport::TestCase
include ConfigurationTestHelper, ProcessesTestHelper, JobsTestHelper
include ConfigurationTestHelper, ProcessesTestHelper, JobsTestHelper, MultishardingTestHelper

setup do
@_on_thread_error = SolidQueue.on_thread_error
Expand Down
17 changes: 17 additions & 0 deletions test/test_helpers/multisharding_test_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module MultishardingTestHelper
private

def connected_to_shard_two(&block)
ActiveRecord::Base.connected_to(shard: :queue_shard_two) { block.call }
end

def change_active_shard_to(new_shard_name, &block)
old_shard_name = SolidQueue.active_shard
SolidQueue.active_shard = new_shard_name
SolidQueue::Record.connects_to_and_set_active_shard
block.call
ensure
SolidQueue.active_shard = old_shard_name
SolidQueue::Record.connects_to_and_set_active_shard
end
end
Loading