diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index fc11eb9..f2333b3 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -256,7 +256,7 @@ def block(blocker, timeout) # @parameter blocker [Object] The object that was blocking the fiber. # @parameter fiber [Fiber] The fiber to unblock. def unblock(blocker, fiber) - # $stderr.puts "unblock(#{blocker}, #{fiber})" + # Fiber.blocking{$stderr.puts "unblock(#{blocker}, #{fiber})"} # This operation is protected by the GVL: if selector = @selector @@ -272,6 +272,8 @@ def unblock(blocker, fiber) # # @parameter duration [Numeric | Nil] The time in seconds to sleep, or if nil, indefinitely. def kernel_sleep(duration = nil) + # Fiber.blocking{$stderr.puts "kernel_sleep(#{duration}, #{Fiber.current})"} + if duration self.block(nil, duration) else @@ -370,6 +372,34 @@ def io_write(io, buffer, length, offset = 0) end end + # Used to defer stopping the current task until later. + class FiberInterrupt + # Create a new stop later operation. + # + # @parameter task [Task] The task to stop later. + def initialize(fiber, exception) + @fiber = fiber + @exception = exception + end + + # @returns [Boolean] Whether the task is alive. + def alive? + @fiber.alive? + end + + # Transfer control to the operation - this will stop the task. + def transfer + # Fiber.blocking{$stderr.puts "FiberInterrupt#transfer(#{@fiber}, #{@exception})"} + @fiber.raise(@exception) + end + end + + # Raise an exception on the specified fiber, waking up the event loop if necessary. + def fiber_interrupt(fiber, exception) + # Fiber.blocking{$stderr.puts "fiber_interrupt(#{fiber}, #{exception})"} + unblock(nil, FiberInterrupt.new(fiber, exception)) + end + # Wait for the specified process ID to exit. # # @public Since *Async v2*. diff --git a/releases.md b/releases.md index 53d2b44..a778772 100644 --- a/releases.md +++ b/releases.md @@ -8,6 +8,23 @@ The `Async::WorkerPool` implementation has been removed in favor of using `IO::E To enable the worker pool, you can set the `ASYNC_SCHEDULER_WORKER_POOL` environment variable to `true`. This will allow the scheduler to use a worker pool for blocking operations, which can help improve performance in applications that perform a lot of CPU-bound operations (e.g. `rb_nogvl`). +### Better handling of `IO#close` using `fiber_interrupt` + +`IO#close` interrupts fibers that are waiting on the IO using the new `fiber_interrupt` hook introduced in Ruby 3.5/4.0. This means that if you close an IO while a fiber is waiting on it, the fiber will be interrupted and will raise an `IOError`. This is a change from previous versions of Ruby, where closing an IO would not interrupt fibers waiting on it, and would instead interrupt the entire event loop (essentially a bug). + +```ruby +r, w = IO.pipe + +Async do + child = Async do + r.gets + end + + r.close # This will interrupt the child fiber. + child.wait # This will raise an `IOError` because the IO was closed. +end +``` + ## v2.24.0 - Ruby v3.1 support is dropped. diff --git a/test/io.rb b/test/io.rb index 846bb84..738ba8a 100644 --- a/test/io.rb +++ b/test/io.rb @@ -90,4 +90,135 @@ out.close end end + + with "#close" do + it "can interrupt reading fiber when closing" do + skip_unless_minimum_ruby_version("3.5") + + r, w = IO.pipe + + read_task = Async do + expect do + r.read(5) + end.to raise_exception(IOError, message: be =~ /stream closed/) + end + + r.close + read_task.wait + end + + it "can interrupt reading fiber when closing from another fiber" do + skip_unless_minimum_ruby_version("3.5") + + r, w = IO.pipe + + read_task = Async do + expect do + r.read(5) + end.to raise_exception(IOError, message: be =~ /stream closed/) + ensure + puts "Exiting read task" + end + + close_task = Async do + r.close + ensure + puts "Exiting close task" + end + + close_task.wait + read_task.wait + end + + it "can interrupt reading fiber when closing from a new thread" do + skip_unless_minimum_ruby_version("3.5") + + r, w = IO.pipe + + read_task = Async do + expect do + r.read(5) + end.to raise_exception(IOError, message: be =~ /stream closed/) + end + + close_thread = Thread.new do + r.close + end + + close_thread.value + read_task.wait + end + + it "can interrupt reading fiber when closing from a fiber in a new thread" do + skip_unless_minimum_ruby_version("3.5") + + r, w = IO.pipe + + read_task = Async do + expect do + r.read(5) + end.to raise_exception(IOError, message: be =~ /stream closed/) + end + + close_thread = Thread.new do + close_task = Async do + r.close + end + close_task.wait + end + + close_thread.value + read_task.wait + end + + it "can interrupt reading thread when closing from a fiber" do + skip_unless_minimum_ruby_version("3.5") + + r, w = IO.pipe + + read_thread = Thread.new do + Thread.current.report_on_exception = false + r.read(5) + end + + # Wait until read_thread blocks on I/O + Thread.pass until read_thread.status == "sleep" + + close_task = Async do + r.close + end + + close_task.wait + + expect do + read_thread.join + end.to raise_exception(IOError, message: be =~ /closed/) + end + + it "can interrupt reading fiber in a new thread when closing from a fiber" do + skip_unless_minimum_ruby_version("3.5") + + r, w = IO.pipe + + read_thread = Thread.new do + Thread.current.report_on_exception = false + read_task = Async do + expect do + r.read(5) + end.to raise_exception(IOError, message: be =~ /closed/) + end + read_task.wait + end + + # Wait until read_thread blocks on I/O + Thread.pass until read_thread.status == "sleep" + + close_task = Async do + r.close + end + close_task.wait + + read_thread.value + end + end end