Skip to content

Commit 0c75e79

Browse files
committed
Merge pull request #432 from ruby-concurrency/channel-refactor
Channel refactor.
2 parents 1b24e0b + d1b843f commit 0c75e79

File tree

10 files changed

+125
-113
lines changed

10 files changed

+125
-113
lines changed

lib/concurrent/channel.rb

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ module Concurrent
88

99
# {include:file:doc/channel.md}
1010
class Channel
11+
extend Forwardable
1112
include Enumerable
1213

14+
# NOTE: Move to global IO pool once stable
1315
GOROUTINES = Concurrent::CachedThreadPool.new
1416
private_constant :GOROUTINES
1517

@@ -32,10 +34,17 @@ def initialize(message = nil)
3234
end
3335
end
3436

37+
def_delegators :buffer,
38+
:size, :capacity, :close, :closed?,
39+
:blocking?, :empty?, :full?
40+
41+
alias_method :length, :size
42+
alias_method :stop, :close
43+
3544
def initialize(opts = {})
3645
# undocumented -- for internal use only
3746
if opts.is_a? Buffer::Base
38-
@buffer = opts
47+
self.buffer = opts
3948
return
4049
end
4150

@@ -45,25 +54,20 @@ def initialize(opts = {})
4554
if size && buffer == :unbuffered
4655
raise ArgumentError.new('unbuffered channels cannot have a size')
4756
elsif size.nil? && buffer.nil?
48-
@buffer = BUFFER_TYPES[:unbuffered].new
57+
self.buffer = BUFFER_TYPES[:unbuffered].new
4958
elsif size == 0 && buffer == :buffered
50-
@buffer = BUFFER_TYPES[:unbuffered].new
59+
self.buffer = BUFFER_TYPES[:unbuffered].new
5160
elsif buffer == :unbuffered
52-
@buffer = BUFFER_TYPES[:unbuffered].new
61+
self.buffer = BUFFER_TYPES[:unbuffered].new
5362
elsif size.nil? || size < 1
5463
raise ArgumentError.new('size must be at least 1 for this buffer type')
5564
else
5665
buffer ||= :buffered
57-
@buffer = BUFFER_TYPES[buffer].new(size)
66+
self.buffer = BUFFER_TYPES[buffer].new(size)
5867
end
5968

60-
@validator = opts.fetch(:validator, DEFAULT_VALIDATOR)
61-
end
62-
63-
def size
64-
@buffer.size
69+
self.validator = opts.fetch(:validator, DEFAULT_VALIDATOR)
6570
end
66-
alias_method :capacity, :size
6771

6872
def put(item)
6973
return false unless validate(item, false, false)
@@ -129,22 +133,21 @@ def take?
129133
item
130134
end
131135

136+
# @example
132137
#
133-
# @example
138+
# jobs = Channel.new
134139
#
135-
# jobs = Channel.new
136-
#
137-
# Channel.go do
138-
# loop do
139-
# j, more = jobs.next
140-
# if more
141-
# print "received job #{j}\n"
142-
# else
143-
# print "received all jobs\n"
144-
# break
145-
# end
140+
# Channel.go do
141+
# loop do
142+
# j, more = jobs.next
143+
# if more
144+
# print "received job #{j}\n"
145+
# else
146+
# print "received all jobs\n"
147+
# break
146148
# end
147149
# end
150+
# end
148151
def next
149152
item, more = do_next
150153
item = nil if item == Buffer::NO_VALUE
@@ -191,11 +194,6 @@ def each
191194
end
192195
end
193196

194-
def close
195-
@buffer.close
196-
end
197-
alias_method :stop, :close
198-
199197
class << self
200198
def timer(seconds)
201199
Channel.new(Buffer::Timer.new(seconds))
@@ -240,10 +238,12 @@ def go_loop_via(executor, *args, &block)
240238

241239
private
242240

241+
attr_accessor :buffer, :validator
242+
243243
def validate(value, allow_nil, raise_error)
244244
if !allow_nil && value.nil?
245245
raise_error ? raise(ValidationError.new('nil is not a valid value')) : false
246-
elsif !@validator.call(value)
246+
elsif !validator.call(value)
247247
raise_error ? raise(ValidationError) : false
248248
else
249249
true
@@ -254,19 +254,19 @@ def validate(value, allow_nil, raise_error)
254254
end
255255

256256
def do_put(item)
257-
@buffer.put(item)
257+
buffer.put(item)
258258
end
259259

260260
def do_offer(item)
261-
@buffer.offer(item)
261+
buffer.offer(item)
262262
end
263263

264264
def do_next
265-
@buffer.next
265+
buffer.next
266266
end
267267

268268
def do_poll
269-
@buffer.poll
269+
buffer.poll
270270
end
271271
end
272272
end

lib/concurrent/channel/buffer/base.rb

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,26 @@ class Base < Synchronization::LockableObject
2020

2121
# @!macro [attach] channel_buffer_size_reader
2222
#
23+
# The number of items currently in the buffer.
24+
attr_reader :size
25+
26+
# @!macro [attach] channel_buffer_capacity_reader
27+
#
2328
# The maximum number of values which can be {#put} onto the buffer
2429
# it becomes full.
25-
attr_reader :size
26-
alias_method :capacity, :size
30+
attr_reader :capacity
2731

