Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patched #151

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
/download
Cargo.lock
.idea/
tmp/
1 change: 1 addition & 0 deletions rust-toolchain
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1.60.0
4 changes: 4 additions & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
tab_spaces = 2
max_width = 140
use_small_heuristics = "Off"
chain_width = 30
21 changes: 21 additions & 0 deletions shell.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{ pkgs ? import (fetchTarball "https://github.com/NixOS/nixpkgs/archive/fd3e33d696b81e76b30160dfad2efb7ac1f19879.tar.gz") {}
}:
pkgs.mkShell {
buildInputs = [
pkgs.stdenv.cc.cc.lib
pkgs.which
pkgs.rustup
pkgs.libiconv
pkgs.git
pkgs.openssh
pkgs.openssl.dev
pkgs.pkg-config
pkgs.cacert
pkgs.zlib
] ++ pkgs.lib.optionals pkgs.stdenv.isDarwin [ pkgs.darwin.apple_sdk.frameworks.SystemConfiguration ] ;
LIBCLANG_PATH = "${pkgs.llvmPackages.libclang.lib}/lib";
LD_LIBRARY_PATH="${pkgs.stdenv.cc.cc.lib}/lib:${pkgs.zlib}/lib";
RUSTC_VERSION = pkgs.lib.readFile ./rust-toolchain;
RUST_BACKTRACE=1;
CARGO_HOME="";
}
30 changes: 24 additions & 6 deletions src/browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use std::{
path::{Path, PathBuf},
};

use async_tungstenite::tungstenite::protocol::WebSocketConfig;
use futures::channel::mpsc::{channel, unbounded, Sender};
use futures::channel::oneshot::channel as oneshot_channel;
use futures::select;
use futures::SinkExt;

