Skip to content

Commit 1457d5b

Browse files
committed
subcribe modify delete script
1 parent 17631f6 commit 1457d5b

File tree

3 files changed

+167
-0
lines changed

3 files changed

+167
-0
lines changed

common_lua.cc

+59
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ extern "C" {
1818
#include "lauxlib.h"
1919
}
2020
#include "common_lua.h"
21+
#include "url-parser/url_parser.h"
22+
#include "h2load_utils.h"
2123

2224

2325

@@ -207,13 +209,70 @@ void register_3rd_party_lib_func_to_lua(lua_State* L)
207209
lua_settop(L, 0);
208210
}
209211

212+
int parse_uri(lua_State* L)
213+
{
214+
const std::string scheme_header = ":scheme";
215+
const std::string path_header = ":path";
216+
const std::string authority_header = ":authority";
217+
218+
std::string uri;
219+
if ((lua_gettop(L) == 1))
220+
{
221+
size_t len;
222+
const char* str = lua_tolstring(L, -1, &len);
223+
uri.assign(str, len);
224+
lua_pop(L, 1);
225+
}
226+
else
227+
{
228+
std::cerr << __FUNCTION__ << " invalid arguments" << std::endl;
229+
lua_settop(L, 0);
230+
}
231+
if (uri.size())
232+
{
233+
http_parser_url u {};
234+
if (http_parser_parse_url(uri.c_str(), uri.size(), 0, &u) == 0)
235+
{
236+
std::string path = get_reqline(uri.c_str(), u);
237+
std::string schema;
238+
std::string host;
239+
if (util::has_uri_field(u, UF_SCHEMA) && util::has_uri_field(u, UF_HOST))
240+
{
241+
schema = util::get_uri_field(uri.c_str(), u, UF_SCHEMA).str();
242+
host = util::get_uri_field(uri.c_str(), u, UF_HOST).str();
243+
if (util::has_uri_field(u, UF_PORT))
244+
{
245+
host.append(":").append(util::utos(u.port));
246+
}
247+
}
248+
lua_createtable(L, 0, schema.empty() ? 1 : 3);
249+
lua_pushlstring(L, path_header.c_str(), path_header.size());
250+
lua_pushlstring(L, path.c_str(), path.size());
251+
lua_rawset(L, -3);
252+
if (schema.size())
253+
{
254+
lua_pushlstring(L, scheme_header.c_str(), scheme_header.size());
255+
lua_pushlstring(L, schema.c_str(), schema.size());
256+
lua_rawset(L, -3);
257+
lua_pushlstring(L, authority_header.c_str(), authority_header.size());
258+
lua_pushlstring(L, host.c_str(), host.size());
259+
lua_rawset(L, -3);
260+
}
261+
return 1;
262+
}
263+
}
264+
return 0;
265+
}
266+
267+
210268
void init_new_lua_state_with_common_apis(lua_State* L)
211269
{
212270
lua_register(L, "time_since_epoch", time_since_epoch);
213271
lua_register(L, "store_value", store_value);
214272
lua_register(L, "get_value", get_value);
215273
lua_register(L, "delete_value", delete_value);
216274
lua_register(L, "generate_uuid_v4", generate_uuid);
275+
lua_register(L, "parse_uri", parse_uri);
217276
register_3rd_party_lib_func_to_lua(L);
218277
}
219278

common_lua.h

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ extern "C"
2020

2121
int generate_uuid(lua_State* L);
2222

23+
int parse_uri(lua_State* L);
24+
2325

