Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit c23b2ff

Browse files
author
Petr Chalupa
committedAug 10, 2014
Merge pull request #143 from ruby-concurrency/actress
Adding pools to Actor utils and other improvements
2 parents 05e5e93 + 8b67f27 commit c23b2ff

File tree

11 files changed

+191
-57
lines changed

11 files changed

+191
-57
lines changed
 

‎doc/actor/celluloid_benchmark.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def counting(count, ivar)
4848
b.report(format('%5d %4d %s', ADD_TO*counts_size, adders_size, 'concurrent')) do
4949
counts = Array.new(counts_size) { [0, Concurrent::IVar.new] }
5050
adders = Array.new(adders_size) do |i|
51-
Concurrent::Actor::AdHoc.spawn("adder#{i}") do
51+
Concurrent::Actor::Utils::AdHoc.spawn("adder#{i}") do
5252
lambda do |(count, ivar)|
5353
if count < ADD_TO
5454
adders[(i+1) % adders_size].tell [count+1, ivar]

‎lib/concurrent/actor.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ module Actor
2626

2727
require 'concurrent/actor/default_dead_letter_handler'
2828
require 'concurrent/actor/root'
29-
require 'concurrent/actor/ad_hoc'
29+
require 'concurrent/actor/utils'
3030

3131
# @return [Reference, nil] current executing actor if any
3232
def self.current

‎lib/concurrent/actor/ad_hoc.rb

Lines changed: 0 additions & 19 deletions
This file was deleted.

‎lib/concurrent/actor/utills.rb

Lines changed: 0 additions & 7 deletions
This file was deleted.

‎lib/concurrent/actor/utils.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
module Concurrent
2+
module Actor
3+
module Utils
4+
require 'concurrent/actor/utils/ad_hoc'
5+
require 'concurrent/actor/utils/broadcast'
6+
require 'concurrent/actor/utils/balancer'
7+
require 'concurrent/actor/utils/pool'
8+
end
9+
end
10+
end

‎lib/concurrent/actor/utils/ad_hoc.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
module Concurrent
2+
module Actor
3+
module Utils
4+
# Allows quick creation of actors with behaviour defined by blocks.
5+
# @example ping
6+
# AdHoc.spawn :forward, an_actor do |where|
7+
# # this block has to return proc defining #on_message behaviour
8+
# -> message { where.tell message }
9+
# end
10+
class AdHoc < Context
11+
def initialize(*args, &initializer)
12+
@on_message = Type! initializer.call(*args), Proc
13+
end
14+
15+
def on_message(message)
16+
instance_exec message, &@on_message
17+
end
18+
end
19+
end
20+
end
21+
end
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
module Concurrent
2+
module Actor
3+
module Utils
4+
5+
# Distributes messages between subscribed actors. Each actor'll get only one message then
6+
# it's unsubscribed. The actor needs to resubscribe when it's ready to receive next message.
7+
# @see Pool
8+
class Balancer < RestartingContext
9+
10+
def initialize
11+
@receivers = []
12+
@buffer = []
13+
end
14+
15+
def on_message(message)
16+
case message
17+
when :subscribe
18+
@receivers << envelope.sender
19+
distribute
20+
true
21+
when :unsubscribe
22+
@receivers.delete envelope.sender
23+
true
24+
when :subscribed?
25+
@receivers.include? envelope.sender
26+
else
27+
@buffer << message
28+
distribute
29+
end
30+
end
31+
32+
def distribute
33+
while !@receivers.empty? && !@buffer.empty?
34+
@receivers.shift << @buffer.shift
35+
end
36+
end
37+
end
38+
end
39+
end
40+
end

‎lib/concurrent/actor/utils/broadcast.rb

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,20 @@ module Concurrent
44
module Actor
55
module Utils
66

7-
# TODO doc
8-
class Broadcast < Context
7+
# Allows to build pub/sub easily.
8+
# @example news
9+
# news_channel = Concurrent::Actor::Utils::Broadcast.spawn :news
10+
#
11+
# 2.times do |i|
12+
# Concurrent::Actor::Utils::AdHoc.spawn "listener-#{i}" do
13+
# news_channel << :subscribe
14+
# -> message { puts message }
15+
# end
16+
# end
17+
#
18+
# news_channel << 'Ruby rocks!'
19+
# # prints: 'Ruby rocks!' twice
20+
class Broadcast < RestartingContext
921

1022
def initialize
1123
@receivers = Set.new
@@ -14,11 +26,14 @@ def initialize
1426
def on_message(message)
1527
case message
1628
when :subscribe
17-
@receivers.add envelope.sender
18-
true
29+
if envelope.sender.is_a? Reference
30+
@receivers.add envelope.sender
31+
true
32+
else
33+
false
34+
end
1935
when :unsubscribe
20-
@receivers.delete envelope.sender
21-
true
36+
!!@receivers.delete(envelope.sender)
2237
when :subscribed?
2338
@receivers.include? envelope.sender
2439
else
@@ -31,6 +46,7 @@ def filtered_receivers
3146
@receivers
3247
end
3348
end
49+
3450
end
3551
end
3652
end

