Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"examples",
"codegen",
"interop", # Tests
"tests/current_thread",
"tests/disable_comments",
"tests/included_service",
"tests/same_name",
Expand Down
21 changes: 9 additions & 12 deletions examples/src/dynamic/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::env;
use tonic::{transport::server::RoutesBuilder, transport::Server, Request, Response, Status};
use tonic::{transport::server::Routes, transport::Server, Request, Response, Status};

use hello_world::greeter_server::{Greeter, GreeterServer};
use hello_world::{HelloReply, HelloRequest};
Expand Down Expand Up @@ -31,15 +31,15 @@ impl Echo for MyEcho {
}
}

fn init_echo(args: &[String], builder: &mut RoutesBuilder) {
fn init_echo(args: &[String], routes: &mut Routes) {
let enabled = args
.into_iter()
.find(|arg| arg.as_str() == "echo")
.is_some();
if enabled {
println!("Adding Echo service...");
let svc = EchoServer::new(MyEcho::default());
builder.add_service(svc);
routes.add_service(svc);
}
}

Expand All @@ -61,7 +61,7 @@ impl Greeter for MyGreeter {
}
}

fn init_greeter(args: &[String], builder: &mut RoutesBuilder) {
fn init_greeter(args: &[String], routes: &mut Routes) {
let enabled = args
.into_iter()
.find(|arg| arg.as_str() == "greeter")
Expand All @@ -70,25 +70,22 @@ fn init_greeter(args: &[String], builder: &mut RoutesBuilder) {
if enabled {
println!("Adding Greeter service...");
let svc = GreeterServer::new(MyGreeter::default());
builder.add_service(svc);
routes.add_service(svc);
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = env::args().collect();
let mut routes_builder = RoutesBuilder::default();
init_greeter(&args, &mut routes_builder);
init_echo(&args, &mut routes_builder);
let mut routes = Routes::default();
init_greeter(&args, &mut routes);
init_echo(&args, &mut routes);

let addr = "[::1]:50051".parse().unwrap();

println!("Grpc server listening on {}", addr);

Server::builder()
.add_routes(routes_builder.routes())
.serve(addr)
.await?;
Server::builder().add_routes(routes).serve(addr).await?;

Ok(())
}
2 changes: 1 addition & 1 deletion examples/src/h2c/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ mod h2c {

impl<S> Service<Request<Body>> for H2c<S>
where
S: Service<Request<Body>, Response = Response<tonic::transport::AxumBoxBody>>
S: Service<Request<Body>, Response = Response<tonic::body::BoxBody>>
+ Clone
+ Send
+ 'static,
Expand Down
22 changes: 22 additions & 0 deletions tests/current_thread/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
authors = ["Inkyu Lee <[email protected]>"]
edition = "2021"
license = "MIT"
name = "current_thread"
publish = false
version = "0.1.0"

[dependencies]
tokio-stream = "0.1"
prost = "0.12"
tonic = {path = "../../tonic", features = ["gzip"]}
tokio = {version = "1.0", features = ["macros", "rt-multi-thread", "net"]}

[build-dependencies]
tonic-build = {path = "../../tonic-build" }

[package.metadata.cargo-machete]
ignored = ["prost"]

[dev-dependencies]
tokio-stream = {version = "0.1.5", features = ["net"]}
7 changes: 7 additions & 0 deletions tests/current_thread/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
fn main() {
tonic_build::configure()
.local_executor(true)
.build_server(true)
.compile(&["proto/test.proto", "proto/stream.proto"], &["proto"])
.unwrap();
}
10 changes: 10 additions & 0 deletions tests/current_thread/proto/stream.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
syntax = "proto3";

package stream;

service TestStream {
rpc StreamCall(InputStream) returns (stream OutputStream);
}

message InputStream {}
message OutputStream {}
23 changes: 23 additions & 0 deletions tests/current_thread/proto/test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
syntax = "proto3";

package test;

service Test {
rpc UnaryCall(Input) returns (Output);
}

message Input {}
message Output {}

service Test1 {
rpc UnaryCall(Input1) returns (Output1);

rpc StreamCall(Input1) returns (stream Output1);
}

message Input1 {
bytes buf = 1;
}
message Output1 {
bytes buf = 1;
}
4 changes: 4 additions & 0 deletions tests/current_thread/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod pb {
tonic::include_proto!("stream");
tonic::include_proto!("test");
}
61 changes: 61 additions & 0 deletions tests/current_thread/tests/interceptor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use current_thread::pb::{test_client::TestClient, test_server, Input, Output};
use std::time::Duration;
use tokio::sync::oneshot;
use tonic::{
transport::{Endpoint, Server},
GrpcMethod, Request, Response, Status,
};

#[tokio::test]
async fn interceptor_retrieves_grpc_method() {
use test_server::Test;

struct Svc;

#[tonic::async_trait(?Send)]
impl Test for Svc {
async fn unary_call(&self, _: Request<Input>) -> Result<Response<Output>, Status> {
Ok(Response::new(Output {}))
}
}

let svc = test_server::TestServer::new(Svc);

let (tx, rx) = oneshot::channel();

let local = tokio::task::LocalSet::new();
local
.run_until(async move {
// Start the server now, second call should succeed
let jh = tokio::task::spawn_local(async move {
Server::builder()
.local_executor()
.add_service(svc)
.serve_with_shutdown("127.0.0.1:1340".parse().unwrap(), async {
drop(rx.await)
})
.await
.unwrap();
});

let channel = Endpoint::from_static("http://127.0.0.1:1340").connect_lazy();

fn client_intercept(req: Request<()>) -> Result<Request<()>, Status> {
println!("Intercepting client request: {:?}", req);

let gm = req.extensions().get::<GrpcMethod>().unwrap();
assert_eq!(gm.service(), "test.Test");
assert_eq!(gm.method(), "UnaryCall");

Ok(req)
}
let mut client = TestClient::with_interceptor(channel, client_intercept);

tokio::time::sleep(Duration::from_millis(100)).await;
client.unary_call(Request::new(Input {})).await.unwrap();

tx.send(()).unwrap();
jh.await.unwrap();
})
.await
}
112 changes: 112 additions & 0 deletions tests/current_thread/tests/routes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use std::time::Duration;

use tokio::sync::oneshot;
use tokio_stream::StreamExt;

use current_thread::pb::{
test1_client, test1_server, test_client, test_server, Input, Input1, Output, Output1,
};
use tonic::codegen::BoxStream;
use tonic::transport::server::LocalRoutes;
use tonic::{
transport::{Endpoint, Server},
Request, Response, Status,
};

#[tokio::test]
async fn multiple_service_using_routes() {
struct Svc1;

#[tonic::async_trait(?Send)]
impl test_server::Test for Svc1 {
async fn unary_call(&self, _req: Request<Input>) -> Result<Response<Output>, Status> {
Ok(Response::new(Output {}))
}
}

struct Svc2;

#[tonic::async_trait(?Send)]
impl test1_server::Test1 for Svc2 {
async fn unary_call(&self, request: Request<Input1>) -> Result<Response<Output1>, Status> {
Ok(Response::new(Output1 {
buf: request.into_inner().buf,
}))
}

type StreamCallStream = BoxStream<Output1>;

async fn stream_call(
&self,
request: Request<Input1>,
) -> Result<Response<Self::StreamCallStream>, Status> {
let output = Output1 {
buf: request.into_inner().buf,
};
let stream = tokio_stream::iter(vec![Ok(output)]);

Ok(Response::new(Box::pin(stream)))
}
}

let svc1 = test_server::TestServer::new(Svc1);
let svc2 = test1_server::Test1Server::new(Svc2);

let (tx, rx) = oneshot::channel::<()>();
let mut routes = LocalRoutes::default();
routes.add_service(svc1).add_service(svc2);

tokio::task::LocalSet::new()
.run_until(async move {
let jh = tokio::task::spawn_local(async move {
Server::builder()
.local_executor()
.add_routes(routes)
.serve_with_shutdown("127.0.0.1:1400".parse().unwrap(), async {
drop(rx.await)
})
.await
.unwrap();
});

tokio::time::sleep(Duration::from_millis(100)).await;

let channel = Endpoint::from_static("http://127.0.0.1:1400")
.connect()
.await
.unwrap();

let mut client1 = test_client::TestClient::new(channel.clone());
let mut client2 = test1_client::Test1Client::new(channel);

client1.unary_call(Input {}).await.unwrap();

let resp2 = client2
.unary_call(Input1 {
buf: b"hello".to_vec(),
})
.await
.unwrap()
.into_inner();
assert_eq!(&resp2.buf, b"hello");
let mut stream_response = client2
.stream_call(Input1 {
buf: b"world".to_vec(),
})
.await
.unwrap()
.into_inner();
let first = match stream_response.next().await {
Some(Ok(first)) => first,
_ => panic!("expected one non-error item in the stream call response"),
};

assert_eq!(&first.buf, b"world");
assert!(stream_response.next().await.is_none());

tx.send(()).unwrap();

jh.await.unwrap();
})
.await
}
65 changes: 65 additions & 0 deletions tests/current_thread/tests/streams.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use current_thread::pb::{test_stream_server, InputStream, OutputStream};
use tokio::sync::oneshot;
use tonic::{transport::Server, Request, Response, Status};

type Stream<T> = std::pin::Pin<
Box<dyn tokio_stream::Stream<Item = std::result::Result<T, Status>> + Send + 'static>,
>;

#[tokio::test]
async fn status_from_server_stream_with_source() {
struct Svc;

#[tonic::async_trait(?Send)]
impl test_stream_server::TestStream for Svc {
type StreamCallStream = Stream<OutputStream>;

async fn stream_call(
&self,
_: Request<InputStream>,
) -> Result<Response<Self::StreamCallStream>, Status> {
let s = Unsync(std::ptr::null_mut::<()>());

Ok(Response::new(Box::pin(s) as Self::StreamCallStream))
}
}

let svc = test_stream_server::TestStreamServer::new(Svc);

let (tx, rx) = oneshot::channel::<()>();

let local = tokio::task::LocalSet::new();
local
.run_until(async move {
let jh = tokio::task::spawn_local(async move {
Server::builder()
.local_executor()
.add_service(svc)
.serve_with_shutdown("127.0.0.1:1339".parse().unwrap(), async {
drop(rx.await)
})
.await
.unwrap();
});

tx.send(()).unwrap();

jh.await.unwrap();
})
.await
}

struct Unsync(*mut ());

unsafe impl Send for Unsync {}

impl tokio_stream::Stream for Unsync {
type Item = Result<OutputStream, Status>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
unimplemented!()
}
}
Loading