Skip to content

Commit ab7992c

Browse files
Fix UI
The recent upgrade to hyper 1.0 seems to have broken the UI. This commit fixes the issue with hyper_staticfile. It also changes the return types from a fixed Response<Full<Bytes>> to a custom JoshResponse type that allows us to deal with streaming responses instead of always buffering them. This type and some helper functions are defined in a new hyper_integration.rs file
1 parent bee495c commit ab7992c

File tree

3 files changed

+71
-50
lines changed

3 files changed

+71
-50
lines changed

josh-proxy/src/bin/josh-proxy.rs

Lines changed: 41 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,17 @@
22
extern crate lazy_static;
33
extern crate clap;
44

5-
use bytes::Bytes;
65
use clap::Parser;
7-
use http_body_util::Full;
86
use hyper::body::Incoming;
97
use hyper::server::conn::http1;
108
use hyper_util::rt::{tokio::TokioIo, tokio::TokioTimer};
119
use josh_proxy::cli;
1210
use josh_proxy::{FetchError, MetaConfig, RemoteAuth, RepoConfig, RepoUpdate, run_git_with_auth};
11+
use josh_proxy::hyper_integration::{JoshResponse, empty, full, erase};
1312
use tokio::pin;
1413
use tokio::sync::broadcast;
1514
use tracing_opentelemetry::OpenTelemetrySpanExt;
1615

17-
use futures::FutureExt;
1816
use hyper::service::service_fn;
1917
use hyper::{Request, Response, StatusCode};
2018

