Skip to content

Commit a3d937d

Browse files
authored
feat(xRPC): implement read method (#42)
1 parent 0b2e768 commit a3d937d

File tree

5 files changed

+786
-20
lines changed

5 files changed

+786
-20
lines changed

lib/resty/apisix/stream/xrpc/socket.lua

Lines changed: 98 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,129 @@
1-
require("table.clone")
21
local base = require("resty.core.base")
2+
local ffi = require("ffi")
3+
local ffi_str = ffi.string
4+
local C = ffi.C
5+
local FFI_AGAIN = base.FFI_AGAIN
6+
local FFI_DONE = base.FFI_DONE
7+
local FFI_ERROR = base.FFI_ERROR
8+
local get_string_buf = base.get_string_buf
9+
local get_size_ptr = base.get_size_ptr
10+
local get_request = base.get_request
11+
local co_yield = coroutine._yield
12+
local tab_clone = require("table.clone")
313

414

515
base.allows_subsystem("stream")
616

717

18+
ffi.cdef[[
19+
typedef unsigned char u_char;
20+
typedef struct ngx_stream_lua_socket_tcp_upstream_s
21+
ngx_stream_lua_socket_tcp_upstream_t;
22+
23+
int
24+
ngx_stream_lua_ffi_socket_tcp_read_buf(ngx_stream_lua_request_t *r,
25+
ngx_stream_lua_socket_tcp_upstream_t *u, u_char **res, size_t len,
26+
u_char *errbuf, size_t *errbuf_size);
27+
28+
int
29+
ngx_stream_lua_ffi_socket_tcp_get_read_buf_result(ngx_stream_lua_request_t *r,
30+
ngx_stream_lua_socket_tcp_upstream_t *u, u_char **res, size_t len,
31+
u_char *errbuf, size_t *errbuf_size);
32+
]]
33+
local socket_tcp_read = C.ngx_stream_lua_ffi_socket_tcp_read_buf
34+
local socket_tcp_get_read_result = C.ngx_stream_lua_ffi_socket_tcp_get_read_buf_result
35+
36+
37+
local ERR_BUF_SIZE = 256
38+
local SOCKET_CTX_INDEX = 1
39+
local resbuf = ffi.new("u_char*[1]")
840
local Downstream = {}
941
local Upstream = {}
1042
local downstream_mt
1143
local upstream_mt
1244

13-
-- need to remove methods which will break the buffer management
14-
local function remove_unwanted_method(sk)
45+
46+
local function get_tcp_socket(cosocket)
47+
local tcp_socket = cosocket[SOCKET_CTX_INDEX]
48+
if not tcp_socket then
49+
return error("bad tcp socket", 3)
50+
end
51+
52+
return tcp_socket
53+
end
54+
55+
56+
-- read the given length of data to a buffer in C land and return the buffer address
57+
-- return error if the read data is less than given length
58+
local function read_buf(cosocket, len)
59+
if len <= 0 then
60+
error("bad length: length of data should be positive, got " .. len, 2)
61+
end
62+
63+
if len > 4 * 1024 * 1024 then
64+
error("bad length: length of data too big, got " .. len, 2)
65+
end
66+
67+
local r = get_request()
68+
if not r then
69+
error("no request found", 2)
70+
end
71+
72+
local u = get_tcp_socket(cosocket)
73+
local errbuf = get_string_buf(ERR_BUF_SIZE)
74+
local errbuf_size = get_size_ptr()
75+
errbuf_size[0] = ERR_BUF_SIZE
76+
77+
local rc = socket_tcp_read(r, u, resbuf, len, errbuf, errbuf_size)
78+
if rc == FFI_DONE then
79+
error(ffi_str(errbuf, errbuf_size[0]), 2)
80+
end
81+
82+
while true do
83+
if rc == FFI_ERROR then
84+
return nil, ffi_str(errbuf, errbuf_size[0])
85+
end
86+
87+
if rc >= 0 then
88+
return resbuf[0]
89+
end
90+
91+
assert(rc == FFI_AGAIN)
92+
93+
co_yield()
94+
95+
errbuf = get_string_buf(ERR_BUF_SIZE)
96+
errbuf_size = get_size_ptr()
97+
errbuf_size[0] = ERR_BUF_SIZE
98+
rc = socket_tcp_get_read_result(r, u, resbuf, len, errbuf, errbuf_size)
99+
end
100+
end
101+
102+
103+
local function patch_methods(sk)
15104
local methods = getmetatable(sk).__index
16-
local copy = table.clone(methods)
105+
local copy = tab_clone(methods)
106+
-- need to remove methods which will break the buffer management
17107
copy.receive = nil
18108
copy.receiveany = nil
19109
copy.receiveuntil = nil
20110

111+
copy.read = read_buf
112+
21113
return {__index = copy}
22114
end
23115

24116

25117
local function set_method_table(sk, is_downstream)
26118
if is_downstream then
27119
if not downstream_mt then
28-
downstream_mt = remove_unwanted_method(sk)
120+
downstream_mt = patch_methods(sk)
29121
end
30122
return setmetatable(sk, downstream_mt)
31123
end
32124

33125
if not upstream_mt then
34-
upstream_mt = remove_unwanted_method(sk)
126+
upstream_mt = patch_methods(sk)
35127
end
36128
return setmetatable(sk, upstream_mt)
37129
end

0 commit comments

Comments
 (0)