use chromiumoxide_cdp::cdp::browser_protocol::target::{
CreateBrowserContextParams, CreateTargetParams, DisposeBrowserContextParams, TargetId,
CreateBrowserContextParams, CreateTargetParams, DisposeBrowserContextParams, TargetId, SessionId,
};
use chromiumoxide_cdp::cdp::{CdpEventMessage, IntoEventKind};
use chromiumoxide_types::*;
Expand Down Expand Up @@ -53,9 +54,9 @@ pub struct Browser {

impl Browser {
/// Connect to an already running chromium instance via websocket
pub async fn connect(debug_ws_url: impl Into<String>) -> Result<(Self, Handler)> {
pub async fn connect(debug_ws_url: impl Into<String>, websocket_config: Option<WebSocketConfig>) -> Result<(Self, Handler)> {
let debug_ws_url = debug_ws_url.into();
let conn = Connection::<CdpEventMessage>::connect(&debug_ws_url).await?;
let conn = Connection::<CdpEventMessage>::connect(&debug_ws_url, websocket_config).await?;

let (tx, rx) = channel(1);

Expand All @@ -80,7 +81,11 @@ impl Browser {
/// This fails if no web socket url could be detected from the child
/// processes stderr for more than the configured `launch_timeout`
/// (20 seconds by default).
pub async fn launch(mut config: BrowserConfig) -> Result<(Self, Handler)> {
pub async fn launch(config: BrowserConfig) -> Result<(Self, Handler)> {
Self::launch_with_ws_config(config, None).await
}

pub async fn launch_with_ws_config(mut config: BrowserConfig, websocket_config: Option<WebSocketConfig>) -> Result<(Self, Handler)> {
// Canonalize paths to reduce issues with sandboxing
config.executable = utils::canonicalize(&config.executable).await?;

Expand All @@ -94,6 +99,7 @@ impl Browser {
async fn with_child(
config: &BrowserConfig,
child: &mut Child,
websocket_config: Option<WebSocketConfig>,
) -> Result<(String, Connection<CdpEventMessage>)> {
let dur = config.launch_timeout;
cfg_if::cfg_if! {
Expand All @@ -107,11 +113,11 @@ impl Browser {
};
// extract the ws:
let debug_ws_url = ws_url_from_output(child, timeout_fut).await?;
let conn = Connection::<CdpEventMessage>::connect(&debug_ws_url).await?;
let conn = Connection::<CdpEventMessage>::connect(&debug_ws_url, websocket_config).await?;
Ok((debug_ws_url, conn))
}

let (debug_ws_url, conn) = match with_child(&config, &mut child).await {
let (debug_ws_url, conn) = match with_child(&config, &mut child, websocket_config).await {
Ok(conn) => conn,
Err(e) => {
// An initialization error occurred, clean up the process
Expand Down Expand Up @@ -337,6 +343,18 @@ impl Browser {
to_command_response::<T>(resp, method)
}

pub async fn execute_with_session<T: Command>(&self, cmd: T, session_id: Option<SessionId>) -> Result<CommandResponse<T::Response>> {
let (tx, rx) = oneshot_channel();
let method = cmd.identifier();
let msg = CommandMessage::with_session(cmd, tx, session_id)?;
self.sender
.clone()
.send(HandlerMessage::Command(msg))
.await?;
let resp = rx.await??;
to_command_response::<T>(resp, method)
}

/// Return all of the pages of the browser
pub async fn pages(&self) -> Result<Vec<Page>> {
let (tx, rx) = oneshot_channel();
Expand Down
172 changes: 86 additions & 86 deletions src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ use crate::error::{CdpError, DeadlineExceeded, Result};
use crate::handler::REQUEST_TIMEOUT;

/// Deserialize a response
pub(crate) fn to_command_response<T: Command>(
resp: Response,
method: MethodId,
) -> Result<CommandResponse<T::Response>> {
if let Some(res) = resp.result {
let result = serde_json::from_value(res)?;
Ok(CommandResponse {
id: resp.id,
result,
method,
})
} else if let Some(err) = resp.error {
Err(err.into())
} else {
Err(CdpError::NoResponse)
}
pub fn to_command_response<T: Command>(resp: Response, method: MethodId) -> Result<CommandResponse<T::Response>> {
if let Some(res) = resp.result {
let result = serde_json::from_value(res.clone()).map_err(|e| {
let msg = format!("error converting response [{}] [{}] [{}]", method, res, e.to_string());
CdpError::ChromeMessage(msg)
})?;
Ok(CommandResponse {
id: resp.id,
result,
method,
})
} else if let Some(err) = resp.error {
Err(err.into())
} else {
Err(CdpError::NoResponse)
}
}

/// Messages used internally to communicate with the connection, which is
Expand All @@ -45,82 +45,82 @@ pub struct CommandMessage<T = Result<Response>> {
}

impl<T> CommandMessage<T> {
pub fn new<C: Command>(cmd: C, sender: OneshotSender<T>) -> serde_json::Result<Self> {
Ok(Self {
method: cmd.identifier(),
session_id: None,
params: serde_json::to_value(cmd)?,
sender,
})
}

/// Whether this command is a navigation
pub fn is_navigation(&self) -> bool {
self.method.as_ref() == NavigateParams::IDENTIFIER
}
pub fn new<C: Command>(cmd: C, sender: OneshotSender<T>) -> serde_json::Result<Self> {
Ok(Self {
method: cmd.identifier(),
session_id: None,
params: serde_json::to_value(cmd)?,
sender,
})
}

/// Whether this command is a navigation
pub fn is_navigation(&self) -> bool {
self.method.as_ref() == NavigateParams::IDENTIFIER
}

pub fn with_session<C: Command>(
cmd: C,
sender: OneshotSender<T>,
session_id: Option<SessionId>,
) -> serde_json::Result<Self> {
Ok(Self {
method: cmd.identifier(),
session_id,
params: serde_json::to_value(cmd)?,
sender,
})
}

pub fn split(self) -> (Request, OneshotSender<T>) {
(
Request {
method: self.method,
Ok(Self {
method: cmd.identifier(),
session_id,
params: serde_json::to_value(cmd)?,
sender,
})
}

pub fn split(self) -> (Request, OneshotSender<T>) {
(
Request {
method: self.method,
session_id: self.session_id.map(Into::into),
params: self.params,
},
self.sender,
)
}
params: self.params,
},
self.sender,
)
}
}

impl Method for CommandMessage {
fn identifier(&self) -> MethodId {
self.method.clone()
}
fn identifier(&self) -> MethodId {
self.method.clone()
}
}

#[derive(Debug)]
pub struct CommandChain {
/// The commands to process: (method identifier, params)
cmds: VecDeque<(MethodId, serde_json::Value)>,
/// The last issued command we currently waiting for its completion
waiting: Option<(MethodId, Instant)>,
/// The window a response after issuing a request must arrive
timeout: Duration,
/// The commands to process: (method identifier, params)
cmds: VecDeque<(MethodId, serde_json::Value)>,
/// The last issued command we currently waiting for its completion
waiting: Option<(MethodId, Instant)>,
/// The window a response after issuing a request must arrive
timeout: Duration,
}

pub type NextCommand = Poll<Option<Result<(MethodId, serde_json::Value), DeadlineExceeded>>>;

impl CommandChain {
/// Creates a new `CommandChain` from an `Iterator`.
///
/// The order of the commands corresponds to the iterator's
pub fn new<I>(cmds: I, timeout: Duration) -> Self
where
I: IntoIterator<Item = (MethodId, serde_json::Value)>,
{
Self {
cmds: VecDeque::from_iter(cmds),
waiting: None,
timeout,
}
/// Creates a new `CommandChain` from an `Iterator`.
///
/// The order of the commands corresponds to the iterator's
pub fn new<I>(cmds: I, timeout: Duration) -> Self
where
I: IntoIterator<Item = (MethodId, serde_json::Value)>,
{
Self {
cmds: VecDeque::from_iter(cmds),
waiting: None,
timeout,
}
}

/// queue in another request
pub fn push_back(&mut self, method: MethodId, params: serde_json::Value) {
/// queue in another request
pub fn push_back(&mut self, method: MethodId, params: serde_json::Value) {
self.cmds.push_back((method, params))
}
}

/// Removes the waiting state if the identifier matches that of the last
/// issued command
Expand All @@ -143,25 +143,25 @@ impl CommandChain {
cmd,
now - *deadline
);
Poll::Ready(Some(Err(DeadlineExceeded::new(now, *deadline))))
} else {
Poll::Pending
}
} else if let Some((method, val)) = self.cmds.pop_front() {
self.waiting = Some((method.clone(), now + self.timeout));
Poll::Ready(Some(Ok((method, val))))
} else {
Poll::Ready(None)
}
Poll::Ready(Some(Err(DeadlineExceeded::new(now, *deadline))))
} else {
Poll::Pending
}
} else if let Some((method, val)) = self.cmds.pop_front() {
self.waiting = Some((method.clone(), now + self.timeout));
Poll::Ready(Some(Ok((method, val))))
} else {
Poll::Ready(None)
}
}
}

impl Default for CommandChain {
fn default() -> Self {
Self {
cmds: Default::default(),
waiting: None,
timeout: Duration::from_millis(REQUEST_TIMEOUT),
}
fn default() -> Self {
Self {
cmds: Default::default(),
waiting: None,
timeout: Duration::from_millis(REQUEST_TIMEOUT),
}
}
}
12 changes: 8 additions & 4 deletions src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::collections::VecDeque;
use std::marker::PhantomData;
use std::pin::Pin;

use async_tungstenite::{tungstenite::protocol::WebSocketConfig, WebSocketStream};
use async_tungstenite::WebSocketStream;
use async_tungstenite::tungstenite::protocol::WebSocketConfig;
use futures::stream::Stream;
use futures::task::{Context, Poll};
use futures::Sink;
Expand Down Expand Up @@ -37,13 +38,14 @@ pub struct Connection<T: EventMessage> {
}

impl<T: EventMessage + Unpin> Connection<T> {
pub async fn connect(debug_ws_url: impl AsRef<str>) -> Result<Self> {
pub async fn connect(debug_ws_url: impl AsRef<str>, maybe_config: Option<WebSocketConfig>) -> Result<Self> {
let config = WebSocketConfig {
max_message_size: None,
max_frame_size: None,
max_send_queue: None,
..Default::default()
};
let config = maybe_config.unwrap_or(config);

cfg_if::cfg_if! {
if #[cfg(feature = "async-std-runtime")] {
Expand Down Expand Up @@ -136,13 +138,15 @@ impl<T: EventMessage + Unpin> Stream for Connection<T> {
// read from the ws
match Stream::poll_next(Pin::new(&mut pin.ws), cx) {
Poll::Ready(Some(Ok(msg))) => {
return match serde_json::from_slice::<Message<T>>(&msg.into_data()) {
let data = &msg.into_data();
return match serde_json::from_slice::<Message<T>>(data) {
Ok(msg) => {
tracing::trace!("Received {:?}", msg);
Poll::Ready(Some(Ok(msg)))
}
Err(err) => {
tracing::error!("Failed to deserialize WS response {}", err);
let data = std::str::from_utf8(data).unwrap_or_default();
tracing::error!("Failed to deserialize WS response {} [{}]", err, data);
Poll::Ready(Some(Err(err.into())))
}
};
Expand Down
Loading