Skip to content

Commit b75031f

Browse files
Add WebSocketClient implementation for ReactCxxPlatform (#53233)
Summary: Pull Request resolved: #53233 Changelog: [Internal] This is needed for RN Fantom Reviewed By: rshest Differential Revision: D80115098 fbshipit-source-id: 03036dda311527ac27d12656d9b92c609e0b9ee2
1 parent d009a02 commit b75031f

2 files changed

Lines changed: 357 additions & 0 deletions

File tree

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
8+
#include "WebSocketClient.h"
9+
10+
#include <boost/beast/core.hpp>
11+
#include <boost/beast/websocket.hpp>
12+
#include <folly/Synchronized.h>
13+
#include <folly/Uri.h>
14+
#include <folly/system/ThreadName.h>
15+
#include <react/debug/react_native_assert.h>
16+
#include <atomic>
17+
#include <mutex>
18+
#include <queue>
19+
20+
namespace facebook::react {
21+
22+
struct WebSocketClient::Impl final : public std::enable_shared_from_this<Impl> {
23+
void onResolveCompleted(
24+
boost::system::error_code ec,
25+
const boost::asio::ip::tcp::resolver::results_type& results);
26+
27+
void onConnectionCompleted(boost::system::error_code ec);
28+
29+
void onHandshakeCompleted(boost::system::error_code ec);
30+
31+
void listen();
32+
33+
void write();
34+
35+
void onConnectCallback(bool connected, const std::string& error);
36+
37+
// Callbacks and Uri
38+
OnConnectCallback onConnectCallback_;
39+
OnClosedCallback onClosedCallback_;
40+
OnMessageCallback onMessageCallback_;
41+
std::optional<folly::Uri> uri_;
42+
43+
// Boost Beast WebSocket Client
44+
boost::asio::io_context ioContext_;
45+
boost::asio::ip::tcp::resolver resolver_{ioContext_};
46+
boost::beast::multi_buffer buffer_;
47+
folly::Synchronized<
48+
boost::beast::websocket::stream<boost::asio::ip::tcp::socket>>
49+
ws_{boost::beast::websocket::stream<boost::asio::ip::tcp::socket>{
50+
ioContext_}};
51+
52+
// Input and Output handling
53+
std::mutex mutexOut_;
54+
std::queue<std::string> messagesOut_;
55+
std::atomic<bool> isWriting_{false};
56+
std::atomic<bool> isClosing_{false};
57+
};
58+
59+
WebSocketClient::WebSocketClient() noexcept
60+
: impl_(std::make_shared<WebSocketClient::Impl>()) {}
61+
62+
WebSocketClient::~WebSocketClient() {
63+
WebSocketClient::close("Force close as WebSocketClient being destroyed");
64+
}
65+
66+
void WebSocketClient::setOnClosedCallback(
67+
OnClosedCallback&& callback) noexcept {
68+
impl_->onClosedCallback_ = std::move(callback);
69+
}
70+
71+
void WebSocketClient::setOnMessageCallback(
72+
OnMessageCallback&& callback) noexcept {
73+
impl_->onMessageCallback_ = std::move(callback);
74+
}
75+
76+
void WebSocketClient::connect(
77+
const std::string& url,
78+
OnConnectCallback&& callback) {
79+
if (thread_) {
80+
react_native_assert(false && "WebSocketClient::connect() called twice");
81+
return;
82+
}
83+
84+
impl_->onConnectCallback_ = std::move(callback);
85+
impl_->uri_ = folly::Uri{url};
86+
87+
// Resolve the domain name
88+
impl_->resolver_.async_resolve(
89+
impl_->uri_->hostname(),
90+
std::to_string(impl_->uri_->port()),
91+
[weakImpl = std::weak_ptr(impl_)](
92+
boost::system::error_code ec,
93+
const boost::asio::ip::tcp::resolver::results_type& results) {
94+
if (auto impl = weakImpl.lock()) {
95+
impl->onResolveCompleted(ec, results);
96+
}
97+
});
98+
99+
// Start the I/O thread
100+
static int32_t s_threadId = 0;
101+
thread_ = std::make_unique<std::thread>(
102+
[weakImpl = std::weak_ptr(impl_), tid = s_threadId++]() {
103+
folly::setThreadName("WebSocket" + std::to_string(tid));
104+
if (auto impl = weakImpl.lock()) {
105+
impl->ioContext_.run();
106+
}
107+
});
108+
}
109+
110+
void WebSocketClient::close(const std::string& reason) {
111+
if (!impl_->isClosing_.exchange(true)) {
112+
if (impl_->onClosedCallback_) {
113+
impl_->onClosedCallback_(reason);
114+
}
115+
auto ws = impl_->ws_.wlock();
116+
if (ws->is_open()) {
117+
ws->async_close(
118+
boost::beast::websocket::close_reason(reason), [](auto&&) {});
119+
} else {
120+
ws->next_layer().close();
121+
}
122+
if (thread_) {
123+
thread_->join();
124+
}
125+
}
126+
thread_ = nullptr;
127+
}
128+
129+
void WebSocketClient::send(const std::string& message) {
130+
{
131+
std::lock_guard<std::mutex> lock(impl_->mutexOut_);
132+
impl_->messagesOut_.emplace(message);
133+
}
134+
impl_->write();
135+
}
136+
137+
void WebSocketClient::ping() {
138+
auto ws = impl_->ws_.wlock();
139+
// Send a ping message
140+
ws->async_ping(
141+
{}, [weakImpl = std::weak_ptr(impl_)](boost::beast::error_code ec) {
142+
auto impl = weakImpl.lock();
143+
if (!impl || impl->isClosing_) {
144+
return;
145+
}
146+
if (ec) {
147+
LOG(ERROR) << "Error pinging websocket: " << ec.message();
148+
return;
149+
}
150+
});
151+
}
152+
153+
void WebSocketClient::Impl::onResolveCompleted(
154+
boost::system::error_code ec,
155+
const boost::asio::ip::tcp::resolver::results_type& results) {
156+
if (ec) {
157+
// TODO: Handle retry logic here
158+
onConnectCallback(false, "Failed to resolve host");
159+
return;
160+
}
161+
162+
auto ws = ws_.wlock();
163+
// Make the connection on the IP address we get from a lookup
164+
boost::asio::async_connect(
165+
ws->next_layer(),
166+
results.begin(),
167+
results.end(),
168+
[weakImpl = weak_from_this()](
169+
boost::system::error_code ec, const auto& /*ep*/) {
170+
if (auto impl = weakImpl.lock()) {
171+
impl->onConnectionCompleted(ec);
172+
}
173+
});
174+
}
175+
176+
void WebSocketClient::Impl::onConnectionCompleted(
177+
boost::system::error_code ec) {
178+
if (ec) {
179+
// TODO: Handle retry logic here
180+
onConnectCallback(false, "Failed to connect");
181+
return;
182+
}
183+
184+
auto ws = ws_.wlock();
185+
// https://datatracker.ietf.org/doc/html/rfc6455#section-3:
186+
// > The "resource-name" (also known as /resource name/ in
187+
// > https://datatracker.ietf.org/doc/html/rfc6455#section-4.1)
188+
// > can be constructed by concatenating the following:
189+
// >
190+
// > o "/" if the path component is empty
191+
// >
192+
// > o the path component
193+
// >
194+
// > o "?" if the query component is non-empty
195+
// >
196+
// > o the query component
197+
auto resourceName = (uri_->path().empty() ? "/" : uri_->path()) +
198+
(uri_->query().empty() ? "" : "?" + uri_->query());
199+
// Perform the websocket handshake
200+
ws->async_handshake(
201+
uri_->host() +
202+
(uri_->port() == 0 ? "" : ":" + std::to_string(uri_->port())),
203+
resourceName,
204+
[weakImpl = weak_from_this()](boost::system::error_code ec) {
205+
if (auto impl = weakImpl.lock()) {
206+
impl->onHandshakeCompleted(ec);
207+
}
208+
});
209+
}
210+
211+
void WebSocketClient::Impl::onHandshakeCompleted(boost::system::error_code ec) {
212+
if (ec) {
213+
// TODO: Handle retry logic here
214+
onConnectCallback(false, "Failed to handshake");
215+
return;
216+
}
217+
218+
onConnectCallback(true, "Connected");
219+
220+
// Listen for any messages from the server
221+
listen();
222+
223+
// Start writing any buffered messages
224+
write();
225+
}
226+
227+
void WebSocketClient::Impl::listen() {
228+
if (isClosing_) {
229+
return;
230+
}
231+
auto ws = ws_.wlock();
232+
ws->async_read(
233+
buffer_,
234+
[weakImpl = weak_from_this()](
235+
boost::beast::error_code ec, std::size_t bytes_transferred) {
236+
auto impl = weakImpl.lock();
237+
if (!impl || impl->isClosing_) {
238+
return;
239+
}
240+
if (ec) {
241+
LOG(ERROR) << "Error reading from websocket: " << ec.message();
242+
return;
243+
}
244+
std::string message(
245+
boost::beast::buffers_to_string(impl->buffer_.data()));
246+
if (impl->onMessageCallback_) {
247+
impl->onMessageCallback_(message);
248+
}
249+
impl->buffer_.consume(bytes_transferred);
250+
impl->listen();
251+
});
252+
}
253+
254+
void WebSocketClient::Impl::write() {
255+
if (isClosing_) {
256+
return;
257+
}
258+
if (isWriting_) {
259+
return;
260+
}
261+
isWriting_ = true;
262+
263+
std::shared_ptr<std::string> message;
264+
{
265+
std::lock_guard<std::mutex> lock(mutexOut_);
266+
if (!messagesOut_.empty()) {
267+
message = std::make_shared<std::string>(messagesOut_.front());
268+
messagesOut_.pop();
269+
}
270+
}
271+
272+
if (!message || message->empty()) {
273+
isWriting_ = false;
274+
return;
275+
}
276+
277+
auto ws = ws_.wlock();
278+
ws->async_write(
279+
boost::beast::net::buffer(*message),
280+
[message, weakImpl = weak_from_this()](
281+
boost::beast::error_code ec,
282+
std::size_t /*bytes_transferred*/) mutable {
283+
auto impl = weakImpl.lock();
284+
if (!impl || impl->isClosing_) {
285+
return;
286+
}
287+
if (ec) {
288+
LOG(ERROR) << "Error writing to websocket: " << ec.message();
289+
return;
290+
}
291+
impl->isWriting_ = false;
292+
impl->write();
293+
message.reset(); // Release the message after it's been sent
294+
});
295+
}
296+
297+
void WebSocketClient::Impl::onConnectCallback(
298+
bool connected,
299+
const std::string& error) {
300+
if (onConnectCallback_) {
301+
onConnectCallback_(connected, error);
302+
}
303+
}
304+
305+
WebSocketClientFactory getWebSocketClientFactory() {
306+
return []() { return std::make_unique<WebSocketClient>(); };
307+
}
308+
309+
} // namespace facebook::react
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
8+
#pragma once
9+
10+
#include <react/http/IWebSocketClient.h>
11+
#include <memory>
12+
#include <string>
13+
#include <thread>
14+
15+
namespace facebook::react {
16+
17+
class WebSocketClient : public IWebSocketClient {
18+
public:
19+
WebSocketClient() noexcept;
20+
~WebSocketClient() override;
21+
WebSocketClient(WebSocketClient& other) = delete;
22+
WebSocketClient& operator=(WebSocketClient& other) = delete;
23+
WebSocketClient(WebSocketClient&& other) = delete;
24+
WebSocketClient& operator=(WebSocketClient&& other) = delete;
25+
26+
void setOnClosedCallback(OnClosedCallback&& callback) noexcept override;
27+
28+
void setOnMessageCallback(OnMessageCallback&& callback) noexcept override;
29+
30+
void connect(
31+
const std::string& url,
32+
OnConnectCallback&& onConnectCallback = nullptr) override;
33+
34+
void close(const std::string& reason) override;
35+
36+
void send(const std::string& message) override;
37+
38+
void ping() override;
39+
40+
private:
41+
struct Impl;
42+
43+
// Instance data and IO thread
44+
const std::shared_ptr<Impl> impl_;
45+
std::unique_ptr<std::thread> thread_;
46+
};
47+
48+
} // namespace facebook::react

0 commit comments

Comments
 (0)