@@ -4,7 +4,11 @@ use hyper::{
4
4
Request , Response , Uri ,
5
5
} ;
6
6
use std:: time:: Duration ;
7
- use tower:: { builder:: ServiceBuilder , reconnect:: Reconnect , Service , ServiceExt } ;
7
+ use tower:: { MakeService , Service } ;
8
+ use tower:: limit:: rate:: { Rate , RateLimit } ;
9
+ use tower:: limit:: concurrency:: ConcurrencyLimit ;
10
+ use tower:: buffer:: Buffer ;
11
+ use tower:: retry:: Retry ;
8
12
use tower_hyper:: {
9
13
client:: { Builder , Connect } ,
10
14
retry:: { Body , RetryPolicy } ,
@@ -22,38 +26,42 @@ fn main() {
22
26
23
27
fn request ( ) -> impl Future < Item = Response < hyper:: Body > , Error = ( ) > {
24
28
let connector = Connector :: new ( HttpConnector :: new ( 1 ) ) ;
25
- let hyper = Connect :: new ( connector, Builder :: new ( ) ) ;
29
+ let mut hyper = Connect :: new ( connector, Builder :: new ( ) ) ;
26
30
27
- // RetryPolicy is a very simple policy that retries `n` times
28
- // if the response has a 500 status code. Here, `n` is 5.
29
- let policy = RetryPolicy :: new ( 5 ) ;
30
31
// We're calling the tower/examples/server.rs.
31
32
let dst = Destination :: try_from_uri ( Uri :: from_static ( "http://127.0.0.1:3000" ) ) . unwrap ( ) ;
32
33
33
- // Now, to build the service! We use two BufferLayers in order to:
34
- // - provide backpressure for the RateLimitLayer, and ConcurrencyLimitLayer
35
- // - meet `RetryLayer`'s requirement that our service implement `Service + Clone`
36
- // - ..and to provide cheap clones on the service.
37
- let maker = ServiceBuilder :: new ( )
38
- . buffer ( 5 )
39
- . rate_limit ( 5 , Duration :: from_secs ( 1 ) )
40
- . concurrency_limit ( 5 )
41
- . retry ( policy)
42
- . buffer ( 5 )
43
- . make_service ( hyper) ;
44
-
45
- // `Reconnect` accepts a destination and a MakeService, creating a new service
46
- // any time the connection encounters an error.
47
- let client = Reconnect :: new ( maker, dst) ;
34
+ // Now, to build the service!
35
+ let client = hyper
36
+ . make_service ( dst)
37
+ . map_err ( |err| eprintln ! ( "Connect Error {:?}" , err) )
38
+ . map ( |conn| {
39
+ //We use two BufferLayers in order to:
40
+ // - provide backpressure for the RateLimitLayer, and ConcurrencyLimitLayer
41
+ // - meet `RetryLayer`'s requirement that our service implement `Service + Clone`
42
+ // - ..and to provide cheap clones on the service.
43
+ let buf = Buffer :: new ( conn, 1 ) ;
44
+
45
+ // // RetryPolicy is a very simple policy that retries `n` times
46
+ // // if the response has a 500 status code. Here, `n` is 5.
47
+ let policy = RetryPolicy :: new ( 5 ) ;
48
+ let retry = Retry :: new ( policy, buf) ;
49
+
50
+ // - provide backpressure for the RateLimitLayer, and ConcurrencyLimitLayer
51
+ let rate_limit = RateLimit :: new ( retry, Rate :: new ( 5 , Duration :: from_secs ( 1 ) ) ) ;
52
+ let concurency_limit = ConcurrencyLimit :: new ( rate_limit, 5 ) ;
53
+
54
+ Buffer :: new ( concurency_limit, 1 )
55
+ } ) ;
48
56
49
57
let request = Request :: builder ( )
50
58
. method ( "GET" )
51
59
. body ( Body :: from ( Vec :: new ( ) ) )
52
60
. unwrap ( ) ;
53
61
54
- // we check to see if the client is ready to accept requests.
62
+ // let client = Reconnect::new(client, dst);
63
+
55
64
client
56
- . ready ( )
57
65
. map_err ( |e| panic ! ( "Service is not ready: {:?}" , e) )
58
66
. and_then ( |mut c| {
59
67
c. call ( request)
0 commit comments