diff --git a/Cargo.lock b/Cargo.lock index de976d0..e1a5186 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,7 +9,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf6ccdb167abbf410dcb915cabd428929d7f6a04980b54a11f26a39f1c7f7107" dependencies = [ "cfg-if", - "const-random", "getrandom", "once_cell", "version_check", @@ -33,15 +32,16 @@ dependencies = [ [[package]] name = "arrow2" -version = "0.15.0" +version = "0.17.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b843531e0a9d8dac44b0aa6adc926b2d970e8a627fe2105cd0498d5f93a6e97f" +checksum = "15ae0428d69ab31d7b2adad22a752d6f11fef2e901d2262d0cad4f5cb08b7093" dependencies = [ "ahash", "arrow-format", "base64", "bytemuck", "chrono", + "comfy-table", "dyn-clone", "either", "ethnum", @@ -85,7 +85,7 @@ checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -96,7 +96,7 @@ checksum = "677d1d8ab452a3936018a687b20e6f7cf5363d713b732b8884001317b0e48aa3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -107,9 +107,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "base64" -version = "0.13.1" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" +checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d" [[package]] name = "bitflags" @@ -140,7 +140,7 @@ checksum = "5fe233b960f12f8007e3db2d136e3cb1c291bfd7396e384ee76025fc1a3932b4" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -166,33 +166,16 @@ dependencies = [ ] [[package]] -name = "const-random" -version = "0.1.15" +name = "comfy-table" +version = "6.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368a7a772ead6ce7e1de82bfb04c485f3db8ec744f72925af5735e29a22cc18e" +checksum = "7e959d788268e3bf9d35ace83e81b124190378e4c91c9067524675e33394b8ba" dependencies = [ - "const-random-macro", - "proc-macro-hack", + "strum", + "strum_macros", + "unicode-width", ] -[[package]] -name = "const-random-macro" -version = "0.1.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d7d6ab3c3a2282db210df5f02c4dab6e0a7057af0fb7ebd4070f30fe05c0ddb" -dependencies = [ - "getrandom", - "once_cell", - "proc-macro-hack", - "tiny-keccak", -] - -[[package]] -name = "crunchy" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" - [[package]] name = "dyn-clone" version = "1.0.10" @@ -279,7 +262,7 @@ checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -331,6 +314,12 @@ version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74721d007512d0cb3338cd20f0654ac913920061a4c4d0d8708edb3f2a698c0c" +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "hermit-abi" version = "0.2.6" @@ -400,7 +389,7 @@ dependencies = [ "libc", "log", "wasi", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] @@ -458,7 +447,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] @@ -505,26 +494,20 @@ dependencies = [ "array-init-cursor", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.20+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" - [[package]] name = "proc-macro2" -version = "1.0.49" +version = "1.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5" +checksum = "dec2b086b7a862cf4de201096214fa870344cf922b2b30c167badb3af3195406" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.23" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" +checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488" dependencies = [ "proc-macro2", ] @@ -547,6 +530,12 @@ dependencies = [ "semver", ] +[[package]] +name = "rustversion" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06" + [[package]] name = "scopeguard" version = "1.1.0" @@ -582,7 +571,7 @@ checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -617,9 +606,9 @@ checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" [[package]] name = "socket2" -version = "0.4.7" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd" +checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" dependencies = [ "libc", "winapi", @@ -640,6 +629,25 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d55dd09aaa2f85ef8767cc9177294d63c30d62c8533329e75aa51d8b94976e22" +[[package]] +name = "strum" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" + +[[package]] +name = "strum_macros" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn 1.0.107", +] + [[package]] name = "syn" version = "1.0.107" @@ -651,6 +659,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32d41677bcbe24c20c52e7c70b0d8db04134c5d1066bf98662e2871ad200ea3e" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "thiserror" version = "1.0.38" @@ -668,28 +687,18 @@ checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" dependencies = [ "proc-macro2", "quote", - "syn", -] - -[[package]] -name = "tiny-keccak" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" -dependencies = [ - "crunchy", + "syn 1.0.107", ] [[package]] name = "tokio" -version = "1.23.0" +version = "1.28.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46" +checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" dependencies = [ "autocfg", "bytes", "libc", - "memchr", "mio", "num_cpus", "parking_lot", @@ -697,18 +706,18 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] name = "tokio-macros" -version = "1.8.2" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.18", ] [[package]] @@ -717,6 +726,12 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc" +[[package]] +name = "unicode-width" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" + [[package]] name = "version_check" version = "0.9.4" @@ -750,7 +765,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 1.0.107", "wasm-bindgen-shared", ] @@ -772,7 +787,7 @@ checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -811,13 +826,37 @@ version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.42.0", + "windows_aarch64_msvc 0.42.0", + "windows_i686_gnu 0.42.0", + "windows_i686_msvc 0.42.0", + "windows_x86_64_gnu 0.42.0", + "windows_x86_64_gnullvm 0.42.0", + "windows_x86_64_msvc 0.42.0", +] + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +dependencies = [ + "windows_aarch64_gnullvm 0.48.0", + "windows_aarch64_msvc 0.48.0", + "windows_i686_gnu 0.48.0", + "windows_i686_msvc 0.48.0", + "windows_x86_64_gnu 0.48.0", + "windows_x86_64_gnullvm 0.48.0", + "windows_x86_64_msvc 0.48.0", ] [[package]] @@ -826,38 +865,80 @@ version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" + [[package]] name = "windows_aarch64_msvc" version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" + [[package]] name = "windows_i686_gnu" version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" +[[package]] +name = "windows_i686_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" + [[package]] name = "windows_i686_msvc" version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" +[[package]] +name = "windows_i686_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" + [[package]] name = "windows_x86_64_gnu" version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" + [[package]] name = "windows_x86_64_msvc" version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" diff --git a/minibend/Cargo.toml b/minibend/Cargo.toml index fca3f94..5c21a85 100644 --- a/minibend/Cargo.toml +++ b/minibend/Cargo.toml @@ -5,15 +5,21 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[[bin]] +name = "minibend" +path = "src/bin/minibend.rs" +doctest = false +test = false + [features] default = [] simd = ["arrow2/simd"] [dependencies] -arrow2 = { version = "0.15.0", features = ["io_parquet"] } +arrow2 = { version = "0.17", features = ["io_parquet", "io_print"] } async-fn-stream = "0.2" futures = "0.3" thiserror = "1.0" +tokio = { version = "1.28", features = ["full"] } [dev-dependencies] -tokio = { version = "1.23", features = ["full"] } \ No newline at end of file diff --git a/minibend/src/bin/minibend.rs b/minibend/src/bin/minibend.rs new file mode 100644 index 0000000..2e84808 --- /dev/null +++ b/minibend/src/bin/minibend.rs @@ -0,0 +1,18 @@ +use minibend::catalog::Catalog; +use minibend::datablock::pretty_print; +use minibend::error::*; + +#[tokio::main] +async fn main() -> Result<()> { + let test_file = format!("tests/source/alltypes_plain.parquet"); + let mut catalog = Catalog::default(); + catalog.add_parquet_table("parquet", &test_file)?; + let table = catalog.get_table("parquet")?; + + let rbs = table.scan(None); + let schema = table.schema(); + + pretty_print(rbs, schema).await; + + Ok(()) +} diff --git a/minibend/src/catalog.rs b/minibend/src/catalog.rs index 3fbe3a1..85db421 100644 --- a/minibend/src/catalog.rs +++ b/minibend/src/catalog.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; +use crate::datasource::parquet::ParquetTable; +use crate::datasource::TableRef; use crate::error::{Error, Result}; -use crate::source::parquet::ParquetTable; -use crate::source::TableRef; #[derive(Default)] pub struct Catalog { diff --git a/minibend/src/datablock.rs b/minibend/src/datablock.rs index dd6b839..4cd9924 100644 --- a/minibend/src/datablock.rs +++ b/minibend/src/datablock.rs @@ -2,9 +2,35 @@ use std::pin::Pin; use arrow2::array::Array; use arrow2::chunk::Chunk; -use futures::Stream; +use arrow2::datatypes::{Field, Schema}; +use futures::{Stream, StreamExt}; use crate::error::Result; pub type DataBlock = Chunk>; pub type DataBlockStream = Pin> + Send + Sync + 'static>>; + +pub fn schema_projected(schema: Schema, field_names: Vec) -> Schema { + // TODO: should validate that all columns are actually present... + let retained: Vec = schema + .fields + .into_iter() + .filter(|f| field_names.contains(&f.name)) + .collect(); + Schema::from(retained) +} + +pub async fn pretty_print(mut rbs: DataBlockStream, schema: Schema) { + let names = schema.fields.iter().map(|f| &f.name).collect::>(); + let mut all_record_batches = Vec::new(); + if let Some(rb) = rbs.next().await { + if rb.is_ok() { + all_record_batches.push(rb.unwrap()); + } + } + println!("results: "); + println!( + "{}", + arrow2::io::print::write(&all_record_batches[..], &names) + ); +} diff --git a/minibend/src/source.rs b/minibend/src/datasource.rs similarity index 100% rename from minibend/src/source.rs rename to minibend/src/datasource.rs diff --git a/minibend/src/source/parquet.rs b/minibend/src/datasource/parquet.rs similarity index 98% rename from minibend/src/source/parquet.rs rename to minibend/src/datasource/parquet.rs index 5bcff21..c7370f3 100644 --- a/minibend/src/source/parquet.rs +++ b/minibend/src/datasource/parquet.rs @@ -8,8 +8,8 @@ use arrow2::io::parquet::read::*; use async_fn_stream::fn_stream; use crate::datablock::DataBlockStream; +use crate::datasource::DataSource; use crate::error::Result; -use crate::source::DataSource; use super::TableRef; @@ -69,7 +69,7 @@ impl DataSource for ParquetTable { r.push(array.clone()); } Chunk::new(r) - }, + } None => chunk, }; // yield elements from stream via `emitter` diff --git a/minibend/src/lib.rs b/minibend/src/lib.rs index d58a700..98ec0a2 100644 --- a/minibend/src/lib.rs +++ b/minibend/src/lib.rs @@ -1,5 +1,6 @@ -#[allow(dead_code)] -mod catalog; -mod datablock; -mod error; -mod source; +#![allow(dead_code)] + +pub mod catalog; +pub mod datablock; +pub mod datasource; +pub mod error; diff --git a/minibend/src/logical_plan.rs b/minibend/src/logical_plan.rs new file mode 100644 index 0000000..75e7fc0 --- /dev/null +++ b/minibend/src/logical_plan.rs @@ -0,0 +1,60 @@ +use arrow2::datatypes::{DataType, Field, Schema}; + +use crate::datasource::DataSource; +use crate::datablock::schema_projected; + +pub enum LogicalPlan { + Scan(Scan), +} + +impl LogicalPlan { + pub fn schema(&self) -> Schema { + match self { + LogicalPlan::Scan(scan) => schema_projected(scan.datasource.schema(), scan.projection.clone()), + } + } + + pub fn children(&self) -> Vec> { + match self { + LogicalPlan::Scan(_scan) => vec![], + } + } +} + +pub struct Scan { + pub datasource: Box, + pub projection: Vec, +} + +impl Scan { + pub fn new(datasource: Box, projection: Vec) -> Scan { + Scan { + datasource, + projection, + } + } +} + +trait LogicalExpression { + fn to_field(&self, input: LogicalPlan) -> Field; +} + +struct Column { + name: String, +} + +impl LogicalExpression for Column { + fn to_field(&self, input: LogicalPlan) -> Field { + input.schema().fields.iter().find(|f| f.name == self.name).unwrap().clone() + } +} + +struct Literal { + value: String, +} + +impl LogicalExpression for Literal { + fn to_field(&self, _input: LogicalPlan) -> Field { + Field::new(&self.value, DataType::Utf8, false) + } +} \ No newline at end of file