Skip to content

Commit

Permalink
fix test, optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Aug 15, 2024
1 parent 8abdfc3 commit f83381d
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 33 deletions.
5 changes: 4 additions & 1 deletion include/swoole_message_bus.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ class MessageBus {
* It is possible to operate the same pipe in multiple threads.
* Each thread must have a unique buffer and the socket memory must be separated.
*/
network::Socket *get_pipe_socket(int pipe_fd);
network::Socket *get_pipe_socket(network::Socket *sock) {
return pipe_sockets_[sock->get_fd()];
}
void init_pipe_socket(network::Socket *sock);
};
} // namespace swoole
16 changes: 5 additions & 11 deletions include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -803,23 +803,16 @@ class Server {
return connection_list[fd].socket;
}

/**
* [ReactorThread]
*/
network::Socket *get_worker_pipe_socket(Worker *worker) {
return get_thread(SwooleTG.id)->message_bus.get_pipe_socket(worker->pipe_master->get_fd());
}

network::Socket *get_command_reply_socket() {
return is_base_mode() ? get_worker(0)->pipe_master : pipe_command->get_socket(false);
}

int get_worker_pipe_master_fd(WorkerId id) {
return get_worker(id)->pipe_master->get_fd();
network::Socket *get_worker_pipe_master(WorkerId id) {
return get_worker(id)->pipe_master;
}

int get_worker_pipe_worker_fd(WorkerId id) {
return get_worker(id)->pipe_worker->get_fd();
network::Socket *get_worker_pipe_worker(WorkerId id) {
return get_worker(id)->pipe_worker;
}

/**
Expand Down Expand Up @@ -1382,6 +1375,7 @@ class Server {
void init_port_protocol(ListenPort *port);
void init_signal_handler();
void init_ipc_max_size();
void init_pipe_sockets(MessageBus *mb);

void set_max_connection(uint32_t _max_connection);

Expand Down
18 changes: 9 additions & 9 deletions src/protocol/message_bus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,17 +287,17 @@ size_t MessageBus::get_memory_size() {
return size;
}

network::Socket *MessageBus::get_pipe_socket(int pipe_fd) {
if ((size_t) pipe_fd >= pipe_sockets_.size()) {
auto _socket = make_socket(pipe_fd, SW_FD_PIPE);
_socket->buffer_size = UINT_MAX;
_socket->set_nonblock();
void MessageBus::init_pipe_socket(network::Socket *sock) {
int pipe_fd = sock->get_fd();
if (pipe_fd >= pipe_sockets_.size()) {
pipe_sockets_.resize(pipe_fd + 1);
pipe_sockets_[pipe_fd] = _socket;
return _socket;
} else {
return pipe_sockets_[pipe_fd];
}
auto _socket = make_socket(pipe_fd, SW_FD_PIPE);
_socket->buffer_size = UINT_MAX;
if (!_socket->nonblock) {
_socket->set_nonblock();
}
pipe_sockets_[pipe_fd] = _socket;
}

MessageBus::~MessageBus() {
Expand Down
14 changes: 10 additions & 4 deletions src/server/base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,16 @@ bool BaseFactory::finish(SendData *data) {

bool BaseFactory::forward_message(Session *session, SendData *data) {
Worker *worker = server_->gs->event_workers.get_worker(session->reactor_id);
int pipe_fd = worker->pipe_master->get_fd();
swoole_trace_log(SW_TRACE_SERVER, "forward message, fd=%d, len=%ld", pipe_fd, data->info.len);
auto message_bus = server_->get_worker_message_bus();
if (!message_bus->write(message_bus->get_pipe_socket(pipe_fd), data)) {
swoole_trace_log(SW_TRACE_SERVER,
"fd=%d, worker_id=%d, type=%d, len=%ld",
worker->pipe_master->get_fd(),
session->reactor_id,
data->info.type,
data->info.len);

auto mb = server_->get_worker_message_bus();
auto sock = server_->is_thread_mode() ? mb->get_pipe_socket(worker->pipe_master) : worker->pipe_master;
if (!mb->write(sock, data)) {
swoole_sys_warning("failed to send %u bytes to pipe_master", data->info.len);
return false;
}
Expand Down
11 changes: 11 additions & 0 deletions src/server/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,17 @@ void Server::init_ipc_max_size() {
#endif
}

void Server::init_pipe_sockets(MessageBus *mb) {
assert(is_started());
size_t n = get_core_worker_num();

SW_LOOP_N(n) {
Worker *worker = get_worker(i);
mb->init_pipe_socket(worker->pipe_master);
mb->init_pipe_socket(worker->pipe_worker);
}
}

/**
* allocate memory for Server::pipe_buffers
*/
Expand Down
14 changes: 11 additions & 3 deletions src/server/process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,18 @@ bool ProcessFactory::dispatch(SendData *task) {

SendData _task;
memcpy(&_task, task, sizeof(SendData));
network::Socket *sock;
MessageBus *mb;

network::Socket *pipe_socket =
server_->is_reactor_thread() ? server_->get_worker_pipe_socket(worker) : worker->pipe_master;
return server_->message_bus.write(pipe_socket, &_task);
if (server_->is_reactor_thread()) {
mb = &server_->get_thread(swoole_get_thread_id())->message_bus;
sock = mb->get_pipe_socket(worker->pipe_master);
} else {
mb = &server_->message_bus;
sock = worker->pipe_master;
}

return mb->write(sock, &_task);
}

static bool inline process_is_supported_send_yield(Server *serv, Connection *conn) {
Expand Down
12 changes: 7 additions & 5 deletions src/server/reactor_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -316,15 +316,15 @@ void ReactorThread::shutdown(Reactor *reactor) {
}

if (serv->is_thread_mode()) {
Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_worker_fd(reactor->id));
Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_worker(reactor->id));
reactor->del(socket);
}

SW_LOOP_N(serv->worker_num) {
if (i % serv->reactor_num != reactor->id) {
continue;
}
Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_master_fd(i));
Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_master(i));
reactor->del(socket);
}

Expand Down Expand Up @@ -751,15 +751,17 @@ int ReactorThread::init(Server *serv, Reactor *reactor, uint16_t reactor_id) {
}

serv->init_reactor(reactor);
serv->init_pipe_sockets(&message_bus);

if (serv->is_thread_mode()) {
Worker *worker = serv->get_worker(reactor_id);
serv->init_worker(worker);
auto pipe_worker = message_bus.get_pipe_socket(worker->pipe_worker->get_fd());
auto pipe_worker = message_bus.get_pipe_socket(worker->pipe_worker);
reactor->add(pipe_worker, SW_EVENT_READ);
}

if (serv->pipe_command) {
pipe_command = message_bus.get_pipe_socket(serv->pipe_command->get_socket(false)->get_fd());
pipe_command = message_bus.get_pipe_socket(serv->pipe_command->get_socket(false));
pipe_command->buffer_size = UINT_MAX;
}

Expand All @@ -774,7 +776,7 @@ int ReactorThread::init(Server *serv, Reactor *reactor, uint16_t reactor_id) {
if (i % serv->reactor_num != reactor_id) {
continue;
}
Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_master_fd(i));
Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_master(i));
if (reactor->add(socket, SW_EVENT_READ) < 0) {
return SW_ERR;
}
Expand Down

0 comments on commit f83381d

Please sign in to comment.