Skip to content

Commit 375f6c8

Browse files
authored
feat: add has_pending_data (#51)
1 parent 54c8d1c commit 375f6c8

File tree

3 files changed

+128
-2
lines changed

3 files changed

+128
-2
lines changed

lib/resty/apisix/stream/xrpc/socket.lua

+30
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,18 @@ ngx_stream_lua_ffi_socket_tcp_get_send_result(ngx_stream_lua_request_t *r,
4343
void
4444
ngx_stream_lua_ffi_socket_tcp_reset_read_buf(ngx_stream_lua_request_t *r,
4545
ngx_stream_lua_socket_tcp_upstream_t *u);
46+
47+
int
48+
ngx_stream_lua_ffi_socket_tcp_has_pending_data(ngx_stream_lua_request_t *r,
49+
ngx_stream_lua_socket_tcp_upstream_t *u,
50+
u_char *errbuf, size_t *errbuf_size);
4651
]]
4752
local socket_tcp_read = C.ngx_stream_lua_ffi_socket_tcp_read_buf
4853
local socket_tcp_get_read_result = C.ngx_stream_lua_ffi_socket_tcp_get_read_buf_result
4954
local socket_tcp_move = C.ngx_stream_lua_ffi_socket_tcp_send_from_socket
5055
local socket_tcp_get_move_result = C.ngx_stream_lua_ffi_socket_tcp_get_send_result
5156
local socket_tcp_reset_read_buf = C.ngx_stream_lua_ffi_socket_tcp_reset_read_buf
57+
local socket_tcp_has_pending_data = C.ngx_stream_lua_ffi_socket_tcp_has_pending_data
5258

5359

5460
local ERR_BUF_SIZE = 256
@@ -178,6 +184,29 @@ local function drain(cosocket, len)
178184
end
179185

180186

187+
-- has_pending_data check if there is unread data in the given socket.
188+
-- return false if there is no pending data, and return true if there may be any pending data.
189+
-- we require it to be called after any read methods called successfully.
190+
local function has_pending_data(cosocket)
191+
local r = get_request()
192+
if not r then
193+
error("no request found", 2)
194+
end
195+
196+
local u = get_tcp_socket(cosocket)
197+
198+
local errbuf = get_string_buf(ERR_BUF_SIZE)
199+
local errbuf_size = get_size_ptr()
200+
errbuf_size[0] = ERR_BUF_SIZE
201+
202+
local rc = socket_tcp_has_pending_data(r, u, errbuf, errbuf_size)
203+
if rc == FFI_ERROR then
204+
return nil, ffi_str(errbuf, errbuf_size[0])
205+
end
206+
return rc == FFI_AGAIN
207+
end
208+
209+
181210
-- move the buffers from src cosocket to dst cosocket. The buffers are from previous one or multiple
182211
-- read calls. It is equal to send multiple read buffer in the src cosocket to the dst cosocket.
183212
local function move(dst, src)
@@ -256,6 +285,7 @@ local function patch_methods(sk)
256285
copy.read_line = read_line
257286
copy.move = move
258287
copy.reset_read_buf = reset_read_buf
288+
copy.has_pending_data = has_pending_data
259289

260290
return {__index = copy}
261291
end

patch/1.19.9/ngx_stream_lua-xrpc.patch

+27-2
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ index e65d572..25175a5 100644
8181
#endif /* _NGX_STREAM_LUA_INPUT_FILTERS_H_INCLUDED_ */
8282

