Skip to content

Commit dd250ba

Browse files
committed
Improved streamable error handling.
1 parent ef5d49d commit dd250ba

File tree

6 files changed

+106
-39
lines changed

6 files changed

+106
-39
lines changed

lib/protocol/http/body/buffered.rb

+1-2
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def finish
5656
self
5757
end
5858

59+
# Ensure that future reads return nil, but allow for rewinding.
5960
def close(error = nil)
6061
@index = @chunks.length
6162
end
@@ -112,8 +113,6 @@ def rewind
112113
def inspect
113114
if @chunks
114115
"\#<#{self.class} #{@chunks.size} chunks, #{self.length} bytes>"
115-
else
116-
"\#<#{self.class} closed>"
117116
end
118117
end
119118
end

lib/protocol/http/body/streamable.rb

+26-13
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,14 @@ module Body
1717
#
1818
# When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server.
1919
module Streamable
20+
# Raised when an operation is attempted on a closed stream.
2021
class ClosedError < StandardError
2122
end
2223

24+
# Raised when a streaming body is consumed more than once.
25+
class ConsumedError < StandardError
26+
end
27+
2328
def self.new(*arguments)
2429
if arguments.size == 1
2530
DeferredBody.new(*arguments)
@@ -63,20 +68,36 @@ def write(chunk)
6368
end
6469
end
6570

71+
# Indicates that no further output will be generated.
72+
def close_write(error = nil)
73+
# We might want to specialize the implementation later...
74+
close(error)
75+
end
76+
6677
# Can be invoked by the block to close the stream. Closing the output means that no more chunks will be generated.
6778
def close(error = nil)
6879
if from = @from
6980
# We are closing from within the output fiber, so we need to transfer back to `@from`:
7081
@from = nil
71-
from.transfer(nil)
82+
if error
83+
from.raise(error)
84+
else
85+
from.transfer(nil)
86+
end
7287
elsif @fiber
7388
# We are closing from outside the output fiber, so we need to resume the fiber appropriately:
7489
@from = Fiber.current
7590

7691
if error
92+
# The fiber will be resumed from where it last called write, and we will raise the error there:
7793
@fiber.raise(error)
7894
else
79-
@fiber.transfer(nil)
95+
begin
96+
# If we get here, it means we are closing the fiber from the outside, so we need to transfer control back to the fiber:
97+
@fiber.transfer(nil)
98+
rescue Protocol::HTTP::Body::Streamable::ClosedError
99+
# If the fiber then tries to write to the stream, it will raise a ClosedError, and we will end up here. We can ignore it, as we are already closing the stream and don't care about further writes.
100+
end
80101
end
81102
end
82103
end
@@ -105,7 +126,7 @@ def stream?
105126
def read
106127
if @output.nil?
107128
if @block.nil?
108-
raise "Streaming body has already been consumed!"
129+
raise ConsumedError, "Streaming body has already been consumed!"
109130
end
110131

111132
@output = Output.new(@input, @block)
@@ -120,7 +141,7 @@ def read
120141
# The block can read and write to the stream, and must close the stream when finishing.
121142
def call(stream)
122143
if @block.nil?
123-
raise "Streaming block has already been consumed!"
144+
raise ConsumedError, "Streaming block has already been consumed!"
124145
end
125146

126147
block = @block
@@ -153,25 +174,17 @@ def close(error = nil)
153174
class DeferredBody < Body
154175
def initialize(block)
155176
super(block, Writable.new)
156-
@finishing = false
157-
end
158-
159-
def close(error = nil)
160-
return unless @finishing
161-
super
162177
end
163178

164179
# Stream the response body into the block's input.
165180
def stream(input)
166181
input&.each do |chunk|
167182
@input&.write(chunk)
168183
end
184+
@input&.close_write
169185
rescue => error
170186
raise
171187
ensure
172-
@finishing = true
173-
@input&.close
174-
175188
self.close(error)
176189
end
177190
end

lib/protocol/http/body/writable.rb

+13-19
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,9 @@ class Closed < StandardError
1616
# @param [Integer] length The length of the response body if known.
1717
# @param [Async::Queue] queue Specify a different queue implementation, e.g. `Async::LimitedQueue.new(8)` to enable back-pressure streaming.
1818
def initialize(length = nil, queue: Thread::Queue.new)
19-
@queue = queue
20-
2119
@length = length
22-
20+
@queue = queue
2321
@count = 0
24-
25-
@finished = false
26-
27-
@closed = false
2822
@error = nil
2923
end
3024

@@ -34,18 +28,16 @@ def length
3428

3529
# Stop generating output; cause the next call to write to fail with the given error. Does not prevent existing chunks from being read. In other words, this indicates both that no more data will be or should be written to the body.
3630
def close(error = nil)
37-
unless @closed
38-
@queue.close
39-
40-
@closed = true
41-
@error = error
42-
end
31+
@error ||= error
32+
33+
@queue.clear
34+
@queue.close
4335

