Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Sep 13, 2024
1 parent 39f1ee8 commit b9e574e
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 28 deletions.
2 changes: 1 addition & 1 deletion core-tests/src/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ TEST(server, task_worker2) {
memcpy(buf.data, packet, strlen(packet));
buf.info.reactor_id = worker->id;
buf.info.ext_flags |= (SW_TASK_NONBLOCK | SW_TASK_CALLBACK);
ASSERT_GE(serv->gs->task_workers.dispatch(&buf, &_dst_worker_id), 0);
ASSERT_EQ(serv->gs->task_workers.dispatch(&buf, &_dst_worker_id), SW_OK);
sleep(1);
kill(serv->gs->master_pid, SIGTERM);
}
Expand Down
4 changes: 4 additions & 0 deletions include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,10 @@ class Server {
return gs->task_workers.get_task_id(task);
}

uint16_t get_command_id(EventData *cmd) {
return cmd->info.server_fd;
}

EventData *get_task_result() {
return &(task_results[swoole_get_process_id()]);
}
Expand Down
39 changes: 20 additions & 19 deletions src/server/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ void Server::call_command_callback(int64_t request_id, const std::string &result
return;
}
iter->second(this, result);
command_callbacks.erase(request_id);
}

void Server::call_command_handler(MessageBus &mb, uint16_t worker_id, Socket *sock) {
Expand Down Expand Up @@ -1126,56 +1127,58 @@ bool Server::command(WorkerId process_id,
}

int command_id = iter->second.id;
int64_t requset_id = command_current_request_id++;
int64_t request_id = command_current_request_id++;
Socket *pipe_sock;

SendData task{};
task.info.fd = requset_id;
task.info.fd = request_id;
task.info.reactor_id = process_id;
task.info.server_fd = command_id;
task.info.type = SW_SERVER_EVENT_COMMAND_REQUEST;
task.info.len = msg.length();
task.data = msg.c_str();

command_callbacks[request_id] = fn;

if (!(process_type & iter->second.accepted_process_types)) {
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_OPERATION_NOT_SUPPORT, "unsupported [process_type]");
_fail:
command_callbacks.erase(request_id);
return false;
}

if (process_type == Command::REACTOR_THREAD) {
if (!is_process_mode()) {
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_OPERATION_NOT_SUPPORT, "unsupported [server_mode]");
return false;
goto _fail;
}
if (process_id >= reactor_num) {
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_INVALID_PARAMS, "invalid thread_id[%d]", process_id);
return false;
goto _fail;
}
pipe_sock = get_worker(process_id)->pipe_worker;
} else if (process_type == Command::EVENT_WORKER) {
if (process_id >= worker_num) {
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_INVALID_PARAMS, "invalid worker_id[%d]", process_id);
return false;
goto _fail;
}
pipe_sock = get_worker(process_id)->pipe_master;
} else if (process_type == Command::TASK_WORKER) {
if (process_id >= task_worker_num) {
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_INVALID_PARAMS, "invalid task_worker_id[%d]", process_id);
return false;
goto _fail;
}
EventData buf;
memset(&buf.info, 0, sizeof(buf.info));
if (!task_pack(&buf, msg.c_str(), msg.length())) {
return false;
goto _fail;
}
buf.info.type = SW_SERVER_EVENT_COMMAND_REQUEST;
buf.info.fd = requset_id;
buf.info.fd = request_id;
buf.info.server_fd = command_id;
int _dst_worker_id = process_id;
if (gs->task_workers.dispatch(&buf, &_dst_worker_id) <= 0) {
return false;
if (gs->task_workers.dispatch(&buf, &_dst_worker_id) == SW_ERR) {
goto _fail;
}
command_callbacks[requset_id] = fn;
return true;
} else if (process_type == Command::MANAGER) {
EventData buf;
Expand All @@ -1185,31 +1188,29 @@ bool Server::command(WorkerId process_id,
"message is too large, maximum length is %lu, the given length is %lu",
sizeof(buf.data),
msg.length());
return false;
goto _fail;
}
memset(&buf.info, 0, sizeof(buf.info));
buf.info.type = SW_SERVER_EVENT_COMMAND_REQUEST;
buf.info.fd = requset_id;
buf.info.fd = request_id;
buf.info.server_fd = command_id;
buf.info.len = msg.length();
memcpy(buf.data, msg.c_str(), msg.length());
if (gs->event_workers.push_message(&buf) < 0) {
return false;
goto _fail;
}
command_callbacks[requset_id] = fn;
return true;
} else if (process_type == Command::MASTER) {
auto result = call_command_handler_in_master(command_id, msg);
fn(this, result);
return true;
} else {
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_OPERATION_NOT_SUPPORT, "unsupported [process_type]");
return false;
goto _fail;
}
if (!message_bus.write(pipe_sock, &task)) {
return false;
goto _fail;
}
command_callbacks[requset_id] = fn;
return true;
}