8383
diff --git src/ngx_stream_lua_socket_tcp.c src/ngx_stream_lua_socket_tcp.c
84-
index 7fcfb45..a21561b 100644
84+
index 7fcfb45..9981b4b 100644
8585
--- src/ngx_stream_lua_socket_tcp.c
8686
+++ src/ngx_stream_lua_socket_tcp.c
8787
@@ -234,6 +234,41 @@ enum {
@@ -158,7 +158,7 @@ index 7fcfb45..a21561b 100644
158158
static ngx_int_t
159159
ngx_stream_lua_socket_tcp_read(ngx_stream_lua_request_t *r,
160160
ngx_stream_lua_socket_tcp_upstream_t *u)
161-
@@ -6005,6 +6065,647 @@ static ngx_int_t ngx_stream_lua_socket_insert_buffer(
161+
@@ -6005,6 +6065,672 @@ static ngx_int_t ngx_stream_lua_socket_insert_buffer(
162162
}
163163

164164

@@ -278,6 +278,7 @@ index 7fcfb45..a21561b 100644
278278
+ p = ngx_strerror(u->socket_errno, errbuf, *errbuf_size);
279279
+ /* for compatibility with LuaSocket */
280280
+ ngx_strlow(errbuf, errbuf, p - errbuf);
281+
+ *errbuf_size = p - errbuf;
281282
+
282283
+ } else {
283284
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "error")
@@ -585,6 +586,30 @@ index 7fcfb45..a21561b 100644
585586
+}
586587
+
587588
+
589+
+int
590+
+ngx_stream_lua_ffi_socket_tcp_has_pending_data(ngx_stream_lua_request_t *r,
591+
+ ngx_stream_lua_socket_tcp_upstream_t *u,
592+
+ u_char *errbuf, size_t *errbuf_size)
593+
+{
594+
+ /* skip many input checks as we require glance to be called
595+
+ * after any read methods called successfully */
596+
+
597+
+ if (u == NULL || u->bufs_in == NULL) {
598+
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "closed") - errbuf;
599+
+ return NGX_ERROR;
600+
+ }
601+
+
602+
+ /* no remain data */
603+
+ if ((u->buffer.last == u->buffer.pos && u->buffer.last != u->buffer.end)
604+
+ || u->eof /* EOF reached */)
605+
+ {
606+
+ return NGX_OK;
607+
+ }
608+
+
609+
+ return NGX_AGAIN;
610+
+}
611+
+
612+
+
588613
+static void
589614
+ngx_stream_lua_ffi_socket_write_error_retval_handler(
590615
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,

t/stream/xrpc/misc.t

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
use t::APISIX_NGINX 'no_plan';
2+
3+
run_tests();
4+
5+
__DATA__
6+
7+
=== TEST 1: has_pending_data
8+
--- stream_server_config
9+
content_by_lua_block {
10+
local sk = require("resty.apisix.stream.xrpc.socket").downstream.socket()
11+
local p, err, len = sk:read_line(128)
12+
if err then
13+
ngx.say(err)
14+
return
15+
end
16+
ngx.say(sk:has_pending_data())
17+
}
18+
--- stream_request eval
19+
"hello world\r\n" x 2
20+
--- stream_response
21+
true
22+
23+
24+
25+
=== TEST 2: has_pending_data, all are read
26+
--- stream_server_config
27+
content_by_lua_block {
28+
local sk = require("resty.apisix.stream.xrpc.socket").downstream.socket()
29+
local p, err, len = sk:read_line(128)
30+
if err then
31+
ngx.say(err)
32+
return
33+
end
34+
ngx.say(sk:has_pending_data())
35+
}
36+
--- stream_request eval
37+
"hello world\r\n"
38+
--- stream_response
39+
false
40+
41+
42+
43+
=== TEST 3: has_pending_data, multiple read
44+
--- stream_server_config
45+
content_by_lua_block {
46+
local sk = require("resty.apisix.stream.xrpc.socket").downstream.socket()
47+
assert(sk:read(4))
48+
assert(sk:drain(7))
49+
ngx.say(sk:has_pending_data())
50+
}
51+
--- stream_request eval
52+
"hello world"
53+
--- stream_response
54+
false
55+
56+
57+
58+
=== TEST 4: has_pending_data, buffer is greater than lua_socket_buffer_size
59+
--- stream_server_config
60+
lua_socket_buffer_size 128;
61+
content_by_lua_block {
62+
local sk = require("resty.apisix.stream.xrpc.socket").downstream.socket()
63+
assert(sk:read(132))
64+
-- In this case, the has_pending_data has to return true as
65+
-- there is no way to know if there is pending data without a read
66+
ngx.say(sk:has_pending_data())
67+
}
68+
--- stream_request eval
69+
"1234" x 33
70+
--- stream_response
71+
true

0 commit comments

Comments
 (0)