Skip to content

Commit e7cd220

Browse files
authored
Use Fiber.schedule in Streamable. (#68)
1 parent 0bb3b0a commit e7cd220

File tree

7 files changed

+158
-132
lines changed

7 files changed

+158
-132
lines changed

examples/streaming/bidirectional.rb

+15
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,16 @@
2020
output = Protocol::HTTP::Body::Streamable.response(request) do |stream|
2121
# Simple echo server:
2222
while chunk = stream.readpartial(1024)
23+
$stderr.puts "Server chunk: #{chunk.inspect}"
2324
stream.write(chunk)
25+
$stderr.puts "Server waiting for next chunk..."
2426
end
27+
$stderr.puts "Server done reading request."
2528
rescue EOFError
29+
$stderr.puts "Server EOF."
2630
# Ignore EOF errors.
2731
ensure
32+
$stderr.puts "Server closing stream."
2833
stream.close
2934
end
3035

@@ -38,18 +43,28 @@
3843
streamable = Protocol::HTTP::Body::Streamable.request do |stream|
3944
stream.write("Hello, ")
4045
stream.write("World!")
46+
47+
$stderr.puts "Client closing write..."
4148
stream.close_write
4249

50+
$stderr.puts "Client reading response..."
51+
4352
while chunk = stream.readpartial(1024)
53+
$stderr.puts "Client chunk: #{chunk.inspect}"
4454
puts chunk
4555
end
56+
$stderr.puts "Client done reading response."
4657
rescue EOFError
58+
$stderr.puts "Client EOF."
4759
# Ignore EOF errors.
4860
ensure
61+
$stderr.puts "Client closing stream: #{$!}"
4962
stream.close
5063
end
5164

65+
$stderr.puts "Client sending request..."
5266
response = client.get("/", body: streamable)
67+
$stderr.puts "Client received response and streaming it..."
5368
streamable.stream(response.body)
5469
ensure
5570
server_task.stop

examples/streaming/gems.locked

+29-10
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
PATH
2+
remote: ../../../async-http
3+
specs:
4+
async-http (0.75.0)
5+
async (>= 2.10.2)
6+
async-pool (~> 0.7)
7+
io-endpoint (~> 0.11)
8+
io-stream (~> 0.4)
9+
protocol-http (~> 0.33)
10+
protocol-http1 (~> 0.20)
11+
protocol-http2 (~> 0.18)
12+
traces (>= 0.10)
13+
114
PATH
215
remote: ../..
316
specs:
@@ -10,15 +23,6 @@ GEM
1023
console (~> 1.26)
1124
fiber-annotation
1225
io-event (~> 1.6, >= 1.6.5)
13-
async-http (0.75.0)
14-
async (>= 2.10.2)
15-
async-pool (~> 0.7)
16-
io-endpoint (~> 0.11)
17-
io-stream (~> 0.4)
18-
protocol-http (~> 0.30)
19-
protocol-http1 (~> 0.20)
20-
protocol-http2 (~> 0.18)
21-
traces (>= 0.10)
2226
async-pool (0.8.1)
2327
async (>= 1.25)
2428
metrics
@@ -27,13 +31,20 @@ GEM
2731
fiber-annotation
2832
fiber-local (~> 1.1)
2933
json
34+
debug (1.9.2)
35+
irb (~> 1.10)
36+
reline (>= 0.3.8)
3037
fiber-annotation (0.2.0)
3138
fiber-local (1.1.0)
3239
fiber-storage
3340
fiber-storage (1.0.0)
41+
io-console (0.7.2)
3442
io-endpoint (0.13.1)
3543
io-event (1.6.5)
3644
io-stream (0.4.0)
45+
irb (1.14.0)
46+
rdoc (>= 4.0.0)
47+
reline (>= 0.4.2)
3748
json (2.7.2)
3849
metrics (0.10.2)
3950
protocol-hpack (1.5.0)
@@ -42,6 +53,13 @@ GEM
4253
protocol-http2 (0.18.0)
4354
protocol-hpack (~> 1.4)
4455
protocol-http (~> 0.18)
56+
psych (5.1.2)
57+
stringio
58+
rdoc (6.7.0)
59+
psych (>= 4.0.0)
60+
reline (0.5.10)
61+
io-console (~> 0.5)
62+
stringio (3.1.1)
4563
traces (0.13.1)
4664

4765
PLATFORMS
@@ -50,7 +68,8 @@ PLATFORMS
5068

5169
DEPENDENCIES
5270
async
53-
async-http
71+
async-http!
72+
debug
5473
protocol-http!
5574

5675
BUNDLED WITH

examples/streaming/gems.rb

+3-1
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,7 @@
66
source "https://rubygems.org"
77

88
gem "async"
9-
gem "async-http"
9+
gem "async-http", path: "../../../async-http"
1010
gem "protocol-http", path: "../../"
11+
12+
gem "debug"

examples/streaming/simple.rb

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#!/usr/bin/env ruby
2+
# frozen_string_literal: true
3+
4+
# Released under the MIT License.
5+
# Copyright, 2024, by Samuel Williams.
6+
7+
require 'async'
8+
require 'async/http/client'
9+
require 'async/http/server'
10+
require 'async/http/endpoint'
11+
12+
require 'protocol/http/body/streamable'
13+
require 'protocol/http/body/writable'
14+
require 'protocol/http/body/stream'
15+
16+
endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000')
17+
18+
Async do
19+
server = Async::HTTP::Server.for(endpoint) do |request|
20+
output = Protocol::HTTP::Body::Streamable.response(request) do |stream|
21+
$stderr.puts "Server sending text..."
22+
stream.write("Hello from server!")
23+
rescue EOFError
24+
$stderr.puts "Server EOF."
25+
# Ignore EOF errors.
26+
ensure
27+
$stderr.puts "Server closing stream."
28+
stream.close
29+
end
30+
31+
Protocol::HTTP::Response[200, {}, output]
32+
end
33+
34+
server_task = Async{server.run}
35+
36+
client = Async::HTTP::Client.new(endpoint)
37+
38+
streamable = Protocol::HTTP::Body::Streamable.request do |stream|
39+
while chunk = stream.readpartial(1024)
40+
$stderr.puts "Client chunk: #{chunk.inspect}"
41+
end
42+
rescue EOFError
43+
$stderr.puts "Client EOF."
44+
# Ignore EOF errors.
45+
ensure
46+
$stderr.puts "Client closing stream."
47+
stream.close
48+
end
49+
50+
$stderr.puts "Client sending request..."
51+
response = client.get("/", body: streamable)
52+
$stderr.puts "Client received response and streaming it..."
53+
streamable.stream(response.body)
54+
ensure
55+
server_task.stop
56+
end

gems.rb

+4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
gem "decode"
2323
gem "rubocop"
2424

25+
gem "sus-fixtures-async"
26+
2527
gem "bake-test"
2628
gem "bake-test-external"
2729
end
30+
31+
# gem "async-http", path: "../async-http"

lib/protocol/http/body/streamable.rb

+47-80
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,6 @@ module Body
1717
#
1818
# When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server.
1919
module Streamable
20-
# Raised when an operation is attempted on a closed stream.
21-
class ClosedError < StandardError
22-
end
23-
24-
# Raised when a streaming body is consumed more than once.
25-
class ConsumedError < StandardError
26-
end
27-
2820
def self.new(*arguments)
2921
if arguments.size == 1
3022
DeferredBody.new(*arguments)
@@ -33,100 +25,64 @@ def self.new(*arguments)
3325
end
3426
end
3527

36-
# Represents an output wrapper around a stream, that can invoke a fiber when `#read`` is called.
37-
#
38-
# This behaves a little bit like a generator or lazy enumerator, in that it can be used to generate chunks of data on demand.
39-
#
40-
# When closing the the output, the block is invoked one last time with `nil` to indicate the end of the stream.
28+
def self.request(&block)
29+
DeferredBody.new(block)
30+
end
31+
32+
def self.response(request, &block)
33+
Body.new(block, request.body)
34+
end
35+
4136
class Output
37+
def self.schedule(input, block)
38+
self.new(input, block).tap(&:schedule)
39+
end
40+
4241
def initialize(input, block)
43-
stream = Stream.new(input, self)
44-
45-
@from = nil
46-
47-
@fiber = Fiber.new do |from|
48-
@from = from
49-
block.call(stream)
50-
rescue => error
51-
# Ignore.
52-
ensure
53-
@fiber = nil
54-
self.close(error)
55-
end
42+
@output = Writable.new
43+
@stream = Stream.new(input, @output)
44+
@block = block
5645
end
5746

58-
# Can be invoked by the block to write to the stream.
59-
def write(chunk)
60-
if from = @from
61-
@from = nil
62-
@from = from.transfer(chunk)
63-
else
64-
raise ClosedError, "Stream is not being read!"
47+
def schedule
48+
@fiber ||= Fiber.schedule do
49+
@block.call(@stream)
6550
end
6651
end
6752

68-
# Indicates that no further output will be generated.
69-
def close_write(error = nil)
70-
# We might want to specialize the implementation later...
71-
close(error)
53+
def read
54+
@output.read
7255
end
7356

74-
# Can be invoked by the block to close the stream. Closing the output means that no more chunks will be generated.
7557
def close(error = nil)
76-
if from = @from
77-
# We are closing from within the output fiber, so we need to transfer back to `@from`:
78-
@from = nil
79-
if error
80-
from.raise(error)
81-
else
82-
from.transfer(nil)
83-
end
84-
elsif @fiber
85-
# We are closing from outside the output fiber, so we need to resume the fiber appropriately:
86-
@from = Fiber.current
87-
88-
if error
89-
# The fiber will be resumed from where it last called write, and we will raise the error there:
90-
@fiber.raise(error)
91-
else
92-
begin
93-
# If we get here, it means we are closing the fiber from the outside, so we need to transfer control back to the fiber:
94-
@fiber.transfer(nil)
95-
rescue Protocol::HTTP::Body::Streamable::ClosedError
96-
# If the fiber then tries to write to the stream, it will raise a ClosedError, and we will end up here. We can ignore it, as we are already closing the stream and don't care about further writes.
97-
end
98-
end
99-
end
100-
end
101-
102-
def read
103-
raise RuntimeError, "Stream is already being read!" if @from
104-
105-
@fiber&.transfer(Fiber.current)
58+
@output.close_write(error)
10659
end
10760
end
10861

62+
# Raised when a streaming body is consumed more than once.
63+
class ConsumedError < StandardError
64+
end
65+
10966
class Body < Readable
11067
def initialize(block, input = nil)
11168
@block = block
11269
@input = input
11370
@output = nil
11471
end
11572

116-
attr :block
117-
11873
def stream?
11974
true
12075
end
12176

12277
# Invokes the block in a fiber which yields chunks when they are available.
12378
def read
79+
# We are reading chunk by chunk, allocate an output stream and execute the block to generate the chunks:
12480
if @output.nil?
12581
if @block.nil?
12682
raise ConsumedError, "Streaming body has already been consumed!"
12783
end
12884

129-
@output = Output.new(@input, @block)
85+
@output = Output.schedule(@input, @block)
13086
@block = nil
13187
end
13288

@@ -155,15 +111,16 @@ def call(stream)
155111

156112
# Closing a stream indicates we are no longer interested in reading from it.
157113
def close(error = nil)
158-
if input = @input
159-
@input = nil
160-
input.close(error)
161-
end
162-
163114
if output = @output
164115
@output = nil
116+
# Closing the output here may take some time, as it may need to finish handling the stream:
165117
output.close(error)
166118
end
119+
120+
if input = @input
121+
@input = nil
122+
input.close(error)
123+
end
167124
end
168125
end
169126

@@ -173,12 +130,22 @@ def initialize(block)
173130
super(block, Writable.new)
174131
end
175132

133+
# Closing a stream indicates we are no longer interested in reading from it, but in this case that does not mean that the output block is finished generating data.
134+
def close(error = nil)
135+
if error
136+
super
137+
end
138+
end
139+
176140
# Stream the response body into the block's input.
177-
def stream(input)
178-
input&.each do |chunk|
179-
@input&.write(chunk)
141+
def stream(body)
142+
body&.each do |chunk|
143+
@input.write(chunk)
180144
end
181-
@input&.close_write
145+
rescue => error
146+
raise
147+
ensure
148+
@input.close_write(error)
182149
end
183150
end
184151
end

0 commit comments

Comments
 (0)