Skip to content

Commit 7164f96

Browse files
hoxyqmeta-codesync[bot]
authored andcommitted
Limit WebSocket queue size for packager connection (facebook#54300)
Summary: Pull Request resolved: facebook#54300 # Changelog: [Internal] Establishes a queue mechanism on top of the OkHttp's WebSocket implementation. This mechanism will control the queue size and guarantee that we don't have more than 16MB scheduled. This prevents the scenario of when OkHttp forces WS disconnection because of this threshold. Reviewed By: motiz88, alanleedev Differential Revision: D85581509 fbshipit-source-id: ac3e830c935c1301b674739c96fcbe18446eaa71
1 parent f51973a commit 7164f96

File tree

3 files changed

+156
-11
lines changed

3 files changed

+156
-11
lines changed

packages/react-native/ReactAndroid/src/main/java/com/facebook/react/devsupport/CxxInspectorPackagerConnection.kt

Lines changed: 99 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,17 @@ package com.facebook.react.devsupport
99

1010
import android.os.Handler
1111
import android.os.Looper
12+
import com.facebook.common.logging.FLog
1213
import com.facebook.jni.HybridData
1314
import com.facebook.proguard.annotations.DoNotStrip
1415
import com.facebook.proguard.annotations.DoNotStripAny
16+
import com.facebook.react.common.annotations.VisibleForTesting
1517
import com.facebook.soloader.SoLoader
1618
import java.io.Closeable
19+
import java.nio.ByteBuffer
20+
import java.nio.charset.StandardCharsets
21+
import java.util.ArrayDeque
22+
import java.util.Queue
1723
import java.util.concurrent.TimeUnit
1824
import okhttp3.OkHttpClient
1925
import okhttp3.Request
@@ -67,7 +73,7 @@ internal class CxxInspectorPackagerConnection(
6773
*/
6874
@DoNotStripAny
6975
private interface IWebSocket : Closeable {
70-
fun send(message: String)
76+
fun send(chunk: ByteBuffer)
7177

7278
/**
7379
* Close the WebSocket connection. NOTE: There is no close() method in the C++ interface.
@@ -76,6 +82,95 @@ internal class CxxInspectorPackagerConnection(
7682
override fun close()
7783
}
7884

85+
/**
86+
* A simple WebSocket wrapper that prevents having more than 16MiB of messages queued
87+
* simultaneously. This is done to stop OkHttp from closing the WebSocket connection.
88+
*
89+
* https://github.com/facebook/react-native/issues/39651.
90+
* https://github.com/square/okhttp/blob/4e7dbec1ea6c9cf8d80422ac9d44b9b185c749a3/okhttp/src/commonJvmAndroid/kotlin/okhttp3/internal/ws/RealWebSocket.kt#L684.
91+
*/
92+
private class InspectorPackagerWebSocketImpl(
93+
private val nativeWebSocket: WebSocket,
94+
private val handler: Handler,
95+
) : IWebSocket {
96+
private val messageQueue: Queue<Pair<String, Int>> = ArrayDeque()
97+
private val queueLock = Any()
98+
private val drainRunnable =
99+
object : Runnable {
100+
override fun run() {
101+
FLog.d(TAG, "Attempting to drain the message queue after ${drainDelayMs}ms")
102+
tryDrainQueue()
103+
}
104+
}
105+
106+
/**
107+
* We are providing a String to OkHttp's WebSocket, because there is no guarantee that all CDP
108+
* clients will support binary data format.
109+
*/
110+
override fun send(chunk: ByteBuffer) {
111+
synchronized(queueLock) {
112+
val messageSize = chunk.capacity()
113+
val message = StandardCharsets.UTF_8.decode(chunk).toString()
114+
val currentQueueSize = nativeWebSocket.queueSize()
115+
116+
if (currentQueueSize + messageSize > MAX_QUEUE_SIZE) {
117+
FLog.d(TAG, "Reached queue size limit. Queueing the message.")
118+
messageQueue.offer(Pair(message, messageSize))
119+
scheduleDrain()
120+
} else {
121+
if (messageQueue.isEmpty()) {
122+
nativeWebSocket.send(message)
123+
} else {
124+
messageQueue.offer(Pair(message, messageSize))
125+
tryDrainQueue()
126+
}
127+
}
128+
}
129+
}
130+
131+
override fun close() {
132+
synchronized(queueLock) {
133+
handler.removeCallbacks(drainRunnable)
134+
messageQueue.clear()
135+
nativeWebSocket.close(1000, "End of session")
136+
}
137+
}
138+
139+
private fun tryDrainQueue() {
140+
synchronized(queueLock) {
141+
while (messageQueue.isNotEmpty()) {
142+
val (nextMessage, nextMessageSize) = messageQueue.peek() ?: break
143+
val currentQueueSize = nativeWebSocket.queueSize()
144+
145+
if (currentQueueSize + nextMessageSize <= MAX_QUEUE_SIZE) {
146+
messageQueue.poll()
147+
if (!nativeWebSocket.send(nextMessage)) {
148+
// The WebSocket is closing, closed, or cancelled.
149+
handler.removeCallbacks(drainRunnable)
150+
messageQueue.clear()
151+
152+
break
153+
}
154+
} else {
155+
scheduleDrain()
156+
break
157+
}
158+
}
159+
}
160+
}
161+
162+
private fun scheduleDrain() {
163+
FLog.d(TAG, "Scheduled a task to drain messages queue.")
164+
handler.removeCallbacks(drainRunnable)
165+
handler.postDelayed(drainRunnable, drainDelayMs)
166+
}
167+
168+
companion object {
169+
private val TAG: String = InspectorPackagerWebSocketImpl::class.java.simpleName
170+
private const val drainDelayMs: Long = 100
171+
}
172+
}
173+
79174
/** Java implementation of the C++ InspectorPackagerConnectionDelegate interface. */
80175
private class DelegateImpl {
81176
private val httpClient =
@@ -130,15 +225,8 @@ internal class CxxInspectorPackagerConnection(
130225
}
131226
},
132227
)
133-
return object : IWebSocket {
134-
override fun send(message: String) {
135-
webSocket.send(message)
136-
}
137228

138-
override fun close() {
139-
webSocket.close(1000, "End of session")
140-
}
141-
}
229+
return InspectorPackagerWebSocketImpl(webSocket, handler)
142230
}
143231

144232
@DoNotStrip
@@ -152,6 +240,8 @@ internal class CxxInspectorPackagerConnection(
152240
SoLoader.loadLibrary("react_devsupportjni")
153241
}
154242

243+
@VisibleForTesting internal const val MAX_QUEUE_SIZE = 16L * 1024 * 1024 // 16MiB
244+
155245
@JvmStatic
156246
private external fun initHybrid(
157247
url: String,

packages/react-native/ReactAndroid/src/main/jni/react/devsupport/JCxxInspectorPackagerConnectionWebSocket.cpp

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,44 @@
55
* LICENSE file in the root directory of this source tree.
66
*/
77

8+
#include <fbjni/ByteBuffer.h>
9+
810
#include "JCxxInspectorPackagerConnectionWebSocket.h"
911

1012
using namespace facebook::jni;
1113
using namespace facebook::react::jsinspector_modern;
1214

1315
namespace facebook::react::jsinspector_modern {
1416

17+
namespace {
18+
19+
local_ref<JByteBuffer::javaobject> getReadOnlyByteBufferFromStringView(
20+
std::string_view sv) {
21+
auto buffer = JByteBuffer::wrapBytes(
22+
const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(sv.data())),
23+
sv.size());
24+
25+
/**
26+
* Return a read-only buffer that shares the underlying contents.
27+
* This guards from accidential mutations on the Java side, since we did
28+
* casting above.
29+
*
30+
* https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html#asReadOnlyBuffer--
31+
*/
32+
static auto method =
33+
buffer->javaClassStatic()->getMethod<JByteBuffer::javaobject()>(
34+
"asReadOnlyBuffer");
35+
return method(buffer);
36+
}
37+
38+
} // namespace
39+
1540
void JCxxInspectorPackagerConnectionWebSocket::send(std::string_view message) {
1641
static auto method =
17-
javaClassStatic()->getMethod<void(const std::string&)>("send");
18-
method(self(), std::string(message));
42+
javaClassStatic()->getMethod<void(local_ref<JByteBuffer::javaobject>)>(
43+
"send");
44+
auto byteBuffer = getReadOnlyByteBufferFromStringView(message);
45+
method(self(), byteBuffer);
1946
}
2047

2148
void JCxxInspectorPackagerConnectionWebSocket::close() {
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
8+
package com.facebook.react.devsupport
9+
10+
import okhttp3.internal.ws.RealWebSocket
11+
import org.assertj.core.api.Assertions.assertThat
12+
import org.junit.Test
13+
14+
class CxxInspectorPackagerConnectionTest {
15+
16+
@Test
17+
fun testMaxQueueSizeEquality() {
18+
val okHttpRealWebSocketClass = RealWebSocket::class.java
19+
val okHttpMaxQueueSizeField = okHttpRealWebSocketClass.getDeclaredField("MAX_QUEUE_SIZE")
20+
okHttpMaxQueueSizeField.isAccessible = true
21+
22+
val okHttpMaxQueueSize = okHttpMaxQueueSizeField.getLong(null)
23+
assertThat(okHttpMaxQueueSize).isNotNull
24+
25+
assertThat(okHttpMaxQueueSize)
26+
.isEqualTo(CxxInspectorPackagerConnection.Companion.MAX_QUEUE_SIZE)
27+
}
28+
}

0 commit comments

Comments
 (0)