2426
// from pb.so
2527
LUALIB_API int luaopen_pb_io(lua_State* L);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
local server_id = start_server("maock_subscribe_update_delete.json")
2+
3+
register_service_handler(server_id, "subscribe", "handle_subscribe", 20)
4+
5+
register_service_handler(server_id, "subs-update", "handle_subscribe_update", 20)
6+
7+
register_service_handler(server_id, "subs-del", "handle_ubsubscribe", 20)
8+
9+
local location_header_with_subs_id = {"/some-hardcoded-api-root/some-service/v2/some-operation", "/", "subs_id_placeholder"}
10+
11+
local timestamp_reporting_stat = 0
12+
13+
local pseudo_uuid_as_worker_thread_id_for_stats_output = generate_uuid_v4()
14+
15+
local number_of_notify_request_sent = 0
16+
17+
local number_of_notify_request_sent_last_second = 0
18+
19+
math.randomseed(os.time())
20+
21+
-- utility functions begin
22+
local function tokenize_path_and_query(path)
23+
path = path .. "?"
24+
results = string.gmatch(path, '([^?]+)')
25+
tokens = {}
26+
for res in results do
27+
table.insert(tokens, res)
28+
end
29+
return tokens
30+
end
31+
32+
local function tokenize_path(path)
33+
path = path .. "/"
34+
results = string.gmatch(path, '([^/]+)')
35+
tokens = {}
36+
for res in results do
37+
table.insert(tokens, res)
38+
end
39+
return tokens
40+
end
41+
-- utility functions end
42+
43+
function handle_subscribe(response_addr, request_headers, request_payload)
44+
subscription_id = generate_uuid_v4()
45+
store_value(subscription_id, request_payload)
46+
location_header_with_subs_id[2] = subscription_id
47+
response_header = {[":status"] = "201", ["location"] = table.concat(location_header_with_subs_id)}
48+
send_response(response_addr, response_header, "")
49+
end
50+
51+
function handle_subscribe_update(response_addr, request_headers, request_payload)
52+
path = headers[":path"]
53+
tokens = tokenize_path_and_query(path)
54+
path_without_query = tokens[1]
55+
path_tokens = tokenize_path(path_without_query)
56+
subscription_id = path_tokens[table.getn(path_tokens)]
57+
subscribe_request_payload = get_value(subscription_id)
58+
if (subscribe_request_payload == nil)
59+
then
60+
response_header = {[":status"] = "404"}
61+
send_response(response_addr, response_header, "")
62+
return
63+
end
64+
response_header = {[":status"] = "200"}
65+
send_response(response_addr, response_header, '{"status": "success"}')
66+
67+
-- send notify
68+
doc = rapidjson.Document()
69+
ok, message = doc:parse(subscribe_request_payload)
70+
notify_uri = doc:get("/callback-uri")
71+
72+
notify_headers = parse_uri(notify_uri)
73+
74+
notify_response_headers, notify_response_payload = send_http_request_and_await_response(notify_headers, "")
75+
76+
if (notify_response_headers ~= nil)
77+
then
78+
number_of_notify_request_sent = number_of_notify_request_sent + 1
79+
end
80+
now = time_since_epoch()
81+
if (now - timestamp_reporting_stat > 1000)
82+
then
83+
output = string.format("thread: %s, number request sent: %d, tps = %d", pseudo_uuid_as_worker_thread_id_for_stats_output, number_of_notify_request_sent, ((number_of_request_sent - number_of_notify_request_sent_last_second)*1000)/(now - timestamp_reporting_stat))
84+
number_of_notify_request_sent_last_second = number_of_notify_request_sent;
85+
timestamp_reporting_stat = now
86+
print (output)
87+
end
88+
end
89+
90+
function handle_ubsubscribe(response_addr, request_headers, request_payload)
91+
path = headers[":path"]
92+
tokens = tokenize_path_and_query(path)
93+
path_without_query = tokens[1]
94+
path_tokens = tokenize_path(path_without_query)
95+
subscription_id = path_tokens[table.getn(path_tokens)]
96+
subscribe_request_payload = get_value(subscription_id)
97+
if (subscribe_request_payload == nil)
98+
then
99+
response_header = {[":status"] = "404"}
100+
send_response(response_addr, response_header, "")
101+
return
102+
end
103+
response_header = {[":status"] = "204"}
104+
delete_value(subscription_id)
105+
send_response(response_addr, response_header, "")
106+
end

0 commit comments

Comments
 (0)