Expand Down
16 changes: 8 additions & 8 deletions src/server/task_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ void Server::init_task_workers() {
}
}

static int TaskWorker_call_command_handler(ProcessPool *pool, EventData *req) {
static int TaskWorker_call_command_handler(ProcessPool *pool, Worker *worker, EventData *req) {
Server *serv = (Server *) pool->ptr;
int command_id = req->info.server_fd;
int command_id = serv->get_command_id(req);
auto iter = serv->command_handlers.find(command_id);
if (iter == serv->command_handlers.end()) {
swoole_error_log(SW_LOG_ERROR, SW_ERROR_SERVER_INVALID_COMMAND, "Unknown command[%d]", command_id);
Expand All @@ -71,8 +71,8 @@ static int TaskWorker_call_command_handler(ProcessPool *pool, EventData *req) {
auto result = handler(serv, std::string(packet.data, packet.length));

SendData task{};
task.info.fd = req->info.fd;
task.info.reactor_id = sw_worker()->id;
task.info.fd = serv->get_task_id(req);
task.info.reactor_id = worker->id;
task.info.server_fd = -1;
task.info.type = SW_SERVER_EVENT_COMMAND_RESPONSE;
task.info.len = result.length();
Expand All @@ -92,7 +92,7 @@ static int TaskWorker_onTask(ProcessPool *pool, Worker *worker, EventData *task)
} else if (task->info.type == SW_SERVER_EVENT_SHUTDOWN) {
SwooleWG.shutdown = true;
} else if (task->info.type == SW_SERVER_EVENT_COMMAND_REQUEST) {
ret = TaskWorker_call_command_handler(pool, task);
ret = TaskWorker_call_command_handler(pool, worker, task);
} else {
ret = serv->onTask(serv, task);
/**
Expand Down Expand Up @@ -367,7 +367,7 @@ bool Server::finish(const char *data, size_t data_len, int flags, EventData *cur
}
buf.info.ext_flags |= flags;
buf.info.type = SW_SERVER_EVENT_FINISH;
buf.info.fd = current_task->info.fd;
buf.info.fd = get_task_id(current_task);

if (worker->pool->use_socket && worker->pool->stream_info_->last_connection) {
uint32_t _len = htonl(data_len);
Expand All @@ -376,7 +376,7 @@ bool Server::finish(const char *data, size_t data_len, int flags, EventData *cur
retval = worker->pool->stream_info_->last_connection->send_blocking(data, data_len);
}
} else {
retval = send_to_worker_from_worker(worker, &buf, sizeof(buf.info) + buf.info.len, SW_PIPE_MASTER);
retval = send_to_worker_from_worker(worker, &buf, buf.size(), SW_PIPE_MASTER);
}
} else {
uint64_t flag = 1;
Expand Down Expand Up @@ -418,7 +418,7 @@ bool Server::finish(const char *data, size_t data_len, int flags, EventData *cur
}
result->info.ext_flags |= flags;
result->info.type = SW_SERVER_EVENT_FINISH;
result->info.fd = current_task->info.fd;
result->info.fd = get_task_id(current_task);
}

// unlock worker
Expand Down

0 comments on commit b9e574e

Please sign in to comment.