diff --git a/examples/streaming/bidirectional.rb b/examples/streaming/bidirectional.rb new file mode 100755 index 0000000..100a8a1 --- /dev/null +++ b/examples/streaming/bidirectional.rb @@ -0,0 +1,56 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require 'async' +require 'async/http/client' +require 'async/http/server' +require 'async/http/endpoint' + +require 'protocol/http/body/streamable' +require 'protocol/http/body/writable' +require 'protocol/http/body/stream' + +endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000') + +Async do + server = Async::HTTP::Server.for(endpoint) do |request| + output = Protocol::HTTP::Body::Streamable.response(request) do |stream| + # Simple echo server: + while chunk = stream.readpartial(1024) + stream.write(chunk) + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end + + Protocol::HTTP::Response[200, {}, output] + end + + server_task = Async{server.run} + + client = Async::HTTP::Client.new(endpoint) + + streamable = Protocol::HTTP::Body::Streamable.request do |stream| + stream.write("Hello, ") + stream.write("World!") + stream.close_write + + while chunk = stream.readpartial(1024) + puts chunk + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end + + response = client.get("/", body: streamable) + streamable.stream(response.body) +ensure + server_task.stop +end diff --git a/examples/streaming/gems.locked b/examples/streaming/gems.locked new file mode 100644 index 0000000..4af7498 --- /dev/null +++ b/examples/streaming/gems.locked @@ -0,0 +1,57 @@ +PATH + remote: ../.. + specs: + protocol-http (0.33.0) + +GEM + remote: https://rubygems.org/ + specs: + async (2.17.0) + console (~> 1.26) + fiber-annotation + io-event (~> 1.6, >= 1.6.5) + async-http (0.75.0) + async (>= 2.10.2) + async-pool (~> 0.7) + io-endpoint (~> 0.11) + io-stream (~> 0.4) + protocol-http (~> 0.30) + protocol-http1 (~> 0.20) + protocol-http2 (~> 0.18) + traces (>= 0.10) + async-pool (0.8.1) + async (>= 1.25) + metrics + traces + console (1.27.0) + fiber-annotation + fiber-local (~> 1.1) + json + fiber-annotation (0.2.0) + fiber-local (1.1.0) + fiber-storage + fiber-storage (1.0.0) + io-endpoint (0.13.1) + io-event (1.6.5) + io-stream (0.4.0) + json (2.7.2) + metrics (0.10.2) + protocol-hpack (1.5.0) + protocol-http1 (0.22.0) + protocol-http (~> 0.22) + protocol-http2 (0.18.0) + protocol-hpack (~> 1.4) + protocol-http (~> 0.18) + traces (0.13.1) + +PLATFORMS + ruby + x86_64-linux + +DEPENDENCIES + async + async-http + protocol-http! + +BUNDLED WITH + 2.5.16 diff --git a/examples/streaming/gems.rb b/examples/streaming/gems.rb new file mode 100644 index 0000000..8ff743d --- /dev/null +++ b/examples/streaming/gems.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +source "https://rubygems.org" + +gem "async" +gem "async-http" +gem "protocol-http", path: "../../" diff --git a/examples/streaming/unidirectional.rb b/examples/streaming/unidirectional.rb new file mode 100755 index 0000000..73991cb --- /dev/null +++ b/examples/streaming/unidirectional.rb @@ -0,0 +1,60 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require 'async' +require 'async/http/client' +require 'async/http/server' +require 'async/http/endpoint' + +require 'protocol/http/body/stream' +require 'protocol/http/body/writable' + +endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000') + +Async do + server = Async::HTTP::Server.for(endpoint) do |request| + output = Protocol::HTTP::Body::Writable.new + stream = Protocol::HTTP::Body::Stream.new(request.body, output) + + Async do + # Simple echo server: + while chunk = stream.readpartial(1024) + stream.write(chunk) + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end + + Protocol::HTTP::Response[200, {}, output] + end + + server_task = Async{server.run} + + client = Async::HTTP::Client.new(endpoint) + + input = Protocol::HTTP::Body::Writable.new + response = client.get("/", body: input) + + begin + stream = Protocol::HTTP::Body::Stream.new(response.body, input) + + stream.write("Hello, ") + stream.write("World!") + stream.close_write + + while chunk = stream.readpartial(1024) + puts chunk + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end +ensure + server_task.stop +end diff --git a/fixtures/protocol/http/body/a_readable_body.rb b/fixtures/protocol/http/body/a_readable_body.rb new file mode 100644 index 0000000..1d7218b --- /dev/null +++ b/fixtures/protocol/http/body/a_readable_body.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +module Protocol + module HTTP + module Body + AReadableBody = Sus::Shared("a readable body") do + with "#read" do + it "after closing, returns nil" do + body.close + + expect(body.read).to be_nil + end + end + + with "empty?" do + it "returns true after closing" do + body.close + + expect(body).to be(:empty?) + end + end + end + end + end +end diff --git a/fixtures/protocol/http/body/a_writable_body.rb b/fixtures/protocol/http/body/a_writable_body.rb new file mode 100644 index 0000000..f56ab71 --- /dev/null +++ b/fixtures/protocol/http/body/a_writable_body.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +module Protocol + module HTTP + module Body + AWritableBody = Sus::Shared("a readable body") do + with "#read" do + it "after closing the write end, returns all chunks" do + body.write("Hello ") + body.write("World!") + body.close_write + + expect(body.read).to be == "Hello " + expect(body.read).to be == "World!" + expect(body.read).to be_nil + end + end + + with "empty?" do + it "returns false before writing" do + expect(body).not.to be(:empty?) + end + + it "returns true after all chunks are consumed" do + body.write("Hello") + body.close_write + + expect(body).not.to be(:empty?) + expect(body.read).to be == "Hello" + expect(body.read).to be_nil + + expect(body).to be(:empty?) + end + end + end + end + end +end diff --git a/guides/links.yaml b/guides/links.yaml index e240013..23c0f9f 100644 --- a/guides/links.yaml +++ b/guides/links.yaml @@ -1,4 +1,4 @@ getting-started: order: 1 design-overview: - order: 2 + order: 10 diff --git a/guides/streaming/readme.md b/guides/streaming/readme.md new file mode 100644 index 0000000..ae24ee6 --- /dev/null +++ b/guides/streaming/readme.md @@ -0,0 +1,131 @@ +# Streaming + +This guide gives an overview of how to implement streaming requests and responses. + +## Independent Uni-directional Streaming + +The request and response body work independently of each other can stream data in both directions. {ruby Protocol::HTTP::Body::Stream} provides an interface to merge these independent streams into an IO-like interface. + +```ruby +#!/usr/bin/env ruby + +require 'async' +require 'async/http/client' +require 'async/http/server' +require 'async/http/endpoint' + +require 'protocol/http/body/stream' +require 'protocol/http/body/writable' + +endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000') + +Async do + server = Async::HTTP::Server.for(endpoint) do |request| + output = Protocol::HTTP::Body::Writable.new + stream = Protocol::HTTP::Body::Stream.new(request.body, output) + + Async do + # Simple echo server: + while chunk = stream.readpartial(1024) + stream.write(chunk) + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end + + Protocol::HTTP::Response[200, {}, output] + end + + server_task = Async{server.run} + + client = Async::HTTP::Client.new(endpoint) + + input = Protocol::HTTP::Body::Writable.new + response = client.get("/", body: input) + + begin + stream = Protocol::HTTP::Body::Stream.new(response.body, input) + + stream.write("Hello, ") + stream.write("World!") + stream.close_write + + while chunk = stream.readpartial(1024) + puts chunk + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end +ensure + server_task.stop +end +``` + +This approach works quite well, especially when the input and output bodies are independently compressed, decompressed, or chunked. However, some protocols, notably, WebSockets operate on the raw connection and don't require this level of abstraction. + +## Bi-directional Streaming + +While WebSockets can work on the above streaming interface, it's a bit more convenient to use the streaming interface directly, which gives raw access to the underlying stream where possible. + +```ruby +#!/usr/bin/env ruby + +require 'async' +require 'async/http/client' +require 'async/http/server' +require 'async/http/endpoint' + +require 'protocol/http/body/stream' +require 'protocol/http/body/writable' + +endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000') + +Async do + server = Async::HTTP::Server.for(endpoint) do |request| + streamable = Protocol::HTTP::Body::Streamable. + output = Protocol::HTTP::Body::Writable.new + stream = Protocol::HTTP::Body::Stream.new(request.body, output) + + Async do + # Simple echo server: + while chunk = stream.readpartial(1024) + stream.write(chunk) + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end + + Protocol::HTTP::Response[200, {}, output] + end + + server_task = Async{server.run} + + client = Async::HTTP::Client.new(endpoint) + + input = Protocol::HTTP::Body::Writable.new + response = client.get("/", body: input) + + begin + stream = Protocol::HTTP::Body::Stream.new(response.body, input) + + stream.write("Hello, ") + stream.write("World!") + stream.close_write + + while chunk = stream.readpartial(1024) + puts chunk + end + rescue EOFError + # Ignore EOF errors. + ensure + stream.close + end +ensure + server_task.stop +end \ No newline at end of file diff --git a/lib/protocol/http/body/buffered.rb b/lib/protocol/http/body/buffered.rb index eada2f3..088252b 100644 --- a/lib/protocol/http/body/buffered.rb +++ b/lib/protocol/http/body/buffered.rb @@ -56,6 +56,17 @@ def finish self end + # Ensure that future reads return nil, but allow for rewinding. + def close(error = nil) + @index = @chunks.length + end + + def clear + @chunks.clear + @length = 0 + @index = 0 + end + def length @length ||= @chunks.inject(0) {|sum, chunk| sum + chunk.bytesize} end @@ -70,6 +81,8 @@ def ready? end def read + return nil unless @chunks + if chunk = @chunks[@index] @index += 1 @@ -81,18 +94,26 @@ def write(chunk) @chunks << chunk end + def close_write(error) + # Nothing to do. + end + def rewindable? - true + @chunks != nil end def rewind + return false unless @chunks + @index = 0 return true end def inspect - "\#<#{self.class} #{@chunks.size} chunks, #{self.length} bytes>" + if @chunks + "\#<#{self.class} #{@chunks.size} chunks, #{self.length} bytes>" + end end end end diff --git a/lib/protocol/http/body/deflate.rb b/lib/protocol/http/body/deflate.rb index 9c68472..5e98316 100644 --- a/lib/protocol/http/body/deflate.rb +++ b/lib/protocol/http/body/deflate.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true # Released under the MIT License. -# Copyright, 2019-2023, by Samuel Williams. +# Copyright, 2019-2024, by Samuel Williams. require_relative 'wrapper' diff --git a/lib/protocol/http/body/digestable.rb b/lib/protocol/http/body/digestable.rb index f4ff966..ea0c583 100644 --- a/lib/protocol/http/body/digestable.rb +++ b/lib/protocol/http/body/digestable.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true # Released under the MIT License. -# Copyright, 2020-2023, by Samuel Williams. +# Copyright, 2020-2024, by Samuel Williams. require_relative 'wrapper' diff --git a/lib/protocol/http/body/file.rb b/lib/protocol/http/body/file.rb index 996463b..954cdf6 100644 --- a/lib/protocol/http/body/file.rb +++ b/lib/protocol/http/body/file.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true # Released under the MIT License. -# Copyright, 2019-2023, by Samuel Williams. +# Copyright, 2019-2024, by Samuel Williams. require_relative 'readable' @@ -68,13 +68,15 @@ def read end end - def stream? - true - end + # def stream? + # true + # end - def call(stream) - IO.copy_stream(@file, stream, @remaining) - end + # def call(stream) + # IO.copy_stream(@file, stream, @remaining) + # ensure + # stream.close + # end def join return "" if @remaining == 0 diff --git a/lib/protocol/http/body/inflate.rb b/lib/protocol/http/body/inflate.rb index 12d940d..2693f0e 100644 --- a/lib/protocol/http/body/inflate.rb +++ b/lib/protocol/http/body/inflate.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true # Released under the MIT License. -# Copyright, 2019-2023, by Samuel Williams. +# Copyright, 2019-2024, by Samuel Williams. require 'zlib' diff --git a/lib/protocol/http/body/readable.rb b/lib/protocol/http/body/readable.rb index 226c897..68e075c 100644 --- a/lib/protocol/http/body/readable.rb +++ b/lib/protocol/http/body/readable.rb @@ -17,7 +17,11 @@ module Body # # If you don't want to read from a stream, and instead want to close it immediately, you can call `close` on the body. If the body is already completely consumed, `close` will do nothing, but if there is still data to be read, it will cause the underlying stream to be reset (and possibly closed). class Readable - # Close the stream immediately. + # Close the stream immediately. After invoking this method, the stream should be considered closed, and all internal resources should be released. + # + # If an error occured while handling the output, it can be passed as an argument. This may be propagated to the client, for example the client may be informed that the stream was not fully read correctly. + # + # Invoking `#read` after `#close` will return `nil`. def close(error = nil) end @@ -68,13 +72,15 @@ def read def each return to_enum unless block_given? - while chunk = self.read - yield chunk + begin + while chunk = self.read + yield chunk + end + rescue => error + raise + ensure + self.close(error) end - rescue => error - raise - ensure - self.close(error) end # Read all remaining chunks into a single binary string using `#each`. @@ -98,12 +104,14 @@ def stream? false end - # Write the body to the given stream. + # Invoke the body with the given stream. # - # In some cases, the stream may also be readable, such as when hijacking an HTTP/1 connection. In that case, it may be acceptable to read and write to the stream directly. + # The default implementation simply writes each chunk to the stream. If the body is not ready, it will be flushed after each chunk. Closes the stream when finished or if an error occurs. # - # If the stream is not ready, it will be flushed after each chunk. Closes the stream when finished or if an error occurs. + # Write the body to the given stream. # + # @parameter stream [IO | Object] An `IO`-like object that responds to `#read`, `#write` and `#flush`. + # @returns [Boolean] Whether the ownership of the stream was transferred. def call(stream) self.each do |chunk| stream.write(chunk) @@ -113,6 +121,8 @@ def call(stream) stream.flush end end + ensure + stream.close end # Read all remaining chunks into a buffered body and close the underlying input. diff --git a/lib/protocol/http/body/stream.rb b/lib/protocol/http/body/stream.rb index f1209ea..0a13ec4 100644 --- a/lib/protocol/http/body/stream.rb +++ b/lib/protocol/http/body/stream.rb @@ -21,6 +21,7 @@ def initialize(input = nil, output = Buffered.new) # Will hold remaining data in `#read`. @buffer = nil + @closed = false @closed_read = false end @@ -251,21 +252,22 @@ def flush end # Close the input body. - def close_read - if @input + def close_read(error = nil) + if input = @input + @input = nil @closed_read = true @buffer = nil - @input&.close - @input = nil + input.close(error) end end # Close the output body. - def close_write - if @output - @output&.close + def close_write(error = nil) + if output = @output @output = nil + + output.close_write(error) end end diff --git a/lib/protocol/http/body/streamable.rb b/lib/protocol/http/body/streamable.rb index 9e8f641..2c2d990 100644 --- a/lib/protocol/http/body/streamable.rb +++ b/lib/protocol/http/body/streamable.rb @@ -1,9 +1,11 @@ # frozen_string_literal: true # Released under the MIT License. -# Copyright, 2022, by Samuel Williams. +# Copyright, 2019-2024, by Samuel Williams. require_relative 'readable' +require_relative 'writable' + require_relative 'stream' module Protocol @@ -14,30 +16,28 @@ module Body # In some cases, it's advantageous to directly read and write to the underlying stream if possible. For example, HTTP/1 upgrade requests, WebSockets, and similar. To handle that case, response bodies can implement `stream?` and return `true`. When `stream?` returns true, the body **should** be consumed by calling `call(stream)`. Server implementations may choose to always invoke `call(stream)` if it's efficient to do so. Bodies that don't support it will fall back to using `#each`. # # When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server. - class Streamable < Readable - class Closed < StandardError + module Streamable + # Raised when an operation is attempted on a closed stream. + class ClosedError < StandardError end - def initialize(block, input = nil) - @block = block - @input = input - @output = nil + # Raised when a streaming body is consumed more than once. + class ConsumedError < StandardError end - # Closing a stream indicates we are no longer interested in reading from it. - def close(error = nil) - if @input - @input.close - @input = nil - end - - if @output - @output.close(error) + def self.new(*arguments) + if arguments.size == 1 + DeferredBody.new(*arguments) + else + Body.new(*arguments) end end - attr :block - + # Represents an output wrapper around a stream, that can invoke a fiber when `#read`` is called. + # + # This behaves a little bit like a generator or lazy enumerator, in that it can be used to generate chunks of data on demand. + # + # When closing the the output, the block is invoked one last time with `nil` to indicate the end of the stream. class Output def initialize(input, block) stream = Stream.new(input, self) @@ -47,16 +47,11 @@ def initialize(input, block) @fiber = Fiber.new do |from| @from = from block.call(stream) - rescue Closed + rescue => error # Ignore. ensure @fiber = nil - - # No more chunks will be generated: - if from = @from - @from = nil - from.transfer(nil) - end + self.close(error) end end @@ -66,17 +61,41 @@ def write(chunk) @from = nil @from = from.transfer(chunk) else - raise RuntimeError, "Stream is not being read!" + raise ClosedError, "Stream is not being read!" end end - # Can be invoked by the block to close the stream. + # Indicates that no further output will be generated. + def close_write(error = nil) + # We might want to specialize the implementation later... + close(error) + end + + # Can be invoked by the block to close the stream. Closing the output means that no more chunks will be generated. def close(error = nil) if from = @from + # We are closing from within the output fiber, so we need to transfer back to `@from`: @from = nil - from.transfer(nil) + if error + from.raise(error) + else + from.transfer(nil) + end elsif @fiber - @fiber.raise(error || Closed) + # We are closing from outside the output fiber, so we need to resume the fiber appropriately: + @from = Fiber.current + + if error + # The fiber will be resumed from where it last called write, and we will raise the error there: + @fiber.raise(error) + else + begin + # If we get here, it means we are closing the fiber from the outside, so we need to transfer control back to the fiber: + @fiber.transfer(nil) + rescue Protocol::HTTP::Body::Streamable::ClosedError + # If the fiber then tries to write to the stream, it will raise a ClosedError, and we will end up here. We can ignore it, as we are already closing the stream and don't care about further writes. + end + end end end @@ -87,25 +106,80 @@ def read end end - # Invokes the block in a fiber which yields chunks when they are available. - def read - @output ||= Output.new(@input, @block) + class Body < Readable + def initialize(block, input = nil) + @block = block + @input = input + @output = nil + end - return @output.read - end - - def stream? - true + attr :block + + def stream? + true + end + + # Invokes the block in a fiber which yields chunks when they are available. + def read + if @output.nil? + if @block.nil? + raise ConsumedError, "Streaming body has already been consumed!" + end + + @output = Output.new(@input, @block) + @block = nil + end + + @output.read + end + + # Invoke the block with the given stream. + # + # The block can read and write to the stream, and must close the stream when finishing. + def call(stream) + if @block.nil? + raise ConsumedError, "Streaming block has already been consumed!" + end + + block = @block + + @input = @output = @block = nil + + # Ownership of the stream is passed into the block, in other words, the block is responsible for closing the stream. + block.call(stream) + rescue => error + # If, for some reason, the block raises an error, we assume it may not have closed the stream, so we close it here: + stream.close + raise + end + + # Closing a stream indicates we are no longer interested in reading from it. + def close(error = nil) + if input = @input + @input = nil + input.close(error) + end + + if output = @output + @output = nil + output.close(error) + end + end end - def call(stream) - raise "Streaming body has already been read!" if @output + # A deferred body has an extra `stream` method which can be used to stream data into the body, as the response body won't be available until the request has been sent. + class DeferredBody < Body + def initialize(block) + super(block, Writable.new) + end - @block.call(stream) - rescue => error - raise - ensure - self.close(error) + # Stream the response body into the block's input. + def stream(input) + input&.each do |chunk| + @input&.write(chunk) + end + @input&.close_write + end end end end diff --git a/lib/protocol/http/body/writable.rb b/lib/protocol/http/body/writable.rb index c9bf874..eff874b 100644 --- a/lib/protocol/http/body/writable.rb +++ b/lib/protocol/http/body/writable.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true # Released under the MIT License. -# Copyright, 2018-2023, by Samuel Williams. +# Copyright, 2024, by Samuel Williams. require_relative 'readable' @@ -16,15 +16,9 @@ class Closed < StandardError # @param [Integer] length The length of the response body if known. # @param [Async::Queue] queue Specify a different queue implementation, e.g. `Async::LimitedQueue.new(8)` to enable back-pressure streaming. def initialize(length = nil, queue: Thread::Queue.new) - @queue = queue - @length = length - + @queue = queue @count = 0 - - @finished = false - - @closed = false @error = nil end @@ -34,18 +28,16 @@ def length # Stop generating output; cause the next call to write to fail with the given error. Does not prevent existing chunks from being read. In other words, this indicates both that no more data will be or should be written to the body. def close(error = nil) - unless @closed - @queue.close - - @closed = true - @error = error - end + @error ||= error + + @queue.clear + @queue.close super end def closed? - @closed + @queue.closed? end def ready? @@ -59,22 +51,71 @@ def empty? # Read the next available chunk. def read + if @error + raise @error + end + @queue.pop end # Write a single chunk to the body. Signal completion by calling `#finish`. def write(chunk) - # If the reader breaks, the writer will break. - # The inverse of this is less obvious (*) - if @closed + if @queue.closed? raise(@error || Closed) end - @count += 1 @queue.push(chunk) + @count += 1 end - alias << write + def close_write(error = nil) + @error ||= error + @queue.close + end + + class Output + def initialize(writable) + @writable = writable + @closed = false + end + + def closed? + @closed || @writable.closed? + end + + def write(chunk) + @writable.write(chunk) + end + + alias << write + + def close(error = nil) + @closed = true + + if error + @writable.close(error) + else + @writable.close_write + end + end + end + + # Create an output wrapper which can be used to write chunks to the body. + def output + output = Output.new(self) + + unless block_given? + return output + end + + begin + yield output + rescue => error + raise error + ensure + output.close(error) + end + end def inspect "\#<#{self.class} #{@count} chunks written, #{status}>" diff --git a/test/protocol/http/body/buffered.rb b/test/protocol/http/body/buffered.rb index d6167df..be72de7 100644 --- a/test/protocol/http/body/buffered.rb +++ b/test/protocol/http/body/buffered.rb @@ -5,11 +5,14 @@ # Copyright, 2020-2023, by Bruno Sutic. require 'protocol/http/body/buffered' +require "protocol/http/body/a_readable_body" describe Protocol::HTTP::Body::Buffered do let(:source) {["Hello", "World"]} let(:body) {subject.wrap(source)} + it_behaves_like Protocol::HTTP::Body::AReadableBody + with ".wrap" do with "an instance of Protocol::HTTP::Body::Readable as a source" do let(:source) {Protocol::HTTP::Body::Readable.new} @@ -158,6 +161,15 @@ end end + with "#clear" do + it "clears all chunks and resets length" do + body.clear + expect(body.chunks).to be(:empty?) + expect(body.read).to be == nil + expect(body.length).to be == 0 + end + end + with '#inspect' do it "can be inspected" do expect(body.inspect).to be =~ /\d+ chunks, \d+ bytes/ diff --git a/test/protocol/http/body/deflate.rb b/test/protocol/http/body/deflate.rb index 75e93f3..7d3e8a5 100644 --- a/test/protocol/http/body/deflate.rb +++ b/test/protocol/http/body/deflate.rb @@ -2,7 +2,7 @@ # frozen_string_literal: true # Released under the MIT License. -# Copyright, 2019-2023, by Samuel Williams. +# Copyright, 2019-2024, by Samuel Williams. require 'protocol/http/body/buffered' require 'protocol/http/body/deflate' @@ -17,7 +17,6 @@ it "should round-trip data" do body.write("Hello World!") - body.close expect(decompressed_body.join).to be == "Hello World!" end @@ -26,7 +25,6 @@ it "should round-trip data" do body.write(data) - body.close expect(decompressed_body.read).to be == data expect(decompressed_body.read).to be == nil @@ -39,7 +37,6 @@ 10.times do body.write("Hello World!") end - body.close 10.times do expect(decompressed_body.read).to be == "Hello World!" diff --git a/test/protocol/http/body/digestable.rb b/test/protocol/http/body/digestable.rb index 046b3b6..0dbb5ad 100644 --- a/test/protocol/http/body/digestable.rb +++ b/test/protocol/http/body/digestable.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true # Released under the MIT License. -# Copyright, 2020-2023, by Samuel Williams. +# Copyright, 2020-2024, by Samuel Williams. require 'protocol/http/body/digestable' require 'protocol/http/body/buffered' diff --git a/test/protocol/http/body/file.rb b/test/protocol/http/body/file.rb index e54d090..dac8727 100644 --- a/test/protocol/http/body/file.rb +++ b/test/protocol/http/body/file.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true # Released under the MIT License. -# Copyright, 2019-2023, by Samuel Williams. +# Copyright, 2019-2024, by Samuel Williams. require 'protocol/http/body/file' @@ -9,11 +9,11 @@ let(:path) {File.expand_path('file_spec.txt', __dir__)} let(:body) {subject.open(path)} - with '#stream?' do - it "should be streamable" do - expect(body).to be(:stream?) - end - end + # with '#stream?' do + # it "should be streamable" do + # expect(body).to be(:stream?) + # end + # end with '#join' do it "should read entire file" do diff --git a/test/protocol/http/body/streamable.rb b/test/protocol/http/body/streamable.rb index 2f4e91f..51a8885 100644 --- a/test/protocol/http/body/streamable.rb +++ b/test/protocol/http/body/streamable.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true # Released under the MIT License. -# Copyright, 2022, by Samuel Williams. +# Copyright, 2024, by Samuel Williams. require 'protocol/http/body/streamable' @@ -14,8 +14,7 @@ end end - let(:input) {nil} - let(:body) {subject.new(block, input)} + let(:body) {subject.new(block)} with "#stream?" do it "should be streamable" do @@ -63,11 +62,25 @@ expect do @stream.write("!") - end.to raise_exception(RuntimeError, message: be =~ /Stream is not being read!/) + end.to raise_exception(Protocol::HTTP::Body::Streamable::ClosedError, message: be =~ /Stream is not being read!/) end end end + with '#close_write' do + let(:block) do + proc do |stream| + stream.close_write + end + end + + let(:body) {subject.new(block)} + + it "can close the output body" do + expect(body.read).to be == nil + end + end + with '#each' do it "can read the body" do chunks = [] @@ -83,18 +96,37 @@ expect(stream.string).to be == "HelloWorld" end + it "will fail if invoked twice" do + stream = StringIO.new + body.call(stream) + + expect do + body.call(stream) + end.to raise_exception(Protocol::HTTP::Body::Streamable::ConsumedError) + end + + it "will fail if trying to read after streaming" do + stream = StringIO.new + body.call(stream) + + expect do + body.read + end.to raise_exception(Protocol::HTTP::Body::Streamable::ConsumedError) + end + with "a block that raises an error" do let(:block) do proc do |stream| stream.write("Hello") raise "Oh no... a wild error appeared!" + ensure + stream.close end end it "closes the stream if an error occurs" do stream = StringIO.new - expect(body).to receive(:close) expect do body.call(stream) @@ -108,8 +140,17 @@ with '#close' do it "can close the body" do expect(body.read).to be == "Hello" + body.close end + + it "can raise an error on the block" do + expect(body.read).to be == "Hello" + + expect do + body.close(RuntimeError.new("Oh no!")) + end.to raise_exception(RuntimeError, message: be =~ /Oh no!/) + end end with "nested fiber" do @@ -137,6 +178,8 @@ end end + let(:body) {subject.new(block, input)} + it "can read from input" do expect(body.read).to be == "Hello" expect(body.read).to be == " " @@ -151,5 +194,34 @@ expect(output.string).to be == "Hello World" end + + with '#close' do + it "can close the body" do + expect(body.read).to be == "Hello" + body.close + end + end + end + + with "#stream" do + let(:block) do + proc do |stream| + while chunk = stream.read_partial + stream.write(chunk) + end + end + end + + it "can stream to output" do + input = Protocol::HTTP::Body::Buffered.new(["Hello", " ", "World"]) + + body.stream(input) + + expect(body.read).to be == "Hello" + expect(body.read).to be == " " + expect(body.read).to be == "World" + + body.close + end end end diff --git a/test/protocol/http/body/writable.rb b/test/protocol/http/body/writable.rb index 3da5cf6..82ce4c6 100644 --- a/test/protocol/http/body/writable.rb +++ b/test/protocol/http/body/writable.rb @@ -1,14 +1,17 @@ # frozen_string_literal: true # Released under the MIT License. -# Copyright, 2018-2023, by Samuel Williams. +# Copyright, 2024, by Samuel Williams. require 'protocol/http/body/writable' require 'protocol/http/body/deflate' +require 'protocol/http/body/a_writable_body' describe Protocol::HTTP::Body::Writable do let(:body) {subject.new} + it_behaves_like Protocol::HTTP::Body::AWritableBody + with "#length" do it "should be unspecified by default" do expect(body.length).to be_nil @@ -41,14 +44,15 @@ it "should be empty if closed with no pending chunks" do expect(body).not.to be(:empty?) - body.close + body.close_write expect(body).to be(:empty?) end it "should become empty when pending chunks are read" do body.write("Hello") - body.close + + body.close_write expect(body).not.to be(:empty?) body.read @@ -102,7 +106,7 @@ body.write("#{i}") end - body.close + body.close_write expect(body.join).to be == "012" end @@ -114,7 +118,7 @@ body.write("Hello World #{i}") end - body.close + body.close_write 3.times do |i| chunk = body.read @@ -154,7 +158,7 @@ it "will stop after finishing" do body.write("Hello World!") - body.close + body.close_write expect(body).not.to be(:empty?) @@ -165,4 +169,31 @@ expect(body).to be(:empty?) end end + + with "#output" do + it "can be used to write data" do + body.output do |output| + output.write("Hello World!") + end + + expect(body.output).to be(:closed?) + + expect(body.read).to be == "Hello World!" + expect(body.read).to be_nil + end + + it "can propagate errors" do + expect do + body.output do |output| + raise "Oops!" + end + end.to raise_exception(RuntimeError, message: be =~ /Oops/) + + expect(body).to be(:closed?) + + expect do + body.read + end.to raise_exception(RuntimeError, message: be =~ /Oops/) + end + end end