Skip to content

Commit 3368797

Browse files
committed
stream discard
1 parent 74e4bff commit 3368797

File tree

5 files changed

+242
-0
lines changed

5 files changed

+242
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// Copyright (c) 2016-2017
2+
// Author: Chrono Law
3+
#include "NdgStreamDiscardInit.hpp"
4+
5+
auto ndg_stream_discard_module = NdgStreamDiscardInit::module();
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright (c) 2016-2017
2+
// Author: Chrono Law
3+
#ifndef _NDG_STREAM_DISCARD_CONF_HPP
4+
#define _NDG_STREAM_DISCARD_CONF_HPP
5+
6+
#include "NgxStreamAll.hpp"
7+
8+
class NdgStreamDiscardConf final
9+
{
10+
public:
11+
typedef NdgStreamDiscardConf this_type;
12+
public:
13+
NdgStreamDiscardConf() = default;
14+
~NdgStreamDiscardConf() = default;
15+
public:
16+
ngx_msec_t timeout = ngx_nil;
17+
public:
18+
static void* create(ngx_conf_t* cf)
19+
{
20+
return NgxPool(cf).alloc<this_type>();
21+
}
22+
23+
static char* merge(ngx_conf_t *cf, void *parent, void *child)
24+
{
25+
boost::ignore_unused(cf);
26+
27+
auto& prev = cast(parent);
28+
auto& conf = cast(child);
29+
30+
NgxValue::merge(conf.timeout, prev.timeout, 5000);
31+
32+
return NGX_CONF_OK;
33+
}
34+
public:
35+
static this_type& cast(void* p)
36+
{
37+
return *reinterpret_cast<this_type*>(p);
38+
}
39+
};
40+
41+
NGX_MOD_INSTANCE(NdgStreamDiscardModule,
42+
ndg_stream_discard_module, NdgStreamDiscardConf)
43+
44+
#endif //_NDG_STREAM_DISCARD_CONF_HPP
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright (c) 2016-2017
2+
// Author: Chrono Law
3+
#ifndef _NDG_STREAM_DISCARD_HANDLER_HPP
4+
#define _NDG_STREAM_DISCARD_HANDLER_HPP
5+
6+
#include <array>
7+
8+
#include "NdgStreamDiscardConf.hpp"
9+
10+
class NdgStreamDiscardHandler final
11+
{
12+
public:
13+
typedef NdgStreamDiscardHandler this_type;
14+
typedef NdgStreamDiscardModule this_module;
15+
public:
16+
static void handler(ngx_stream_session_t *s)
17+
try
18+
{
19+
NgxConnection conn(s);
20+
21+
NgxWriteEvent wev = conn.write_event();
22+
//wev.handler(&this_type::block_write_handler);
23+
wev.handler([](ngx_event_t *ev){});
24+
25+
NgxReadEvent rev = conn.read_event();
26+
27+
rev.handler(&this_type::discard_read_handler);
28+
29+
rev.process();
30+
}
31+
catch(const NgxException& e)
32+
{
33+
NgxLogError(s).print("error = %d", e.code());
34+
NgxStreamSession(s).close();
35+
}
36+
private:
37+
static void discard_read_handler(ngx_event_t *ev)
38+
try
39+
{
40+
NgxReadEvent rev(ev);
41+
NgxConnection conn(ev);
42+
43+
// check timedout and error
44+
NgxException::fail(rev.expired(), NGX_ETIMEDOUT);
45+
NgxException::fail(conn.closed());
46+
47+
if(rev.ready())
48+
{
49+
// read from client
50+
std::array<u_char, 64> buf;
51+
NgxLogError log(conn);
52+
53+
ssize_t n = 0;
54+
55+
// read until can not read
56+
for(;;)
57+
{
58+
n = conn.recv(buf.data(), buf.size());
59+
60+
if(n <= 0) // error accured
61+
{
62+
break;
63+
}
64+
65+
log.print("recv %d bytes", n);
66+
67+
conn.reusable(false);
68+
} // end for
69+
70+
//NgxException::fail(n == NGX_ERROR || n == 0);
71+
NgxException::fail(n != NGX_AGAIN);
72+
73+
//(n == NGX_AGAIN)
74+
}
75+
76+
NgxStreamSession s(ev);
77+
auto& cf = this_module::conf().srv(s);
78+
79+
//rev.timeout(cf.timeout, true);
80+
//rev.wait();
81+
rev.wait_for(cf.timeout, true);
82+
}
83+
catch(const NgxException& e)
84+
{
85+
NgxStreamSession s(ev);
86+
87+
NgxLogError(s).print("error = %d", e.code());
88+
s.close();
89+
}
90+
private:
91+
static void block_write_handler(ngx_event_t *ev)
92+
{
93+
// do nothing
94+
}
95+
96+
};
97+
98+
#endif //_NDG_STREAM_DISCARD_HANDLER_HPP
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright (c) 2016-2017
2+
// Author: Chrono Law
3+
#ifndef _NDG_STREAM_DISCARD_INIT_HPP
4+
#define _NDG_STREAM_DISCARD_INIT_HPP
5+
6+
#include "NdgStreamDiscardConf.hpp"
7+
#include "NdgStreamDiscardHandler.hpp"
8+
9+
class NdgStreamDiscardInit final
10+
{
11+
public:
12+
typedef NdgStreamDiscardConf conf_type;
13+
typedef NdgStreamDiscardHandler handler_type;
14+
typedef NdgStreamDiscardInit this_type;
15+
public:
16+
static ngx_command_t* cmds()
17+
{
18+
static ngx_command_t n[] =
19+
{
20+
{
21+
ngx_string("ndg_discard_time_out"),
22+
NgxTake(NGX_STREAM_SRV_CONF, 1),
23+
ngx_conf_set_msec_slot,
24+
NGX_STREAM_SRV_CONF_OFFSET,
25+
offsetof(conf_type, timeout),
26+
nullptr
27+
},
28+
29+
{
30+
ngx_string("ndg_stream_discard"),
31+
NgxTake(NGX_STREAM_SRV_CONF, 0),
32+
&this_type::set_discard,
33+
NGX_STREAM_SRV_CONF_OFFSET,
34+
0, nullptr
35+
},
36+
37+
ngx_null_command
38+
};
39+
40+
return n;
41+
}
42+
public:
43+
static ngx_stream_module_t* ctx()
44+
{
45+
static ngx_stream_module_t c =
46+
{
47+
NGX_MODULE_NULL(4),
48+
49+
&conf_type::create,
50+
&conf_type::merge,
51+
};
52+
53+
return &c;
54+
}
55+
public:
56+
static const ngx_module_t& module()
57+
{
58+
static ngx_module_t m =
59+
{
60+
NGX_MODULE_V1,
61+
62+
ctx(),
63+
cmds(),
64+
65+
NGX_STREAM_MODULE,
66+
NGX_MODULE_NULL(7),
67+
NGX_MODULE_V1_PADDING
68+
};
69+
70+
return m;
71+
}
72+
private:
73+
static char* set_discard(ngx_conf_t* cf, ngx_command_t* cmd, void* conf)
74+
{
75+
boost::ignore_unused(cmd, conf);
76+
77+
NgxStreamCoreModule::handler(
78+
cf, &handler_type::handler);
79+
80+
return NGX_CONF_OK;
81+
}
82+
};
83+
84+
#endif //_NDG_STREAM_DISCARD_INIT_HPP
85+

stream/discard/config

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#./configure --add-module=$HOME/ngx_cpp_dev/modules/test
2+
3+
ngx_module_type=STREAM
4+
ngx_module_name=ndg_stream_discard_module
5+
ngx_module_srcs="$ngx_addon_dir/ModNdgStreamDiscard.cpp"
6+
7+
. auto/module
8+
9+
ngx_addon_name=ndg_stream_discard_module
10+

0 commit comments

Comments
 (0)