Skip to content

Commit 011ac51

Browse files
author
Petr Chalupa
committed
Merge pull request #71 from pitr-ch/actor
Support `send` and `send-off` methods for Agent
2 parents 49980ed + bf55719 commit 011ac51

File tree

5 files changed

+99
-47
lines changed

5 files changed

+99
-47
lines changed

lib/concurrent/actor/simple_actor_ref.rb

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,21 @@ class SimpleActorRef
99
include ActorRef
1010

1111
def initialize(actor, opts = {})
12-
@actor = actor
13-
@mutex = Mutex.new
14-
@executor = OneByOne.new OptionsParser::get_executor_from(opts)
15-
@stop_event = Event.new
16-
@reset_on_error = opts.fetch(:reset_on_error, true)
12+
@actor = actor
13+
@mutex = Mutex.new
14+
@one_by_one = OneByOne.new
15+
@executor = OptionsParser::get_executor_from(opts)
16+
@stop_event = Event.new
17+
@reset_on_error = opts.fetch(:reset_on_error, true)
1718
@exception_class = opts.fetch(:rescue_exception, false) ? Exception : StandardError
18-
@args = opts.fetch(:args, []) if @reset_on_error
19+
@args = opts.fetch(:args, []) if @reset_on_error
1920

2021
@actor.define_singleton_method(:shutdown, &method(:set_stop_event))
2122
@actor.on_start
2223
end
2324

2425
def running?
25-
! @stop_event.set?
26+
not @stop_event.set?
2627
end
2728

2829
def shutdown?
@@ -32,7 +33,7 @@ def shutdown?
3233
def post(*msg, &block)
3334
raise ArgumentError.new('message cannot be empty') if msg.empty?
3435
ivar = IVar.new
35-
@executor.post(Message.new(msg, ivar, block), &method(:process_message))
36+
@one_by_one.post(@executor, Message.new(msg, ivar, block), &method(:process_message))
3637
ivar
3738
end
3839

lib/concurrent/agent.rb

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class Agent
4040
# is given at initialization
4141
TIMEOUT = 5
4242

43-
attr_reader :timeout, :executor
43+
attr_reader :timeout, :task_executor, :operation_executor
4444

4545
# Initialize a new Agent with the given initial value and provided options.
4646
#
@@ -60,12 +60,14 @@ class Agent
6060
# @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and
6161
# returning the value returned from the proc
6262
def initialize(initial, opts = {})
63-
@value = initial
64-
@rescuers = []
65-
@validator = Proc.new { |result| true }
66-
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
67-
self.observers = CopyOnWriteObserverSet.new
68-
@executor = OneByOne.new OptionsParser::get_executor_from(opts)
63+
@value = initial
64+
@rescuers = []
65+
@validator = Proc.new { |result| true }
66+
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
67+
self.observers = CopyOnWriteObserverSet.new
68+
@one_by_one = OneByOne.new
69+
@task_executor = OptionsParser.get_task_executor_from(opts)
70+
@operation_executor = OptionsParser.get_operation_executor_from(opts)
6971
init_mutex
7072
set_deref_options(opts)
7173
end
@@ -122,27 +124,39 @@ def validate(&block)
122124
alias_method :validate_with, :validate
123125
alias_method :validates_with, :validate
124126

125-
# Update the current value with the result of the given block operation
127+
# Update the current value with the result of the given block operation,
128+
# block should not do blocking calls, use #post_off for blocking calls
126129
#
127130
# @yield the operation to be performed with the current value in order to calculate
128131
# the new value
129132
# @yieldparam [Object] value the current value
130133
# @yieldreturn [Object] the new value
131134
# @return [true, nil] nil when no block is given
132135
def post(&block)
133-
return nil if block.nil?
134-
@executor.post { work(&block) }
135-
true
136+
post_on(@task_executor, &block)
137+
end
138+
139+
# Update the current value with the result of the given block operation,
140+
# block can do blocking calls
141+
#
142+
# @yield the operation to be performed with the current value in order to calculate
143+
# the new value
144+
# @yieldparam [Object] value the current value
145+
# @yieldreturn [Object] the new value
146+
# @return [true, nil] nil when no block is given
147+
def post_off(&block)
148+
post_on(@operation_executor, &block)
136149
end
137150

138-
# Update the current value with the result of the given block operation
151+
# Update the current value with the result of the given block operation,
152+
# block should not do blocking calls, use #post_off for blocking calls
139153
#
140154
# @yield the operation to be performed with the current value in order to calculate
141155
# the new value
142156
# @yieldparam [Object] value the current value
143157
# @yieldreturn [Object] the new value
144158
def <<(block)
145-
self.post(&block)
159+
post(&block)
146160
self
147161
end
148162

@@ -152,12 +166,18 @@ def <<(block)
152166
# @return [Boolean] false on timeout, true otherwise
153167
def await(timeout = nil)
154168
done = Event.new
155-
post { done.set }
169+
post { |val| done.set; val }
156170
done.wait timeout
157171
end
158172

159173
private
160174

175+
def post_on(executor, &block)
176+
return nil if block.nil?
177+
@one_by_one.post(executor) { work(&block) }
178+
true
179+
end
180+
161181
# @!visibility private
162182
Rescuer = Struct.new(:clazz, :block) # :nodoc:
163183

