diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 21e0eab..176be24 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,5 +44,6 @@ jobs: - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 with: - files: tarpaulin-report.xml + token: ${{ secrets.CODECOV_TOKEN }} + files: cobertura.xml diff --git a/.gitignore b/.gitignore index 15a18b6..38cfb37 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /target .DS_Store .idea/ +cobertura.xml diff --git a/Cargo.lock b/Cargo.lock index 5fa5b18..bf50f52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1274,6 +1274,7 @@ dependencies = [ "assert_cmd", "datafusion", "insta", + "predicates", "structopt", "tempfile", "thiserror 2.0.12", @@ -1374,6 +1375,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b09cf3155332e944990140d967ff5eceb70df778b34f77d8075db46e4704e6d8" +dependencies = [ + "num-traits", +] + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -2002,6 +2012,12 @@ dependencies = [ "adler2", ] +[[package]] +name = "normalize-line-endings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" + [[package]] name = "num" version = "0.4.3" @@ -2305,7 +2321,10 @@ checksum = "a5d19ee57562043d37e82899fade9a22ebab7be9cef5026b07fda9cdd4293573" dependencies = [ "anstyle", "difflib", + "float-cmp", + "normalize-line-endings", "predicates-core", + "regex", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 076744c..30361a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ datafusion = {version = "46.0.1", features = ["avro"]} structopt = "0.3.26" thiserror = "2.0.12" tokio = { version="1.44.2", features = ["rt-multi-thread"]} +predicates = "3.1.3" [dev-dependencies] assert_cmd = "2" diff --git a/src/bin/main.rs b/src/bin/main.rs index 710c45f..f69ec09 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -86,7 +86,7 @@ pub enum Commands { #[structopt(short,long)] chunks: usize, #[structopt(parse(from_os_str))] - output_dir: Option, + output: Option, }, #[structopt(about = "Concatenate multiple files or all files in a directory")] @@ -131,8 +131,8 @@ async fn main() -> Result<(), DfKitError> { Commands::Reverse { filename, output } => { reverse(&ctx, &filename, output).await?; } - Commands::Split { filename, chunks, output_dir} => { - let out_dir = output_dir.unwrap_or_else(|| env::current_dir().unwrap()); + Commands::Split { filename, chunks, output} => { + let out_dir = output.unwrap_or_else(|| env::current_dir().unwrap()); dfsplit(&ctx, &filename, chunks, &out_dir).await?; } Commands::Cat { files, dir, output } => { diff --git a/src/commands.rs b/src/commands.rs index 35cbbe2..74b3c4f 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -2,12 +2,10 @@ use std::fs; use std::path::{Path, PathBuf}; use std::sync::Arc; use datafusion::arrow::compute::concat_batches; -use datafusion::common::DataFusionError; -use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::MemTable; use datafusion::logical_expr::col; use datafusion::prelude::SessionContext; -use crate::utils::{file_type, register_table, DfKitError, FileFormat}; +use crate::utils::{file_type, register_table, write_output, DfKitError}; pub async fn view(ctx: &SessionContext, filename: &Path, limit: Option) -> Result<(), DfKitError> { let df = register_table(&ctx, "t", &filename).await?; @@ -28,15 +26,7 @@ pub async fn query(ctx: &SessionContext, filename: &Path, sql: Option, o let df_sql = ctx.sql(&*sql.unwrap()).await?; if let Some(path) = output { - match file_type { - FileFormat::Csv => df_sql.write_csv(path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Parquet => df_sql.write_parquet(path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Json => df_sql.write_json(path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Avro => { - return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write support not implemented".to_string()))); - } - }; - + write_output(df_sql, &path, &file_type).await?; println!("File written to: {}, successfully.", path.display()); } else { df_sql.show().await?; @@ -49,14 +39,7 @@ pub async fn convert(ctx: &SessionContext, filename: &Path, output_filename: &Pa let df = register_table(ctx, "t", &filename).await?; let output_file_type = file_type(&output_filename)?; - match output_file_type { - FileFormat::Csv => df.write_csv(output_filename.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Parquet => df.write_parquet(output_filename.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Json => df.write_json(output_filename.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Avro => { - return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write support not implemented".to_string()))); - } - }; + write_output(df, &output_filename, &output_file_type).await?; Ok(()) } @@ -103,20 +86,7 @@ pub async fn sort( if let Some(out_path) = output { let format = file_type(&out_path)?; - match format { - FileFormat::Csv => { - sorted_df.write_csv(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await? - } - FileFormat::Parquet => { - sorted_df.write_parquet(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await? - } - FileFormat::Json => { - sorted_df.write_json(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await? - } - FileFormat::Avro => { - return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write not supported".into()))); - } - }; + write_output(sorted_df, &out_path, &format).await?; println!("Sorted file written to: {}", out_path.display()); } else { sorted_df.show().await?; @@ -151,28 +121,7 @@ pub async fn reverse( if let Some(out_path) = output { let format = file_type(&out_path)?; - match format { - FileFormat::Csv => { - reversed_df - .write_csv(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None) - .await? - } - FileFormat::Parquet => { - reversed_df - .write_parquet(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None) - .await? - } - FileFormat::Json => { - reversed_df - .write_json(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None) - .await? - } - FileFormat::Avro => { - return Err(DfKitError::DataFusion(DataFusionError::NotImplemented( - "Avro write not supported".into(), - ))); - } - }; + write_output(reversed_df, &out_path, &format).await?; println!("Reversed file written to: {}", out_path.display()); } else { reversed_df.show().await?; @@ -202,28 +151,7 @@ pub async fn dfsplit(ctx: &SessionContext, filename: &Path, chunks: usize, outpu let chunk_filename = format!("{}_{}.{}", stem, i + 1, extension); let chunk_path = output_dir.join(chunk_filename); - match format { - FileFormat::Csv => { - chunk_df - .write_csv(chunk_path.to_str().unwrap(), DataFrameWriteOptions::default(), None) - .await? - } - FileFormat::Parquet => { - chunk_df - .write_parquet(chunk_path.to_str().unwrap(), DataFrameWriteOptions::default(), None) - .await? - } - FileFormat::Json => { - chunk_df - .write_json(chunk_path.to_str().unwrap(), DataFrameWriteOptions::default(), None) - .await? - } - FileFormat::Avro => { - return Err(DfKitError::DataFusion(DataFusionError::NotImplemented( - "Avro split write not supported".into(), - ))) - } - }; + write_output(chunk_df, &chunk_path, &format).await?; println!("Written chunk {} to {}", i + 1, chunk_path.display()); } @@ -246,14 +174,7 @@ pub async fn cat(ctx: &SessionContext, files: Vec, out_path: &Path) -> } let format = file_type(&out_path)?; - match format { - FileFormat::Csv => final_df.write_csv(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Parquet => final_df.write_parquet(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Json => final_df.write_json(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, - FileFormat::Avro => { - return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write not supported".into()))); - } - }; + write_output(final_df, out_path, &format).await?; println!("Concatenated file written to: {}", out_path.display()); Ok(()) diff --git a/src/utils.rs b/src/utils.rs index 70e613e..bbf1fd0 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,5 +1,6 @@ use std::path::{Path, PathBuf}; use datafusion::arrow::error::ArrowError; +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::prelude::*; use datafusion::error::DataFusionError; use thiserror::Error; @@ -86,3 +87,15 @@ pub fn parse_file_list(files: Option, dir: Option) -> Result Result<(), DfKitError> { + match format { + FileFormat::Csv => df.write_csv(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, + FileFormat::Parquet => df.write_parquet(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, + FileFormat::Json => df.write_json(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?, + FileFormat::Avro => { + return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write not supported".into()))); + } + }; + Ok(()) +} + diff --git a/tests/test.rs b/tests/test.rs index 33fe3ed..3643c71 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,8 +1,9 @@ use assert_cmd::Command; use tempfile::tempdir; use std::fs; -use std::path::Path; +use std::path::{Path, PathBuf}; use insta::assert_snapshot; +use dfkit::utils::parse_file_list; fn write_temp_file(dir: &Path, name: &str, contents: &str) -> std::path::PathBuf { let file_path = dir.join(name); @@ -10,10 +11,18 @@ fn write_temp_file(dir: &Path, name: &str, contents: &str) -> std::path::PathBuf file_path } +fn create_basic_csv(dir: &Path) -> PathBuf { + write_temp_file(dir, "input.csv", "name,age\nalice,30\nbob,40\n") +} + +fn create_extended_csv(dir: &Path) -> PathBuf { + write_temp_file(dir, "input.csv", "name,age\nalice,30\nbob,40\ncharlie,50\n") +} + #[test] fn test_view() { let temp = tempdir().unwrap(); - let input = write_temp_file(temp.path(), "input.csv", "name,age\nalice,30\nbob,40\n"); + let input = create_basic_csv(temp.path()); let mut cmd = Command::cargo_bin("dfkit").unwrap(); cmd.args(["view", input.to_str().unwrap()]); let output = cmd.assert().success().get_output().stdout.clone(); @@ -29,7 +38,7 @@ fn test_view() { #[test] fn test_view_with_limit() { let temp = tempdir().unwrap(); - let input = write_temp_file(temp.path(), "input.csv", "name,age\nalice,30\nbob,40\n"); + let input = create_basic_csv(temp.path()); let mut cmd = Command::cargo_bin("dfkit").unwrap(); cmd.args(["view", input.to_str().unwrap(), "-l", "1"]); let output = cmd.assert().success().get_output().stdout.clone(); @@ -41,3 +50,327 @@ fn test_view_with_limit() { +-------+-----+ "); } + +#[test] +fn test_query() { + let temp = tempdir().unwrap(); + let input = create_basic_csv(temp.path()); + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args(["query", input.to_str().unwrap(), "--sql", "SELECT * FROM t WHERE age > 35"]); + let output = cmd.assert().success().get_output().stdout.clone(); + assert_snapshot!(String::from_utf8(output).unwrap(), @r" + +------+-----+ + | name | age | + +------+-----+ + | bob | 40 | + +------+-----+ + "); +} + +#[test] +fn test_query_with_output() { + let temp = tempdir().unwrap(); + let input = create_basic_csv(temp.path()); + let output_path = temp.path().join("out.csv"); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "query", + input.to_str().unwrap(), + "--sql", + "SELECT * FROM t WHERE age > 35", + "--output", + output_path.to_str().unwrap(), + ]); + + cmd.assert().success(); + + assert!(output_path.exists(), "Output file was not created"); +} + +#[test] +fn test_convert_csv_to_json() { + let temp = tempdir().unwrap(); + let input = create_basic_csv(temp.path()); + let output = temp.path().join("output.json"); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "convert", + input.to_str().unwrap(), + output.to_str().unwrap(), + ]); + + cmd.assert().success(); + + let output_contents = fs::read_to_string(&output).unwrap(); + assert!(output_contents.contains("\"name\":\"alice\"") || output_contents.contains("alice")); // Loose check +} + +#[test] +fn test_convert_csv_to_parquet() { + let temp = tempdir().unwrap(); + let input = create_basic_csv(temp.path()); + let output = temp.path().join("output.parquet"); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "convert", + input.to_str().unwrap(), + output.to_str().unwrap(), + ]); + + cmd.assert().success(); + assert!(output.exists(), "Parquet file not created"); +} + +#[test] +fn test_convert_to_avro_should_fail() { + let temp = tempdir().unwrap(); + let input = create_basic_csv(temp.path()); + let output = temp.path().join("output.avro"); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "convert", + input.to_str().unwrap(), + output.to_str().unwrap(), + ]); + + cmd.assert() + .failure() + .stderr(predicates::str::contains("Avro write not supported")); +} + +#[test] +fn test_describe_command() { + let temp = tempdir().unwrap(); + let input = create_basic_csv(temp.path()); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "describe", + input.to_str().unwrap(), + ]); + + let output = cmd.assert().success().get_output().stdout.clone(); + assert_snapshot!(String::from_utf8(output).unwrap(), @r" + +------------+-------+--------------------+ + | describe | name | age | + +------------+-------+--------------------+ + | count | 2 | 2.0 | + | null_count | 0 | 0.0 | + | mean | null | 35.0 | + | std | null | 7.0710678118654755 | + | min | alice | 30.0 | + | max | bob | 40.0 | + | median | null | 35.0 | + +------------+-------+--------------------+ + "); +} + +#[test] +fn test_schema_command() { + let temp = tempdir().unwrap(); + let input = create_basic_csv(temp.path()); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "schema", + input.to_str().unwrap(), + ]); + + let output = cmd.assert().success().get_output().stdout.clone(); + assert_snapshot!(String::from_utf8(output).unwrap(), @r" + +-------------+-----------+-------------+ + | column_name | data_type | is_nullable | + +-------------+-----------+-------------+ + | name | Utf8 | YES | + | age | Int64 | YES | + +-------------+-----------+-------------+ + "); +} + +#[test] +fn test_count_command() { + let temp = tempdir().unwrap(); + let input = create_extended_csv(temp.path()); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "count", + input.to_str().unwrap(), + ]); + + let output = cmd.assert().success().get_output().stdout.clone(); + assert_snapshot!(String::from_utf8(output).unwrap(), @r" + +----------+ + | count(*) | + +----------+ + | 3 | + +----------+ + "); +} + +#[test] +fn test_sort_command_ascending() { + let temp = tempdir().unwrap(); + let input = create_extended_csv(temp.path()); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "sort", + input.to_str().unwrap(), + "--columns", "age", + ]); + + let output = cmd.assert().success().get_output().stdout.clone(); + assert_snapshot!(String::from_utf8(output).unwrap(), @r" + +---------+-----+ + | name | age | + +---------+-----+ + | alice | 30 | + | bob | 40 | + | charlie | 50 | + +---------+-----+ + "); +} + +#[test] +fn test_sort_command_descending() { + let temp = tempdir().unwrap(); + let input = create_extended_csv(temp.path()); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + cmd.args([ + "sort", + input.to_str().unwrap(), + "--columns", "age", + "--descending", + ]); + + let output = cmd.assert().success().get_output().stdout.clone(); + assert_snapshot!(String::from_utf8(output).unwrap(), @r" + +---------+-----+ + | name | age | + +---------+-----+ + | charlie | 50 | + | bob | 40 | + | alice | 30 | + +---------+-----+ + "); +} + +#[test] +fn test_reverse_stdout() { + let temp = tempdir().unwrap(); + let input_path = create_extended_csv(temp.path()); + fs::write(&input_path, "name,age\nalice,30\nbob,40\ncharlie,25").unwrap(); + + let mut cmd = Command::cargo_bin("dfkit").unwrap(); + let output = cmd + .args([ + "reverse", + input_path.to_str().unwrap(), + ]) + .assert() + .success() + .get_output() + .stdout + .clone(); + + assert_snapshot!(String::from_utf8(output).unwrap(), @r###" + +---------+-----+ + | name | age | + +---------+-----+ + | charlie | 25 | + | bob | 40 | + | alice | 30 | + +---------+-----+ + "###); +} + +#[test] +fn test_split_creates_chunks() { + let temp = tempdir().unwrap(); + let input_path = temp.path().join("data.csv"); + let output_dir = temp.path().join("out"); + + // Write input file + fs::write( + &input_path, + "name,age\nalice,30\nbob,40\ncharlie,25\ndave,20\nellen,45\n", + ) + .unwrap(); + + // Run the CLI command + let _ = Command::cargo_bin("dfkit") + .unwrap() + .args(&[ + "split", + input_path.to_str().unwrap(), + "--chunks", + "2", + output_dir.to_str().unwrap(), + ]) + .assert() + .success() + .get_output() + .stdout + .clone(); + + // Assert output files exist using parse_file_list + let mut files= parse_file_list(None, Some(output_dir.clone())).unwrap(); + + files.sort(); + + assert_eq!(files.len(), 2); +} + +#[test] +fn test_cat_concatenates_csv_files() { + let temp = tempdir().unwrap(); + let file1 = temp.path().join("part1.csv"); + let file2 = temp.path().join("part2.csv"); + let out_file = temp.path().join("combined.csv"); + + // Create sample CSV files + fs::write(&file1, "name,age\nalice,30\nbob,40\n").unwrap(); + fs::write(&file2, "name,age\ncharlie,25\ndave,20\n").unwrap(); + + let input_files = format!("{},{}", file1.display(), file2.display()); + + // Run the CLI command + let _ = Command::cargo_bin("dfkit") + .unwrap() + .args(&[ + "cat", + "--files", + &input_files, + "--output", + out_file.to_str().unwrap(), + ]) + .assert() + .success(); + + // Read and sort the output CSV + let result_csv = fs::read_to_string(&out_file).unwrap(); + let lines: Vec<&str> = result_csv.lines().collect(); + + let header = lines[0]; + let mut records = lines[1..].to_vec(); + records.sort(); // Sort records alphabetically + + let sorted_result = std::iter::once(header) + .chain(records.into_iter()) + .collect::>() + .join("\n"); + + assert_snapshot!(sorted_result, @r" + name,age + alice,30 + bob,40 + charlie,25 + dave,20 + "); +}