@@ -17,26 +17,41 @@ pub fn read(
1717 compress_output : Option < Option < CompressionType > > ,
1818) -> Result < ( ) , std:: io:: Error > {
1919 let output_file = File :: create ( output_file_path) ?;
20- let mut file_writer: Box < dyn Write > = match compress_output {
21- Some ( Some ( CompressionType :: Zstd ) ) => {
22- Box :: new ( zstd:: Encoder :: new ( output_file, 1 ) ?. auto_finish ( ) )
20+
21+ match compress_output {
22+ Some ( Some ( CompressionType :: Zstd ) ) | Some ( None ) => {
23+ let mut encoder = zstd:: Encoder :: new ( output_file, 1 ) ?;
24+ write_data ( & input_file_path, & mut encoder, strategies) ?;
25+ let file = encoder. finish ( ) ?;
26+ file. sync_all ( ) ?;
2327 }
2428 Some ( Some ( CompressionType :: Gzip ) ) => {
25- Box :: new ( GzEncoder :: new ( output_file, Compression :: best ( ) ) )
29+ let mut encoder = GzEncoder :: new ( output_file, Compression :: best ( ) ) ;
30+ write_data ( & input_file_path, & mut encoder, strategies) ?;
31+ let file = encoder. finish ( ) ?;
32+ file. sync_all ( ) ?;
33+ }
34+ None => {
35+ let mut writer = BufWriter :: new ( output_file) ;
36+ write_data ( & input_file_path, & mut writer, strategies) ?;
37+ writer. flush ( ) ?;
2638 }
27- Some ( None ) => Box :: new ( zstd :: Encoder :: new ( output_file , 1 ) ? . auto_finish ( ) ) ,
39+ }
2840
29- None => Box :: new ( BufWriter :: new ( output_file ) ) ,
30- } ;
41+ Ok ( ( ) )
42+ }
3143
32- let file_reader = File :: open ( & input_file_path)
44+ fn write_data (
45+ input_file_path : & str ,
46+ writer : & mut dyn Write ,
47+ strategies : & Strategies ,
48+ ) -> Result < ( ) , std:: io:: Error > {
49+ let file_reader = File :: open ( input_file_path)
3350 . unwrap_or_else ( |_| panic ! ( "Input file '{}' does not exist" , input_file_path) ) ;
3451
3552 let mut reader = BufReader :: new ( file_reader) ;
3653 let mut line = String :: new ( ) ;
37-
3854 let mut row_parser_state = State :: new ( ) ;
39-
4055 let mut rng = rng:: get ( ) ;
4156
4257 loop {
@@ -46,13 +61,10 @@ pub fn read(
4661 }
4762
4863 let transformed_row = row_parser:: parse ( & mut rng, & line, & mut row_parser_state, strategies) ;
49- file_writer . write_all ( transformed_row. as_bytes ( ) ) ?;
64+ writer . write_all ( transformed_row. as_bytes ( ) ) ?;
5065 line. clear ( ) ;
5166 }
5267
53- // Flush to ensure all data is written before auto_finish() on drop
54- file_writer. flush ( ) ?;
55-
5668 Ok ( ( ) )
5769}
5870
0 commit comments