Skip to content

Commit 8d28c55

Browse files
committed
chore: asynchronous IO for connection fiber
Signed-off-by: Kostas Kyrimis <[email protected]>
1 parent eaedc9a commit 8d28c55

File tree

5 files changed

+199
-8
lines changed

5 files changed

+199
-8
lines changed

src/facade/dragonfly_connection.cc

Lines changed: 169 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,13 @@
1010
#include <absl/strings/str_cat.h>
1111
#include <absl/time/time.h>
1212

13+
#include <condition_variable>
1314
#include <numeric>
15+
#include <utility>
1416
#include <variant>
1517

18+
#include "absl/cleanup/cleanup.h"
19+
#include "absl/types/span.h"
1620
#include "base/cycle_clock.h"
1721
#include "base/flag_utils.h"
1822
#include "base/flags.h"
@@ -112,6 +116,8 @@ ABSL_FLAG(uint32_t, pipeline_wait_batch_usec, 0,
112116
"If non-zero, waits for this time for more I/O "
113117
" events to come for the connection in case there is only one command in the pipeline. ");
114118

119+
ABSL_FLAG(bool, expiremental_io_loop_v2, false, "new io loop");
120+
115121
using namespace util;
116122
using namespace std;
117123
using absl::GetFlag;
@@ -695,6 +701,8 @@ void Connection::OnShutdown() {
695701
VLOG(1) << "Connection::OnShutdown";
696702

697703
BreakOnce(POLLHUP);
704+
io_ec_ = make_error_code(errc::connection_aborted);
705+
io_event_.notify();
698706
}
699707