2832
# @!macro [attach] channel_buffer_initialize
2933
#
3034
# Creates a new buffer.
31-
def initialize
35+
def initialize(*args)
3236
super()
3337
synchronize do
3438
@closed = false
3539
@size = 0
40+
@capacity = 0
41+
@buffer = nil
42+
ns_initialize(*args)
3643
end
3744
end
3845

@@ -187,6 +194,12 @@ def closed?
187194

188195
private
189196

197+
attr_accessor :buffer
198+
attr_writer :closed, :capacity, :size
199+
200+
def ns_initialize(*args)
201+
end
202+
190203
# @!macro channel_buffer_closed_question
191204
def ns_closed?
192205
@closed

lib/concurrent/channel/buffer/buffered.rb

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,6 @@ module Buffer
1010
# an item is removed from the buffer, creating spare capacity.
1111
class Buffered < Base
1212

13-
# @!macro channel_buffer_initialize
14-
#
15-
# @param [Integer] size the maximum capacity of the buffer; must be
16-
# greater than zero.
17-
# @raise [ArgumentError] when the size is zero (0) or less.
18-
def initialize(size)
19-
raise ArgumentError.new('size must be greater than 0') if size.to_i <= 0
20-
super()
21-
synchronize do
22-
@size = size.to_i
23-
@buffer = []
24-
end
25-
end
26-
2713
# @!macro channel_buffer_empty_question
2814
def empty?
2915
synchronize { ns_empty? }
@@ -85,7 +71,7 @@ def next
8571
if ns_closed? && ns_empty?
8672
return NO_VALUE, false
8773
elsif !ns_empty?
88-
item = @buffer.shift
74+
item = buffer.shift
8975
more = !ns_empty? || !ns_closed?
9076
return item, more
9177
end
@@ -100,26 +86,37 @@ def poll
10086
if ns_empty?
10187
NO_VALUE
10288
else
103-
@buffer.shift
89+
buffer.shift
10490
end
10591
end
10692
end
10793

10894
private
10995

96+
# @!macro channel_buffer_initialize
97+
#
98+
# @param [Integer] size the maximum capacity of the buffer; must be
99+
# greater than zero.
100+
# @raise [ArgumentError] when the size is zero (0) or less.
101+
def ns_initialize(size)
102+
raise ArgumentError.new('size must be greater than 0') if size.to_i <= 0
103+
self.capacity = size.to_i
104+
self.buffer = []
105+
end
106+
110107
# @!macro channel_buffer_empty_question
111108
def ns_empty?
112-
@buffer.length == 0
109+
buffer.length == 0
113110
end
114111

115112
# @!macro channel_buffer_full_question
116113
def ns_full?
117-
@buffer.length == @size
114+
buffer.length == capacity
118115
end
119116

120117
# @!macro channel_buffer_put
121118
def ns_put_onto_buffer(item)
122-
@buffer.push(item)
119+
buffer.push(item)
123120
end
124121
end
125122
end

lib/concurrent/channel/buffer/dropping.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def ns_full?
4545

4646
# @!macro channel_buffer_put
4747
def ns_put_onto_buffer(item)
48-
@buffer.push(item) unless @buffer.size == size
48+
buffer.push(item) unless buffer.size == capacity
4949
end
5050
end
5151
end

lib/concurrent/channel/buffer/sliding.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ def ns_full?
4545

4646
# @!macro channel_buffer_put
4747
def ns_put_onto_buffer(item)
48-
@buffer.shift if @buffer.size == size
49-
@buffer.push(item)
48+
buffer.shift if buffer.size == capacity
49+
buffer.push(item)
5050
end
5151
end
5252
end

lib/concurrent/channel/buffer/ticker.rb

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,6 @@ module Buffer
88

99
class Ticker < Base
1010

11-
def initialize(interval)
12-
super()
13-
synchronize do
14-
@interval = interval.to_f
15-
@next_tick = Concurrent.monotonic_time + interval
16-
end
17-
end
18-
1911
def size() 1; end
2012

2113
def empty?() false; end
@@ -63,6 +55,11 @@ def poll
6355

6456
private
6557

58+
def ns_initialize(interval)
59+
@interval = interval.to_f
60+
@next_tick = Concurrent.monotonic_time + interval
61+
end
62+
6663
def do_poll
6764
if ns_closed?
6865
return nil, false

lib/concurrent/channel/buffer/timer.rb

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,6 @@ module Buffer
88

99
class Timer < Base
1010

11-
def initialize(delay)
12-
super()
13-
synchronize do
14-
@tick = Concurrent.monotonic_time + delay.to_f
15-
@closed = false
16-
@empty = false
17-
end
18-
end
19-
2011
def size() 1; end
2112

2213
def empty?
@@ -59,13 +50,19 @@ def poll
5950

6051
private
6152

53+
def ns_initialize(delay)
54+
@tick = Concurrent.monotonic_time + delay.to_f
55+
@closed = false
56+
@empty = false
57+
end
58+
6259
def do_poll
6360
synchronize do
6461
return :closed, false if ns_closed?
6562

6663
if Concurrent.monotonic_time > @tick
6764
# only one listener gets notified
68-
@closed = @empty = true
65+
closed = empty = true
6966
return :tick, Concurrent::Channel::Tick.new(@tick)
7067
else
7168
return :wait, true

0 commit comments

Comments
 (0)