@@ -168,7 +188,6 @@ def try_rescue(ex) # :nodoc:
168188
end
169189
rescuer.block.call(ex) if rescuer
170190
rescue Exception => ex
171-
# puts "#{ex} (#{ex.class})\n#{ex.backtrace.join("\n")}"
172191
# supress
173192
end
174193

@@ -179,8 +198,8 @@ def work(&handler) # :nodoc:
179198
begin
180199
# FIXME creates second thread
181200
result, valid = Concurrent::timeout(@timeout) do
182-
[result = handler.call(value),
183-
validator.call(result)]
201+
result = handler.call(value)
202+
[result, validator.call(result)]
184203
end
185204
rescue Exception => ex
186205
exception = ex

lib/concurrent/executor/one_by_one.rb

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,22 @@ module Concurrent
44
# never running at the same time.
55
class OneByOne
66

7-
attr_reader :executor
8-
9-
Job = Struct.new(:args, :block) do
7+
Job = Struct.new(:executor, :args, :block) do
108
def call
119
block.call *args
1210
end
1311
end
1412

15-
# @param [Executor] executor
16-
def initialize(executor)
17-
@executor = executor
13+
def initialize
1814
@being_executed = false
1915
@stash = []
2016
@mutex = Mutex.new
2117
end
2218

2319
# Submit a task to the executor for asynchronous processing.
2420
#
21+
# @param [Executor] executor to be used for this job
22+
#
2523
# @param [Array] args zero or more arguments to be passed to the task
2624
#
2725
# @yield the asynchronous task to perform
@@ -30,9 +28,10 @@ def initialize(executor)
3028
# is not running
3129
#
3230
# @raise [ArgumentError] if no task is given
33-
def post(*args, &task)
31+
def post(executor, *args, &task)
3432
return nil if task.nil?
35-
job = Job.new args, task
33+
job = Job.new executor, args, task
34+
3635
@mutex.lock
3736
post = if @being_executed
3837
@stash << job
@@ -41,30 +40,25 @@ def post(*args, &task)
4140
@being_executed = true
4241
end
4342
@mutex.unlock
44-
@executor.post { work(job) } if post
45-
true
46-
end
4743

48-
# Submit a task to the executor for asynchronous processing.
49-
#
50-
# @param [Proc] task the asynchronous task to perform
51-
#
52-
# @return [self] returns itself
53-
def <<(task)
54-
post(&task)
55-
self
44+
call_job job if post
45+
true
5646
end
5747

5848
private
5949

50+
def call_job(job)
51+
job.executor.post { work(job) }
52+
end
53+
6054
# ensures next job is executed if any is stashed
6155
def work(job)
6256
job.call
6357
ensure
6458
@mutex.lock
6559
job = @stash.shift || (@being_executed = false)
6660
@mutex.unlock
67-
@executor.post { work(job) } if job
61+
call_job job if job
6862
end
6963

7064
end

lib/concurrent/options_parser.rb

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,27 @@ def get_executor_from(opts = {})
2020
Concurrent.configuration.global_task_pool
2121
end
2222
end
23-
module_function :get_executor_from
23+
24+
# Get the requested `Executor` based on the values set in the options hash.
25+
#
26+
# @param [Hash] opts the options defining the requested executor
27+
# @option opts [Executor] :task_executor (`nil`) when set use the given `Executor` instance
28+
#
29+
# @return [Executor] the requested thread pool (default: global task pool)
30+
def get_task_executor_from(opts = {})
31+
opts[:task_executor] || opts[:executor] || Concurrent.configuration.global_task_pool
32+
end
33+
34+
# Get the requested `Executor` based on the values set in the options hash.
35+
#
36+
# @param [Hash] opts the options defining the requested executor
37+
# @option opts [Executor] :task_executor (`nil`) when set use the given `Executor` instance
38+
#
39+
# @return [Executor] the requested thread pool (default: global operation pool)
40+
def get_operation_executor_from(opts = {})
41+
opts[:operation_executor] || opts[:executor] || Concurrent.configuration.global_operation_pool
42+
end
43+
44+
extend self
2445
end
2546
end

spec/concurrent/agent_spec.rb

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,17 @@ module Concurrent
1919
end.new
2020
end
2121

22+
context '#send_off' do
23+
subject { Agent.new 2 }
24+
25+
it 'executes post and post-off in order' do
26+
subject.post { |v| v + 2 }
27+
subject.post_off { |v| v * 3 }
28+
subject.await
29+
subject.value.should eq 12
30+
end
31+
end
32+
2233
context 'behavior' do
2334

2435
# dereferenceable
@@ -153,7 +164,7 @@ def trigger_observable(observable)
153164
subject.post { nil }
154165
sleep(0.1)
155166
subject.
156-
executor.
167+
instance_variable_get(:@one_by_one).
157168
instance_variable_get(:@stash).
158169
size.should eq 2
159170
end
@@ -189,6 +200,12 @@ def trigger_observable(observable)
189200
fn.should be_false
190201
end
191202

203+
it 'does not alter the value' do
204+
subject.post { |v| v + 1 }
205+
subject.await
206+
subject.value.should eq 1
207+
end
208+
192209
end
193210

194211
context 'fulfillment' do

0 commit comments

Comments
 (0)