Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Remote middleware #575

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

NeoLegends
Copy link

Hey! First of all, thank you for making tower! We've had great success using it.

In one of our applications we are running some parts of the code in single-threaded runtimes on dedicated threads. Those parts of the code periodically call a service that expects to be run on a multi-threaded runtime, however (e.g. by using task::block_in_place), and so we've been seeing panics.

This PR introduces a middleware that allows executing a service remotely on another runtime. It's a thin layer over Buffer that spawns the futures returned from the buffered service onto the service's executor, and then returns the JoinHandle (wrapped in a newtype) back to the user. The user then gets the result of running the service through the JoinHandle, potentially crossing runtime boundaries in the process.

TODO: Tests, more docs

@davidpdrsn
Copy link
Member

davidpdrsn commented Apr 3, 2021

I think this is kinda interesting. Its not a need I've had myself previously.

However, I don't believe its always valid to pass things around to different runtimes 🤔 I know that some tokio IO resources (such as TcpStream) have handles to the runtime (IO drivers) they were created on and can panic if used on another runtime, when their origin runtime has been shutdown.

For example consider this example:

use std::thread;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::runtime::Builder;
use tokio::sync::mpsc;

fn main() {
    // channel used to send things between runtimes
    let (tx, mut rx) = mpsc::channel(10);

    // create a runtime in a thread
    let t1 = thread::spawn(move || {
        Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap()
            .block_on(async {
                // create some IO resource and send it to another runtime
                let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
                let (socket, _) = listener.accept().await.unwrap();
                tx.send(socket).await.unwrap();
            });
    });

    // create another runtime in another thread
    let t2 = thread::spawn(move || {
        Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap()
            .block_on(async move {
                // receive the socket created on the other runtime
                let mut socket = rx.recv().await.unwrap();

                // read the request
                let mut buf = [0; 1014];
                let n = socket.read(&mut buf).await.unwrap(); // <- this panics because the IO driver is gone
                let request = String::from_utf8_lossy(&buf[..n]);
                println!("{}", request);

                // write a response
                socket.write_all(b"HTTP/1.1 200 OK\r\n").await.unwrap();
                socket.write_all(b"content-length: 0\r\n").await.unwrap();
                socket.write_all(b"\r\n").await.unwrap();
            });
    });

    t1.join().unwrap();
    t2.join().unwrap();
}

If you cargo run this and then do curl localhost:3000 it panics with:

thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "IO driver has terminated" }', src/main.rs:37:53
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Any', src/main.rs:49:15

So one could imagine a Service that produces TcpStreams which when combined with Remote would panic in some circumstances due to the origin runtime getting shutdown.

Is that something you have considered or run into?

@NeoLegends
Copy link
Author

Thank you for the feedback @davidpdrsn!

However, I don't believe its always valid to pass things around to different runtimes 🤔 I know that some tokio IO resources (such as TcpStream) have handles to the runtime (IO drivers) they were created on and can panic if used on another runtime, when their origin runtime has been shutdown.

This is correct, yes. Using resources across runtimes requires special caution, but can work fine as long as the origin runtime is longer-lived than the "consuming" runtime.

Is that something you have considered or run into?

In our case the origin runtime (the one we want to use Remote to run work on) is the main runtime, which outlives the single threaded ones. Those are short-lived and created on-demand, running untrusted code. We don't want to run that code on the main runtime to avoid the risk of blocking it.

Generally the plan for the Remote middleware is to create it on (or with a handle to) the runtime you're executing the service on, and then you're free to move the handle to another one.

@davidpdrsn
Copy link
Member

I'm not convinced this needs to live in tower rather than a separate crate given the gotchas but I could probably be convinced otherwise 😊

Would like to hear what others think. cc @hawkw @LucioFranco @jonhoo

@jonhoo
Copy link
Collaborator

jonhoo commented Aug 15, 2021

I think a good guide here is to see whether others also come asking for this. If so, I think it would make a lot of sense. But I think it's too early to adopt it pro-actively. But, at least now we have a starting point for if that day comes!

@LucioFranco
Copy link
Member

Am I going crazy but can't you just do this with Buffer? Right now the tokio::spawn is hard coded https://docs.rs/tower/0.4.8/src/tower/buffer/service.rs.html#76 but in theory you could use https://docs.rs/tokio/1.10.1/tokio/runtime/struct.Handle.html#method.enter maybe or we can just expose a fn(Fut) that you can then call your own executor via it.

@hawkw
Copy link
Member

hawkw commented Aug 26, 2021

Am I going crazy but can't you just do this with Buffer? Right now the tokio::spawn is hard coded https://docs.rs/tower/0.4.8/src/tower/buffer/service.rs.html#76 but in theory you could use https://docs.rs/tokio/1.10.1/tokio/runtime/struct.Handle.html#method.enter maybe or we can just expose a fn(Fut) that you can then call your own executor via it.

That is essentially what this code does; it uses Buffer::pair and spawns the returned worker task with a Handle:

pub fn with_handle(service: T, bound: usize, handle: &Handle) -> Self {
let (inner, worker) = Buffer::pair(Spawn::new(service, handle.clone()), bound);
handle.spawn(worker);

However, the other thing that this code does is wrapping the Buffered service (inside the Buffer) with a service that spawns the inner service's futures on the current runtime, and returns their JoinHandles. This ensures that the service's response futures run on the same runtime as the Buffer worker itself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants