Skip to content

Commit 4c6654a

Browse files
committed
Introduce ReplicationCoordinator to support multiple AZs
It's common for applications that are deployed across multiple availability zones (using a replicated database) to create an ad-hoc method for indicating which zone is "active", meaning the zone primarily responsible for writing to the database. For example, a team may choose to use a MySQL system variable to indicate the data center where the primary database sits. In which case, they need to write code to make sure all Rails processes in all zones query this efficiently (it may be slow to access in non-primary zones) and are notified if the primary zone changes, as in the case of a data center failover. ReplicationCoordinator::Base is introduced to allow developers to write code that determines whether a process is in an active zone, and then: - monitor and cache that value, with configurable polling interval - fire callbacks when the state changes from active -> passive or vice versa
1 parent c44bde6 commit 4c6654a

File tree

4 files changed

+416
-0
lines changed

4 files changed

+416
-0
lines changed

activesupport/lib/active_support.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ module ActiveSupport
5454
autoload :IsolatedExecutionState
5555
autoload :Notifications
5656
autoload :Reloader
57+
autoload :ReplicationCoordinator
5758
autoload :SecureCompareRotator
5859

5960
eager_autoload do
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
# frozen_string_literal: true
2+
3+
module ActiveSupport
4+
# = ActiveSupport::ReplicationCoordinator
5+
#
6+
# The \ReplicationCoordinator module supports applications that run in multiple availability zones.
7+
#
8+
# == Replication, Availability Zones, and Active-Passive State
9+
#
10+
# A common deployment topology for Rails applications is to have application servers running in
11+
# multiple availability zones, with a single database that is replicated across these zones.
12+
#
13+
# In such deployment, application code may need to determine whether it is running an "active"
14+
# zone and is responsible for writing to the database, or in a "passive" or "standby" zone that
15+
# primarily reads from the zone-local database replica. And, in case of a zone failure, the
16+
# application may need to be able to dynamically switch a passive zone to an active zone (or vice
17+
# versa).
18+
#
19+
# The term "Passive" here is intended to include deployments in which the non-active zones are
20+
# handling read requests, and potentially even performing occasional writes back to the active
21+
# zone over an inter-AZ network link. The exact interpretation depends on the nature of the
22+
# replication strategy and your deployment topology.
23+
#
24+
# Some example scenarios where knowing the replication state is important:
25+
#
26+
# - Custom database selector middleware
27+
# - Controlling background jobs that should only run in an active zone
28+
# - Deciding whether to preheat fragment caches for "next page" paginated results (which may not
29+
# be cached in time if relying on an inter-AZ network link and replication lag).
30+
#
31+
# The two classes in this module are:
32+
#
33+
# - ReplicationCoordinator::Base: An abstract base class that provides a monitoring
34+
# mechanism to fetch and cache the replication state on a configurable time interval and notify
35+
# when that state changes.
36+
# - ReplicationCoordinator::SingleZone: A concrete implementation that always
37+
# indicates an active zone, and so it represents the default behavior for a single-zone
38+
# deployment that does not use database replication.
39+
#
40+
module ReplicationCoordinator
41+
# = Replication Coordinator Abstract Base Class
42+
#
43+
# An abstract base class that provides a monitoring mechanism to fetch and cache the replication
44+
# state on a configurable time interval and notify when that state changes.
45+
#
46+
# Subclasses must only implement #fetch_active_zone, which returns a boolean indicating whether
47+
# the caller is in an active zone. This method may be expensive, so the class uses a
48+
# Concurrent::TimerTask to periodically check (and cache) this value. The current cached status
49+
# can cheaply be inspected with #active_zone?. The refresh interval can be set by passing a
50+
# +polling_interval+ option to the constructor.
51+
#
52+
# The timer task must be explicitly started by calling #start_monitoring. Once started,
53+
# registered callbacks are invoked when an active zone change is detected.
54+
#
55+
# == Basic usage
56+
#
57+
# class CustomReplicationCoordinator < ActiveSupport::ReplicationCoordinator::Base
58+
# def fetch_active_zone
59+
# # Custom logic to determine if the local zone is active
60+
# end
61+
# end
62+
#
63+
# coordinator = CustomReplicationCoordinator.new(polling_interval: 10.seconds)
64+
#
65+
# coordinator.active_zone? # Immediately returns the cached value
66+
#
67+
# coordinator.on_active_zone do |coordinator|
68+
# puts "This zone is now active"
69+
# # Start processes or threads that should only run in the active zone
70+
# end
71+
#
72+
# coordinator.on_passive_zone do |coordinator|
73+
# puts "This zone is now passive"
74+
# # Stop processes or threads that should only run in the active zone
75+
# end
76+
#
77+
# # Start a background thread to monitor the active zone status and invoke the callbacks on changes
78+
# coordinator.start_monitoring
79+
#
80+
# coordinator.updated_at # Returns the last time the active zone status was checked
81+
#
82+
# Subclasses must implement #fetch_active_zone
83+
class Base
84+
attr_reader :state_change_hooks, :polling_interval, :executor, :logger, :updated_at
85+
86+
# Initialize a new coordinator instance.
87+
#
88+
# [+polling_interval+] How often to refresh active zone status (default: 5 seconds)
89+
def initialize(polling_interval: 5, executor: ActiveSupport::Executor, logger: nil)
90+
@state_change_hooks = { active: [], passive: [] }
91+
@polling_interval = polling_interval
92+
@executor = executor
93+
@logger = logger || (defined?(Rails.logger) && Rails.logger)
94+
95+
@last_active_zone = nil
96+
@updated_at = nil
97+
@active_zone_watcher = nil
98+
99+
check_active_zone
100+
end
101+
102+
# Determine if the local zone is active.
103+
#
104+
# This method must be implemented by subclasses to define the logic for determining if the
105+
# local zone is active. The return value is used to trigger state change hooks when the active
106+
# zone changes.
107+
#
108+
# It's assumed that this method may be slow, so ReplicationCoordinator has a background thread
109+
# that calls this method every +polling_interval+ seconds, and caches the result which is
110+
# returned by #active_zone?
111+
#
112+
# Returns +true+ if the local zone is active, +false+ otherwise.
113+
def fetch_active_zone
114+
raise NotImplementedError
115+
end
116+
117+
# Returns +true+ if the local zone is active, +false+ otherwise.
118+
#
119+
# This always returns a cached value.
120+
def active_zone?
121+
@last_active_zone
122+
end
123+
124+
# Start monitoring for active zone changes.
125+
#
126+
# This starts a Concurrent::TimerTask to periodically refresh the active zone status. If a
127+
# change is detected, then the appropriate state change callbacks will be invoked.
128+
def start_monitoring
129+
active_zone_watcher.execute
130+
end
131+
132+
# Stop monitoring for active zone changes.
133+
#
134+
# This stops the Concurrent::TimerTask, if it is running.
135+
def stop_monitoring
136+
@active_zone_watcher&.shutdown
137+
end
138+
139+
# Register a callback to be executed when the local zone becomes active.
140+
#
141+
# The callback will be immediately executed if this zone is currently active.
142+
#
143+
# [+block+] callback to execute when zone becomes active
144+
#
145+
# Yields the coordinator instance to the block.
146+
def on_active_zone(&block)
147+
state_change_hooks[:active] << block
148+
block.call(self) if active_zone?
149+
end
150+
151+
# Register a callback to be executed when the local zone becomes passive.
152+
#
153+
# The callback will be immediately executed if this zone is not currently active.
154+
#
155+
# [+block+] callback to execute when zone becomes passive
156+
#
157+
# Yields the coordinator instance to the block.
158+
def on_passive_zone(&block)
159+
state_change_hooks[:passive] << block
160+
block.call(self) if !active_zone?
161+
end
162+
163+
# Clear all registered state_change hooks.
164+
def clear_hooks
165+
state_change_hooks[:active] = []
166+
state_change_hooks[:passive] = []
167+
end
168+
169+
private
170+
def active_zone_watcher
171+
@active_zone_watcher ||= begin
172+
task = Concurrent::TimerTask.new(execution_interval: polling_interval) do
173+
check_active_zone
174+
end
175+
176+
task.add_observer do |_, _, error|
177+
if error
178+
executor.error_reporter&.report(error, handled: false, source: "replication_coordinator.active_support")
179+
logger&.error("#{error.detailed_message}: could not check #{self.class} active zone")
180+
end
181+
end
182+
183+
task
184+
end
185+
end
186+
187+
def check_active_zone
188+
new_active_zone = executor_wrap { fetch_active_zone }
189+
@updated_at = Time.now
190+
191+
if @last_active_zone.nil? || new_active_zone != @last_active_zone
192+
@last_active_zone = new_active_zone
193+
194+
if @last_active_zone
195+
logger&.info "#{self.class}: pid #{$$}: switching to active"
196+
run_active_zone_hooks
197+
else
198+
logger&.info "#{self.class}: pid #{$$}: switching to passive"
199+
run_passive_zone_hooks
200+
end
201+
end
202+
end
203+
204+
def run_active_zone_hooks
205+
run_hooks_for(:active)
206+
end
207+
208+
def run_passive_zone_hooks
209+
run_hooks_for(:passive)
210+
end
211+
212+
def run_hooks_for(event)
213+
state_change_hooks.fetch(event, []).each do |block|
214+
block.call(self)
215+
rescue Exception => exception
216+
handle_thread_error(exception)
217+
end
218+
end
219+
220+
def executor_wrap(&block)
221+
if @executor
222+
@executor.wrap(&block)
223+
else
224+
yield
225+
end
226+
end
227+
end
228+
229+
# = "Single Zone" Replication Coordinator
230+
#
231+
# A concrete implementation that always indicates an active zone, and so it represents the
232+
# default behavior for a single-zone deployment that does not use database replication.
233+
#
234+
# This is a simple implementation that always returns +true+ from #active_zone?
235+
#
236+
# == Basic usage
237+
#
238+
# cluster = ActiveSupport::ReplicationCoordinator::SingleZone.new
239+
# cluster.active_zone? #=> true
240+
# cluster.on_active_zone { puts "Will always be called" }
241+
# cluster.on_passize_zone { puts "Will never be called" }
242+
# cluster.start_monitoring # Does nothing, since there is no monitoring needed.
243+
class SingleZone < Base
244+
# Always returns true, indicating this zone is active.
245+
#
246+
# Returns true.
247+
def fetch_active_zone
248+
true
249+
end
250+
251+
def start_monitoring
252+
# No-op implementation since no monitoring is needed.
253+
end
254+
end
255+
end
256+
end
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# frozen_string_literal: true
2+
3+
module ActiveSupport
4+
module Testing
5+
# ReplicationCoordinator is a concrete implementation of a
6+
# ActiveSupport::ReplicationCoordinator::Base that can be used to test the behavior of objects
7+
# that depend on replication state.
8+
class ReplicationCoordinator < ActiveSupport::ReplicationCoordinator::Base
9+
attr_reader :fetch_count
10+
11+
# Initializes the replication coordinator with an initial active zone state.
12+
#
13+
# The replication coordinator can be initialized with an initial active zone state using the
14+
# optional +active_zone+ parameter, which defaults to +true+.
15+
def initialize(active_zone = true, **options)
16+
@next_active_zone = active_zone
17+
@fetch_count = 0
18+
super(**options)
19+
end
20+
21+
# Sets the value that will next be returned by #fetch_active_zone, simulating an external
22+
# replication state change.
23+
def set_next_active_zone(active_zone)
24+
@next_active_zone = active_zone
25+
end
26+
27+
def fetch_active_zone # :nodoc:
28+
@fetch_count += 1
29+
@next_active_zone
30+
end
31+
end
32+
end
33+
end

0 commit comments

Comments
 (0)