700708
void Connection::OnPreMigrateThread() {
@@ -1096,7 +1104,12 @@ void Connection::ConnectionFlow() {
10961104
// Main loop.
10971105
if (parse_status != ERROR && !ec) {
10981106
UpdateIoBufCapacity(io_buf_, stats_, [&]() { io_buf_.EnsureCapacity(64); });
1099-
auto res = IoLoop();
1107+
variant<error_code, Connection::ParserStatus> res;
1108+
if (GetFlag(FLAGS_expiremental_io_loop_v2)) {
1109+
res = IoLoopV2();
1110+
} else {
1111+
res = IoLoop();
1112+
}
11001113

11011114
if (holds_alternative<error_code>(res)) {
11021115
ec = get<error_code>(res);
@@ -1154,6 +1167,10 @@ void Connection::ConnectionFlow() {
11541167
}
11551168
}
11561169

1170+
if (GetFlag(FLAGS_expiremental_io_loop_v2)) {
1171+
socket_->ResetOnRecvHook();
1172+
}
1173+
11571174
if (ec && !FiberSocketBase::IsConnClosed(ec)) {
11581175
string conn_info = service_->GetContextInfo(cc_.get()).Format();
11591176
LOG_EVERY_T(WARNING, 1) << "Socket error for connection " << conn_info << " " << GetName()
@@ -1225,6 +1242,7 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) {
12251242
auto dispatch_async = [this]() -> MessageHandle { return {FromArgs(tmp_parse_args_)}; };
12261243

12271244
io::Bytes read_buffer = io_buf_.InputBuffer();
1245+
size_t total = 0;
12281246
do {
12291247
result = redis_parser_->Parse(read_buffer, &consumed, &tmp_parse_args_);
12301248
request_consumed_bytes_ += consumed;
@@ -1258,6 +1276,7 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) {
12581276
<< "Redis parser error: " << result << " during parse: " << ToSV(read_buffer);
12591277
}
12601278
read_buffer.remove_prefix(consumed);
1279+
total += consumed;
12611280

12621281
// We must yield from time to time to allow other fibers to run.
12631282
// Specifically, if a client sends a huge chunk of data resulting in a very long pipeline,
@@ -1268,7 +1287,7 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) {
12681287
}
12691288
} while (RedisParser::OK == result && read_buffer.size() > 0 && !reply_builder_->GetError());
12701289

1271-
io_buf_.ConsumeInput(io_buf_.InputLen());
1290+
io_buf_.ConsumeInput(total);
12721291

12731292
parser_error_ = result;
12741293
if (result == RedisParser::OK)
@@ -1430,7 +1449,7 @@ io::Result<size_t> Connection::HandleRecvSocket() {
14301449
return recv_sz;
14311450
}
14321451

1433-
auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
1452+
variant<error_code, Connection::ParserStatus> Connection::IoLoop() {
14341453
error_code ec;
14351454
ParserStatus parse_status = OK;
14361455

@@ -2161,6 +2180,153 @@ bool Connection::WeakRef::operator==(const WeakRef& other) const {
21612180
return client_id_ == other.client_id_;
21622181
}
21632182

2183+
void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) {
2184+
if (std::holds_alternative<std::error_code>(n.read_result)) {
2185+
io_ec_ = std::get<std::error_code>(n.read_result);
2186+
return;
2187+
}
2188+
2189+
// TODO non epoll API via EnableRecvMultishot
2190+
// if (std::holds_alternative<io::MutableBytes>(n.read_result))
2191+
2192+
if (std::holds_alternative<bool>(n.read_result)) {
2193+
if (!std::get<bool>(n.read_result)) {
2194+
io_ec_ = make_error_code(errc::connection_aborted);
2195+
return;
2196+
}
2197+
2198+
if (io_buf_.AppendLen() == 0) {
2199+
// We will regrow in IoLoop
2200+
return;
2201+
}
2202+
2203+
io::MutableBytes buf = io_buf_.AppendBuffer();
2204+
int res = recv(socket_->native_handle(), buf.data(), buf.size(), 0);
2205+
2206+
// error path
2207+
if (res < 0) {
2208+
// LOG(INFO) << "ERROR";
2209+
if (errno == EAGAIN || errno == EWOULDBLOCK) {
2210+
return;
2211+
}
2212+
2213+
if (errno == ECONNRESET) {
2214+
// The peer can shutdown the connection abruptly.
2215+
io_ec_ = make_error_code(errc::connection_aborted);
2216+
}
2217+
2218+
LOG_IF(FATAL, !io_ec_) << "Recv error: " << strerror(-res) << " errno " << errno;
2219+
return;
2220+
}
2221+
2222+
if (res == 0) {
2223+
io_ec_ = make_error_code(errc::connection_aborted);
2224+
return;
2225+
}
2226+
// A recv call can return fewer bytes than requested even if the
2227+
// socket buffer actually contains enough data to satisfy the full request.
2228+
// TODO maybe worth looping here and try another recv call until it fails
2229+
// with EAGAIN or EWOULDBLOCK. The problem there is that we need to handle
2230+
// resizing if AppendBuffer is zero.
2231+
io_buf_.CommitWrite(res);
2232+
return;
2233+
}
2234+
2235+
DCHECK(false) << "Sould not reach here";
2236+
}
2237+
2238+
variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {
2239+
error_code ec;
2240+
ParserStatus parse_status = OK;
2241+
2242+
size_t max_iobfuf_len = GetFlag(FLAGS_max_client_iobuf_len);
2243+
2244+
auto* peer = socket_.get();
2245+
recv_buf_.res_len = 0;
2246+
2247+
// TODO EnableRecvMultishot
2248+
2249+
// Breaks with TLS. RegisterOnRecv is unimplemented.
2250+
peer->RegisterOnRecv([this](const FiberSocketBase::RecvNotification& n) {
2251+
DoReadOnRecv(n);
2252+
io_event_.notify();
2253+
});
2254+
2255+
do {
2256+
HandleMigrateRequest();
2257+
2258+
// We *must* poll again for readiness. The event handler we registered above
2259+
// with RegisterOnRecv() will get called *once* for each socket readiness event.
2260+
// So, when we get notified below in io_event_.wait() we might read less data
2261+
// than it is available because io_buf_ does not have enough capacity. If we loop,
2262+
// and do not attempt to read from the socket again we can deadlock. To avoid this,
2263+
// we poll once for readiness before preempting.
2264+
DoReadOnRecv(FiberSocketBase::RecvNotification{true});
2265+
io_event_.await(
2266+
[this]() { return io_buf_.InputLen() > 0 || io_ec_ || io_buf_.AppendLen() == 0; });
2267+
2268+
if (io_ec_) {
2269+
LOG_IF(WARNING, cntx()->replica_conn) << "async io error: " << ec;
2270+
return std::exchange(io_ec_, {});
2271+
}
2272+
2273+
phase_ = PROCESS;
2274+
bool is_iobuf_full = io_buf_.AppendLen() == 0;
2275+
2276+
if (redis_parser_) {
2277+
parse_status = ParseRedis(max_busy_read_cycles_cached);
2278+
} else {
2279+
DCHECK(memcache_parser_);
2280+
parse_status = ParseMemcache();
2281+
}
2282+
2283+
if (reply_builder_->GetError()) {
2284+
return reply_builder_->GetError();
2285+
}
2286+
2287+
if (parse_status == NEED_MORE) {
2288+
parse_status = OK;
2289+
2290+
size_t capacity = io_buf_.Capacity();
2291+
if (capacity < max_iobfuf_len) {
2292+
size_t parser_hint = 0;
2293+
if (redis_parser_)
2294+
parser_hint = redis_parser_->parselen_hint(); // Could be done for MC as well.
2295+
2296+
// If we got a partial request and we managed to parse its
2297+
// length, make sure we have space to store it instead of
2298+
// increasing space incrementally.
2299+
// (Note: The buffer object is only working in power-of-2 sizes,
2300+
// so there's no danger of accidental O(n^2) behavior.)
2301+
if (parser_hint > capacity) {
2302+
UpdateIoBufCapacity(io_buf_, stats_,
2303+
[&]() { io_buf_.Reserve(std::min(max_iobfuf_len, parser_hint)); });
2304+
}
2305+
2306+
// If we got a partial request because iobuf was full, grow it up to
2307+
// a reasonable limit to save on Recv() calls.
2308+
if (is_iobuf_full && capacity < max_iobfuf_len / 2) {
2309+
// Last io used most of the io_buf to the end.
2310+
UpdateIoBufCapacity(io_buf_, stats_, [&]() {
2311+
io_buf_.Reserve(capacity * 2); // Valid growth range.
2312+
});
2313+
}
2314+
2315+
if (io_buf_.AppendLen() == 0U) {
2316+
// it can happen with memcached but not for RedisParser, because RedisParser fully
2317+
// consumes the passed buffer
2318+
LOG_EVERY_T(WARNING, 10)
2319+
<< "Maximum io_buf length reached, consider to increase max_client_iobuf_len flag";
2320+
}
2321+
}
2322+
} else if (parse_status != OK) {
2323+
break;
2324+
}
2325+
} while (peer->IsOpen());
2326+
2327+
return parse_status;
2328+
}
2329+
21642330
void ResetStats() {
21652331
auto& cstats = tl_facade_stats->conn_stats;
21662332
cstats.pipelined_cmd_cnt = 0;

src/facade/dragonfly_connection.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "io/io_buf.h"
2121
#include "util/connection.h"
2222
#include "util/fibers/fibers.h"
23+
#include "util/fibers/synchronization.h"
2324
#include "util/http/http_handler.h"
2425

2526
typedef struct ssl_ctx_st SSL_CTX;
@@ -349,6 +350,10 @@ class Connection : public util::Connection {
349350
// Main loop reading client messages and passing requests to dispatch queue.
350351
std::variant<std::error_code, ParserStatus> IoLoop();
351352

353+
void DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n);
354+
// Main loop reading client messages and passing requests to dispatch queue.
355+
std::variant<std::error_code, ParserStatus> IoLoopV2();
356+
352357
// Returns true if HTTP header is detected.
353358
io::Result<bool> CheckForHttpProto();
354359

@@ -421,6 +426,9 @@ class Connection : public util::Connection {
421426
util::fb2::CondVarAny cnd_; // dispatch queue waker
422427
util::fb2::Fiber async_fb_; // async fiber (if started)
423428

429+
std::error_code io_ec_;
430+
util::fb2::EventCount io_event_;
431+
424432
uint64_t pending_pipeline_cmd_cnt_ = 0; // how many queued Redis async commands in dispatch_q
425433
size_t pending_pipeline_bytes_ = 0; // how many bytes of the queued Redis async commands
426434

tests/dragonfly/connection_test.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -408,23 +408,33 @@ async def pub_task():
408408

409409

410410
@pytest.mark.slow
411-
@dfly_args({"proactor_threads": "4"})
411+
@dfly_args({"proactor_threads": "4", "migrate_connections": False})
412412
async def test_pubsub_busy_connections(df_server: DflyInstance):
413-
sleep = 60
413+
sleep = 10
414+
415+
idd = 0
414416

415417
async def sub_thread():
416418
i = 0
417419

418420
async def sub_task():
419421
nonlocal i
422+
nonlocal idd
420423
sleep_task = asyncio.create_task(asyncio.sleep(sleep))
424+
j = idd
425+
idd = idd + 1
421426
while not sleep_task.done():
422427
client = df_server.client()
423428
pubsub = client.pubsub()
424-
await pubsub.subscribe("channel")
429+
try:
430+
await pubsub.subscribe("channel")
431+
except Exception as e:
432+
logging.info(f"ERRRRRRRROR {j}")
433+
pass
425434
# await pubsub.unsubscribe("channel")
426435
i = i + 1
427436
await client.close()
437+
logging.info(f"SUB DONE {j}")
428438

429439
subs = [asyncio.create_task(sub_task()) for _ in range(10)]
430440
for s in subs:
@@ -436,24 +446,29 @@ async def pub_task():
436446
i = 0
437447
sleep_task = asyncio.create_task(asyncio.sleep(sleep))
438448
while not sleep_task.done():
449+
# logging.info("before")
439450
await pub.publish("channel", f"message-{i}")
440451
i = i + 1
452+
# logging.info("after")
453+
logging.info("DONE")
441454

442455
def run_in_thread():
443456
loop = asyncio.new_event_loop()
444457
asyncio.set_event_loop(loop)
445458
loop.run_until_complete(sub_thread())
446459

447460
threads = []
448-
for _ in range(10):
461+
for _ in range(1):
449462
thread = Thread(target=run_in_thread)
450463
thread.start()
451464
threads.append(thread)
452465

453466
await pub_task()
454467

468+
logging.info("==================")
455469
for thread in threads:
456470
thread.join()
471+
logging.info("==================")
457472

458473

459474
async def test_subscribers_with_active_publisher(df_server: DflyInstance, max_connections=100):

tests/dragonfly/instance.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,8 @@ def create(self, existing_port=None, path=None, version=100, **kwargs) -> DflyIn
436436
if version >= 1.26:
437437
args.setdefault("fiber_safety_margin=4096")
438438

439+
args.setdefault("expiremental_io_loop_v2=true")
440+
439441
for k, v in args.items():
440442
args[k] = v.format(**self.params.env) if isinstance(v, str) else v
441443

0 commit comments

Comments
 (0)