Skip to content

Commit

Permalink
Ensure read/readavailable for BufferStream are threadsafe (JuliaLang#…
Browse files Browse the repository at this point in the history
…57211)

It looks like these methods were just missed while overloading for
BufferStream.

There's also `readbytes!` where the current implementation will fallback
to the `LibuvStream` implementation that is currently not threadsafe.
What's the best approach there since the implementation is quite a bit
more involved? Just duplicate the code but for BufferStream? Should we
take the BufferStream lock and invoke the LibuvStream method? Open to
ideas there.

Also open to suggestions for having tests here? Not easy to simulate the
data race of writing and calling readavailable.

The fix here will unblock JuliaWeb/HTTP.jl#1213
(I'll probably do some compat shim there until this is fully released).

Thanks to @oscardssmith for rubber ducking this issue with me.

Probably most helpfully reviewed by @vtjnash.

---------

Co-authored-by: Jameson Nash <[email protected]>
  • Loading branch information
quinnj and vtjnash authored Feb 4, 2025
1 parent 2f0a523 commit ffc96bc
Showing 1 changed file with 58 additions and 0 deletions.
58 changes: 58 additions & 0 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,64 @@ function wait_readnb(s::BufferStream, nb::Int)
end
end

function readavailable(this::BufferStream)
bytes = lock(this.cond) do
wait_readnb(this, 1)
buf = this.buffer
@assert buf.seekable == false
take!(buf)
end
return bytes
end

function read(stream::BufferStream)
bytes = lock(stream.cond) do
wait_close(stream)
take!(stream.buffer)
end
return bytes
end

function readbytes!(s::BufferStream, a::Vector{UInt8}, nb::Int)
sbuf = s.buffer
@assert sbuf.seekable == false
@assert sbuf.maxsize >= nb

function wait_locked(s, buf, nb)
while bytesavailable(buf) < nb
s.readerror === nothing || throw(s.readerror)
isopen(s) || break
s.status != StatusEOF || break
wait_readnb(s, nb)
end
end

bytes = lock(s.cond) do
if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
wait_locked(s, sbuf, nb)
end
if bytesavailable(sbuf) >= nb
nread = readbytes!(sbuf, a, nb)
else
initsize = length(a)
newbuf = PipeBuffer(a, maxsize=nb)
newbuf.size = newbuf.offset # reset the write pointer to the beginning
nread = try
s.buffer = newbuf
write(newbuf, sbuf)
wait_locked(s, newbuf, nb)
bytesavailable(newbuf)
finally
s.buffer = sbuf
end
_take!(a, _unsafe_take!(newbuf))
length(a) >= initsize || resize!(a, initsize)
end
return nread
end
return bytes
end

show(io::IO, s::BufferStream) = print(io, "BufferStream(bytes waiting=", bytesavailable(s.buffer), ", isopen=", isopen(s), ")")

function readuntil(s::BufferStream, c::UInt8; keep::Bool=false)
Expand Down

0 comments on commit ffc96bc

Please sign in to comment.