diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 15d3ace4eda9..bd540992e73b 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -583,6 +583,17 @@ static TInterconnectSettings GetInterconnectSettings(const NKikimrConfig::TInter result.EventDelay = TDuration::MicroSeconds(config.GetEventDelayMicrosec()); } + if (config.HasSocketSendOptimization()) { + switch (config.GetSocketSendOptimization()) { + case NKikimrConfig::TInterconnectConfig::IC_SO_DISABLED: + result.SocketSendOptimization = ESocketSendOptimization::DISABLED; + break; + case NKikimrConfig::TInterconnectConfig::IC_SO_MSG_ZEROCOPY: + result.SocketSendOptimization = ESocketSendOptimization::IC_MSG_ZEROCOPY; + break; + } + } + return result; } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 0c9ba41544b0..5eac49b79655 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -404,6 +404,11 @@ message TInterconnectConfig { REQUIRED = 2; }; + enum ESocketSendOptimization { + IC_SO_DISABLED = 0; + IC_SO_MSG_ZEROCOPY = 1; + }; + repeated TChannel Channel = 1; optional bool FirstTryBeforePoll = 2; // DEPRECATED optional bool StartTcp = 3 [default = false]; @@ -433,9 +438,10 @@ message TInterconnectConfig { optional bool SuppressConnectivityCheck = 39 [default = false]; optional uint32 PreallocatedBufferSize = 40; optional uint32 NumPreallocatedBuffers = 41; - optional bool EnableExternalDataChannel = 42; + optional bool EnableExternalDataChannel = 42 [default = false]; optional bool ValidateIncomingPeerViaDirectLookup = 44; optional uint32 SocketBacklogSize = 45; // SOMAXCONN if not set or zero + optional ESocketSendOptimization SocketSendOptimization = 51 [default = IC_SO_DISABLED]; // ballast is added to IC handshake frames to ensure correctness of jumbo frames transmission over network optional uint32 HandshakeBallastSize = 14; diff --git a/ydb/library/actors/interconnect/channel_scheduler.h b/ydb/library/actors/interconnect/channel_scheduler.h index b0eac9debcd3..541378ce145a 100644 --- a/ydb/library/actors/interconnect/channel_scheduler.h +++ b/ydb/library/actors/interconnect/channel_scheduler.h @@ -12,7 +12,6 @@ namespace NActors { std::array, 16> ChannelArray; THashMap ChannelMap; std::shared_ptr Metrics; - TEventHolderPool& Pool; const ui32 MaxSerializedEventSize; const TSessionParams Params; @@ -29,11 +28,10 @@ namespace NActors { public: TChannelScheduler(ui32 peerNodeId, const TChannelsConfig& predefinedChannels, - std::shared_ptr metrics, TEventHolderPool& pool, ui32 maxSerializedEventSize, + std::shared_ptr metrics, ui32 maxSerializedEventSize, TSessionParams params) : PeerNodeId(peerNodeId) , Metrics(std::move(metrics)) - , Pool(pool) , MaxSerializedEventSize(maxSerializedEventSize) , Params(std::move(params)) { @@ -72,7 +70,7 @@ namespace NActors { if (channel < ChannelArray.size()) { auto& res = ChannelArray[channel]; if (Y_UNLIKELY(!res)) { - res.emplace(Pool, channel, PeerNodeId, MaxSerializedEventSize, Metrics, + res.emplace(channel, PeerNodeId, MaxSerializedEventSize, Metrics, Params); } return *res; @@ -80,7 +78,7 @@ namespace NActors { auto it = ChannelMap.find(channel); if (Y_UNLIKELY(it == ChannelMap.end())) { it = ChannelMap.emplace(std::piecewise_construct, std::forward_as_tuple(channel), - std::forward_as_tuple(Pool, channel, PeerNodeId, MaxSerializedEventSize, + std::forward_as_tuple(channel, PeerNodeId, MaxSerializedEventSize, Metrics, Params)).first; } return it->second; diff --git a/ydb/library/actors/interconnect/interconnect_channel.cpp b/ydb/library/actors/interconnect/interconnect_channel.cpp index f82864e52c00..e7710220b841 100644 --- a/ydb/library/actors/interconnect/interconnect_channel.cpp +++ b/ydb/library/actors/interconnect/interconnect_channel.cpp @@ -1,4 +1,5 @@ #include "interconnect_channel.h" +#include "interconnect_zc_processor.h" #include #include @@ -56,10 +57,10 @@ namespace NActors { return true; } - void TEventOutputChannel::DropConfirmed(ui64 confirm) { + void TEventOutputChannel::DropConfirmed(ui64 confirm, TEventHolderPool& pool) { LOG_DEBUG_IC_SESSION("ICOCH98", "Dropping confirmed messages"); for (auto it = NotYetConfirmed.begin(); it != NotYetConfirmed.end() && it->Serial <= confirm; ) { - Pool.Release(NotYetConfirmed, it++); + pool.Release(NotYetConfirmed, it++); } } @@ -185,7 +186,7 @@ namespace NActors { if (allowCopy && (reinterpret_cast(data) & 63) + len <= 64) { task.Write(data, len); } else { - task.Append(data, len); + task.Append(data, len, &event.ZcTransferId); } *bytesSerialized += len; Y_DEBUG_ABORT_UNLESS(len <= PartLenRemain); @@ -314,7 +315,8 @@ namespace NActors { }; char *ptr = reinterpret_cast(part + 1); *ptr++ = static_cast(EXdcCommand::PUSH_DATA); - *reinterpret_cast(ptr) = bytesSerialized; + + WriteUnaligned(ptr, bytesSerialized); ptr += sizeof(ui16); if (task.ChecksummingXxhash()) { XXH3_state_t state; @@ -322,9 +324,10 @@ namespace NActors { task.XdcStream.ScanLastBytes(bytesSerialized, [&state](TContiguousSpan span) { XXH3_64bits_update(&state, span.data(), span.size()); }); - *reinterpret_cast(ptr) = XXH3_64bits_digest(&state); + const ui32 cs = XXH3_64bits_digest(&state); + WriteUnaligned(ptr, cs); } else if (task.ChecksummingCrc32c()) { - *reinterpret_cast(ptr) = task.ExternalChecksum; + WriteUnaligned(ptr, task.ExternalChecksum); } task.WriteBookmark(std::move(partBookmark), buffer, partSize); @@ -335,7 +338,7 @@ namespace NActors { return complete; } - void TEventOutputChannel::NotifyUndelivered() { + void TEventOutputChannel::ProcessUndelivered(TEventHolderPool& pool, NInterconnect::IZcGuard* zg) { LOG_DEBUG_IC_SESSION("ICOCH89", "Notyfying about Undelivered messages! NotYetConfirmed size: %zu, Queue size: %zu", NotYetConfirmed.size(), Queue.size()); if (State == EState::BODY && Queue.front().Event) { Y_ABORT_UNLESS(!Chunker.IsComplete()); // chunk must have an event being serialized @@ -350,11 +353,17 @@ namespace NActors { item.ForwardOnNondelivery(true); } } - Pool.Release(NotYetConfirmed); + + // Events in the NotYetConfirmed may be actualy not sended by kernel. + // In case of enabled ZC we need to wait kernel send task to be completed before reusing buffers + if (zg) { + zg->ExtractToSafeTermination(NotYetConfirmed); + } + pool.Release(NotYetConfirmed); for (auto& item : Queue) { item.ForwardOnNondelivery(false); } - Pool.Release(Queue); + pool.Release(Queue); } } diff --git a/ydb/library/actors/interconnect/interconnect_channel.h b/ydb/library/actors/interconnect/interconnect_channel.h index d038c727d903..72c71be49c0d 100644 --- a/ydb/library/actors/interconnect/interconnect_channel.h +++ b/ydb/library/actors/interconnect/interconnect_channel.h @@ -15,6 +15,10 @@ #include "packet.h" #include "event_holder_pool.h" +namespace NInterconnect { + class IZcGuard; +} + namespace NActors { #pragma pack(push, 1) @@ -59,10 +63,9 @@ namespace NActors { class TEventOutputChannel : public TInterconnectLoggingBase { public: - TEventOutputChannel(TEventHolderPool& pool, ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize, + TEventOutputChannel(ui16 id, ui32 peerNodeId, ui32 maxSerializedEventSize, std::shared_ptr metrics, TSessionParams params) : TInterconnectLoggingBase(Sprintf("OutputChannel %" PRIu16 " [node %" PRIu32 "]", id, peerNodeId)) - , Pool(pool) , PeerNodeId(peerNodeId) , ChannelId(id) , Metrics(std::move(metrics)) @@ -73,8 +76,8 @@ namespace NActors { ~TEventOutputChannel() { } - std::pair Push(IEventHandle& ev) { - TEventHolder& event = Pool.Allocate(Queue); + std::pair Push(IEventHandle& ev, TEventHolderPool& pool) { + TEventHolder& event = pool.Allocate(Queue); const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr2); OutputQueueSize += bytes; if (event.Span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) { @@ -85,7 +88,7 @@ namespace NActors { return std::make_pair(bytes, &event); } - void DropConfirmed(ui64 confirm); + void DropConfirmed(ui64 confirm, TEventHolderPool& pool); bool FeedBuf(TTcpPacketOutTask& task, ui64 serial, ui64 *weightConsumed); @@ -105,9 +108,8 @@ namespace NActors { return OutputQueueSize; } - void NotifyUndelivered(); + void ProcessUndelivered(TEventHolderPool& pool, NInterconnect::IZcGuard* zg); - TEventHolderPool& Pool; const ui32 PeerNodeId; const ui16 ChannelId; std::shared_ptr Metrics; diff --git a/ydb/library/actors/interconnect/interconnect_common.h b/ydb/library/actors/interconnect/interconnect_common.h index 7757e7c4672b..11ae48a4e0d4 100644 --- a/ydb/library/actors/interconnect/interconnect_common.h +++ b/ydb/library/actors/interconnect/interconnect_common.h @@ -22,6 +22,11 @@ namespace NActors { REQUIRED, // encryption is mandatory }; + enum class ESocketSendOptimization { + DISABLED, + IC_MSG_ZEROCOPY, + }; + struct TInterconnectSettings { TDuration Handshake; TDuration DeadPeer; @@ -48,13 +53,14 @@ namespace NActors { ui32 MaxSerializedEventSize = NActors::EventMaxByteSize; ui32 PreallocatedBufferSize = 8 << 10; // 8 KB ui32 NumPreallocatedBuffers = 16; - bool EnableExternalDataChannel = false; + bool EnableExternalDataChannel = true; bool ValidateIncomingPeerViaDirectLookup = false; ui32 SocketBacklogSize = 0; // SOMAXCONN if zero TDuration FirstErrorSleep = TDuration::MilliSeconds(10); TDuration MaxErrorSleep = TDuration::Seconds(1); double ErrorSleepRetryMultiplier = 4.0; TDuration EventDelay = TDuration::Zero(); + ESocketSendOptimization SocketSendOptimization = ESocketSendOptimization::DISABLED; }; struct TWhiteboardSessionStatus { diff --git a/ydb/library/actors/interconnect/interconnect_mon.cpp b/ydb/library/actors/interconnect/interconnect_mon.cpp index f8245e1d7234..0b59c64a725f 100644 --- a/ydb/library/actors/interconnect/interconnect_mon.cpp +++ b/ydb/library/actors/interconnect/interconnect_mon.cpp @@ -130,7 +130,8 @@ namespace NInterconnect { } TABLED() { str << kv.second.TotalOutputQueueSize; } TABLED() { str << (kv.second.Connected ? "yes" : "no"); } - TABLED() { str << (kv.second.ExternalDataChannel ? "yes" : "no"); } + TABLED() { str << (kv.second.ExternalDataChannel ? "yes" : "no") + << " (" << (kv.second.XDCFlags & TInterconnectProxyTCP::TProxyStats::XDCFlags::MSG_ZERO_COPY_SEND ? "MSG_ZC_SEND" : "_") << ")"; } TABLED() { str << kv.second.Host; } TABLED() { str << kv.second.Port; } TABLED() { diff --git a/ydb/library/actors/interconnect/interconnect_stream.cpp b/ydb/library/actors/interconnect/interconnect_stream.cpp index f8db077fa406..d239872b98b8 100644 --- a/ydb/library/actors/interconnect/interconnect_stream.cpp +++ b/ydb/library/actors/interconnect/interconnect_stream.cpp @@ -112,13 +112,29 @@ namespace NInterconnect { ssize_t TStreamSocket::Send(const void* msg, size_t len, TString* /*err*/) const { - const auto ret = ::send(Descriptor, static_cast(msg), int(len), 0); + return SendWithFlags(msg, len, 0); + } + + ssize_t + TStreamSocket::SendWithFlags(const void* msg, size_t len, int flags) const { + const auto ret = ::send(Descriptor, static_cast(msg), int(len), flags); if (ret < 0) return -LastSocketError(); return ret; } +#if defined(__linux__) + ssize_t + TStreamSocket::RecvErrQueue(struct msghdr* msg) const { + const auto ret = ::recvmsg(Descriptor, msg, MSG_ERRQUEUE); + if (ret < 0) + return -LastSocketError(); + + return ret; + } +#endif + ssize_t TStreamSocket::Recv(void* buf, size_t len, TString* /*err*/) const { const auto ret = ::recv(Descriptor, static_cast(buf), int(len), 0); diff --git a/ydb/library/actors/interconnect/interconnect_stream.h b/ydb/library/actors/interconnect/interconnect_stream.h index b9ca804e0e5b..13b2f09e9993 100644 --- a/ydb/library/actors/interconnect/interconnect_stream.h +++ b/ydb/library/actors/interconnect/interconnect_stream.h @@ -58,6 +58,11 @@ namespace NInterconnect { virtual ssize_t WriteV(const struct iovec* iov, int iovcnt) const; virtual ssize_t ReadV(const struct iovec* iov, int iovcnt) const; + ssize_t SendWithFlags(const void* msg, size_t len, int flags) const; +#if defined(__linux__) + ssize_t RecvErrQueue(struct msghdr* msg) const; +#endif + int Connect(const TAddress& addr) const; int Connect(const NAddr::IRemoteAddr* addr) const; int Listen(int backlog) const; diff --git a/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp b/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp index 3fce38af6f99..75538f385774 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/ydb/library/actors/interconnect/interconnect_tcp_input_session.cpp @@ -575,14 +575,14 @@ namespace NActors { throw TExDestroySession{TDisconnectReason::FormatError()}; } - auto size = *reinterpret_cast(ptr); + const ui16 size = ReadUnaligned(ptr); if (!size) { LOG_CRIT_IC_SESSION("ICIS03", "XDC empty payload"); throw TExDestroySession{TDisconnectReason::FormatError()}; } if (!Params.Encryption) { - const ui32 checksumExpected = *reinterpret_cast(ptr + sizeof(ui16)); + const ui32 checksumExpected = ReadUnaligned(ptr + sizeof(ui16)); XdcChecksumQ.emplace_back(size, checksumExpected); } diff --git a/ydb/library/actors/interconnect/interconnect_tcp_proxy.cpp b/ydb/library/actors/interconnect/interconnect_tcp_proxy.cpp index 0e1f95fd652d..8028eefb91f0 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/ydb/library/actors/interconnect/interconnect_tcp_proxy.cpp @@ -896,7 +896,15 @@ namespace NActors { stats.LastSessionDieTime = LastSessionDieTime; stats.TotalOutputQueueSize = Session ? Session->TotalOutputQueueSize : 0; stats.Connected = Session ? (bool)Session->Socket : false; - stats.ExternalDataChannel = Session && Session->XdcSocket; + if (Session) { + if (const auto xdcFlags = Session->GetXDCFlags()) { + stats.ExternalDataChannel = true; + stats.XDCFlags = *xdcFlags; + } else { + stats.ExternalDataChannel = false; + stats.XDCFlags = 0; + } + } stats.Host = TechnicalPeerHostName; stats.Port = 0; ui32 rep = 0; diff --git a/ydb/library/actors/interconnect/interconnect_tcp_proxy.h b/ydb/library/actors/interconnect/interconnect_tcp_proxy.h index 3fa9253a3f89..99abc3b45c77 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_proxy.h +++ b/ydb/library/actors/interconnect/interconnect_tcp_proxy.h @@ -50,6 +50,11 @@ namespace NActors { TDuration Ping; i64 ClockSkew; TString Encryption; + enum XDCFlags { + NONE = 0, + MSG_ZERO_COPY_SEND = 1, + }; + ui8 XDCFlags; }; struct TEvStats : TEventLocal { diff --git a/ydb/library/actors/interconnect/interconnect_tcp_session.cpp b/ydb/library/actors/interconnect/interconnect_tcp_session.cpp index 43580ed179b3..aaf9ed7b6ed6 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_session.cpp +++ b/ydb/library/actors/interconnect/interconnect_tcp_session.cpp @@ -1,6 +1,7 @@ #include "interconnect_tcp_proxy.h" #include "interconnect_tcp_session.h" #include "interconnect_handshake.h" +#include "interconnect_zc_processor.h" #include #include @@ -37,6 +38,7 @@ namespace NActors { , OutputStuckFlag(false) , OutputQueueUtilization(16) , OutputCounter(0ULL) + , ZcProcessor(proxy->Common->Settings.SocketSendOptimization == ESocketSendOptimization::IC_MSG_ZEROCOPY) { Proxy->Metrics->SetConnected(0); ReceiveContext.Reset(new TReceiveContext); @@ -56,8 +58,8 @@ namespace NActors { auto destroyCallback = [as = TlsActivationContext->ExecutorThread.ActorSystem, id = Proxy->Common->DestructorId](THolder event) { as->Send(id, event.Release()); }; - Pool.ConstructInPlace(Proxy->Common, std::move(destroyCallback)); - ChannelScheduler.ConstructInPlace(Proxy->PeerNodeId, Proxy->Common->ChannelsConfig, Proxy->Metrics, *Pool, + Pool = std::make_unique(Proxy->Common, std::move(destroyCallback)); + ChannelScheduler.ConstructInPlace(Proxy->PeerNodeId, Proxy->Common->ChannelsConfig, Proxy->Metrics, Proxy->Common->Settings.MaxSerializedEventSize, Params); LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] session created", Proxy->PeerNodeId); @@ -65,6 +67,18 @@ namespace NActors { SendUpdateToWhiteboard(); } + std::optional TInterconnectSessionTCP::GetXDCFlags() const { + if (XdcSocket) { + if (ZcProcessor.ZcStateIsOk()) { + return TInterconnectProxyTCP::TProxyStats::MSG_ZERO_COPY_SEND; + } else { + return TInterconnectProxyTCP::TProxyStats::NONE; + } + } else { + return {}; + } + } + void TInterconnectSessionTCP::CloseInputSession() { Send(ReceiverId, new TEvInterconnect::TEvCloseInputSession); } @@ -95,8 +109,10 @@ namespace NActors { } DelayedEvents.clear(); + std::unique_ptr guard = ZcProcessor.GetGuard(); + ChannelScheduler->ForEach([&](TEventOutputChannel& channel) { - channel.NotifyUndelivered(); + channel.ProcessUndelivered(*Pool, guard.get()); }); if (ReceiverId) { @@ -114,6 +130,8 @@ namespace NActors { Proxy->Metrics->SubSubscribersCount(Subscribers.size()); } + guard->Terminate(std::move(Pool), XdcSocket, TlsActivationContext->AsActorContext()); + TActor::PassAway(); } @@ -131,7 +149,7 @@ namespace NActors { auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel); const bool wasWorking = oChannel.IsWorking(); - const auto [dataSize, event] = oChannel.Push(*ev); + const auto [dataSize, event] = oChannel.Push(*ev, *Pool); LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize); TotalOutputQueueSize += dataSize; @@ -242,6 +260,10 @@ namespace NActors { Socket = std::move(ev->Get()->Socket); XdcSocket = std::move(ev->Get()->XdcSocket); + if (XdcSocket) { + ZcProcessor.ApplySocketOption(*XdcSocket); + } + // there may be a race const ui64 nextPacket = Max(LastConfirmed, ev->Get()->NextPacket); @@ -532,8 +554,8 @@ namespace NActors { LOG_INFO(*TlsActivationContext, NActorsServices::INTERCONNECT_STATUS, "[%u] disconnected", Proxy->PeerNodeId); } if (XdcSocket) { + // call shutdown but do not call close and do not free wrapper object - we need descriptor to finish ZC op XdcSocket->Shutdown(SHUT_RDWR); - XdcSocket.Reset(); } SendUpdateToWhiteboard(true, false); } @@ -674,6 +696,19 @@ namespace NActors { } } + if (XdcSocket) { + ZcProcessor.ProcessNotification(*XdcSocket); + if (!(ZcProcessor.ZcStateIsOk() || ZcProcessor.ZcStateIsDisabled())) { + TString err = ZcProcessor.ExtractErrText(); + if (err) { + LOG_WARN_IC_SESSION("ICS26", "ZeroCopy op was non success: %s", + err.data()); + + Proxy->UpdateErrorStateLog(TActivationContext::Now(), "zc_error", err.data()); + } + } + } + if (const size_t w = process(XdcStream, XdcSocket, XdcPollerToken, &ReceiveContext->XdcWriteBlocked, maxBytesAtOnce)) { XdcBytesSent += w; XdcOffset += w; @@ -695,6 +730,25 @@ namespace NActors { WriteBlockedByFullSendBuffer = writeBlockedByFullSendBuffer; } + ssize_t TInterconnectSessionTCP::HandleWriteResult(ssize_t r, const TString& err) { + if (r > 0) { + return r; + } else if (-r != EAGAIN && -r != EWOULDBLOCK) { + const TString message = r == 0 ? "connection closed by peer" + : err ? err + : Sprintf("socket: %s", strerror(-r)); + LOG_NOTICE_NET(Proxy->PeerNodeId, "%s", message.data()); + if (r == 0 && !NumEventsInQueue && LastConfirmed == OutputCounter) { + Terminate(TDisconnectReason::EndOfStream()); + } else { + ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r)); + } + return 0; // error indicator + } else { + return -1; // temporary error + } + } + ssize_t TInterconnectSessionTCP::Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, size_t maxBytes) { LWPROBE_IF_TOO_LONG(SlowICWriteData, Proxy->PeerNodeId, ms) { @@ -713,9 +767,15 @@ namespace NActors { #endif } + // Note1: Zero copy socket write has noticiable overhead for memory managment inside kernel. + // So now we try to use ZC only for XDC + // Note2: Current socket encryption implementation does not allow to pass flags + const bool tryZc = (XdcSocket && !Params.Encryption) ? (int)socket == (int)*XdcSocket : false; + TStackVec wbuffers; + TStackVec zeroCtrl; - stream.ProduceIoVec(wbuffers, maxElementsInIOV, maxBytes); + stream.ProduceIoVec(wbuffers, maxElementsInIOV, maxBytes, tryZc ? &zeroCtrl : nullptr); Y_ABORT_UNLESS(!wbuffers.empty()); TString err; @@ -723,35 +783,27 @@ namespace NActors { { // issue syscall with timing const ui64 begin = GetCycleCountFast(); - do { - if (wbuffers.size() == 1) { - auto& front = wbuffers.front(); - r = socket.Send(front.Data, front.Size, &err); - } else { - r = socket.WriteV(reinterpret_cast(wbuffers.data()), wbuffers.size()); + if (zeroCtrl) { + r = ZcProcessor.ProcessSend(wbuffers, socket, zeroCtrl); + if (r < 0) { + err = ZcProcessor.GetErrReason(); } - } while (r == -EINTR); + } else { + do { + if (wbuffers.size() == 1) { + auto& front = wbuffers.front(); + r = socket.Send(front.Data, front.Size, &err); + } else { + r = socket.WriteV(reinterpret_cast(wbuffers.data()), wbuffers.size()); + } + } while (r == -EINTR); + } const ui64 end = GetCycleCountFast(); Proxy->Metrics->IncSendSyscalls((end - begin) * 1'000'000 / GetCyclesPerMillisecond()); } - if (r > 0) { - return r; - } else if (-r != EAGAIN && -r != EWOULDBLOCK) { - const TString message = r == 0 ? "connection closed by peer" - : err ? err - : Sprintf("socket: %s", strerror(-r)); - LOG_NOTICE_NET(Proxy->PeerNodeId, "%s", message.data()); - if (r == 0 && !NumEventsInQueue && LastConfirmed == OutputCounter) { - Terminate(TDisconnectReason::EndOfStream()); - } else { - ReestablishConnectionWithHandshake(r == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-r)); - } - return 0; // error indicator - } else { - return -1; // temporary error - } + return HandleWriteResult(r, err); } Y_UNREACHABLE(); @@ -926,7 +978,7 @@ namespace NActors { XdcStream.DropFront(bytesDroppedFromXdc); if (lastDroppedSerial) { ChannelScheduler->ForEach([&](TEventOutputChannel& channel) { - channel.DropConfirmed(*lastDroppedSerial); + channel.DropConfirmed(*lastDroppedSerial, *Pool); }); } @@ -1344,6 +1396,28 @@ namespace NActors { MON_VAR(GetTotalInflightAmountOfData()) MON_VAR(GetCloseOnIdleTimeout()) MON_VAR(Subscribers.size()) + TABLER() { + TABLED() { str << "ZeroCopy state"; } + TABLED() { str << ZcProcessor.GetCurrentStateName(); } + } + TABLER() { + TABLED() { str << "ZeroCopy confirmed send with copy / confirmed send total"; } + TABLED() { + str << Sprintf("%lu / %lu", ZcProcessor.GetConfirmedWithCopy(), ZcProcessor.GetConfirmed()); + } + } + TABLER() { + TABLED() { str << "ZeroCopy lag (in transfers)"; } + TABLED() { + str << ZcProcessor.GetZcLag(); + } + } + TABLER() { + TABLED() { str << "ZeroCopy send bytes (total)"; } + TABLED() { + str << ZcProcessor.GetSendAsZcBytes(); + } + } } } } diff --git a/ydb/library/actors/interconnect/interconnect_tcp_session.h b/ydb/library/actors/interconnect/interconnect_tcp_session.h index c1f218996e18..b68f4968bf3f 100644 --- a/ydb/library/actors/interconnect/interconnect_tcp_session.h +++ b/ydb/library/actors/interconnect/interconnect_tcp_session.h @@ -20,6 +20,7 @@ #include #include "interconnect_impl.h" +#include "interconnect_zc_processor.h" #include "poller_tcp.h" #include "poller_actor.h" #include "interconnect_channel.h" @@ -32,6 +33,10 @@ #include #include +namespace NInterconnect { + class TInterconnectZcProcessor; +} + namespace NActors { static constexpr ui64 StarvingInRowForNotEnoughCpu = 32; @@ -437,6 +442,8 @@ namespace NActors { return ReceiveContext->ClockSkew_us; } + std::optional GetXDCFlags() const; + private: friend class TInterconnectProxyTCP; @@ -506,6 +513,7 @@ namespace NActors { void Handle(TEvPollerReady::TPtr& ev); void Handle(TEvPollerRegisterResult::TPtr ev); void WriteData(); + ssize_t HandleWriteResult(ssize_t r, const TString& err); ssize_t Write(NInterconnect::TOutgoingStream& stream, NInterconnect::TStreamSocket& socket, size_t maxBytes); ui32 MakePacket(bool data, TMaybe pingMask = {}); @@ -565,7 +573,7 @@ namespace NActors { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// const TSessionParams Params; - TMaybe Pool; + std::unique_ptr Pool; TMaybe ChannelScheduler; ui64 TotalOutputQueueSize; bool OutputStuckFlag; @@ -650,6 +658,8 @@ namespace NActors { ui64 EqualizeCounter = 0; ui64 StarvingInRow = 0; + + NInterconnect::TInterconnectZcProcessor ZcProcessor; }; class TInterconnectSessionKiller diff --git a/ydb/library/actors/interconnect/interconnect_zc_processor.cpp b/ydb/library/actors/interconnect/interconnect_zc_processor.cpp new file mode 100644 index 000000000000..05719d39336e --- /dev/null +++ b/ydb/library/actors/interconnect/interconnect_zc_processor.cpp @@ -0,0 +1,448 @@ +#include "interconnect_zc_processor.h" +#include "logging.h" + +#include +#include +#include + +#include + +#if defined (__linux__) + +#define YDB_MSG_ZEROCOPY_SUPPORTED 1 + +#endif + +#ifdef YDB_MSG_ZEROCOPY_SUPPORTED + +#include +#include +#include + +#ifndef MSG_ZEROCOPY +#define MSG_ZEROCOPY 0x4000000 +#endif + +#ifndef SO_ZEROCOPY +#define SO_ZEROCOPY 60 +#endif + +static bool CmsgIsIpLevel(const cmsghdr& cmsg) { + return (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR) || + (cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR); +} + +static bool CmsgIsZeroCopy(const cmsghdr& cmsg) { + if (!CmsgIsIpLevel(cmsg)) { + return false; + } + auto serr = reinterpret_cast CMSG_DATA(&cmsg); + return serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY; +} + +#endif + +namespace NInterconnect { +using NActors::TEvents; + +#ifdef YDB_MSG_ZEROCOPY_SUPPORTED + +struct TErr { + explicit TErr(const TString& err) + : Reason(err) + {} + TString Reason; +}; + +struct TAgain { +}; + +struct TZcSendResult { + ui64 SendNum = 0; + ui64 SendWithCopyNum = 0; +}; + +using TProcessErrQueueResult = std::variant; + +static TProcessErrQueueResult DoProcessErrQueue(NInterconnect::TStreamSocket& socket) { + // Mostly copy-paste from grpc ERRQUEUE handling + struct msghdr msg; + msg.msg_name = nullptr; + msg.msg_namelen = 0; + msg.msg_iov = nullptr; + msg.msg_iovlen = 0; + msg.msg_flags = 0; + + constexpr size_t cmsg_alloc_space = + CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in)) + + CMSG_SPACE(32 * NLA_ALIGN(NLA_HDRLEN + sizeof(uint64_t))); + + union { + char rbuf[cmsg_alloc_space]; + struct cmsghdr align; + } aligned_buf; + msg.msg_control = aligned_buf.rbuf; + + TZcSendResult result; + + while (true) { + ssize_t r; + msg.msg_controllen = sizeof(aligned_buf.rbuf); + + do { + r = socket.RecvErrQueue(&msg); + } while (r == -EINTR); + + if (r < 0) { + if (result.SendNum == 0) { + if (r == -EAGAIN || r == -EWOULDBLOCK) { + return TAgain(); + } else { + return TErr("unexpected error during errqueue read, err: " + ToString(r) + ", " + TString(strerror(r))); + } + } else { + break; + } + } + + // Unlikly, but grpc handle it + if ((msg.msg_flags & MSG_CTRUNC) != 0) { + if (result.SendNum == 0) { + return TErr("errqueue message was truncated"); + } else { + break; + } + } + + for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len; + cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (CmsgIsZeroCopy(*cmsg)) { + auto serr = reinterpret_cast(CMSG_DATA(cmsg)); + if (serr->ee_data < serr->ee_info) { + // Incorrect data inside kernel + continue; + } + ui64 sends = serr->ee_data - serr->ee_info + 1; + result.SendNum += sends; + if (serr->ee_code == SO_EE_CODE_ZEROCOPY_COPIED) { + result.SendWithCopyNum += sends; + } + } + } + } + return result; +} + +#endif + +// returns number of buffers which should be sent to find ZC ready buffer on the first place +size_t AdjustLen(std::span wbuf, std::span ctrl, ui64 threshold) +{ + size_t l = wbuf.size(); + for (size_t i = 0; i < wbuf.size(); i++) { + if (ctrl[i].ZcReady() && wbuf[i].Size > threshold) { + l = i; + break; + } + } + + return l; +} + +void TInterconnectZcProcessor::DoProcessNotification(NInterconnect::TStreamSocket& socket) { +#ifdef YDB_MSG_ZEROCOPY_SUPPORTED + const TProcessErrQueueResult res = DoProcessErrQueue(socket); + + std::visit(TOverloaded{ + [this](const TErr& err) { + ZcState = ZC_DISABLED_ERR; + AddErr(err.Reason); + }, + [](const TAgain&) { + // Noting here. Probably read during next iteration + }, + [this](const TZcSendResult& res) { + Confirmed += res.SendNum; + ConfirmedWithCopy += res.SendWithCopyNum; + }}, res); + + + if (ZcState == ZC_CONGESTED && Confirmed == SendAsZc) { + ZcState = ZC_OK; + } + + // There is no reliable way to check if both side of tcp connection + // are on the same host (consider different namespaces as the same host too). + // So we check that each transfer has hidden copy during some period. + if (ZcState == ZC_OK && ConfirmedWithCopy == Confirmed && Confirmed > 10) { + AddErr("Hidden copy during ZC operation"); + ZcState = ZC_DISABLED_HIDDEN_COPY; + } +#else + Y_UNUSED(socket); +#endif +} + +void TInterconnectZcProcessor::ApplySocketOption(NInterconnect::TStreamSocket& socket) +{ +#ifdef YDB_MSG_ZEROCOPY_SUPPORTED + if (ZcState == ZC_OK) { + const int enable = 1; + if (setsockopt((int)socket, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable)) < 0) { + AddErr("Unable to set SO_ZEROCOPY option"); + ZcState = ZC_DISABLED_ERR; + } else { + ResetState(); + } + } +#else + Y_UNUSED(socket); +#endif +} + +void TInterconnectZcProcessor::ResetState() { + if (ZcState == ZC_DISABLED) { + return; + } + + ZcState = ZC_OK; + Confirmed = 0; + ConfirmedWithCopy = 0; + SendAsZc = 0; + SendAsZcBytes = 0; + LastErr.clear(); +} + +ssize_t TInterconnectZcProcessor::ProcessSend(std::span wbuf, TStreamSocket& socket, + std::span ctrl) +{ + Y_DEBUG_ABORT_UNLESS(wbuf.size() == ctrl.size()); + size_t len = wbuf.size(); + + if (ZcStateIsOk()) { + len = AdjustLen(wbuf, ctrl, ZcThreshold); + } + + ssize_t r = 0; + int flags = 0; + + do { + switch (len) { + case 0: +#ifdef YDB_MSG_ZEROCOPY_SUPPORTED + if (ZcStateIsOk()) { + flags |= MSG_ZEROCOPY; + } +#endif + case 1: + r = socket.SendWithFlags(wbuf.front().Data, wbuf.front().Size, flags); + break; + default: + r = socket.WriteV(reinterpret_cast(wbuf.data()), len); + } + } while (r == -EINTR); + +#ifdef YDB_MSG_ZEROCOPY_SUPPORTED + if (flags & MSG_ZEROCOPY) { + if (r > 0) { + // Successful enqueued in to the kernel - increment counter to track dequeue progress + SendAsZc++; + SendAsZcBytes += wbuf.front().Size; + ctrl.front().Update(SendAsZc); + } else if (r == -ENOBUFS) { + if (SendAsZc == Confirmed) { + // Got ENOBUFS just for first not completed zero copy transfer + // it looks like misconfiguration (unable to lock page or net.core.optmem_max extremely small) + // It is better just to stop trying using ZC + ZcState = ZC_DISABLED_ERR; + AddErr("Got ENOBUF just for first transfer"); + } else { + // Got ENOBUFS after some successful send calls. Probably net.core.optmem_max still is not enought + // Just disable temporary ZC until we dequeue notifications + ZcState = ZC_CONGESTED; + AddErr("Got ENOBUF after some transfers, consider to increase net.core.optmem_max"); + } + // The state changed. Trigger retry + r = -EAGAIN; + } + } +#endif + return r; +} + +TString TInterconnectZcProcessor::GetCurrentStateName() const { + switch (ZcState) { + case ZC_DISABLED: + return "Disabled"; + case ZC_DISABLED_ERR: + return "DisabledErr"; + case ZC_DISABLED_HIDDEN_COPY: + return "DisabledHiddenCopy"; + case ZC_OK: + return "Ok"; + case ZC_CONGESTED: + return "Congested"; + } +} + +TString TInterconnectZcProcessor::ExtractErrText() { + if (LastErr) { + TString err; + err.swap(LastErr); + return err; + } else { + return {}; + } +} + +void TInterconnectZcProcessor::AddErr(const TString& err) { + if (LastErr) { + LastErr.reserve(err.size() + 2); + LastErr += ", "; + LastErr += err; + } else { + LastErr = err; + } +} + +/////////////////////////////////////////////////////////////////////////////// + +#ifdef YDB_MSG_ZEROCOPY_SUPPORTED +TInterconnectZcProcessor::TInterconnectZcProcessor(bool enabled) + : ZcState(enabled ? ZC_OK : ZC_DISABLED) +{} + +// Guard part. +// We must guarantee liveness of buffers used for zc +// until enqueued zc operation completed by kernel + +class TGuardActor + : public NActors::TActorBootstrapped + , public NActors::TInterconnectLoggingBase { +public: + TGuardActor(ui64 uncompleted, ui64 confirmed, std::list&& queue, + TIntrusivePtr socket, + std::unique_ptr&& pool); + void Bootstrap(); + STATEFN(StateFunc); +private: + void DoGc(); + const ui64 Uncompleted; + ui64 Confirmed; + std::list Delayed; + TIntrusivePtr Socket; + std::unique_ptr Pool; +}; + +TGuardActor::TGuardActor(ui64 uncompleted, ui64 confirmed, std::list&& queue, + TIntrusivePtr socket, + std::unique_ptr&& pool) + : TInterconnectLoggingBase(Sprintf("TGuardActor, socket %lu", socket ? i64(*socket) : -1)) + , Uncompleted(uncompleted) + , Confirmed(confirmed) + , Delayed(std::move(queue)) + , Socket(socket) + , Pool(std::move(pool)) +{} + +void TGuardActor::Bootstrap() { + Become(&TThis::StateFunc); + DoGc(); +} + +void TGuardActor::DoGc() +{ + Y_DEBUG_ABORT_UNLESS(Pool); + Y_DEBUG_ABORT_UNLESS(Socket); + + const TProcessErrQueueResult res = DoProcessErrQueue(*Socket); + + std::visit(TOverloaded{ + [this](const TErr& err) { + // Nothing can do here (( VERIFY, or just drop buffer probably unsafe from network perspective + LOG_ERROR_IC_SESSION("ICZC01", "error during ERRQUEUE processing: %s", + err.Reason.data()); + Pool->Release(Delayed); + Pool->Trim(); + TActor::PassAway(); + }, + [this](const TAgain&) { + Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup()); + }, + [this](const TZcSendResult& res) { + Confirmed += res.SendNum; + if (Confirmed >= Uncompleted) { + Pool->Release(Delayed); + Pool->Trim(); + TActor::PassAway(); + } else { + Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup()); + } + }}, res); +} + +STFUNC(TGuardActor::StateFunc) { + STRICT_STFUNC_BODY( + cFunc(TEvents::TEvWakeup::EventType, DoGc) + ) +} + +class TGuardRunner : public IZcGuard { +public: + TGuardRunner(ui64 uncompleted, ui64 confirmed) + : Uncompleted(uncompleted) + , Confirmed(confirmed) + {} + + void ExtractToSafeTermination(std::list& queue) noexcept override { + for (std::list::iterator event = queue.begin(); event != queue.end();) { + if (event->ZcTransferId > Confirmed) { + Delayed.splice(Delayed.end(), queue, event++); + } else { + event++; + } + } + } + + void Terminate(std::unique_ptr&& pool, TIntrusivePtr socket, const NActors::TActorContext &ctx) override { + if (!Delayed.empty()) { + // must be registered on the same mailbox! + ctx.RegisterWithSameMailbox(new TGuardActor(Uncompleted, Confirmed, std::move(Delayed), socket, std::move(pool))); + } + } +private: + const ui64 Uncompleted; + const ui64 Confirmed; + std::list Delayed; +}; + +std::unique_ptr TInterconnectZcProcessor::GetGuard() +{ + return std::make_unique(SendAsZc, Confirmed); +} + +#else +TInterconnectZcProcessor::TInterconnectZcProcessor(bool) + : ZcState(ZC_DISABLED) +{} + +class TDummyGuardRunner : public IZcGuard { +public: + TDummyGuardRunner(ui64 uncompleted, ui64 confirmed) + { + Y_UNUSED(uncompleted); + Y_UNUSED(confirmed); + } + + void ExtractToSafeTermination(std::list&) noexcept override {} + void Terminate(std::unique_ptr&&, TIntrusivePtr, const NActors::TActorContext&) override {} +}; + +std::unique_ptr TInterconnectZcProcessor::GetGuard() +{ + return std::make_unique(SendAsZc, Confirmed); +} + +#endif + +} diff --git a/ydb/library/actors/interconnect/interconnect_zc_processor.h b/ydb/library/actors/interconnect/interconnect_zc_processor.h new file mode 100644 index 000000000000..ef1104bee958 --- /dev/null +++ b/ydb/library/actors/interconnect/interconnect_zc_processor.h @@ -0,0 +1,77 @@ +#pragma once + +#include "interconnect_common.h" + +#include "event_holder_pool.h" + +#include + +namespace NInterconnect { + +class IZcGuard { +public: + virtual ~IZcGuard() = default; + virtual void ExtractToSafeTermination(std::list& queue) noexcept = 0; + virtual void Terminate(std::unique_ptr&& pool, TIntrusivePtr socket, const NActors::TActorContext &ctx) = 0; +}; + +class TInterconnectZcProcessor { +public: + TInterconnectZcProcessor(bool enabled); + ~TInterconnectZcProcessor() = default; + + // Enables ability to use ZC for given socket + void ApplySocketOption(TStreamSocket& socket); + + // Perform ZC send if front message is suitable for ZC transfer. + // Otherwise, send usual number of messages to prepare for ZC send for next call + ssize_t ProcessSend(std::span wbuf, TStreamSocket& socket, std::span ctrl); + + // Process notification queue to track actual send position + void ProcessNotification(NInterconnect::TStreamSocket& socket) { + if (SendAsZc > Confirmed && (ZcState == ZC_OK || ZcState == ZC_CONGESTED || ZcState == ZC_DISABLED_HIDDEN_COPY)) { + DoProcessNotification(socket); + } + } + + // Return guerd to start termination handling + std::unique_ptr GetGuard(); + + // Mon parts + ui64 GetConfirmed() const { return Confirmed; } + ui64 GetConfirmedWithCopy() const { return ConfirmedWithCopy; } + ui64 GetZcLag() const { return SendAsZc - Confirmed; } + ui64 GetSendAsZcBytes() const { return SendAsZcBytes; } + TString GetCurrentStateName() const; + const TString& GetErrReason() const { return LastErr; } + + bool ZcStateIsOk() const { return ZcState == ZC_OK; } + bool ZcStateIsDisabled() const { return ZcState == ZC_DISABLED; } + TString ExtractErrText(); + + // Do not try to use ZC for small events + constexpr static ui32 ZcThreshold = 16384; +private: + ui64 SendAsZcBytes = 0; + ui64 SendAsZc = 0; + ui64 Confirmed = 0; + ui64 ConfirmedWithCopy = 0; + + TString LastErr; + + enum { + ZC_DISABLED, // ZeroCopy featute is disabled by used + ZC_DISABLED_ERR, // We got some errors and unable to use ZC for this connection + ZC_DISABLED_HIDDEN_COPY, // The socket associated with loopback, or unsupported nic + // real ZC send is not possible in this case and cause hiden copy inside kernel. + ZC_OK, // OK, data can be send using zero copy + ZC_CONGESTED, // We got ENUBUF and temporary disable ZC send + + } ZcState; + + void DoProcessNotification(NInterconnect::TStreamSocket& socket); + void ResetState(); + void AddErr(const TString& err); +}; + +} diff --git a/ydb/library/actors/interconnect/outgoing_stream.h b/ydb/library/actors/interconnect/outgoing_stream.h index 97f2d7fb61c4..af2ed5b6ce4b 100644 --- a/ydb/library/actors/interconnect/outgoing_stream.h +++ b/ydb/library/actors/interconnect/outgoing_stream.h @@ -28,6 +28,7 @@ namespace NInterconnect { struct TSendChunk { TContiguousSpan Span; TBuffer *Buffer; + ui32* ZcTransferId = nullptr; }; std::vector> Buffers; @@ -39,6 +40,34 @@ namespace NInterconnect { size_t UnsentBytes = 0; public: + /* + * Allow to share buffer between socket to produce safe zero copy operation + */ + class TBufController { + public: + explicit TBufController(ui32* b) + : ZcTransferId(b) + {} + + /* + * Set or update external handler id. For example sequence number of successful MSG_ZEROCOPY call + * Should not be called in period between MakeBuffersShared and before CompleteSharedBuffers call + */ + void Update(ui32 handler) { + if (!ZcTransferId) { + return; + } + *ZcTransferId = handler; + } + + bool ZcReady() const { + return ZcTransferId != nullptr; + } + + private: + ui32 * const ZcTransferId; + }; + operator bool() const { return SendQueuePos != SendQueue.size(); } @@ -91,7 +120,7 @@ namespace NInterconnect { } } - void Append(TContiguousSpan span) { + void Append(TContiguousSpan span, ui32* const zcHandle) { if (AppendBuffer && span.data() == AppendBuffer->Data + AppendOffset) { // the only valid case to use previously acquired span AppendAcquiredSpan(span); } else { @@ -109,6 +138,7 @@ namespace NInterconnect { #endif AppendSpanWithGlueing(span, nullptr); } + SendQueue.back().ZcTransferId = zcHandle; } void Write(TContiguousSpan in) { @@ -158,12 +188,15 @@ namespace NInterconnect { UnsentBytes = 0; } - template - void ProduceIoVec(T& container, size_t maxItems, size_t maxBytes) { + template> + void ProduceIoVec(T& container, size_t maxItems, size_t maxBytes, U* controllers = nullptr) { size_t offset = SendOffset; for (auto it = SendQueue.begin() + SendQueuePos; it != SendQueue.end() && std::size(container) < maxItems && maxBytes; ++it) { const TContiguousSpan span = it->Span.SubSpan(offset, maxBytes); container.push_back(NActors::TConstIoVec{span.data(), span.size()}); + if (controllers) { + controllers->push_back(TBufController(it->ZcTransferId)); + } offset = 0; maxBytes -= span.size(); } @@ -222,6 +255,13 @@ namespace NInterconnect { } } + void CompleteSharedBuffers() { + for (size_t i = 0; i < Buffers.size(); i++) { + DropBufferReference(Buffers[i]); + } + Buffers.clear(); + } + private: void AppendAcquiredSpan(TContiguousSpan span) { TBuffer *buffer = AppendBuffer; @@ -266,6 +306,7 @@ namespace NInterconnect { } } }; + using TOutgoingStream = TOutgoingStreamT<262144>; diff --git a/ydb/library/actors/interconnect/packet.cpp b/ydb/library/actors/interconnect/packet.cpp index eaddb7166a4e..16acb1f9bce1 100644 --- a/ydb/library/actors/interconnect/packet.cpp +++ b/ydb/library/actors/interconnect/packet.cpp @@ -16,6 +16,7 @@ ui32 TEventHolder::Fill(IEventHandle& ev) { ForwardRecipient = ev.GetForwardOnNondeliveryRecipient(); EventActuallySerialized = 0; Descr.Checksum = 0; + ZcTransferId = 0; if (ev.HasBuffer()) { Buffer = ev.ReleaseChainBuffer(); diff --git a/ydb/library/actors/interconnect/packet.h b/ydb/library/actors/interconnect/packet.h index 1cc28d663f89..ae85be308da2 100644 --- a/ydb/library/actors/interconnect/packet.h +++ b/ydb/library/actors/interconnect/packet.h @@ -99,6 +99,7 @@ struct TEventHolder : TNonCopyable { ui32 EventActuallySerialized; mutable NLWTrace::TOrbit Orbit; NWilson::TSpan Span; + ui32 ZcTransferId; //id of zero copy transfer. In case of RDMA it is a place where some internal handle can be stored to identify events ui32 Fill(IEventHandle& ev); @@ -195,10 +196,10 @@ struct TTcpPacketOutTask : TNonCopyable { // Append reference to some data (acquired previously or external pointer). template - void Append(const void *buffer, size_t len) { + void Append(const void *buffer, size_t len, ui32* const zcHandle) { Y_DEBUG_ABORT_UNLESS(len <= (External ? GetExternalFreeAmount() : GetInternalFreeAmount())); (External ? ExternalSize : InternalSize) += len; - (External ? XdcStream : OutgoingStream).Append({static_cast(buffer), len}); + (External ? XdcStream : OutgoingStream).Append({static_cast(buffer), len}, zcHandle); ProcessChecksum(buffer, len); } diff --git a/ydb/library/actors/interconnect/ut/channel_scheduler_ut.cpp b/ydb/library/actors/interconnect/ut/channel_scheduler_ut.cpp index 7d7cf53d8047..d953a53ff8d7 100644 --- a/ydb/library/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/ydb/library/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -14,7 +14,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto callback = [](THolder) {}; TEventHolderPool pool(common, callback); TSessionParams p; - TChannelScheduler scheduler(1, {}, ctr, pool, 64 << 20, p); + TChannelScheduler scheduler(1, {}, ctr, 64 << 20, p); ui32 numEvents = 0; @@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto ev = MakeHolder(1, 0, TActorId(), TActorId(), MakeIntrusive(payload, TEventSerializationInfo{}), 0); auto& ch = scheduler.GetOutputChannel(channel); const bool wasWorking = ch.IsWorking(); - ch.Push(*ev); + ch.Push(*ev, pool); if (!wasWorking) { scheduler.AddToHeap(ch, 0); } diff --git a/ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h b/ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h index 43f3b5a86664..2afea5c8a847 100644 --- a/ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h +++ b/ydb/library/actors/interconnect/ut/lib/ic_test_cluster.h @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -17,6 +18,12 @@ class TTestICCluster: public TNonCopyable { bool Disconnect; }; + enum Flags : ui64 { + EMPTY = 0, + USE_ZC = 1, + USE_TLS = 1 << 1 + }; + private: const ui32 NumNodes; const TString Address = "::1"; @@ -30,7 +37,7 @@ class TTestICCluster: public TNonCopyable { public: TTestICCluster(ui32 numNodes = 1, NActors::TChannelsConfig channelsConfig = NActors::TChannelsConfig(), - TTrafficInterrupterSettings* tiSettings = nullptr, TIntrusivePtr loggerSettings = nullptr) + TTrafficInterrupterSettings* tiSettings = nullptr, TIntrusivePtr loggerSettings = nullptr, Flags flags = EMPTY) : NumNodes(numNodes) , Counters(new NMonitoring::TDynamicCounters) , ChannelsConfig(channelsConfig) @@ -62,7 +69,9 @@ class TTestICCluster: public TNonCopyable { for (ui32 i = 1; i <= NumNodes; ++i) { auto& portMap = tiSettings ? specificNodePortMap[i] : nodeToPortMap; Nodes.emplace(i, MakeHolder(i, NumNodes, portMap, Address, Counters, DeadPeerTimeout, ChannelsConfig, - /*numDynamicNodes=*/0, /*numThreads=*/1, LoggerSettings)); + /*numDynamicNodes=*/0, /*numThreads=*/1, LoggerSettings, TNode::DefaultInflight(), + flags & USE_ZC ? ESocketSendOptimization::IC_MSG_ZEROCOPY : ESocketSendOptimization::DISABLED, + flags & USE_TLS)); } } @@ -84,4 +93,40 @@ class TTestICCluster: public TNonCopyable { void KillActor(ui32 nodeId, const TActorId& id) { Nodes[nodeId]->Send(id, new NActors::TEvents::TEvPoisonPill); } + + NThreading::TFuture GetSessionDbg(ui32 me, ui32 peer) { + NThreading::TPromise promise = NThreading::NewPromise(); + + class TGetHttpInfoActor : public NActors::TActorBootstrapped { + public: + TGetHttpInfoActor(const TActorId& id, NThreading::TPromise promise) + : IcProxy(id) + , Promise(promise) + {} + + void Bootstrap() { + NMonitoring::TMonService2HttpRequest monReq(nullptr, nullptr, nullptr, nullptr, "", nullptr); + Send(IcProxy, new NMon::TEvHttpInfo(monReq)); + Become(&TGetHttpInfoActor::StateFunc); + } + + STRICT_STFUNC(StateFunc, + hFunc(NMon::TEvHttpInfoRes, Handle); + ) + private: + void Handle(NMon::TEvHttpInfoRes::TPtr& ev) { + TStringStream str; + ev->Get()->Output(str); + Promise.SetValue(str.Str()); + PassAway(); + } + const TActorId IcProxy; + NThreading::TPromise Promise; + }; + + IActor* actor = new TGetHttpInfoActor(Nodes[me]->InterconnectProxy(peer), promise); + Nodes[me]->RegisterActor(actor); + + return promise.GetFuture(); + } }; diff --git a/ydb/library/actors/interconnect/ut/lib/node.h b/ydb/library/actors/interconnect/ut/lib/node.h index 7a71ac568c5d..ab13b21add6d 100644 --- a/ydb/library/actors/interconnect/ut/lib/node.h +++ b/ydb/library/actors/interconnect/ut/lib/node.h @@ -12,17 +12,23 @@ #include #include +#include "tls/tls.h" + using namespace NActors; class TNode { THolder ActorSystem; + TString CaPath; public: + static constexpr ui32 DefaultInflight() { return 512 * 1024; } TNode(ui32 nodeId, ui32 numNodes, const THashMap& nodeToPort, const TString& address, NMonitoring::TDynamicCounterPtr counters, TDuration deadPeerTimeout, TChannelsConfig channelsSettings = TChannelsConfig(), ui32 numDynamicNodes = 0, ui32 numThreads = 1, - TIntrusivePtr loggerSettings = nullptr, ui32 inflight = 512 * 1024) { + TIntrusivePtr loggerSettings = nullptr, ui32 inflight = DefaultInflight(), + ESocketSendOptimization sendOpt = ESocketSendOptimization::DISABLED, + bool withTls = false) { TActorSystemSetup setup; setup.NodeId = nodeId; setup.ExecutorsCount = 2; @@ -45,8 +51,17 @@ class TNode { common->Settings.SendBufferDieLimitInMB = 512; common->Settings.TotalInflightAmountOfData = inflight; common->Settings.TCPSocketBufferSize = 2048 * 1024; + common->Settings.SocketSendOptimization = sendOpt; common->OutgoingHandshakeInflightLimit = 3; + if (withTls) { + common->Settings.Certificate = NInterconnect::GetCertificateForTest(); + common->Settings.PrivateKey = NInterconnect::GetPrivateKeyForTest(); + CaPath = NInterconnect::GetTempCaPathForTest(); + common->Settings.CaFilePath = CaPath; + common->Settings.EncryptionMode = EEncryptionMode::REQUIRED; + } + setup.Interconnect.ProxyActors.resize(numNodes + 1 - numDynamicNodes); setup.Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, interconnectPoolId); @@ -124,6 +139,7 @@ class TNode { ~TNode() { ActorSystem->Stop(); + unlink(CaPath.c_str()); } bool Send(const TActorId& recipient, IEventBase* ev) { diff --git a/ydb/library/actors/interconnect/ut/lib/test_events.h b/ydb/library/actors/interconnect/ut/lib/test_events.h index 55cddb71527e..c59b1f76fb7a 100644 --- a/ydb/library/actors/interconnect/ut/lib/test_events.h +++ b/ydb/library/actors/interconnect/ut/lib/test_events.h @@ -15,6 +15,10 @@ namespace NActors { struct TEvTest : TEventPB { TEvTest() = default; + explicit TEvTest(ui64 sequenceNumber) { + Record.SetSequenceNumber(sequenceNumber); + } + TEvTest(ui64 sequenceNumber, const TString& payload) { Record.SetSequenceNumber(sequenceNumber); Record.SetPayload(payload); diff --git a/ydb/library/actors/interconnect/ut/lib/tls/tls.cpp b/ydb/library/actors/interconnect/ut/lib/tls/tls.cpp new file mode 100644 index 000000000000..dfccca53acc1 --- /dev/null +++ b/ydb/library/actors/interconnect/ut/lib/tls/tls.cpp @@ -0,0 +1,148 @@ +#include "tls.h" +#include + +#include +#include + +namespace NInterconnect { + +// Fake CA and cert for IC tests + +TString GetPrivateKeyForTest() +{ + return TString(R"(-----BEGIN PRIVATE KEY----- +MIIJQwIBADANBgkqhkiG9w0BAQEFAASCCS0wggkpAgEAAoICAQDkgBAM97t0q/dD +lrgnO6AAU2wEMa9FHHxpZ0xKjbM4446X2ZLseAL+dahvavRyktoLfs0+1TUz+FTn +9De9+rgg8Y5HkHxOgt7NQseYhOOuFDlfR7c7fVkYqxcBcmXDWueyn+Qqvc7WcMF5 +HMTeurNr4kVT9zQ4YXA+VdKzZgEf3Bu1wnmvtD4hs51K1CVJir7zyHQPBm92mfvZ +pU1GkDJthqELLjhLriIDhj+dhAG/IYSZxhnWyP7gWMEs4fwPd4LSnLEsBrxO4z6z +WBv5EIOXRmeqIFlJ5GTt66FJY3gPjKoGckx4UKEz/VToEpj+EFyV8ZJ1fUZDlz2y +gt+uG+oXCOGwG/zif5WXWqW83M2knVnLo0rvinDGkwv2hNBlUY8WWg9VnZb8GNvl +VFo2wWcm8KdG/luAktql8h0NBGGyRVdYxo5uVbcMeAf5r3FR8b+zBrhGCKvjVwM0 +PcKsINlxzDjGmtERc7/h3BBEAQ/zsj0ZviQpUmrOW2+qaCloz6AkVEq97y7PhLof +g19MuiFB4PyAhHfwbw9R19+KaZpTPJesl80Ke8LO8bqFl2+6+ci6mckO13k59F2F +Kd4Da9oAE6kk4ntxq0/pxHg3QPqmBbh1HjWdvk26OL8DQDUWYEiW47UD1BoRumW6 +cIgOPe+9/EC5IvekEKxdlrGEAGKfKwIDAQABAoICAAG5dn8lweEDVaFjJzUJZFxV +3nA43V7DJ6xpkcM6RvD8XqlTHgjxoVKN5sO4f4T7cL1t3o5FcHPzWtV8giZjGJw/ +CaWmnhmL/H+seL8nUzFcIh0ckguj4++lhhq5Sn7rIum1/s3UkuJxa835lpuSHL28 +SzoMzCfkxfXieSDOrN5MFfVzaBTlLyOn+Soezo07u9P99+PcVsXPxOPQNVrj+bPa +Tg4JWKlrJ5yPmQLotrt18EwMzHy5FZtYPO6VLtObDksZBAlJOVPLFg0NpcbMBhvQ +XAK5R2AHljw2PAhgWzNnpCmnF5NzRrqSt3i5szSvp1M5khmXsXEPJVfa8jJpL+uq +7kqL8TpXhud7Kh0TVMttwhS17weFU7dgcR9skrGD5IXE5cd9tm0HYLAp4dJUidHS +/uQPKoVntzeStjHrgD09fgqHBNFKc1MdvjgpIN9NrzO1lzyt08ynlBwiZWmQErPe +zhy3TVKlC839MBzzWkthDtIn/hgQMK6cpOfIE3qdFX2nIbTfeXy2M7SYdD47Cc+5 +06UXqJdRA29Xuj9qN+JcZHMEh4cXfM+UodthR2spGktEkYsK6cREidCsTrmSroMn +z/Yi1OaWt292U2Izo/Tf7hgWuayVl4V2jP+JCjXmSXE+59/mBoER3DI9RQW9QOkr +RGTBmptWnJHMH0+c1nlxAoIBAQD5ZXSZMiw30NkCRpMgp+WI74O44A/wA19Vzb0L +Wkom7I7KzMvwjCUb/qb+nPbTG/LPYG9o4yweQi8+Pk4GHt6hSj6/bpVNfOWxTU/Q +q7SW2MUaq7VQNW708+8YAgxhvJV4feJwrUCUZRdIDYtaLq5d2Bv1k+x3MCLjJIhU +lPHjYtYtbaaMF6GR50hWzlkY2oalXPMVRWj1K56/OQYUGQcLkR1ITkMx6v+FsYT+ +XiCiufdJUbdGlgdafwgBXWYGyq34Mdcx4yiGjThGa9YgPtHfBSra/aNjDtzOa8eX +HFeniedOVd9HhPwdLfLKZ3nmyM33GKaEbNecBXmtBIeCb9hZAoIBAQDqjPZX8tVb +7kAEhmAph5d08NGWSdKvoRvL1uEfwQHqVKbqR2i1rWSzuFFlys3f7kOL1zinfgnM +yAk3qj7+zL4/fjySYnTlIGSKnll2uCQN7wHNH/2WklywVm6acd46KWpmyVIyZrdR +0ewfMIIQV2uNbtF4hJ1lJLkRpX2zEeHFx73WCX+JNLBwR0ywHhs8OJFkUimxZ9qZ +R9L/z6SzcrZcf3avLJMBBEmnieA63jzM77kInndRHZd6C9LpQQNbSPYD6CovzKfA +Bme0RKCSawZqhVF/sT6KZSNbLgaJPbt2UP3etCD6T14YXsEyXLgDy0/+Ivz8Pr4p +IahyKO5WJwMjAoIBAGW6CHFkkbzCp4HSH7k2Qt40NFp4qoeQGJb7MJ0s2wo9e5El +MBqST6C3oo5AoD1ELSqBf3AdGaXOAU82QsUkWlMX4bhb9vKAe4BytJe9MhBFo0BZ +wb9RzEyGI4R7cWl8bsuTLYYgZTMiePie7bR/TghhWiY1jEKhk9lq0WEO0AucCRjG +nSSPjwvgdxVRXe5RVJKm81A9264FeN8u91fDTaheLL+NjMMTw95YppLK+izmBgQW +HNfh4mX7YtyLqE4k5glS6yAiNCmN+OJgohrNBPYfOXfR9Y82RMK+G897dBWWno7J +YCXgDKYqU9pTktmcFscvetyROPEfGp6ENnHyBSECggEBAJjTjFe15Atoa9IG9HVa +4fbSSt3P8DV7li71Le6Qxfy3d6LDMJjgB/OKL49R218DUoO1kjagSyZhWJAqn61K +HtQkHreK63u35YrkropKZUOm7deH9qW7bCWBy8NaWmAvSCL9Hk+02dG4JFAWPUkE +jRG0mUwbrKqQiP3UhNi+2AsUoL7rpWvzJtuhuXgvxbMxcJqbZosvjiG9yN/hngFG +x0fxzZVKR+arsoo1riLtV1R5Bml1R21VCLP/LEfLkrJSEeptxb8rbEoUYlH1PWLp +1V5my7mV9ZgbWjQ5Aw09af4nu6L2X155hGgApYV5IHVobhC7H3gEMcd/JNBtlw4P +kV0CggEBANC3j6zUsFT2pEJCN7SlHxf8Du9TuqnhuKi2WnMA2qhgJTSgDc0UaUt2 +QiUTF+TIIg+RJoyd5uVmlJYIRQLSU5xqxeS15RTIunw67GsqrSOT5XGDTvid0UCR +TMb15uhe4kL7peiadpa/fMvMxSVNh2Y2QuZRJEuQBZfROUzUI61yUWDnR96elEk7 +J0+mZko0rdPV+uJ/bSgJY5VYde2ynL9cmUGQfg2bOva7/cyaLb6h4MdnBe7He0dn +UqItwtJ1GrwlJx1FeE6D9uPy/YfoKgpWL0KkL5LRD48LTWPCJsaoOqatW0R+qexU +rJ98v+ZO1xZec9kMwz0v9iUBkSYBXkg= +-----END PRIVATE KEY-----)"); + +} + +TString GetCertificateForTest() +{ + return TString(R"(-----BEGIN CERTIFICATE----- +MIIFPzCCAycCFF/7eZiR10RwN8JRGcAAyZC8Xu39MA0GCSqGSIb3DQEBCwUAMHIx +CzAJBgNVBAYTAlVWMREwDwYDVQQIDAhNaWxreVdheTERMA8GA1UEBwwIT3Jpb25B +cm0xDjAMBgNVBAoMBUVhcnRoMRkwFwYDVQQLDBBFYXJ0aERldmVsb3BtZW50MRIw +EAYDVQQDDAlsb2NhbGhvc3QwIBcNMjUwNTIyMTE0NjMzWhgPMjA1MjEwMDcxMTQ2 +MzNaMEQxCzAJBgNVBAYTAlVWMREwDwYDVQQIDAhNaWxreVdheTEOMAwGA1UECgwF +RWFydGgxEjAQBgNVBAMMCWxvY2FsaG9zdDCCAiIwDQYJKoZIhvcNAQEBBQADggIP +ADCCAgoCggIBAOSAEAz3u3Sr90OWuCc7oABTbAQxr0UcfGlnTEqNszjjjpfZkux4 +Av51qG9q9HKS2gt+zT7VNTP4VOf0N736uCDxjkeQfE6C3s1Cx5iE464UOV9Htzt9 +WRirFwFyZcNa57Kf5Cq9ztZwwXkcxN66s2viRVP3NDhhcD5V0rNmAR/cG7XCea+0 +PiGznUrUJUmKvvPIdA8Gb3aZ+9mlTUaQMm2GoQsuOEuuIgOGP52EAb8hhJnGGdbI +/uBYwSzh/A93gtKcsSwGvE7jPrNYG/kQg5dGZ6ogWUnkZO3roUljeA+MqgZyTHhQ +oTP9VOgSmP4QXJXxknV9RkOXPbKC364b6hcI4bAb/OJ/lZdapbzczaSdWcujSu+K +cMaTC/aE0GVRjxZaD1WdlvwY2+VUWjbBZybwp0b+W4CS2qXyHQ0EYbJFV1jGjm5V +twx4B/mvcVHxv7MGuEYIq+NXAzQ9wqwg2XHMOMaa0RFzv+HcEEQBD/OyPRm+JClS +as5bb6poKWjPoCRUSr3vLs+Euh+DX0y6IUHg/ICEd/BvD1HX34ppmlM8l6yXzQp7 +ws7xuoWXb7r5yLqZyQ7XeTn0XYUp3gNr2gATqSTie3GrT+nEeDdA+qYFuHUeNZ2+ +Tbo4vwNANRZgSJbjtQPUGhG6ZbpwiA497738QLki96QQrF2WsYQAYp8rAgMBAAEw +DQYJKoZIhvcNAQELBQADggIBAGkxQD3jWmdkmiefblrPtslDKdGpo3Mu18QiUrDk +vf7BrTwPbPCm4/zTYPKkxfWKbSZRJr6Fg0Glc0d5HczjDisqh519rqDif3NTGQGK +/eaVi7JQkKClIM7DUNb3nNmWwpoloDsgVbIQ9eJed1vTctcVMa0OowpL5JD6fhE+ +oRHSBo8f5VZkViSA7TRoFsqLfgmZZDHp+Hvv8Z47la0AGRynnwFybea7DIRoE+6J +lNA7LPYkwKUkfz88ayGapbqldmyj0rlX74DqyS3G5pS7JYqGPC8MWtpmaklJnP+Y +J8XnEtjIZFx1j6E4ocI/pfSFCXQU2v6ICNpEOLo3IPAtpgjpCGOPXX/ppSo9/N8+ +JOpT+3OlYPXKuXftCu57WD1f62nsTp/2jySN3I2Ej6xyzGSHh4Dejzvh0bNEOFMy +9Z2YFY90jgM6qLfNhrfbb/nENADusMsyjdIKX9sq/aCaAy8FKasGgukuAHFdlg2N +4fovR1GiOK7ALaUiDarytEutINLx/g9FK1S9AXL8Rs1ho3ARsvtuoiU8WILAhWcr +wCvCu8XZcCG7ZaOvTdsbxuqqBOf0+oT4g9kJmFjq4f5ogcZ41hyjvMjejmlAxJ7M +un1GLHp0TMSAhnTeWN1ImAJZDh81b0q1TbRArP9KU/DTaKTdeWwWbSiNw0zMoEN8 +kUJ9 +-----END CERTIFICATE-----)"); +} + +TString GetTempCaPathForTest() +{ + TString ca = R"(-----BEGIN CERTIFICATE----- +MIIFxzCCA6+gAwIBAgIUWl1eFkYtxF33bxXraEskr3Tw7OcwDQYJKoZIhvcNAQEL +BQAwcjELMAkGA1UEBhMCVVYxETAPBgNVBAgMCE1pbGt5V2F5MREwDwYDVQQHDAhP +cmlvbkFybTEOMAwGA1UECgwFRWFydGgxGTAXBgNVBAsMEEVhcnRoRGV2ZWxvcG1l +bnQxEjAQBgNVBAMMCWxvY2FsaG9zdDAgFw0yNTA1MjIxMTQxMTVaGA8yMDUyMTAw +NzExNDExNVowcjELMAkGA1UEBhMCVVYxETAPBgNVBAgMCE1pbGt5V2F5MREwDwYD +VQQHDAhPcmlvbkFybTEOMAwGA1UECgwFRWFydGgxGTAXBgNVBAsMEEVhcnRoRGV2 +ZWxvcG1lbnQxEjAQBgNVBAMMCWxvY2FsaG9zdDCCAiIwDQYJKoZIhvcNAQEBBQAD +ggIPADCCAgoCggIBAOK5cEOHW93Z0Al+E42QTtYW9RD11fUgTsU0FLoynBPfqMA3 +mfSWIkvNqeKH6dgQplgnc95ypCna+Dy6DkLzq2OrXVwiza0+v5ibblNxZ/QsTa/Q +ScRhqrBbc8quAXwODrBz98QNZIkLACAjKzzuhEyXpzOuCetGjf0BSrExSzXHKVXr +ghYAgR2ldkz3r2US3WcoJynHik1yy+htUzcrG+8MeBt4ZBAb9N+BytKJLBAqwt1h +5g6v5+t4n6PL6qa8BgievMVarhJkRs6i3BM9+skwFZNeykRd1vTGINa1lW0SdWAc +QIP5GXX4zk+1V1mtpc7GD+3hnQNMpw++NMY3shC3yNbmPFFTypE/eHv/wbojgLO/ +ECCJH3/1WPUdhlfsta07FMKKRDiPVRKpgwPcCPTxfh+C4Zg83Sfb/SuSAiDTFf55 +kV3mf3AyfJQ9qwPf78CWW2oGpx4LoALHMoE+UmLlC9roOHE2wR/hkGotUmTH+VVd +oU6mkvx82lOfRMYRz70x5PcFnRehKQ48yh4x5Nqww/LLgK04ADiEpO85ybKmBsQl +cC8gvshmvvMAjZKUuNtp/anjIS9HskX9zj1W8h6kBeEoZSd4Dv0oTn49Z+5tnuiu +ut1KKX9PTriW1IpJ8yFAYaUunjtklnMu3aobJUbYV/Kha7/7ZNoH/6av+bMLAgMB +AAGjUzBRMB0GA1UdDgQWBBR8SyxkHIc4yv/BCJ8UzJzwS9c4kjAfBgNVHSMEGDAW +gBR8SyxkHIc4yv/BCJ8UzJzwS9c4kjAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3 +DQEBCwUAA4ICAQAF/O+rq7Y3cAhxdPARyzVH5F5VBQwKc/bqwWhZcEblYIzErBX2 +uYqxM4lVE+Dssa8JTdp5c0QhBpC19fL5X0b95BOIDwsXiDaJzmVIjttUQk//VobF +aJFBkZbSOIT0Cpua57512xpZ+PBbrtBaKeKN4pns6UNJjGpOKpiCNX2uCSlPJfNP +zO7rMPj9yUnMSzUfQitm8fk4JNTzxrUsWdxieytSK1YDK26l0QQ9urAHlfRAyzpT +Pz4NdlR4tZeuNNvCvnmIwEK1x7gsuJBV7+TEZc/cUAQkaulWI7c3aZaPwrMqisiJ +NWQT5UlbIxT8jYxhQhbZervAUCSEzPxM/3jNcUKV69RXSD0fJMGWmVeP3LLIr2vc +P+MqbF7+k5fTzRm1U53vc5G4owh2sVgi+eRbyy1d5fQyAZHrh1bkoJu6ZTKZJsuw +OMvX9dCXtGMLoTHfy+Qqyb5SG2c1Ch+YHvo3Mf9bZroGRXF10NLwHBjb1Z6ai7ME +ZLQxlREW21uBua/gz9ONeKfHz4nUVdG6nCO+H2AeWTjQcpTzQo+iPi2SOZifSegp +tjs6dbR3NCCyhWAL6SUGNg9xRkU6PsudE++71IFsvK4J86ZoVv1SYk/jZDL9XIMk +KCo31N1GAW1wyHCibwsi8jKVLMwvenkpU7XwNiyz/kR2bWzG69Qve5bIzg== +-----END CERTIFICATE----- + )"; + char name[32] = {0}; + const char* namemask = "ydb-temp-ca-XXXXXX"; + strncpy(name, namemask, sizeof(name)); + int fd = mkstemp(name); + Y_ABORT_UNLESS(fd > 0); + Y_ABORT_UNLESS(write(fd, ca.data(), ca.size()) == (ssize_t)ca.size()); + return TString(name); +} + +} diff --git a/ydb/library/actors/interconnect/ut/lib/tls/tls.h b/ydb/library/actors/interconnect/ut/lib/tls/tls.h new file mode 100644 index 000000000000..3c762c6b0584 --- /dev/null +++ b/ydb/library/actors/interconnect/ut/lib/tls/tls.h @@ -0,0 +1,11 @@ +#pragma once + +#include + +namespace NInterconnect { + +TString GetPrivateKeyForTest(); +TString GetCertificateForTest(); +TString GetTempCaPathForTest(); + +} diff --git a/ydb/library/actors/interconnect/ut/lib/tls/ya.make b/ydb/library/actors/interconnect/ut/lib/tls/ya.make new file mode 100644 index 000000000000..f150054fac8d --- /dev/null +++ b/ydb/library/actors/interconnect/ut/lib/tls/ya.make @@ -0,0 +1,7 @@ +LIBRARY() + +SRCS( + tls.cpp +) + +END() diff --git a/ydb/library/actors/interconnect/ut/lib/ya.make b/ydb/library/actors/interconnect/ut/lib/ya.make index 615c6a0e5449..24530f778abe 100644 --- a/ydb/library/actors/interconnect/ut/lib/ya.make +++ b/ydb/library/actors/interconnect/ut/lib/ya.make @@ -7,4 +7,8 @@ SRCS( ic_test_cluster.h ) +PEERDIR( + ydb/library/actors/interconnect/ut/lib/tls +) + END() diff --git a/ydb/library/actors/interconnect/ut/outgoing_stream_ut.cpp b/ydb/library/actors/interconnect/ut/outgoing_stream_ut.cpp index 5066525a0eab..b9c0b743b1cc 100644 --- a/ydb/library/actors/interconnect/ut/outgoing_stream_ut.cpp +++ b/ydb/library/actors/interconnect/ut/outgoing_stream_ut.cpp @@ -6,28 +6,35 @@ #define Ctest Cnull Y_UNIT_TEST_SUITE(OutgoingStream) { - Y_UNIT_TEST(Basic) { - std::vector buffer; - buffer.resize(4 << 20); + void OutgoingTest(bool withExternal) { + struct { + ui32 ZcCounter = 0; // ZcCounter to handle zero copy async transfer from some event queue + std::vector Buffer; + } ev; + ev.Buffer.resize(4 << 20); TReallyFastRng32 rng(EntropyPool()); - for (char *p = buffer.data(); p != buffer.data() + buffer.size(); p += sizeof(ui32)) { + for (char *p = ev.Buffer.data(); p != ev.Buffer.data() + ev.Buffer.size(); p += sizeof(ui32)) { *reinterpret_cast(p) = rng(); } for (ui32 nIter = 0; nIter < 10; ++nIter) { - Cerr << "nIter# " << nIter << Endl; + Ctest << "nIter# " << nIter << Endl; size_t base = 0; // number of dropped bytes size_t sendOffset = 0; // offset to base size_t pending = 0; // number of bytes in queue - NInterconnect::TOutgoingStreamT<4096> stream; + using TOutStream = NInterconnect::TOutgoingStreamT<4096>; + TOutStream stream; + bool zcSync = false; size_t numRewindsRemain = 10; + + ui32 zcTransferId = 0; // Emulate zc copy counter - while (base != buffer.size()) { - const size_t bytesToEnd = buffer.size() - (base + sendOffset); + while (base != ev.Buffer.size()) { + const size_t bytesToEnd = ev.Buffer.size() - (base + sendOffset); Ctest << "base# " << base << " sendOffset# " << sendOffset << " pending# " << pending << " bytesToEnd# " << bytesToEnd; @@ -37,15 +44,27 @@ Y_UNIT_TEST_SUITE(OutgoingStream) { const size_t maxBuffers = 128; std::vector iov; - stream.ProduceIoVec(iov, maxBuffers, Max()); + std::vector ctrl; + stream.ProduceIoVec(iov, maxBuffers, Max(), withExternal ? &ctrl : nullptr); + + if (withExternal) { + Y_ABORT_UNLESS(iov.size() == ctrl.size()); + if (zcSync == false) { + for (auto& x : ctrl) { + if (x.ZcReady()) { + x.Update(++zcTransferId); + } + } + } + } size_t offset = base + sendOffset; for (const auto& [ptr, len] : iov) { - UNIT_ASSERT(memcmp(buffer.data() + offset, ptr, len) == 0); + UNIT_ASSERT(memcmp(ev.Buffer.data() + offset, ptr, len) == 0); offset += len; } UNIT_ASSERT(iov.size() == maxBuffers || offset == base + sendOffset + pending); - const char *nextData = buffer.data() + base + sendOffset + pending; + const char *nextData = ev.Buffer.data() + base + sendOffset + pending; const size_t nextDataMaxLen = bytesToEnd - pending; const size_t nextDataLen = nextDataMaxLen ? rng() % Min(16384, nextDataMaxLen) + 1 : 0; @@ -54,7 +73,7 @@ Y_UNIT_TEST_SUITE(OutgoingStream) { size_t offset = base + sendOffset + pending - bytesToScan; stream.ScanLastBytes(bytesToScan, [&](TContiguousSpan span) { UNIT_ASSERT(offset + span.size() <= base + sendOffset + pending); - UNIT_ASSERT(memcmp(buffer.data() + offset, span.data(), span.size()) == 0); + UNIT_ASSERT(memcmp(ev.Buffer.data() + offset, span.data(), span.size()) == 0); offset += span.size(); }); UNIT_ASSERT_VALUES_EQUAL(offset, base + sendOffset + pending); @@ -67,7 +86,8 @@ Y_UNIT_TEST_SUITE(OutgoingStream) { ADVANCE, REWIND, DROP, - BOOKMARK + BOOKMARK, + EMULATE_ZC_USAGE, }; std::vector actions; @@ -83,13 +103,17 @@ Y_UNIT_TEST_SUITE(OutgoingStream) { actions.push_back(EAction::ADVANCE); actions.push_back(EAction::DROP); + if (withExternal) { + actions.push_back(EAction::EMULATE_ZC_USAGE); + } + switch (actions[rng() % actions.size()]) { case EAction::COPY_APPEND: { Ctest << " COPY_APPEND nextDataLen# " << nextDataLen; auto span = stream.AcquireSpanForWriting(nextDataLen); UNIT_ASSERT(span.size() != 0); memcpy(span.data(), nextData, span.size()); - stream.Append(span); + stream.Append(span, nullptr); pending += span.size(); break; } @@ -102,7 +126,7 @@ Y_UNIT_TEST_SUITE(OutgoingStream) { case EAction::REF_APPEND: Ctest << " REF_APPEND nextDataLen# " << nextDataLen; - stream.Append({nextData, nextDataLen}); + stream.Append({nextData, nextDataLen}, &ev.ZcCounter); pending += nextDataLen; break; @@ -132,16 +156,33 @@ Y_UNIT_TEST_SUITE(OutgoingStream) { break; } - case EAction::BOOKMARK: + case EAction::BOOKMARK: { Ctest << " BOOKMARK nextDataLen# " << nextDataLen; auto bookmark = stream.Bookmark(nextDataLen); stream.WriteBookmark(std::move(bookmark), {nextData, nextDataLen}); pending += nextDataLen; break; + } + + case EAction::EMULATE_ZC_USAGE: + if (zcSync == false) { + UNIT_ASSERT_VALUES_EQUAL(ev.ZcCounter, zcTransferId); + zcSync = true; + } + break; } Ctest << Endl; } + ev.ZcCounter = 0; } } + + Y_UNIT_TEST(Basic) { + OutgoingTest(false); + } + + Y_UNIT_TEST(WithExternalLife) { + OutgoingTest(true); + } } diff --git a/ydb/library/actors/interconnect/ut/protos/interconnect_test.proto b/ydb/library/actors/interconnect/ut/protos/interconnect_test.proto index b74d068a8b8b..c4218d67650c 100644 --- a/ydb/library/actors/interconnect/ut/protos/interconnect_test.proto +++ b/ydb/library/actors/interconnect/ut/protos/interconnect_test.proto @@ -2,7 +2,11 @@ package NInterconnectTest; message TEvTest { optional uint64 SequenceNumber = 1; - optional bytes Payload = 2; + optional uint64 DataCrc = 2; + oneof Data { + bytes Payload = 3; + uint32 PayloadId = 4; + } } message TEvTestChan { diff --git a/ydb/library/actors/interconnect/ut_fat/main.cpp b/ydb/library/actors/interconnect/ut_fat/main.cpp index 251ce1b75b60..504a8aee6f83 100644 --- a/ydb/library/actors/interconnect/ut_fat/main.cpp +++ b/ydb/library/actors/interconnect/ut_fat/main.cpp @@ -1,4 +1,3 @@ - #include #include #include @@ -9,23 +8,26 @@ #include #include +#include #include #include #include #include -Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { - using namespace NActors; +using namespace NActors; +namespace { class TSenderActor: public TSenderBaseActor { TDeque InFly; ui16 SendFlags; + bool UseRope; public: - TSenderActor(const TActorId& recipientActorId, ui16 sendFlags) + TSenderActor(const TActorId& recipientActorId, ui16 sendFlags, bool useRope) : TSenderBaseActor(recipientActorId, 32) , SendFlags(sendFlags) + , UseRope(useRope) { } @@ -36,8 +38,19 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { void SendMessage(const TActorContext& ctx) override { const ui32 flags = IEventHandle::MakeFlags(0, SendFlags); const ui64 cookie = SequenceNumber; - const TString payload('@', RandomNumber(65536) + 4096); - ctx.Send(RecipientActorId, new TEvTest(SequenceNumber, payload), flags, cookie); + + const TString payload(RandomNumber(65536) + 4096, '@'); + + auto ev = new TEvTest(SequenceNumber); + ev->Record.SetDataCrc(Crc32c(payload.data(), payload.size())); + + if (UseRope) { + ev->Record.SetPayloadId(ev->AddPayload(TRope(payload))); + } else { + ev->Record.SetPayload(payload); + } + + ctx.Send(RecipientActorId, ev, flags, cookie); InFly.push_back(SequenceNumber); ++InFlySize; ++SequenceNumber; @@ -75,7 +88,7 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { }; class TReceiverActor: public TReceiverBaseActor { - ui64 ReceivedCount = 0; + std::atomic ReceivedCount = 0; TNode* SenderNode = nullptr; public: @@ -85,20 +98,47 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { { } + ui64 GetReceivedCount() const { + return ReceivedCount.load(std::memory_order_relaxed); + } + void Handle(TEvTest::TPtr& ev, const TActorContext& /*ctx*/) override { const NInterconnectTest::TEvTest& m = ev->Get()->Record; Y_ABORT_UNLESS(m.HasSequenceNumber()); - Y_ABORT_UNLESS(m.GetSequenceNumber() >= ReceivedCount, "got #%" PRIu64 " expected at least #%" PRIu64, - m.GetSequenceNumber(), ReceivedCount); - ++ReceivedCount; + ui64 cur = GetReceivedCount(); + Y_ABORT_UNLESS(m.GetSequenceNumber() >= cur, "got #%" PRIu64 " expected at least #%" PRIu64, + m.GetSequenceNumber(), cur); + if (m.HasPayloadId()) { + auto rope = ev->Get()->GetPayload(m.GetPayloadId()); + auto data = rope.GetContiguousSpan(); + auto crc = Crc32c(data.data(), data.size()); + Y_ABORT_UNLESS(m.GetDataCrc() == crc); + } else { + Y_ABORT_UNLESS(m.HasPayload()); + } + ReceivedCount.fetch_add(1); SenderNode->Send(ev->Sender, new TEvTestResponse(m.GetSequenceNumber())); } ~TReceiverActor() override { - Cerr << "Received " << ReceivedCount << " messages\n"; + Cerr << "Received " << GetReceivedCount() << " messages\n"; } }; + TString GetZcState(TTestICCluster& testCluster, ui32 me, ui32 peer) { + auto httpResp = testCluster.GetSessionDbg(me, peer); + const TString& resp = httpResp.GetValueSync(); + const TString pattern = "ZeroCopy state"; + auto pos = resp.find(pattern); + UNIT_ASSERT_C(pos != std::string::npos, "zero copy field was not found in http info"); + pos += pattern.size(); + size_t end = resp.find('<', pos); + UNIT_ASSERT(end != std::string::npos); + return resp.substr(pos, end - pos); + } +} + +Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { Y_UNIT_TEST(InterconnectTestWithProxyUnsureUndelivered) { ui32 numNodes = 2; double bandWidth = 1000000; @@ -109,12 +149,46 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); - TSenderActor* senderActor = new TSenderActor(recipient, flags); + TSenderActor* senderActor = new TSenderActor(recipient, flags, false); testCluster.RegisterActor(senderActor, 1); NanoSleep(30ULL * 1000 * 1000 * 1000); } + Y_UNIT_TEST(InterconnectTestWithProxyUnsureUndeliveredWithRopeXdc) { + ui32 numNodes = 2; + double bandWidth = 1000000; + ui16 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered; + TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true}; + + TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings); + + TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); + const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); + TSenderActor* senderActor = new TSenderActor(recipient, flags, true); + testCluster.RegisterActor(senderActor, 1); + + NanoSleep(30ULL * 1000 * 1000 * 1000); + } + + Y_UNIT_TEST(InterconnectTestWithProxyTlsReestablishWithXdc) { + ui32 numNodes = 2; + double bandWidth = 1000000; + ui16 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered; + TTestICCluster::TTrafficInterrupterSettings interrupterSettings{TDuration::Seconds(2), bandWidth, true}; + + TTestICCluster testCluster(numNodes, TChannelsConfig(), &interrupterSettings, nullptr, TTestICCluster::USE_TLS); + + TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); + const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); + TSenderActor* senderActor = new TSenderActor(recipient, flags, true); + testCluster.RegisterActor(senderActor, 1); + + NanoSleep(30ULL * 1000 * 1000 * 1000); + + UNIT_ASSERT_C(receiverActor->GetReceivedCount() > 0, "no traffic detected!"); + } + Y_UNIT_TEST(InterconnectTestWithProxy) { ui32 numNodes = 2; double bandWidth = 1000000; @@ -125,9 +199,46 @@ Y_UNIT_TEST_SUITE(InterconnectUnstableConnection) { TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); - TSenderActor* senderActor = new TSenderActor(recipient, flags); + TSenderActor* senderActor = new TSenderActor(recipient, flags, false); testCluster.RegisterActor(senderActor, 1); NanoSleep(30ULL * 1000 * 1000 * 1000); } } + +Y_UNIT_TEST_SUITE(InterconnectZcLocalOp) { + + Y_UNIT_TEST(ZcIsDisabledByDefault) { + ui32 numNodes = 2; + TTestICCluster testCluster(numNodes, TChannelsConfig()); + ui16 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered; + + TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); + const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); + TSenderActor* senderActor = new TSenderActor(recipient, flags, false); + testCluster.RegisterActor(senderActor, 1); + + NanoSleep(5ULL * 1000 * 1000 * 1000); + UNIT_ASSERT_VALUES_EQUAL("Disabled", GetZcState(testCluster, 1, 2)); + } + + Y_UNIT_TEST(ZcDisabledAfterHiddenCopy) { + ui32 numNodes = 2; + ui16 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered; + + TTestICCluster testCluster(numNodes, TChannelsConfig(), nullptr, nullptr, TTestICCluster::USE_ZC); + + TReceiverActor* receiverActor = new TReceiverActor(testCluster.GetNode(1)); + const TActorId recipient = testCluster.RegisterActor(receiverActor, 2); + TSenderActor* senderActor = new TSenderActor(recipient, flags, true); + testCluster.RegisterActor(senderActor, 1); + + NanoSleep(5ULL * 1000 * 1000 * 1000); + // Zero copy send via loopback causes hidden copy inside linux kernel +#if defined (__linux__) + UNIT_ASSERT_VALUES_EQUAL("DisabledHiddenCopy", GetZcState(testCluster, 1, 2)); +#else + UNIT_ASSERT_VALUES_EQUAL("Disabled", GetZcState(testCluster, 1, 2)); +#endif + } +} diff --git a/ydb/library/actors/interconnect/ya.make b/ydb/library/actors/interconnect/ya.make index aec6812e403b..82989795b99d 100644 --- a/ydb/library/actors/interconnect/ya.make +++ b/ydb/library/actors/interconnect/ya.make @@ -37,6 +37,8 @@ SRCS( interconnect_tcp_server.h interconnect_tcp_session.cpp interconnect_tcp_session.h + interconnect_zc_processor.cpp + interconnect_zc_processor.h load.cpp load.h logging.h