diff --git a/Cargo.lock b/Cargo.lock index 7a14da0..ae57f51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -111,7 +111,7 @@ dependencies = [ "log", "num-bigint", "quad-rand", - "rand", + "rand 0.8.5", "regex-lite", "serde", "serde_bytes", @@ -599,6 +599,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.40" @@ -608,6 +614,7 @@ dependencies = [ "android-tzdata", "iana-time-zone", "num-traits", + "serde", "windows-link", ] @@ -705,6 +712,16 @@ dependencies = [ "libc", ] +[[package]] +name = "core-foundation" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b55271e5c8c478ad3f38ad24ef34923091e0548492a266d19b3c0b4d82574c63" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -843,7 +860,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", - "rand", + "rand 0.8.5", "regex", "sqlparser", "tempfile", @@ -957,7 +974,7 @@ dependencies = [ "itertools 0.14.0", "log", "object_store", - "rand", + "rand 0.8.5", "tokio", "tokio-util", "url", @@ -985,7 +1002,7 @@ dependencies = [ "log", "object_store", "parking_lot", - "rand", + "rand 0.8.5", "tempfile", "url", ] @@ -1046,7 +1063,7 @@ dependencies = [ "itertools 0.14.0", "log", "md-5", - "rand", + "rand 0.8.5", "regex", "sha2", "unicode-segmentation", @@ -1290,12 +1307,14 @@ dependencies = [ "assert_cmd", "datafusion", "insta", + "object_store", "predicates", "reqwest", "structopt", "tempfile", "thiserror 2.0.12", "tokio", + "url", ] [[package]] @@ -1546,8 +1565,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1557,9 +1578,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi", "wasi 0.14.2+wasi-0.2.4", + "wasm-bindgen", ] [[package]] @@ -1727,6 +1750,7 @@ dependencies = [ "hyper", "hyper-util", "rustls", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", @@ -2226,7 +2250,7 @@ dependencies = [ "openssl-probe", "openssl-sys", "schannel", - "security-framework", + "security-framework 2.11.1", "security-framework-sys", "tempfile", ] @@ -2328,13 +2352,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3cfccb68961a56facde1163f9319e0d15743352344e7808a11795fb99698dcaf" dependencies = [ "async-trait", + "base64", "bytes", "chrono", "futures", "humantime", + "hyper", "itertools 0.13.0", + "md-5", "parking_lot", "percent-encoding", + "quick-xml", + "rand 0.8.5", + "reqwest", + "ring", + "serde", + "serde_json", "snafu", "tokio", "tracing", @@ -2517,7 +2550,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" dependencies = [ "phf_shared", - "rand", + "rand 0.8.5", ] [[package]] @@ -2654,6 +2687,70 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a651516ddc9168ebd67b24afd085a718be02f8858fe406591b013d101ce2f40" +[[package]] +name = "quick-xml" +version = "0.37.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4ce8c88de324ff838700f36fb6ab86c96df0e3c4ab6ef3a9b2044465cce1369" +dependencies = [ + "memchr", + "serde", +] + +[[package]] +name = "quinn" +version = "0.11.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3bd15a6f2967aef83887dcb9fec0014580467e33720d073560cf015a5683012" +dependencies = [ + "bytes", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2", + "thiserror 2.0.12", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b820744eb4dc9b57a3398183639c511b5a26d2ed702cedd3febaa1393caa22cc" +dependencies = [ + "bytes", + "getrandom 0.3.2", + "rand 0.9.1", + "ring", + "rustc-hash", + "rustls", + "rustls-pki-types", + "slab", + "thiserror 2.0.12", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "541d0f57c6ec747a90738a52741d3221f7960e8ac2f0ff4b1a63680e033b4ab5" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.40" @@ -2676,8 +2773,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", ] [[package]] @@ -2687,7 +2794,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", ] [[package]] @@ -2699,6 +2816,15 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.2", +] + [[package]] name = "recursive" version = "0.1.1" @@ -2790,7 +2916,11 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "quinn", + "rustls", + "rustls-native-certs", "rustls-pemfile", + "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", @@ -2798,11 +2928,14 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-rustls", + "tokio-util", "tower", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "windows-registry", ] @@ -2833,6 +2966,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rustc_version" version = "0.4.1" @@ -2862,12 +3001,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df51b5869f3a441595eac5e8ff14d486ff285f7b8c0df8770e49c3b56351f0f0" dependencies = [ "once_cell", + "ring", "rustls-pki-types", "rustls-webpki", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework 3.2.0", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -2882,6 +3034,9 @@ name = "rustls-pki-types" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" +dependencies = [ + "web-time", +] [[package]] name = "rustls-webpki" @@ -2937,7 +3092,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags 2.9.0", - "core-foundation", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" +dependencies = [ + "bitflags 2.9.0", + "core-foundation 0.10.0", "core-foundation-sys", "libc", "security-framework-sys", @@ -3256,7 +3424,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ "bitflags 2.9.0", - "core-foundation", + "core-foundation 0.9.4", "system-configuration-sys", ] @@ -3368,6 +3536,21 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinyvec" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09b3661f17e86524eccd4371ab0429194e0d7c008abb45f7a7495b1719463c71" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.44.2" @@ -3725,6 +3908,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.77" diff --git a/Cargo.toml b/Cargo.toml index 65a7687..e60eb94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,8 @@ tokio = { version="1.44.2", features = ["rt-multi-thread"]} predicates = "3.1.3" reqwest = "0.12.15" tempfile = "3" +object_store = { version="0.11.2", features = ["aws"] } +url = "2.5.4" [dev-dependencies] assert_cmd = "2" diff --git a/src/utils.rs b/src/utils.rs index 970aff9..cfca775 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -4,8 +4,11 @@ use datafusion::error::DataFusionError; use datafusion::prelude::*; use reqwest::Client; use std::path::{Path, PathBuf}; +use std::sync::Arc; use tempfile::NamedTempFile; use thiserror::Error; +use object_store::aws::AmazonS3Builder; +use url::Url; #[derive(Debug, PartialEq, Eq)] pub enum FileFormat { @@ -15,6 +18,13 @@ pub enum FileFormat { Avro, } +#[derive(Debug, PartialEq, Eq)] +pub enum StorageType { + Local, + Url, + S3, +} + #[derive(Error, Debug)] pub enum FileParseError { #[error("unsupported file format")] @@ -23,6 +33,12 @@ pub enum FileParseError { InvalidExtension, } +#[derive(Error, Debug)] +pub enum StorageTypeError { + #[error("unsupported storage type")] + UnsupportedStorageType, +} + #[derive(Error, Debug)] pub enum DfKitError { #[error("File parsing error: {0}")] @@ -45,6 +61,15 @@ pub enum DfKitError { #[error("Reqwest error: {0}")] Reqwest(#[from] reqwest::Error), + + #[error("Storage Type error: {0}")] + Storage(#[from] StorageTypeError), + + #[error("Parse error during URL parsing: {0}")] + UrlParse(#[from] url::ParseError), + + #[error("ObjectStore error: {0}")] + ObjectStore(#[from] object_store::Error), } pub fn file_type(file_path: &Path) -> Result { @@ -61,45 +86,86 @@ pub fn file_type(file_path: &Path) -> Result { } } +pub fn storage_type(file_path: &Path) -> Result { + let path_str = file_path + .to_str() + .ok_or(DfKitError::FileParse(FileParseError::InvalidExtension))?; + + if path_str.starts_with("http://") || path_str.starts_with("https://") { + Ok(StorageType::Url) + } else if path_str.starts_with("s3://") { + Ok(StorageType::S3) + } else if file_path.is_absolute() { + Ok(StorageType::Local) + } else { + Err(DfKitError::Storage(StorageTypeError::UnsupportedStorageType)) + } + +} + pub async fn register_table( ctx: &SessionContext, table_name: &str, file_path: &Path, ) -> Result { - let path_str = file_path - .to_str() - .ok_or(DfKitError::FileParse(FileParseError::InvalidExtension))?; - let is_url = path_str.starts_with("http://") || path_str.starts_with("https://"); + let storage_type = storage_type(file_path)?; + let (file_format, file_name): (FileFormat, String) = match storage_type { + StorageType::Local => { + let path = file_path.to_path_buf(); + let file_format = file_type(&path)?; + let file_name = path.to_str() + .ok_or(DfKitError::FileParse(FileParseError::InvalidExtension))? + .to_string(); + (file_format, file_name) + } + StorageType::Url => { + let path_str = file_path + .to_str() + .ok_or(DfKitError::FileParse(FileParseError::InvalidExtension))?; + let (_tmpfile, local_path) = download_to_tempfile(path_str).await?; + let file_format = file_type(&local_path)?; + let file_name = local_path + .to_str() + .ok_or(DfKitError::FileParse(FileParseError::InvalidExtension))? + .to_string(); + (file_format, file_name) + } + StorageType::S3 => { + let path_str = file_path + .to_str() + .ok_or(DfKitError::FileParse(FileParseError::InvalidExtension))?; + let url = Url::parse(path_str)?; + let bucket = url.host_str() + .ok_or_else(|| DfKitError::CustomError("Missing bucket in S3 URL".into()))?; + let store= Arc::from(AmazonS3Builder::from_env() + .with_bucket_name(bucket).build()?); - let actual_path = if is_url { - let (_tmpfile, local_path) = download_to_tempfile(path_str).await?; - local_path - } else { - file_path.to_path_buf() + ctx.runtime_env() + .register_object_store(&url, store); + + let file_format = file_type(&file_path.to_path_buf())?; + (file_format, path_str.to_string()) + } }; - let file_format = file_type(&actual_path)?; - let file_name = actual_path - .to_str() - .ok_or(DfKitError::FileParse(FileParseError::InvalidExtension))?; match file_format { FileFormat::Csv => { - ctx.register_csv(table_name, file_name, CsvReadOptions::default()) - .await? + ctx.register_csv(table_name, &file_name, CsvReadOptions::default()) + .await?; } FileFormat::Parquet => { - ctx.register_parquet(table_name, file_name, ParquetReadOptions::default()) - .await? + ctx.register_parquet(table_name, &file_name, ParquetReadOptions::default()) + .await?; } FileFormat::Json => { - ctx.register_json(table_name, file_name, NdJsonReadOptions::default()) - .await? + ctx.register_json(table_name, &file_name, NdJsonReadOptions::default()) + .await?; } FileFormat::Avro => { - ctx.register_avro(table_name, file_name, AvroReadOptions::default()) - .await? + ctx.register_avro(table_name, &file_name, AvroReadOptions::default()) + .await?; } - }; + } Ok(ctx.table(table_name).await?) }