Skip to content

Commit

Permalink
Merge main into net
Browse files Browse the repository at this point in the history
  • Loading branch information
kokoISnoTarget committed Sep 13, 2024
2 parents 5f6bd08 + ac37659 commit f589134
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 0 deletions.
11 changes: 11 additions & 0 deletions packages/net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,14 @@ tokio = { workspace = true }
reqwest = { version = "0.12.7" }
data-url = "0.3.1"
thiserror = "1.0.63"
[package]
name = "blitz-net"
version = "0.1.0"
edition = "2021"

[dependencies]
blitz-traits = { path = "../traits" }
tokio = { workspace = true }
reqwest = { version = "0.12.7" }
data-url = "0.3.1"
thiserror = "1.0.63"
94 changes: 94 additions & 0 deletions packages/net/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,97 @@ impl<T: Send + Sync + 'static> Callback for MpscCallback<T> {
let _ = self.0.send(data);
}
}
use blitz_traits::net::{BoxedHandler, Callback, NetProvider, SharedCallback, Url};
use data_url::DataUrl;
use reqwest::Client;
use std::sync::Arc;
use thiserror::Error;
use tokio::runtime::Handle;

const USER_AGENT: &str = "Mozilla/5.0 (X11; Linux x86_64; rv:60.0) Gecko/20100101 Firefox/81.0";

pub struct Provider<D> {
rt: Handle,
client: Client,
callback: SharedCallback<D>,
}
impl<D> Provider<D> {
pub fn new(rt_handle: Handle, callback: SharedCallback<D>) -> Self {
Self {
rt: rt_handle,
client: Client::new(),
callback,
}
}
}
impl<D: 'static> Provider<D> {
pub fn is_empty(&self) -> bool {
Arc::strong_count(&self.callback) == 1
}
async fn fetch_inner(
client: Client,
url: Url,
callback: SharedCallback<D>,
handler: BoxedHandler<D>,
) -> Result<(), ProviderError> {
match url.scheme() {
"data" => {
let data_url = DataUrl::process(url.as_str())?;
let decoded = data_url.decode_to_vec()?;
handler.bytes(&decoded.0, callback);
}
"file" => {
let file_content = std::fs::read(url.path())?;
handler.bytes(&file_content, callback);
}
_ => {
let response = client
.request(handler.method(), url)
.header("User-Agent", USER_AGENT)
.send()
.await?;
handler.bytes(&response.bytes().await?, callback);
}
}
Ok(())
}
}

impl<D: 'static> NetProvider for Provider<D> {
type Data = D;
fn fetch(&self, url: Url, handler: BoxedHandler<Self::Data>) {
let client = self.client.clone();
let callback = Arc::clone(&self.callback);
drop(
self.rt
.spawn(Self::fetch_inner(client, url, callback, handler)),
);
}
}

#[derive(Error, Debug)]
enum ProviderError {
#[error("{0}")]
Io(#[from] std::io::Error),
#[error("{0}")]
DataUrl(#[from] data_url::DataUrlError),
#[error("{0}")]
DataUrlBas64(#[from] data_url::forgiving_base64::InvalidBase64),
#[error("{0}")]
ReqwestError(#[from] reqwest::Error),
}

use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
pub struct MpscCallback<T>(UnboundedSender<T>);
impl<T> MpscCallback<T> {
pub fn new() -> (UnboundedReceiver<T>, Self) {
let (send, recv) = unbounded_channel();
(recv, Self(send))
}
}
impl<T: Send + Sync + 'static> Callback for MpscCallback<T> {
type Data = T;
fn call(self: Arc<Self>, data: Self::Data) {
let _ = self.0.send(data);
}
}
7 changes: 7 additions & 0 deletions packages/traits/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ name = "blitz-traits"
version = "0.1.0"
edition = "2021"

[dependencies]
http = "1.1.0"
url = "2.5.2"[package]
name = "blitz-traits"
version = "0.1.0"
edition = "2021"

[dependencies]
http = "1.1.0"
url = "2.5.2"
1 change: 1 addition & 0 deletions packages/traits/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod net;
pub mod net;
42 changes: 42 additions & 0 deletions packages/traits/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,45 @@ impl<T: Sync + Send + 'static> Callback for DummyCallback<T> {
type Data = T;
fn call(self: Arc<Self>, _data: Self::Data) {}
}
pub use http::Method;
use std::marker::PhantomData;
use std::sync::Arc;
pub use url::Url;

pub type BoxedHandler<D> = Box<dyn RequestHandler<Data = D>>;
pub type SharedCallback<D> = Arc<dyn Callback<Data = D>>;
pub type SharedProvider<D> = Arc<dyn NetProvider<Data = D>>;

pub trait NetProvider {
type Data;
fn fetch(&self, url: Url, handler: BoxedHandler<Self::Data>);
}

pub trait RequestHandler: Send + Sync + 'static {
type Data;
fn bytes(self: Box<Self>, bytes: &[u8], callback: SharedCallback<Self::Data>);
fn method(&self) -> Method {
Method::GET
}
}

pub trait Callback: Send + Sync + 'static {
type Data;
fn call(self: Arc<Self>, data: Self::Data);
}

pub struct DummyProvider<D>(PhantomData<D>);
impl<D> Default for DummyProvider<D> {
fn default() -> Self {
Self(PhantomData)
}
}
impl<D> NetProvider for DummyProvider<D> {
type Data = D;
fn fetch(&self, _url: Url, _handler: BoxedHandler<Self::Data>) {}
}
pub struct DummyCallback<T>(PhantomData<T>);
impl<T: Sync + Send + 'static> Callback for DummyCallback<T> {
type Data = T;
fn call(self: Arc<Self>, _data: Self::Data) {}
}

0 comments on commit f589134

Please sign in to comment.