@@ -319,7 +317,7 @@ async fn fetch_upstream(
319317
async fn static_paths(
320318
service: &JoshProxyService,
321319
path: &str,
322-
) -> josh::JoshResult<Option<Response<Full<Bytes>>>> {
320+
) -> josh::JoshResult<Option<JoshResponse>> {
323321
tracing::debug!("static_path {:?}", path);
324322
if path == "/version" {
325323
return Ok(Some(make_response(
@@ -378,7 +376,7 @@ async fn static_paths(
378376
async fn repo_update_fn(
379377
_serv: Arc<JoshProxyService>,
380378
req: Request<Incoming>,
381-
) -> josh::JoshResult<Response<Full<Bytes>>> {
379+
) -> josh::JoshResult<JoshResponse> {
382380
let body = req.into_body().collect().await?.to_bytes();
383381

384382
let s = tracing::span!(tracing::Level::TRACE, "repo update worker");
@@ -402,10 +400,10 @@ async fn repo_update_fn(
402400
Ok(match result {
403401
Ok(stderr) => Response::builder()
404402
.status(hyper::StatusCode::OK)
405-
.body(Full::new(Bytes::from(stderr))),
403+
.body(full(stderr)),
406404
Err(josh::JoshError(stderr)) => Response::builder()
407405
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
408-
.body(Full::new(Bytes::from(stderr))),
406+
.body(full(stderr)),
409407
}?)
410408
}
411409

@@ -541,27 +539,27 @@ async fn do_filter(
541539
Ok(())
542540
}
543541

544-
fn make_response(body: &str, code: hyper::StatusCode) -> Response<Full<Bytes>> {
542+
fn make_response(body: &str, code: hyper::StatusCode) -> JoshResponse {
545543
let owned_body = body.to_owned();
546544
Response::builder()
547545
.status(code)
548546
.header(hyper::header::CONTENT_TYPE, "text/plain")
549-
.body(Full::new(Bytes::from(owned_body)))
547+
.body(full(owned_body))
550548
.expect("Can't build response")
551549
}
552550

553551
async fn handle_ui_request(
554552
req: Request<Incoming>,
555553
resource_path: &str,
556-
) -> josh::JoshResult<Response<Full<Bytes>>> {
554+
) -> josh::JoshResult<JoshResponse> {
557555
// Proxy: can be used for UI development or to serve a different UI
558556
if let Some(proxy) = &ARGS.static_resource_proxy_target {
559557
let client_ip = IpAddr::from_str("127.0.0.1").unwrap();
560558
return match hyper_reverse_proxy::call(client_ip, proxy, req).await {
561-
Ok(response) => Ok(response),
559+
Ok(response) => Ok(erase(response)),
562560
Err(error) => Ok(Response::builder()
563561
.status(StatusCode::INTERNAL_SERVER_ERROR)
564-
.body(Full::new(Bytes::from(format!("Proxy error: {:?}", error))))
562+
.body(full(format!("Proxy error: {:?}", error)))
565563
.unwrap()),
566564
};
567565
}
@@ -576,25 +574,21 @@ async fn handle_ui_request(
576574
|| resource_path == "/history";
577575

578576
let resolve_path = if is_app_route {
579-
"index.html"
577+
"/index.html"
580578
} else {
581579
resource_path
582580
};
583581

584-
let resolver = hyper_staticfile::Resolver::new("josh/static");
585582
let request = hyper::http::Request::get(resolve_path).body(()).unwrap();
586-
let result = resolver.resolve_request(&request).await?;
587-
let response = hyper::Response::new(Full::new(
588-
hyper_staticfile::ResponseBuilder::new()
589-
.request(&req)
590-
.build(result)?
591-
.into_body()
592-
.collect()
593-
.await?
594-
.to_bytes(),
595-
));
596583

597-
Ok(response)
584+
let resolver = hyper_staticfile::Static::new("/josh/static");
585+
586+
let res = resolver.serve(request).await.map_err(|e| match e {
587+
//TODO: handle errors
588+
_ => JoshError("Error serving static file".to_string())
589+
})?;
590+
591+
return Ok(erase(res));
598592
}
599593

600594
async fn query_meta_repo(
@@ -986,7 +980,7 @@ fn make_repo_update(
986980
async fn handle_serve_namespace_request(
987981
serv: Arc<JoshProxyService>,
988982
req: Request<Incoming>,
989-
) -> josh::JoshResult<Response<Full<Bytes>>> {
983+
) -> josh::JoshResult<JoshResponse> {
990984
let error_response = |status: StatusCode| Ok(make_response("", status));
991985

992986
if req.method() != hyper::Method::POST {
@@ -1201,7 +1195,7 @@ async fn handle_serve_namespace_request(
12011195
async fn call_service(
12021196
serv: Arc<JoshProxyService>,
12031197
req_auth: (josh_proxy::auth::Handle, Request<Incoming>),
1204-
) -> josh::JoshResult<Response<Full<Bytes>>> {
1198+
) -> josh::JoshResult<JoshResponse> {
12051199
let (auth, req) = req_auth;
12061200

12071201
let path = {
@@ -1271,7 +1265,7 @@ async fn call_service(
12711265
return Ok(Response::builder()
12721266
.status(hyper::StatusCode::FOUND)
12731267
.header("Location", redirect_path)
1274-
.body(Full::new(Bytes::new()))?);
1268+
.body(empty())?);
12751269
}
12761270
};
12771271

@@ -1320,7 +1314,7 @@ async fn call_service(
13201314
return Ok(Response::builder()
13211315
.status(hyper::StatusCode::TEMPORARY_REDIRECT)
13221316
.header("Location", format!("{}{}", remote_url, parsed_url.pathinfo))
1323-
.body(Full::new(Bytes::new()))?);
1317+
.body(empty())?);
13241318
}
13251319

13261320
let http_auth_required = ARGS.require_auth && parsed_url.pathinfo == "/git-receive-pack";
@@ -1347,7 +1341,7 @@ async fn call_service(
13471341
"Basic realm=User Visible Realm",
13481342
)
13491343
.status(hyper::StatusCode::UNAUTHORIZED);
1350-
return Ok(builder.body(Full::new(Bytes::new()))?);
1344+
return Ok(builder.body(empty())?);
13511345
}
13521346
}
13531347

@@ -1357,11 +1351,11 @@ async fn call_service(
13571351

13581352
if parsed_url.api == "/~/graphiql" {
13591353
let addr = format!("/~/graphql{}", meta.config.repo);
1360-
return Ok(tokio::task::spawn_blocking(move || {
1354+
return Ok(erase(tokio::task::spawn_blocking(move || {
13611355
josh_proxy::juniper_hyper::graphiql(&addr, None)
13621356
})
13631357
.in_current_span()
1364-
.await??);
1358+
.await??));
13651359
}
13661360

13671361
for fetch_repo in fetch_repos.iter() {
@@ -1385,11 +1379,11 @@ async fn call_service(
13851379
"Basic realm=User Visible Realm",
13861380
)
13871381
.status(hyper::StatusCode::UNAUTHORIZED);
1388-
return Ok(builder.body(Full::new(Bytes::new()))?);
1382+
return Ok(builder.body(empty())?);
13891383
}
13901384
Err(FetchError::Other(e)) => {
13911385
let builder = Response::builder().status(hyper::StatusCode::INTERNAL_SERVER_ERROR);
1392-
return Ok(builder.body(Full::new(Bytes::from(e.0)))?);
1386+
return Ok(builder.body(full(e.0))?);
13931387
}
13941388
}
13951389
}
@@ -1450,7 +1444,7 @@ async fn call_service(
14501444
// it is executed in all cases.
14511445
std::mem::drop(temp_ns);
14521446

1453-
Ok(cgi_response)
1447+
Ok(erase(cgi_response))
14541448
}
14551449

14561450
async fn serve_query(
@@ -1459,7 +1453,7 @@ async fn serve_query(
14591453
upstream_repo: String,
14601454
filter: josh::filter::Filter,
14611455
head_ref: &str,
1462-
) -> josh::JoshResult<Response<Full<Bytes>>> {
1456+
) -> josh::JoshResult<JoshResponse> {
14631457
let tracing_span = tracing::span!(tracing::Level::TRACE, "render worker");
14641458
let head_ref = head_ref.to_string();
14651459
let res = tokio::task::spawn_blocking(move || -> josh::JoshResult<_> {
@@ -1506,15 +1500,15 @@ async fn serve_query(
15061500
.get("content-type")
15071501
.unwrap_or(&"text/plain".to_string()),
15081502
)
1509-
.body(Full::new(Bytes::from(res)))?,
1503+
.body(full(res))?,
15101504

15111505
Ok(None) => Response::builder()
15121506
.status(hyper::StatusCode::NOT_FOUND)
1513-
.body(Full::new(Bytes::from("File not found".to_string())))?,
1507+
.body(full("File not found".to_string()))?,
15141508

15151509
Err(res) => Response::builder()
15161510
.status(hyper::StatusCode::UNPROCESSABLE_ENTITY)
1517-
.body(Full::new(Bytes::from(res.to_string())))?,
1511+
.body(full(res.to_string()))?,
15181512
})
15191513
}
15201514

@@ -1650,7 +1644,7 @@ async fn run_proxy() -> josh::JoshResult<i32> {
16501644
let _s = tracing::span!(
16511645
tracing::Level::TRACE,
16521646
"http_request",
1653-
path = _req.uri().path()
1647+
path = _req.uri().path().to_string()
16541648
);
16551649
let s = _s;
16561650

@@ -1677,9 +1671,8 @@ async fn run_proxy() -> josh::JoshResult<i32> {
16771671
};
16781672
let _e = s.enter();
16791673
trace_http_response_code(s.clone(), r.status());
1680-
r
1674+
Ok::<_, hyper::http::Error>(r)
16811675
}
1682-
.map(Ok::<_, hyper::http::Error>)
16831676
}),
16841677
);
16851678
pin!(conn);
@@ -1842,14 +1835,12 @@ async fn serve_graphql(
18421835
upstream_repo: String,
18431836
upstream: String,
18441837
auth: josh_proxy::auth::Handle,
1845-
) -> josh::JoshResult<Response<Full<Bytes>>> {
1838+
) -> josh::JoshResult<JoshResponse> {
18461839
let remote_url = upstream.clone() + upstream_repo.as_str();
18471840
let parsed = match josh_proxy::juniper_hyper::parse_req(req).await {
18481841
Ok(r) => r,
18491842
Err(resp) => {
1850-
return Ok(hyper::Response::new(Full::new(Bytes::from(
1851-
resp.collect().await?.to_bytes(),
1852-
))));
1843+
return Ok(erase(resp))
18531844
}
18541845
};
18551846

@@ -1909,12 +1900,12 @@ async fn serve_graphql(
19091900
"Basic realm=User Visible Realm",
19101901
)
19111902
.status(hyper::StatusCode::UNAUTHORIZED);
1912-
return Ok(builder.body(Full::new(Bytes::new()))?);
1903+
return Ok(builder.body(empty())?);
19131904
}
19141905
Err(FetchError::Other(e)) => {
19151906
let builder =
19161907
Response::builder().status(hyper::StatusCode::INTERNAL_SERVER_ERROR);
1917-
return Ok(builder.body(Full::new(Bytes::from(e.0)))?);
1908+
return Ok(builder.body(full(e.0))?);
19181909
}
19191910
};
19201911

@@ -1928,8 +1919,8 @@ async fn serve_graphql(
19281919
hyper::StatusCode::BAD_REQUEST
19291920
};
19301921

1931-
let body = Full::new(Bytes::from(serde_json::to_string_pretty(&res).unwrap()));
1932-
let mut resp = Response::new(Full::new(Bytes::new()));
1922+
let body = full(serde_json::to_string_pretty(&res).unwrap());
1923+
let mut resp = Response::new(empty());
19331924
*resp.status_mut() = code;
19341925
resp.headers_mut().insert(
19351926
hyper::header::CONTENT_TYPE,
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use bytes::Bytes;
2+
use http_body_util::{combinators::BoxBody, BodyExt};
3+
use hyper::Response;
4+
5+
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
6+
pub type JoshBody = BoxBody<Bytes, BoxError>;
7+
pub type JoshResponse = Response<JoshBody>;
8+
9+
pub fn empty() -> JoshBody {
10+
use http_body_util::{Empty, BodyExt};
11+
return Empty::<Bytes>::new()
12+
.map_err(|never| match never {})
13+
.boxed();
14+
}
15+
16+
pub fn full(b: impl Into<Bytes>) -> JoshBody {
17+
use http_body_util::{Full, BodyExt};
18+
return Full::<Bytes>::new(b.into())
19+
.map_err(|never| match never {})
20+
.boxed();
21+
}
22+
23+
pub fn erase<B>(res: hyper::Response<B>) -> JoshResponse
24+
where
25+
B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
26+
B::Error: Into<BoxError>,
27+
{
28+
res.map(|b| b.map_err(Into::into).boxed())
29+
}

josh-proxy/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub mod cli;
33
pub mod housekeeping;
44
pub mod juniper_hyper;
55
pub mod trace;
6+
pub mod hyper_integration;
67

78
#[macro_use]
89
extern crate lazy_static;

0 commit comments

Comments
 (0)