Skip to content

feat(minibend): add minibend 03 #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 154 additions & 73 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions minibend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@ edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[[bin]]
name = "minibend"
path = "src/bin/minibend.rs"
doctest = false
test = false

[features]
default = []
simd = ["arrow2/simd"]

[dependencies]
arrow2 = { version = "0.15.0", features = ["io_parquet"] }
arrow2 = { version = "0.17", features = ["io_parquet", "io_print"] }
async-fn-stream = "0.2"
futures = "0.3"
thiserror = "1.0"
tokio = { version = "1.28", features = ["full"] }

[dev-dependencies]
tokio = { version = "1.23", features = ["full"] }
18 changes: 18 additions & 0 deletions minibend/src/bin/minibend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use minibend::catalog::Catalog;
use minibend::datablock::pretty_print;
use minibend::error::*;

#[tokio::main]
async fn main() -> Result<()> {
let test_file = format!("tests/source/alltypes_plain.parquet");
let mut catalog = Catalog::default();
catalog.add_parquet_table("parquet", &test_file)?;
let table = catalog.get_table("parquet")?;

let rbs = table.scan(None);
let schema = table.schema();

pretty_print(rbs, schema).await;

Ok(())
}
4 changes: 2 additions & 2 deletions minibend/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::HashMap;

use crate::datasource::parquet::ParquetTable;
use crate::datasource::TableRef;
use crate::error::{Error, Result};
use crate::source::parquet::ParquetTable;
use crate::source::TableRef;

#[derive(Default)]
pub struct Catalog {
Expand Down
28 changes: 27 additions & 1 deletion minibend/src/datablock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,35 @@ use std::pin::Pin;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use futures::Stream;
use arrow2::datatypes::{Field, Schema};
use futures::{Stream, StreamExt};

use crate::error::Result;

pub type DataBlock = Chunk<Box<dyn Array>>;
pub type DataBlockStream = Pin<Box<dyn Stream<Item = Result<DataBlock>> + Send + Sync + 'static>>;

pub fn schema_projected(schema: Schema, field_names: Vec<String>) -> Schema {
// TODO: should validate that all columns are actually present...
let retained: Vec<Field> = schema
.fields
.into_iter()
.filter(|f| field_names.contains(&f.name))
.collect();
Schema::from(retained)
}

pub async fn pretty_print(mut rbs: DataBlockStream, schema: Schema) {
let names = schema.fields.iter().map(|f| &f.name).collect::<Vec<_>>();
let mut all_record_batches = Vec::new();
if let Some(rb) = rbs.next().await {
if rb.is_ok() {
all_record_batches.push(rb.unwrap());
}
}
println!("results: ");
println!(
"{}",
arrow2::io::print::write(&all_record_batches[..], &names)
);
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use arrow2::io::parquet::read::*;
use async_fn_stream::fn_stream;

use crate::datablock::DataBlockStream;
use crate::datasource::DataSource;
use crate::error::Result;
use crate::source::DataSource;

use super::TableRef;

Expand Down Expand Up @@ -69,7 +69,7 @@ impl DataSource for ParquetTable {
r.push(array.clone());
}
Chunk::new(r)
},
}
None => chunk,
};
// yield elements from stream via `emitter`
Expand Down
11 changes: 6 additions & 5 deletions minibend/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#[allow(dead_code)]
mod catalog;
mod datablock;
mod error;
mod source;
#![allow(dead_code)]

pub mod catalog;
pub mod datablock;
pub mod datasource;
pub mod error;
60 changes: 60 additions & 0 deletions minibend/src/logical_plan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use arrow2::datatypes::{DataType, Field, Schema};

use crate::datasource::DataSource;
use crate::datablock::schema_projected;

pub enum LogicalPlan {
Scan(Scan),
}

impl LogicalPlan {
pub fn schema(&self) -> Schema {
match self {
LogicalPlan::Scan(scan) => schema_projected(scan.datasource.schema(), scan.projection.clone()),
}
}

pub fn children(&self) -> Vec<Box<LogicalPlan>> {
match self {
LogicalPlan::Scan(_scan) => vec![],
}
}
}

pub struct Scan {
pub datasource: Box<dyn DataSource>,
pub projection: Vec<String>,
}

impl Scan {
pub fn new(datasource: Box<dyn DataSource>, projection: Vec<String>) -> Scan {
Scan {
datasource,
projection,
}
}
}

trait LogicalExpression {
fn to_field(&self, input: LogicalPlan) -> Field;
}

struct Column {
name: String,
}

impl LogicalExpression for Column {
fn to_field(&self, input: LogicalPlan) -> Field {
input.schema().fields.iter().find(|f| f.name == self.name).unwrap().clone()
}
}

struct Literal {
value: String,
}

impl LogicalExpression for Literal {
fn to_field(&self, _input: LogicalPlan) -> Field {
Field::new(&self.value, DataType::Utf8, false)
}
}