4436
super
4537
end
4638

4739
def closed?
48-
@closed
40+
@queue.closed?
4941
end
5042

5143
def ready?
@@ -59,23 +51,23 @@ def empty?
5951

6052
# Read the next available chunk.
6153
def read
54+
if @error
55+
raise @error
56+
end
57+
6258
@queue.pop
6359
end
6460

6561
# Write a single chunk to the body. Signal completion by calling `#finish`.
6662
def write(chunk)
67-
# If the reader breaks, the writer will break.
68-
if @closed
63+
if @queue.closed?
6964
raise(@error || Closed)
7065
end
7166

7267
@queue.push(chunk)
7368
@count += 1
7469
end
7570

76-
# This alias is provided for compatibility with template generation.
77-
alias << write
78-
7971
def close_write(error = nil)
8072
@error ||= error
8173
@queue.close
@@ -95,6 +87,8 @@ def write(chunk)
9587
@writable.write(chunk)
9688
end
9789

90+
alias << write
91+
9892
def close(error = nil)
9993
@closed = true
10094

test/protocol/http/body/buffered.rb

+9
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,15 @@
161161
end
162162
end
163163

164+
with "#clear" do
165+
it "clears all chunks and resets length" do
166+
body.clear
167+
expect(body.chunks).to be(:empty?)
168+
expect(body.read).to be == nil
169+
expect(body.length).to be == 0
170+
end
171+
end
172+
164173
with '#inspect' do
165174
it "can be inspected" do
166175
expect(body.inspect).to be =~ /\d+ chunks, \d+ bytes/

test/protocol/http/body/streamable.rb

+24
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,24 @@
9696
expect(stream.string).to be == "HelloWorld"
9797
end
9898

99+
it "will fail if invoked twice" do
100+
stream = StringIO.new
101+
body.call(stream)
102+
103+
expect do
104+
body.call(stream)
105+
end.to raise_exception(Protocol::HTTP::Body::Streamable::ConsumedError)
106+
end
107+
108+
it "will fail if trying to read after streaming" do
109+
stream = StringIO.new
110+
body.call(stream)
111+
112+
expect do
113+
body.read
114+
end.to raise_exception(Protocol::HTTP::Body::Streamable::ConsumedError)
115+
end
116+
99117
with "a block that raises an error" do
100118
let(:block) do
101119
proc do |stream|
@@ -125,8 +143,14 @@
125143
with '#close' do
126144
it "can close the body" do
127145
expect(body.read).to be == "Hello"
146+
128147
body.close
129148
end
149+
150+
it "can raise an error on the block" do
151+
expect(body.read).to be == "Hello"
152+
body.close(RuntimeError.new("Oh no!"))
153+
end
130154
end
131155

132156
with "nested fiber" do

test/protocol/http/body/writable.rb

+33-5
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,15 @@
4444
it "should be empty if closed with no pending chunks" do
4545
expect(body).not.to be(:empty?)
4646

47-
body.close
47+
body.close_write
4848

4949
expect(body).to be(:empty?)
5050
end
5151

5252
it "should become empty when pending chunks are read" do
5353
body.write("Hello")
54-
body.close
54+
55+
body.close_write
5556

5657
expect(body).not.to be(:empty?)
5758
body.read
@@ -105,7 +106,7 @@
105106
body.write("#{i}")
106107
end
107108

108-
body.close
109+
body.close_write
109110

110111
expect(body.join).to be == "012"
111112
end
@@ -117,7 +118,7 @@
117118
body.write("Hello World #{i}")
118119
end
119120

120-
body.close
121+
body.close_write
121122

122123
3.times do |i|
123124
chunk = body.read
@@ -157,7 +158,7 @@
157158

158159
it "will stop after finishing" do
159160
body.write("Hello World!")
160-
body.close
161+
body.close_write
161162

162163
expect(body).not.to be(:empty?)
163164

@@ -168,4 +169,31 @@
168169
expect(body).to be(:empty?)
169170
end
170171
end
172+
173+
with "#output" do
174+
it "can be used to write data" do
175+
body.output do |output|
176+
output.write("Hello World!")
177+
end
178+
179+
expect(body.output).to be(:closed?)
180+
181+
expect(body.read).to be == "Hello World!"
182+
expect(body.read).to be_nil
183+
end
184+
185+
it "can propagate errors" do
186+
expect do
187+
body.output do |output|
188+
raise "Oops!"
189+
end
190+
end.to raise_exception(RuntimeError, message: be =~ /Oops/)
191+
192+
expect(body).to be(:closed?)
193+
194+
expect do
195+
body.read
196+
end.to raise_exception(RuntimeError, message: be =~ /Oops/)
197+
end
198+
end
171199
end

0 commit comments

Comments
 (0)