Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ jobs:
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
files: tarpaulin-report.xml
token: ${{ secrets.CODECOV_TOKEN }}
files: cobertura.xml

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/target
.DS_Store
.idea/
cobertura.xml

19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ datafusion = {version = "46.0.1", features = ["avro"]}
structopt = "0.3.26"
thiserror = "2.0.12"
tokio = { version="1.44.2", features = ["rt-multi-thread"]}
predicates = "3.1.3"

[dev-dependencies]
assert_cmd = "2"
Expand Down
6 changes: 3 additions & 3 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub enum Commands {
#[structopt(short,long)]
chunks: usize,
#[structopt(parse(from_os_str))]
output_dir: Option<PathBuf>,
output: Option<PathBuf>,
},

#[structopt(about = "Concatenate multiple files or all files in a directory")]
Expand Down Expand Up @@ -131,8 +131,8 @@ async fn main() -> Result<(), DfKitError> {
Commands::Reverse { filename, output } => {
reverse(&ctx, &filename, output).await?;
}
Commands::Split { filename, chunks, output_dir} => {
let out_dir = output_dir.unwrap_or_else(|| env::current_dir().unwrap());
Commands::Split { filename, chunks, output} => {
let out_dir = output.unwrap_or_else(|| env::current_dir().unwrap());
dfsplit(&ctx, &filename, chunks, &out_dir).await?;
}
Commands::Cat { files, dir, output } => {
Expand Down
93 changes: 7 additions & 86 deletions src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use datafusion::arrow::compute::concat_batches;
use datafusion::common::DataFusionError;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::MemTable;
use datafusion::logical_expr::col;
use datafusion::prelude::SessionContext;
use crate::utils::{file_type, register_table, DfKitError, FileFormat};
use crate::utils::{file_type, register_table, write_output, DfKitError};

pub async fn view(ctx: &SessionContext, filename: &Path, limit: Option<usize>) -> Result<(), DfKitError> {
let df = register_table(&ctx, "t", &filename).await?;
Expand All @@ -28,15 +26,7 @@ pub async fn query(ctx: &SessionContext, filename: &Path, sql: Option<String>, o
let df_sql = ctx.sql(&*sql.unwrap()).await?;

if let Some(path) = output {
match file_type {
FileFormat::Csv => df_sql.write_csv(path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?,
FileFormat::Parquet => df_sql.write_parquet(path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?,
FileFormat::Json => df_sql.write_json(path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?,
FileFormat::Avro => {
return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write support not implemented".to_string())));
}
};

write_output(df_sql, &path, &file_type).await?;
println!("File written to: {}, successfully.", path.display());
} else {
df_sql.show().await?;
Expand All @@ -49,14 +39,7 @@ pub async fn convert(ctx: &SessionContext, filename: &Path, output_filename: &Pa
let df = register_table(ctx, "t", &filename).await?;
let output_file_type = file_type(&output_filename)?;

match output_file_type {
FileFormat::Csv => df.write_csv(output_filename.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?,
FileFormat::Parquet => df.write_parquet(output_filename.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?,
FileFormat::Json => df.write_json(output_filename.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?,
FileFormat::Avro => {
return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write support not implemented".to_string())));
}
};
write_output(df, &output_filename, &output_file_type).await?;
Ok(())
}

Expand Down Expand Up @@ -103,20 +86,7 @@ pub async fn sort(

if let Some(out_path) = output {
let format = file_type(&out_path)?;
match format {
FileFormat::Csv => {
sorted_df.write_csv(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?
}
FileFormat::Parquet => {
sorted_df.write_parquet(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?
}
FileFormat::Json => {
sorted_df.write_json(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?
}
FileFormat::Avro => {
return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write not supported".into())));
}
};
write_output(sorted_df, &out_path, &format).await?;
println!("Sorted file written to: {}", out_path.display());
} else {
sorted_df.show().await?;
Expand Down Expand Up @@ -151,28 +121,7 @@ pub async fn reverse(

if let Some(out_path) = output {
let format = file_type(&out_path)?;
match format {
FileFormat::Csv => {
reversed_df
.write_csv(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None)
.await?
}
FileFormat::Parquet => {
reversed_df
.write_parquet(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None)
.await?
}
FileFormat::Json => {
reversed_df
.write_json(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None)
.await?
}
FileFormat::Avro => {
return Err(DfKitError::DataFusion(DataFusionError::NotImplemented(
"Avro write not supported".into(),
)));
}
};
write_output(reversed_df, &out_path, &format).await?;
println!("Reversed file written to: {}", out_path.display());
} else {
reversed_df.show().await?;
Expand Down Expand Up @@ -202,28 +151,7 @@ pub async fn dfsplit(ctx: &SessionContext, filename: &Path, chunks: usize, outpu
let chunk_filename = format!("{}_{}.{}", stem, i + 1, extension);
let chunk_path = output_dir.join(chunk_filename);

match format {
FileFormat::Csv => {
chunk_df
.write_csv(chunk_path.to_str().unwrap(), DataFrameWriteOptions::default(), None)
.await?
}
FileFormat::Parquet => {
chunk_df
.write_parquet(chunk_path.to_str().unwrap(), DataFrameWriteOptions::default(), None)
.await?
}
FileFormat::Json => {
chunk_df
.write_json(chunk_path.to_str().unwrap(), DataFrameWriteOptions::default(), None)
.await?
}
FileFormat::Avro => {
return Err(DfKitError::DataFusion(DataFusionError::NotImplemented(
"Avro split write not supported".into(),
)))
}
};
write_output(chunk_df, &chunk_path, &format).await?;

println!("Written chunk {} to {}", i + 1, chunk_path.display());
}
Expand All @@ -246,14 +174,7 @@ pub async fn cat(ctx: &SessionContext, files: Vec<PathBuf>, out_path: &Path) ->
}

let format = file_type(&out_path)?;
match format {
FileFormat::Csv => final_df.write_csv(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?,
FileFormat::Parquet => final_df.write_parquet(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?,
FileFormat::Json => final_df.write_json(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?,
FileFormat::Avro => {
return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write not supported".into())));
}
};
write_output(final_df, out_path, &format).await?;
println!("Concatenated file written to: {}", out_path.display());

Ok(())
Expand Down
13 changes: 13 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::path::{Path, PathBuf};
use datafusion::arrow::error::ArrowError;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::prelude::*;
use datafusion::error::DataFusionError;
use thiserror::Error;
Expand Down Expand Up @@ -86,3 +87,15 @@ pub fn parse_file_list(files: Option<String>, dir: Option<PathBuf>) -> Result<Ve
}
}

pub async fn write_output(df: DataFrame, out_path: &Path, format: &FileFormat) -> Result<(), DfKitError> {
match format {
FileFormat::Csv => df.write_csv(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?,
FileFormat::Parquet => df.write_parquet(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?,
FileFormat::Json => df.write_json(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?,
FileFormat::Avro => {
return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write not supported".into())));
}
};
Ok(())
}

Loading