Skip to content
24 changes: 22 additions & 2 deletions include/boost/beast2/impl/write.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class write_some_op

BOOST_ASIO_CORO_REENTER(*this)
{
self.reset_cancellation_state(
asio::enable_total_cancellation());

rv = sr_.prepare();
if(! rv)
{
Expand Down Expand Up @@ -117,8 +120,24 @@ class write_op
{
BOOST_ASIO_CORO_REENTER(*this)
{
self.reset_cancellation_state(asio::enable_total_cancellation());

do
{
if(!!self.cancelled())
{
ec = asio::error::operation_aborted;
BOOST_ASIO_CORO_YIELD
{
BOOST_ASIO_HANDLER_LOCATION(
(__FILE__, __LINE__, "immediate"));
auto io_ex = self.get_io_executor();
asio::async_immediate(
io_ex, asio::append(std::move(self), ec));
}
break; // goto upcall
}

BOOST_ASIO_CORO_YIELD
{
BOOST_ASIO_HANDLER_LOCATION((
Expand All @@ -128,12 +147,13 @@ class write_op
dest_, sr_, std::move(self));
}
n_ += bytes_transferred;

if(ec.failed())
break;
break; // goto upcall
}
while(! sr_.is_done());

// upcall
// upcall:
self.complete(ec, n_ );
}
}
Expand Down
42 changes: 23 additions & 19 deletions include/boost/beast2/test/detail/stream_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ class stream_service

//------------------------------------------------------------------------------

struct stream_read_op_base
struct stream_op_base
{
virtual ~stream_read_op_base() = default;
virtual ~stream_op_base() = default;
virtual void operator()(system::error_code ec) = 0;
};

Expand All @@ -89,8 +89,9 @@ struct stream_state
std::mutex m;
std::string storage;
buffers::string_buffer b;
std::condition_variable cv;
std::unique_ptr<stream_read_op_base> op;
//std::condition_variable cv;
std::unique_ptr<stream_op_base> rop;
std::unique_ptr<stream_op_base> wop;
stream_status code = stream_status::ok;
fail_count* fc = nullptr;
std::size_t nread = 0;
Expand Down Expand Up @@ -133,14 +134,17 @@ void
stream_service::
shutdown()
{
std::vector<std::unique_ptr<detail::stream_read_op_base>> v;
std::lock_guard<std::mutex> g1(sp_->m_);
v.reserve(sp_->v_.size());
for(auto p : sp_->v_)
std::vector<std::unique_ptr<detail::stream_op_base>> v;
{
std::lock_guard<std::mutex> g2(p->m);
v.emplace_back(std::move(p->op));
p->code = detail::stream_status::eof;
std::lock_guard<std::mutex> g1(sp_->m_);
v.reserve(2 * sp_->v_.size());
for(auto p : sp_->v_)
{
std::lock_guard<std::mutex> g2(p->m);
v.emplace_back(std::move(p->rop));
v.emplace_back(std::move(p->wop));
p->code = detail::stream_status::eof;
}
}
}

Expand Down Expand Up @@ -200,8 +204,11 @@ stream_state::
~stream_state()
{
// cancel outstanding read
if(op != nullptr)
(*op)(asio::error::operation_aborted);
if(rop != nullptr)
(*rop)(asio::error::operation_aborted);
// cancel outstanding write
if(wop != nullptr)
(*wop)(asio::error::operation_aborted);
}

inline
Expand All @@ -223,16 +230,13 @@ void
stream_state::
notify_read()
{
if(op)
if(rop)
{
auto op_ = std::move(op);
auto op_ = std::move(rop);
op_->operator()(system::error_code{});
}
else
{
cv.notify_all();
}
}

} // detail
} // test
} // beast2
Expand Down
Loading
Loading