Skip to content

Commit e62d5df

Browse files
committed
tmp
Signed-off-by: Kostas Kyrimis <[email protected]>
1 parent 00bcdfe commit e62d5df

File tree

3 files changed

+100
-38
lines changed

3 files changed

+100
-38
lines changed

src/facade/dragonfly_connection.cc

Lines changed: 79 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
#include <absl/strings/str_cat.h>
1111
#include <absl/time/time.h>
1212

13+
#include <condition_variable>
1314
#include <numeric>
1415
#include <utility>
1516
#include <variant>
1617

1718
#include "absl/cleanup/cleanup.h"
19+
#include "absl/types/span.h"
1820
#include "base/cycle_clock.h"
1921
#include "base/flag_utils.h"
2022
#include "base/flags.h"
@@ -699,6 +701,8 @@ void Connection::OnShutdown() {
699701
VLOG(1) << "Connection::OnShutdown";
700702

701703
BreakOnce(POLLHUP);
704+
io_ec_ = make_error_code(errc::connection_aborted);
705+
io_event_.notify();
702706
}
703707

704708
void Connection::OnPreMigrateThread() {
@@ -1237,10 +1241,9 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) {
12371241
read_buffer.slice = io_buf_.InputBuffer();
12381242
read_buffer.available_bytes = io_buf_.InputLen();
12391243

1244+
size_t total = 0;
1245+
12401246
do {
1241-
if (read_buffer.ShouldAdvance()) { // can happen only with io_uring/bundles
1242-
read_buffer.slice = NextBundleBuffer(read_buffer.available_bytes);
1243-
}
12441247
result = redis_parser_->Parse(read_buffer.slice, &consumed, &tmp_parse_args_);
12451248
request_consumed_bytes_ += consumed;
12461249
if (result == RedisParser::OK && !tmp_parse_args_.empty()) {
@@ -1269,9 +1272,11 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) {
12691272
}
12701273
if (result != RedisParser::OK && result != RedisParser::INPUT_PENDING) {
12711274
// We do not expect that a replica sends an invalid command so we log if it happens.
1272-
LOG_IF(WARNING, cntx()->replica_conn)
1273-
<< "Redis parser error: " << result << " during parse: " << ToSV(read_buffer.slice);
1275+
LOG(WARNING) << "Redis parser error: " << result
1276+
<< " during parse: " << ToSV(read_buffer.slice);
12741277
}
1278+
1279+
total += consumed;
12751280
read_buffer.Consume(consumed);
12761281

12771282
// We must yield from time to time to allow other fibers to run.
@@ -1284,7 +1289,7 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) {
12841289
} while (RedisParser::OK == result && read_buffer.available_bytes > 0 &&
12851290
!reply_builder_->GetError());
12861291

1287-
io_buf_.ConsumeInput(io_buf_.InputLen());
1292+
io_buf_.ConsumeInput(total);
12881293

12891294
parser_error_ = result;
12901295
if (result == RedisParser::OK)
@@ -2189,43 +2194,52 @@ bool Connection::WeakRef::operator==(const WeakRef& other) const {
21892194
}
21902195

21912196
void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) {
2192-
absl::Cleanup on_exit([this]() { io_event_.notify(); });
2193-
21942197
if (std::holds_alternative<std::error_code>(n.read_result)) {
21952198
io_ec_ = std::get<std::error_code>(n.read_result);
21962199
return;
21972200
}
21982201

2199-
// EnableRecvMultishot path
2200-
// if (std::holds_alternative<io::MutableBytes>(n.read_result)) {
2201-
// auto buf = std::get<io::MutableBytes>(n.read_result);
2202-
// // TODO get rid of the copy
2203-
// io_buf_.WriteAndCommit(buf.data(), buf.size());
2204-
// return;
2205-
// }
2202+
// TODO non epoll API via EnableRecvMultishot
2203+
// if (std::holds_alternative<io::MutableBytes>(n.read_result))
22062204

22072205
if (std::holds_alternative<monostate>(n.read_result)) {
2208-
while (true) {
2209-
io::MutableBytes buf = io_buf_.AppendBuffer();
2210-
size_t buf_sz = buf.size();
2211-
int res = recv(socket_->native_handle(), buf.data(), buf.size(), 0);
2212-
if (res < 0) {
2213-
if (errno == EAGAIN || errno == EWOULDBLOCK) {
2214-
// we are done, there is no more socket data available to be read
2215-
return;
2216-
}
2217-
LOG(FATAL) << "Recv error: " << strerror(-res) << " errno " << errno;
2206+
if (io_buf_.AppendLen() == 0) {
2207+
// We will regrow in IoLoop
2208+
return;
2209+
}
2210+
2211+
io::MutableBytes buf = io_buf_.AppendBuffer();
2212+
int res = recv(socket_->native_handle(), buf.data(), buf.size(), 0);
2213+
2214+
// error path
2215+
if (res < 0) {
2216+
// LOG(INFO) << "ERROR";
2217+
if (errno == EAGAIN || errno == EWOULDBLOCK) {
2218+
// we are done, there is no more socket data available to be read.
2219+
LOG(INFO) << "EAGAIN on: " << this;
22182220
return;
22192221
}
2220-
if (res == 0) {
2222+
2223+
if (errno == ECONNRESET) {
2224+
// The peer can shutdown the connection abruptly.
22212225
io_ec_ = make_error_code(errc::connection_aborted);
2222-
return;
2223-
}
2224-
io_buf_.CommitWrite(res);
2225-
if (buf_sz == size_t(res)) {
2226-
return;
22272226
}
2227+
2228+
LOG_IF(FATAL, !io_ec_) << "Recv error: " << strerror(-res) << " errno " << errno;
2229+
return;
2230+
}
2231+
2232+
if (res == 0) {
2233+
LOG(INFO) << "ZERO";
2234+
io_ec_ = make_error_code(errc::connection_aborted);
2235+
return;
22282236
}
2237+
// A recv call can return fewer bytes than requested even if the
2238+
// socket buffer actually contains enough data to satisfy the full request.
2239+
// TODO maybe worth looping here and try another recv call until it fails
2240+
// with EAGAIN or EWOULDBLOCK.
2241+
io_buf_.CommitWrite(res);
2242+
return;
22292243
}
22302244

22312245
DCHECK(false) << "Sould not reach here";
@@ -2240,18 +2254,47 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {
22402254
auto* peer = socket_.get();
22412255
recv_buf_.res_len = 0;
22422256

2243-
// TODO EnableRecvMultishot on iourin ?
2257+
// TODO EnableRecvMultishot
2258+
size_t j = 0;
22442259

22452260
// Breaks with TLS. RegisterOnRecv is unimplemented.
2246-
peer->RegisterOnRecv([this](const FiberSocketBase::RecvNotification& n) {
2261+
peer->RegisterOnRecv([this, &j](const FiberSocketBase::RecvNotification& n) {
2262+
LOG(INFO) << "Called from hook " << this;
2263+
++j;
22472264
DoReadOnRecv(n);
22482265
io_event_.notify();
22492266
});
22502267

2268+
size_t i = 0;
2269+
22512270
do {
2271+
++i;
2272+
if (i == 1) {
2273+
LOG(INFO) << "Hit " << i << " with j " << j;
2274+
}
22522275
HandleMigrateRequest();
22532276

2254-
io_event_.await([this]() { return io_buf_.InputLen() > 0 || io_ec_; });
2277+
// We *must* poll again for readiness. The event handler we registered above
2278+
// with RegisterOnRecv() will get called *once* for each socket readiness event.
2279+
// So, when we get notified below in io_event_.wait() we might read less data
2280+
// than it is available because io_buf_ does not have enough capacity. If we loop,
2281+
// and do not attempt to read from the socket again we can deadlock. To avoid this,
2282+
// we poll once for readiness before preempting.
2283+
DoReadOnRecv(FiberSocketBase::RecvNotification{std::monostate()});
2284+
LOG(INFO) << "Before block " << this;
2285+
std::cv_status res = io_event_.await_until(
2286+
[this]() { return io_buf_.InputLen() > 0 || io_ec_ || io_buf_.AppendLen() == 0; },
2287+
chrono::steady_clock::now() + 10000ms);
2288+
LOG(INFO) << "After block " << this;
2289+
2290+
if (res == std::cv_status::timeout) {
2291+
LOG(INFO) << "TIMEOUT DETECTED FOR CONNECTION " << this << "\nTotal iterations " << i
2292+
<< "\nTotal registered on recv calls " << j;
2293+
LOG(INFO) << "^ Before " << io_buf_.InputLen() << " " << this;
2294+
DoReadOnRecv(FiberSocketBase::RecvNotification{std::monostate()});
2295+
LOG(INFO) << "^ After " << io_buf_.InputLen() << " " << this;
2296+
// DCHECK(io_buf_.InputLen() != 0);
2297+
}
22552298

22562299
if (io_ec_) {
22572300
LOG_IF(WARNING, cntx()->replica_conn) << "async io error: " << ec;
@@ -2268,6 +2311,8 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {
22682311
parse_status = ParseMemcache();
22692312
}
22702313

2314+
// LOG(INFO) << "parse status is " << size_t(parse_status);
2315+
22712316
if (reply_builder_->GetError()) {
22722317
return reply_builder_->GetError();
22732318
}

tests/dragonfly/connection_test.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -408,23 +408,33 @@ async def pub_task():
408408

409409

410410
@pytest.mark.slow
411-
@dfly_args({"proactor_threads": "4"})
411+
@dfly_args({"proactor_threads": "4", "migrate_connections": False})
412412
async def test_pubsub_busy_connections(df_server: DflyInstance):
413-
sleep = 60
413+
sleep = 10
414+
415+
idd = 0
414416

415417
async def sub_thread():
416418
i = 0
417419

418420
async def sub_task():
419421
nonlocal i
422+
nonlocal idd
420423
sleep_task = asyncio.create_task(asyncio.sleep(sleep))
424+
j = idd
425+
idd = idd + 1
421426
while not sleep_task.done():
422427
client = df_server.client()
423428
pubsub = client.pubsub()
424-
await pubsub.subscribe("channel")
429+
try:
430+
await pubsub.subscribe("channel")
431+
except Exception as e:
432+
logging.info(f"ERRRRRRRROR {j}")
433+
pass
425434
# await pubsub.unsubscribe("channel")
426435
i = i + 1
427436
await client.close()
437+
logging.info(f"SUB DONE {j}")
428438

429439
subs = [asyncio.create_task(sub_task()) for _ in range(10)]
430440
for s in subs:
@@ -436,24 +446,29 @@ async def pub_task():
436446
i = 0
437447
sleep_task = asyncio.create_task(asyncio.sleep(sleep))
438448
while not sleep_task.done():
449+
# logging.info("before")
439450
await pub.publish("channel", f"message-{i}")
440451
i = i + 1
452+
# logging.info("after")
453+
logging.info("DONE")
441454

442455
def run_in_thread():
443456
loop = asyncio.new_event_loop()
444457
asyncio.set_event_loop(loop)
445458
loop.run_until_complete(sub_thread())
446459

447460
threads = []
448-
for _ in range(10):
461+
for _ in range(1):
449462
thread = Thread(target=run_in_thread)
450463
thread.start()
451464
threads.append(thread)
452465

453466
await pub_task()
454467

468+
logging.info("==================")
455469
for thread in threads:
456470
thread.join()
471+
logging.info("==================")
457472

458473

459474
async def test_subscribers_with_active_publisher(df_server: DflyInstance, max_connections=100):

tests/dragonfly/instance.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,8 @@ def create(self, existing_port=None, path=None, version=100, **kwargs) -> DflyIn
436436
if version >= 1.26:
437437
args.setdefault("fiber_safety_margin=4096")
438438

439+
args.setdefault("expiremental_io_loop_v2=true")
440+
439441
for k, v in args.items():
440442
args[k] = v.format(**self.params.env) if isinstance(v, str) else v
441443

0 commit comments

Comments
 (0)