1
- #include " RpcProvider .h"
1
+ #include " RpcServer .h"
2
2
#include " rpcHeader.pb.h"
3
3
#include < google/protobuf/descriptor.h>
4
4
#include " burger/net/Buffer.h"
@@ -8,10 +8,12 @@ using namespace burger::net;
8
8
using namespace burger ::rpc;
9
9
using namespace std ::placeholders;
10
10
11
- // 这是框架提供给外部使用,可以发布rpc方法的函数接口
12
- // service_name --》 service描述 --》 service* 记录的服务对象
13
- // --》 method_name --> method 方法对象
14
- void RpcProvider::NotifyService (google::protobuf::Service *service) {
11
+ namespace {
12
+ const size_t kHeaderPrefixNum = 4 ;
13
+
14
+ } // namespace
15
+
16
+ void RpcServer::NotifyService (google::protobuf::Service *service) {
15
17
ServiceInfo serviceInfo;
16
18
// 获取了服务对象的描述信息
17
19
const google::protobuf::ServiceDescriptor *serviceDescPtr = service->GetDescriptor ();
@@ -31,65 +33,40 @@ void RpcProvider::NotifyService(google::protobuf::Service *service) {
31
33
serviceInfoMap_.insert ({serviceName, serviceInfo});
32
34
}
33
35
34
- // 启动rpc服务结点,开始提供rpc远程网络调用服务
35
- void RpcProvider::Run () {
36
+ void RpcServer::Run () {
36
37
std::string ip = Config::Instance ().getString (" rpc" , " rpcServerIp" , " 127.0.0.1" );
37
38
uint16_t port = Config::Instance ().getUInt16 (" rpc" , " rpcServerPort" , 8000 );
38
39
InetAddress addr (ip, port);
39
- CoTcpServer server (&sched_, addr, " RpcProvider " );
40
+ CoTcpServer server (&sched_, addr, " RpcServer " );
40
41
41
- server.setConnectionHandler (std::bind (&RpcProvider ::connHandler, this , _1));
42
+ server.setConnectionHandler (std::bind (&RpcServer ::connHandler, this , _1));
42
43
server.setThreadNum (4 );
43
- INFO (" RpcProvider start service at {} : {}" , ip, port);
44
+ INFO (" RpcServer start service at {} : {}" , ip, port);
44
45
server.start ();
45
46
sched_.wait ();
46
47
}
47
48
48
- // 框架内部,RpcProvider 和 RpcConsumer 协商好之间通信的protobuf数据类型
49
- // service_name method_name
50
- // 我们需要定义proto的message类型,进行数据头的序列化和反序列化
51
- // 发来字节流 : 16UserserviceMITSK123
52
- // header_size(4个字节) + header_str + args_str
53
- // 考虑粘包问题,所以我们不仅要记录service_name, method_name 还要记录 args_size
54
- // header_size 要存成二进制, 利用std::string 的copy
55
49
// todo : 此处好好反复考虑粘包情况
56
- void RpcProvider ::connHandler (const CoTcpConnection::ptr& conn) {
50
+ void RpcServer ::connHandler (const CoTcpConnection::ptr& conn) {
57
51
Buffer::ptr buf = std::make_shared<Buffer>();
58
52
while (conn->recv (buf) > 0 ) {
59
53
// 网络上接受的远程rpc调用请求的字符流, Login args
60
54
std::string recvStr = buf->retrieveAllAsString ();
61
- // 从字节流中读取4个字节的内容
62
55
uint32_t headerSize = 0 ;
63
- size_t headerPrefixNum = 4 ;
64
- recvStr.copy (reinterpret_cast <char *>(&headerSize), headerPrefixNum, 0 );
65
-
66
- // 根据headerSize读取数据头的原始字符流
67
- std::string rpcHeaderStr = recvStr.substr (headerPrefixNum, headerSize);
68
- // 反序列化数据,得到rpc请求的详细信息
69
- RpcHeader rpcHeader;
56
+ std::string rpcHeaderStr = readHeader (recvStr, headerSize);
57
+
70
58
std::string serviceName;
71
59
std::string methodName;
72
60
uint32_t argsSize = 0 ;
73
- if (rpcHeader.ParseFromString (rpcHeaderStr)) {
74
- // 数据头反序列化成功
75
- serviceName = rpcHeader.servicename ();
76
- methodName = rpcHeader.methodname ();
77
- argsSize = rpcHeader.argssize ();
78
- } else {
79
- // 数据头反序列化失败
80
- ERROR (" rpcHeaderStr {} parse error" , rpcHeaderStr);
81
- return ;
82
- }
83
- // 获取rpc方法参数的字符流数据
84
- std::string argsStr = recvStr.substr (headerPrefixNum + headerSize, argsSize);
61
+ if (!deserializeHeader (rpcHeaderStr, serviceName, methodName, argsSize)) return ;
62
+
63
+ std::string argsStr = readArgs (recvStr, headerSize, argsSize);
85
64
#ifdef DEBUG
86
- INFO (" ====================================" );
87
- INFO (" headerSize : {}" , headerSize);
88
- INFO (" rpcHeaderStr : {}" , rpcHeaderStr);
89
- INFO (" serviceName : {}" , serviceName);
90
- INFO (" methodName : {}" , methodName);
91
- INFO (" argsSize : {}" , argsSize);
92
- INFO (" ====================================" );
65
+ DEBUG (" headerSize : {}" , headerSize);
66
+ DEBUG (" rpcHeaderStr : {}" , rpcHeaderStr);
67
+ DEBUG (" serviceName : {}" , serviceName);
68
+ DEBUG (" methodName : {}" , methodName);
69
+ DEBUG (" argsSize : {}" , argsSize);
93
70
#endif
94
71
// 获取service对象和method对象
95
72
auto it = serviceInfoMap_.find (serviceName);
@@ -102,6 +79,7 @@ void RpcProvider::connHandler(const CoTcpConnection::ptr& conn) {
102
79
if (mit == it->second .methodMap_ .end ()) {
103
80
ERROR (" method Name : {} dose not exist" , methodName);
104
81
}
82
+
105
83
google::protobuf::Service *service = it->second .service_ ; // 获取service对象 new UserService
106
84
const google::protobuf::MethodDescriptor *method = mit->second ; // 获取method对象 Login
107
85
@@ -115,11 +93,11 @@ void RpcProvider::connHandler(const CoTcpConnection::ptr& conn) {
115
93
116
94
// 下面的method方法的调用,绑定一个Closure的回调函数
117
95
// todo: 为何我们此处要手动去设置类型
118
- google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider ,
96
+ google::protobuf::Closure *done = google::protobuf::NewCallback<RpcServer ,
119
97
const CoTcpConnection::ptr&,
120
98
google::protobuf::Message *>
121
99
(this ,
122
- &RpcProvider ::sendRpcResonse,
100
+ &RpcServer ::sendRpcResonse,
123
101
conn, response);
124
102
// 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
125
103
// new UserService().Login(controller, request, response, done)
@@ -128,7 +106,7 @@ void RpcProvider::connHandler(const CoTcpConnection::ptr& conn) {
128
106
}
129
107
130
108
// Closure 的回调操作,用于序列化rpc的响应和网络发送
131
- void RpcProvider ::sendRpcResonse (const CoTcpConnection::ptr& conn, google::protobuf::Message* response) {
109
+ void RpcServer ::sendRpcResonse (const CoTcpConnection::ptr& conn, google::protobuf::Message* response) {
132
110
std::string responseStr;
133
111
if (response->SerializeToString (&responseStr)) { // response进行序列化
134
112
// 序列化成功后,通过网络把rpc方法执行的结果发送回rpc的调用方
@@ -138,3 +116,32 @@ void RpcProvider::sendRpcResonse(const CoTcpConnection::ptr& conn, google::proto
138
116
}
139
117
conn->shutdown (); // 短连接,主动关闭
140
118
}
119
+
120
+ std::string RpcServer::readHeader (const std::string& recvStr, uint32_t & headerSize) {
121
+ // 从字节流中读取4个字节的内容
122
+ // header_size 要存成二进制, 利用std::string 的copy
123
+ recvStr.copy (reinterpret_cast <char *>(&headerSize), kHeaderPrefixNum , 0 );
124
+ // 根据headerSize读取数据头的原始字符流
125
+ std::string rpcHeaderStr = recvStr.substr (kHeaderPrefixNum , headerSize);
126
+ return rpcHeaderStr;
127
+ }
128
+
129
+ bool RpcServer::deserializeHeader (const std::string& rpcHeaderStr, std::string& serviceName,
130
+ std::string& methodName, uint32_t & argsSize) {
131
+ RpcHeader rpcHeader;
132
+ if (rpcHeader.ParseFromString (rpcHeaderStr)) {
133
+ // 数据头反序列化成功
134
+ serviceName = rpcHeader.servicename ();
135
+ methodName = rpcHeader.methodname ();
136
+ argsSize = rpcHeader.argssize ();
137
+ return true ;
138
+ } else {
139
+ // 数据头反序列化失败
140
+ ERROR (" rpcHeaderStr {} parse error" , rpcHeaderStr);
141
+ return false ;
142
+ }
143
+ }
144
+
145
+ std::string RpcServer::readArgs (const std::string& recvStr, const uint32_t headerSize, const uint32_t argsSize) {
146
+ return recvStr.substr (kHeaderPrefixNum + headerSize, argsSize);
147
+ }
0 commit comments