Skip to content

Commit bd9f1cb

Browse files
committed
[ampc] ability to send requests without timeouts (should be used sparringly)
1 parent 852a10c commit bd9f1cb

File tree

3 files changed

+21
-2
lines changed

3 files changed

+21
-2
lines changed

crates/core/src/ampc/worker.rs

+19
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ where
9898
Ok(res)
9999
}
100100

101+
fn send_raw_without_timeout(&self, req: &JobReq<Self::Job>) -> Result<JobResp<Self::Job>> {
102+
let mut conn = self.conn()?;
103+
let res = block_on(conn.send_without_timeout(req))?;
104+
Ok(res)
105+
}
106+
101107
fn send<R>(&self, req: R) -> R::Response
102108
where
103109
R: RequestWrapper<<Self::Job as Job>::Worker>,
@@ -107,6 +113,19 @@ where
107113
Resp::User(res) => R::unwrap_response(res).unwrap(),
108114
}
109115
}
116+
117+
fn send_without_timeout<R>(&self, req: R) -> R::Response
118+
where
119+
R: RequestWrapper<<Self::Job as Job>::Worker>,
120+
{
121+
match self
122+
.send_raw_without_timeout(&Req::User(R::wrap(req)))
123+
.unwrap()
124+
{
125+
Resp::Coordinator(_) => panic!("unexpected coordinator response"),
126+
Resp::User(res) => R::unwrap_response(res).unwrap(),
127+
}
128+
}
110129
}
111130

112131
pub trait RequestWrapper<W: Worker>: Message<W> {

crates/core/src/distributed/sonic/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ where
119119
}
120120
}
121121

122-
async fn send_without_timeout(&mut self, request: &Req) -> Result<Res> {
122+
pub async fn send_without_timeout(&mut self, request: &Req) -> Result<Res> {
123123
self.awaiting_res = true;
124124
let bytes = bincode::encode_to_vec(request, common::bincode_config()).unwrap();
125125

crates/core/src/entrypoint/ampc/shortest_path/worker.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ impl RemoteShortestPathWorker {
175175
}
176176

177177
pub fn sample_nodes(&self, num_nodes: u64) -> Vec<webgraph::NodeID> {
178-
self.send(SampleNodes(num_nodes))
178+
self.send_without_timeout(SampleNodes(num_nodes))
179179
}
180180
}
181181

0 commit comments

Comments
 (0)