@@ -9,9 +9,10 @@ use bytes::{BufMut, Bytes, BytesMut};
9
9
use hyper:: body:: to_bytes;
10
10
use hyper:: server:: conn:: AddrStream ;
11
11
use hyper:: service:: make_service_fn;
12
- use hyper:: { Body , Method , Request , Response } ;
12
+ use hyper:: { Body , Method , Request , Response , StatusCode } ;
13
13
use serde_json:: { json, Number } ;
14
14
use tokio:: sync:: { mpsc, oneshot} ;
15
+ use tonic:: codegen:: http;
15
16
use tower:: balance:: pool;
16
17
use tower:: load:: Load ;
17
18
use tower:: { service_fn, BoxError , MakeService , Service } ;
@@ -77,7 +78,7 @@ fn query_response_to_json(results: Vec<QueryResult>) -> anyhow::Result<Bytes> {
77
78
Ok ( buffer. into_inner ( ) . freeze ( ) )
78
79
}
79
80
80
- fn error ( msg : & str , code : u16 ) -> Response < Body > {
81
+ fn error ( msg : & str , code : StatusCode ) -> Response < Body > {
81
82
let err = json ! ( { "error" : msg } ) ;
82
83
Response :: builder ( )
83
84
. status ( code)
@@ -116,17 +117,28 @@ struct Message {
116
117
resp : oneshot:: Sender < Result < Vec < QueryResult > , BoxError > > ,
117
118
}
118
119
120
+ fn parse_payload ( data : & [ u8 ] ) -> Result < HttpQuery , Response < Body > > {
121
+ match serde_json:: from_slice ( data) {
122
+ Ok ( data) => Ok ( data) ,
123
+ Err ( e) => Err ( error ( & e. to_string ( ) , http:: status:: StatusCode :: BAD_REQUEST ) ) ,
124
+ }
125
+ }
126
+
119
127
async fn handle_query (
120
128
mut req : Request < Body > ,
121
129
sender : mpsc:: Sender < Message > ,
122
130
) -> anyhow:: Result < Response < Body > > {
123
131
let bytes = to_bytes ( req. body_mut ( ) ) . await ?;
124
- let req: HttpQuery = serde_json:: from_slice ( & bytes) ?;
132
+ let req = match parse_payload ( & bytes) {
133
+ Ok ( req) => req,
134
+ Err ( resp) => return Ok ( resp) ,
135
+ } ;
136
+
125
137
let ( s, resp) = oneshot:: channel ( ) ;
126
138
127
139
let queries = match parse_queries ( req. statements ) {
128
140
Ok ( queries) => queries,
129
- Err ( e) => return Ok ( error ( & e. to_string ( ) , 400 ) ) ,
141
+ Err ( e) => return Ok ( error ( & e. to_string ( ) , StatusCode :: BAD_REQUEST ) ) ,
130
142
} ;
131
143
132
144
let msg = Message { queries, resp : s } ;
@@ -138,7 +150,7 @@ async fn handle_query(
138
150
let json = query_response_to_json ( rows) ?;
139
151
Ok ( Response :: new ( Body :: from ( json) ) )
140
152
}
141
- Err ( _) | Ok ( Err ( _) ) => Ok ( error ( "internal error" , 500 ) ) ,
153
+ Err ( _) | Ok ( Err ( _) ) => Ok ( error ( "internal error" , StatusCode :: INTERNAL_SERVER_ERROR ) ) ,
142
154
}
143
155
}
144
156
0 commit comments