‎lib/concurrent/actor/utils/pool.rb

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
require 'concurrent/actor/utils/balancer'
2+
3+
module Concurrent
4+
module Actor
5+
module Utils
6+
7+
# Allows to create a pool of workers and distribute work between them
8+
# @param [Integer] size number of workers
9+
# @yield [balancer, index] a block spawning an worker instance. called +size+ times.
10+
# The worker should be descendant of AbstractWorker and supervised, see example.
11+
# @yieldparam [Balancer] balancer to pass to the worker
12+
# @yieldparam [Integer] index of the worker, usually used in its name
13+
# @yieldreturn [Reference] the reference of newly created worker
14+
# @example
15+
# class Worker < Concurrent::Actor::Utils::AbstractWorker
16+
# def work(message)
17+
# p message * 5
18+
# end
19+
# end
20+
#
21+
# pool = Concurrent::Actor::Utils::Pool.spawn! 'pool', 5 do |balancer, index|
22+
# Worker.spawn name: "worker-#{index}", supervise: true, args: [balancer]
23+
# end
24+
#
25+
# pool << 'asd' << 2
26+
# # prints:
27+
# # "asdasdasdasdasd"
28+
# # 10
29+
class Pool < RestartingContext
30+
def initialize(size, &worker_initializer)
31+
@balancer = Balancer.spawn name: :balancer, supervise: true
32+
@workers = Array.new(size, &worker_initializer.curry[@balancer])
33+
@workers.each { |w| Type! w, Reference }
34+
end
35+
36+
def on_message(message)
37+
@balancer << message
38+
end
39+
end
40+
41+
class AbstractWorker < RestartingContext
42+
def initialize(balancer)
43+
@balancer = balancer
44+
@balancer << :subscribe
45+
end
46+
47+
def on_message(message)
48+
work message
49+
ensure
50+
@balancer << :subscribe
51+
end
52+
53+
def work(message)
54+
raise NotImplementedError
55+
end
56+
end
57+
end
58+
end
59+
end

‎lib/concurrent/executor/serialized_execution.rb

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
require 'delegate'
22
require 'concurrent/executor/executor'
33
require 'concurrent/logging'
4+
require 'concurrent/atomic/synchronization'
45

56
module Concurrent
67

78
# Ensures passed jobs in a serialized order never running at the same time.
89
class SerializedExecution
910
include Logging
11+
include Synchronization
1012

1113
Job = Struct.new(:executor, :args, :block) do
1214
def call
@@ -15,9 +17,10 @@ def call
1517
end
1618

1719
def initialize
18-
@being_executed = false
19-
@stash = []
20-
@mutex = Mutex.new
20+
synchronize do
21+
@being_executed = false
22+
@stash = []
23+
end
2124
end
2225

2326
# Submit a task to the executor for asynchronous processing.
@@ -33,23 +36,36 @@ def initialize
3336
#
3437
# @raise [ArgumentError] if no task is given
3538
def post(executor, *args, &task)
36-
return nil if task.nil?
37-
38-
job = Job.new executor, args, task
39-
40-
begin
41-
@mutex.lock
42-
post = if @being_executed
43-
@stash << job
44-
false
45-
else
46-
@being_executed = true
47-
end
48-
ensure
49-
@mutex.unlock
39+
posts [[executor, args, task]]
40+
true
41+
end
42+
43+
# As {#post} but allows to submit multiple tasks at once, it's guaranteed that they will not
44+
# be interleaved by other tasks.
45+
#
46+
# @param [Array<Array(Executor, Array<Object>, Proc)>] posts array of triplets where
47+
# first is a {Executor}, second is array of args for task, third is a task (Proc)
48+
def posts(posts)
49+
# if can_overflow?
50+
# raise ArgumentError, 'SerializedExecution does not support thread-pools which can overflow'
51+
# end
52+
53+
return nil if posts.empty?
54+
55+
jobs = posts.map { |executor, args, task| Job.new executor, args, task }
56+
57+
job_to_post = synchronize do
58+
if @being_executed
59+
@stash.push(*jobs)
60+
nil
61+
else
62+
@being_executed = true
63+
@stash.push(*jobs[1..-1])
64+
jobs.first
65+
end
5066
end
5167

52-
call_job job if post
68+
call_job job_to_post if job_to_post
5369
true
5470
end
5571

@@ -78,11 +94,8 @@ def call_job(job)
7894
def work(job)
7995
job.call
8096
ensure
81-
begin
82-
@mutex.lock
97+
synchronize do
8398
job = @stash.shift || (@being_executed = false)
84-
ensure
85-
@mutex.unlock
8699
end
87100

88101
call_job job if job
@@ -98,7 +111,7 @@ class SerializedExecutionDelegator < SimpleDelegator
98111
include SerialExecutor
99112

100113
def initialize(executor)
101-
@executor = executor
114+
@executor = executor
102115
@serializer = SerializedExecution.new
103116
super(executor)
104117
end

‎spec/concurrent/actor_spec.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
module Concurrent
55
module Actor
66
i_know_it_is_experimental!
7+
AdHoc = Utils::AdHoc
78

89
# FIXME better tests!
910

0 commit comments

Comments
 (0)
Please sign in to comment.