Skip to content

Commit bd6a36f

Browse files
authored
add connect subcommand (#57)
* first commit * modify time depend version * separate connect handler from origin * read config from yaml
1 parent ff9a214 commit bd6a36f

File tree

7 files changed

+804
-12
lines changed

7 files changed

+804
-12
lines changed

bin/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,5 @@ webrtc = "0.9.0"
2424
serde_yaml = "0.9.30"
2525
notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] }
2626
futures = "0.3.30"
27+
time = "0.3.35"
28+
reqwest = { version = "0.11", features = ["json"] }

bin/src/cs.rs

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,19 @@
2020
#![allow(unused)]
2121

2222
use clap::Args;
23+
use log::info;
2324
use serde::{Deserialize, Serialize};
24-
use std::ffi::{c_char, c_void, CString};
25+
use std::{ffi::{c_char, c_void, CString}, fmt::Debug, process::ExitCode};
2526
include!("cs_bindings.rs");
2627

28+
use std::fs;
29+
use std::path::Path;
30+
use std::process;
31+
use std::sync::Arc;
32+
use tokio::sync::Mutex;
33+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
34+
use crate::peer::*;
35+
2736
#[derive(Args, Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
2837
pub struct ServerArgs {
2938
/// Config file path
@@ -38,6 +47,13 @@ pub struct ClientArgs {
3847
pub config: Option<String>,
3948
}
4049

50+
#[derive(Args, Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
51+
pub struct ConnectArgs {
52+
/// Config file path
53+
#[arg(short, long)]
54+
pub config: Option<String>,
55+
}
56+
4157
fn convert_to_go_slices(vec: &Vec<String>) -> (GoSlice, Vec<GoString>) {
4258
let mut go_slices: Vec<GoString> = Vec::with_capacity(vec.len());
4359

@@ -57,6 +73,85 @@ fn convert_to_go_slices(vec: &Vec<String>) -> (GoSlice, Vec<GoString>) {
5773
go_slices,
5874
)
5975
}
76+
77+
fn load_config(config_path: &str) -> Result<ConnectConfig, Box<dyn std::error::Error>> {
78+
// 验证文件是否存在
79+
if !Path::new(config_path).exists() {
80+
return Err(format!("Config file '{}' does not exist", config_path).into());
81+
}
82+
83+
// 读取文件内容
84+
let config_content = fs::read_to_string(config_path)
85+
.map_err(|e| format!("Failed to read config file '{}': {}", config_path, e))?;
86+
87+
// 验证文件不为空
88+
if config_content.trim().is_empty() {
89+
return Err("Config file is empty".into());
90+
}
91+
92+
// 解析 YAML
93+
let config: ConnectConfig = serde_yaml::from_str(&config_content)
94+
.map_err(|e| format!("Failed to parse YAML config: {}", e))?;
95+
96+
match serde_yaml::from_str::<ConnectConfig>(&config_content) {
97+
Ok(config) => println!("解析成功: {:?}", config),
98+
Err(e) => println!("解析错误: {}", e),
99+
}
100+
101+
// 验证必要的字段
102+
validate_config(&config)?;
103+
104+
Ok(config)
105+
}
106+
107+
fn validate_config(config: &ConnectConfig) -> Result<(), Box<dyn std::error::Error>> {
108+
// 配置验证
109+
if config.options.tcp_forward_addr.trim().is_empty() {
110+
return Err("tcp_forward_addr cannot be empty".into());
111+
}
112+
if config.options.tcp_forward_host_prefix.trim().is_empty() {
113+
return Err("tcp_forward_host_prefix cannot be empty".into());
114+
}
115+
Ok(())
116+
}
117+
118+
119+
pub fn run_connect(connect_args: ConnectArgs) {
120+
let mut args = if let Some(config_path) = &connect_args.config {
121+
match load_config(config_path) {
122+
Ok(config) => {
123+
println!("Successfully loaded config from '{}'", config_path);
124+
println!("Config details:");
125+
println!(" TCP Forward Address: {}", config.options.tcp_forward_addr);
126+
println!(" TCP Forward Host Prefix: {}", config.options.tcp_forward_host_prefix);
127+
config
128+
},
129+
Err(e) => {
130+
eprintln!("Error loading config: {}", e);
131+
process::exit(1);
132+
}
133+
}
134+
} else {
135+
println!("No config file specified, using default configuration");
136+
ConnectConfig::default()
137+
};
138+
info!("Run connect cmd.");
139+
let rt = tokio::runtime::Runtime::new().unwrap();
140+
rt.block_on(async move {
141+
info!("Runtime started.");
142+
let connect_reader = tokio::io::stdin();
143+
let connect_writer = tokio::io::stdout();
144+
// let reader = Arc::new(Mutex::new(connect_reader));
145+
// let writer = Arc::new(Mutex::new(connect_writer));
146+
if let Err(e) = process_connect(connect_reader, connect_writer, args).await {
147+
eprintln!("process p2p connect: {}", e);
148+
process::exit(1);
149+
};
150+
});
151+
unsafe {}
152+
// TODO
153+
}
154+
60155
pub fn run_client(client_args: ClientArgs) {
61156
let mut args = if let Some(config) = client_args.config {
62157
vec!["client".to_owned(), "-config".to_owned(), config]

bin/src/main.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::path::PathBuf;
1818

1919
use clap::Parser;
2020
use clap::Subcommand;
21+
use cs::ConnectArgs;
2122
use env_logger::Env;
2223
use log::{error, info};
2324

@@ -50,13 +51,17 @@ enum Commands {
5051
Server(ServerArgs),
5152
/// Run GT Client
5253
Client(ClientArgs),
54+
/// Run GT Connect
55+
Connect(ConnectArgs),
5356

5457
#[command(hide = true)]
5558
SubP2P,
5659
#[command(hide = true)]
5760
SubServer(ServerArgs),
5861
#[command(hide = true)]
5962
SubClient(ClientArgs),
63+
#[command(hide = true)]
64+
SubConnect(ConnectArgs),
6065
}
6166

6267
fn main() {
@@ -75,6 +80,7 @@ fn main() {
7580
depth: cli.depth,
7681
server_args: None,
7782
client_args: None,
83+
connect_args: None,
7884
};
7985
if let Some(command) = cli.command {
8086
match command {
@@ -84,6 +90,9 @@ fn main() {
8490
Commands::Client(args) => {
8591
manager_args.client_args = Some(args);
8692
}
93+
Commands::Connect(args) => {
94+
manager_args.connect_args = Some(args);
95+
}
8796
Commands::SubP2P => {
8897
info!("GT SubP2P");
8998
peer::start_peer_connection();
@@ -102,6 +111,12 @@ fn main() {
102111
info!("GT SubClient done");
103112
return;
104113
}
114+
Commands::SubConnect(args) => {
115+
info!("GT SubConnect");
116+
cs::run_connect(args);
117+
info!("GT SubConnect done");
118+
return;
119+
}
105120
}
106121
}
107122

bin/src/manager.rs

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,15 @@ use tokio::sync::{mpsc, Mutex, oneshot};
4141
use tokio::sync::oneshot::{Receiver, Sender};
4242
use tokio::time::timeout;
4343

44-
use crate::cs::{ClientArgs, ServerArgs};
44+
use crate::cs::{ClientArgs, ConnectArgs, ServerArgs};
4545

4646
#[derive(Debug)]
4747
pub struct ManagerArgs {
4848
pub config: Option<PathBuf>,
4949
pub depth: Option<u8>,
5050
pub server_args: Option<ServerArgs>,
5151
pub client_args: Option<ClientArgs>,
52+
pub connect_args: Option<ConnectArgs>,
5253
}
5354

5455
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
@@ -71,6 +72,7 @@ enum ProcessConfigEnum {
7172
Config(PathBuf),
7273
Server(ServerArgs),
7374
Client(ClientArgs),
75+
Connect(ConnectArgs),
7476
}
7577

7678
pub struct Manager {
@@ -323,6 +325,11 @@ impl Manager {
323325
$cmd.arg("-c").arg(path.clone());
324326
}
325327
}
328+
ProcessConfigEnum::Connect(args) => {
329+
if let Some(path) = &args.config {
330+
$cmd.arg("-c").arg(path.clone());
331+
}
332+
}
326333
}
327334
$cmd.stdin(Stdio::piped());
328335
$cmd.stdout(Stdio::piped());
@@ -452,17 +459,21 @@ impl Manager {
452459
async fn run_configs(&self, configs: Vec<ProcessConfigEnum>) -> Result<()> {
453460
let mut server_config = vec![];
454461
let mut client_config = vec![];
462+
let mut connect_config = vec![];
455463
for config in configs {
456464
match &config {
457465
ProcessConfigEnum::Config(path) => {
458466
if is_client_config_path(path).context("is_client_config_path failed")? {
459467
client_config.push(config);
460-
} else {
468+
} else if is_server_config_path(path).context("is_server_config_path failed")? {
461469
server_config.push(config);
470+
} else {
471+
connect_config.push(config);
462472
}
463473
}
464474
ProcessConfigEnum::Server(_) => server_config.push(config),
465475
ProcessConfigEnum::Client(_) => client_config.push(config),
476+
ProcessConfigEnum::Connect(_) => connect_config.push(config),
466477
}
467478
}
468479
if !server_config.is_empty() {
@@ -476,6 +487,12 @@ impl Manager {
476487
.await
477488
.context("run_client failed")?;
478489
}
490+
491+
if !connect_config.is_empty() {
492+
Self::run(self.cmds.clone(), connect_config, "sub-connect")
493+
.await
494+
.context("run_connect failed")?;
495+
}
479496
Ok(())
480497
}
481498

@@ -802,6 +819,11 @@ fn is_client_config_path(path: &PathBuf) -> Result<bool> {
802819
is_client_config(&yaml)
803820
}
804821

822+
fn is_server_config_path(path: &PathBuf) -> Result<bool> {
823+
let yaml = fs::read_to_string(path)?;
824+
is_server_config(&yaml)
825+
}
826+
805827
fn is_client_config(yaml: &str) -> Result<bool> {
806828
let c = serde_yaml::from_str::<Config>(yaml)?;
807829
if c.services.is_some() {
@@ -811,6 +833,23 @@ fn is_client_config(yaml: &str) -> Result<bool> {
811833
return match typ.as_str() {
812834
"client" => Ok(true),
813835
"server" => Ok(false),
836+
"connect" => Ok(false),
837+
t => Err(anyhow!("invalid config type {}", t)),
838+
};
839+
}
840+
Ok(false)
841+
}
842+
843+
fn is_server_config(yaml: &str) -> Result<bool> {
844+
let s = serde_yaml::from_str::<Config>(yaml)?;
845+
if s.services.is_some() {
846+
return Ok(false);
847+
}
848+
if let Some(typ) = s.typ {
849+
return match typ.as_str() {
850+
"client" => Ok(false),
851+
"server" => Ok(true),
852+
"connect" => Ok(false),
814853
t => Err(anyhow!("invalid config type {}", t)),
815854
};
816855
}

0 commit comments

Comments
 (0)