diff --git a/Cargo.toml b/Cargo.toml index 9cbce02a..53050f7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ [package] name = "amadeus" -version = "0.1.2" +version = "0.1.3" license = "Apache-2.0" authors = ["Alec Mocatta "] categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"] @@ -14,7 +14,7 @@ parquet postgres aws s3 cloudfront elb json csv logs hadoop hdfs arrow common cr """ repository = "https://github.com/alecmocatta/amadeus" homepage = "https://github.com/alecmocatta/amadeus" -documentation = "https://docs.rs/amadeus/0.1.2" +documentation = "https://docs.rs/amadeus/0.1.3" readme = "README.md" edition = "2018" @@ -30,23 +30,25 @@ parquet = ["amadeus-parquet", "amadeus-derive/parquet"] postgres = ["amadeus-postgres", "amadeus-derive/postgres"] csv = ["amadeus-serde", "amadeus-derive/serde"] json = ["amadeus-serde", "amadeus-derive/serde"] +doc = ["amadeus-aws/doc", "amadeus-commoncrawl/doc", "amadeus-parquet/doc", "amadeus-postgres/doc", "amadeus-serde/doc"] [package.metadata.docs.rs] -features = ["constellation", "aws", "commoncrawl", "postgres", "csv", "json"] +features = ["doc", "constellation", "aws", "commoncrawl", "postgres", "csv", "json"] [dependencies] -amadeus-core = { version = "=0.1.2", path = "amadeus-core" } -amadeus-derive = { version = "=0.1.2", path = "amadeus-derive" } -amadeus-types = { version = "=0.1.2", path = "amadeus-types" } -amadeus-aws = { version = "=0.1.2", path = "amadeus-aws", optional = true } -amadeus-commoncrawl = { version = "=0.1.2", path = "amadeus-commoncrawl", optional = true } -amadeus-parquet = { version = "=0.1.2", path = "amadeus-parquet", optional = true } -amadeus-postgres = { version = "=0.1.2", path = "amadeus-postgres", optional = true } -amadeus-serde = { version = "=0.1.2", path = "amadeus-serde", optional = true } +amadeus-core = { version = "=0.1.3", path = "amadeus-core" } +amadeus-derive = { version = "=0.1.3", path = "amadeus-derive" } +amadeus-types = { version = "=0.1.3", path = "amadeus-types" } +amadeus-aws = { version = "=0.1.3", path = "amadeus-aws", optional = true } +amadeus-commoncrawl = { version = "=0.1.3", path = "amadeus-commoncrawl", optional = true } +amadeus-parquet = { version = "=0.1.3", path = "amadeus-parquet", optional = true } +amadeus-postgres = { version = "=0.1.3", path = "amadeus-postgres", optional = true } +amadeus-serde = { version = "=0.1.3", path = "amadeus-serde", optional = true } constellation-rs = { version = "0.1", default-features = false, optional = true } -futures-preview = "=0.3.0-alpha.18" +futures-preview = "=0.3.0-alpha.19" +pin-utils = "0.1.0-alpha.4" serde = { version = "1.0", features = ["derive"] } -serde_closure = "0.1" +serde_closure = "0.2" serde_traitobject = "0.1.6" # pin; broken for some reason diff --git a/README.md b/README.md index 0ddf2546..a3cbc0eb 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@

- Harmonious distributed data analysis in Rust + Harmonious distributed data processing & analysis in Rust

@@ -13,7 +13,7 @@

- Docs + Docs

diff --git a/amadeus-aws/Cargo.toml b/amadeus-aws/Cargo.toml index ce20a03b..ae2f61f8 100644 --- a/amadeus-aws/Cargo.toml +++ b/amadeus-aws/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "amadeus-aws" -version = "0.1.2" +version = "0.1.3" license = "Apache-2.0" authors = ["Alec Mocatta "] categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"] @@ -10,7 +10,7 @@ Harmonious distributed data analysis in Rust. """ repository = "https://github.com/alecmocatta/amadeus" homepage = "https://github.com/alecmocatta/amadeus" -documentation = "https://docs.rs/amadeus/0.1.2" +documentation = "https://docs.rs/amadeus/0.1.3" readme = "README.md" edition = "2018" @@ -18,18 +18,25 @@ edition = "2018" azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests" } maintenance = { status = "actively-developed" } +[features] +doc = [] + [dependencies] -amadeus-core = { version = "=0.1.2", path = "../amadeus-core" } -chrono = "0.4" +amadeus-core = { version = "=0.1.3", path = "../amadeus-core" } +amadeus-types = { version = "=0.1.3", path = "../amadeus-types" } +chrono = { version = "0.4", default-features = false } flate2 = "1.0" futures-01 = { package = "futures", version = "0.1" } -futures-preview = { version = "=0.3.0-alpha.18", features = ["compat"] } +futures-preview = { version = "=0.3.0-alpha.19", features = ["compat"] } http = "0.1" once_cell = "1.0" rusoto_core = "0.40" rusoto_s3 = "0.40" -serde_closure = "0.1" +serde_closure = "0.2" serde = { version = "1.0", features = ["derive"] } tokio = "0.1.7" tokio-retry = "0.2" url = { version = "2.1", features = ["serde"] } + +# dependency of rusoto_core/hyper-tls/native-tls; ensure it's vendored to simplify cross-compilation +openssl = { version = "0.10", features = ["vendored"] } diff --git a/amadeus-aws/src/cloudfront.rs b/amadeus-aws/src/cloudfront.rs index 415c35be..05ab926c 100644 --- a/amadeus-aws/src/cloudfront.rs +++ b/amadeus-aws/src/cloudfront.rs @@ -1,52 +1,21 @@ -use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc}; +use chrono::{NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc}; use flate2::read::MultiGzDecoder; use http::{Method, StatusCode}; - +use rusoto_core::RusotoError; use rusoto_s3::{GetObjectRequest, Object, S3Client, S3}; use serde::{Deserialize, Serialize}; use serde_closure::*; use std::{ - convert::identity, io::{self, BufRead, BufReader}, iter, net, time::Duration, vec + convert::identity, io::{self, BufRead, BufReader}, iter, time::Duration }; -use url::Url; use amadeus_core::{ dist_iter::DistributedIterator, into_dist_iter::IntoDistributedIterator, util::ResultExpand, Source }; +use amadeus_types::{DateTime, IpAddr, Url}; use super::{block_on_01, list, retry, AwsError, AwsRegion}; -type Closure = - serde_closure::FnMut fn(&'r mut Env, Args) -> Output>; - -type CloudfrontInner = amadeus_core::dist_iter::Map< - amadeus_core::dist_iter::FlatMap< - amadeus_core::into_dist_iter::IterIter>, - Closure< - (String, AwsRegion), - (String,), - ResultExpand< - iter::Map< - iter::Filter< - io::Lines>>>, - serde_closure::FnMut< - (), - for<'r, 'a> fn(&'r mut (), (&'a Result,)) -> bool, - >, - >, - Closure<(), (Result,), Result>, - >, - AwsError, - >, - >, - >, - Closure< - (), - (Result, AwsError>,), - Result, - >, ->; - pub struct Cloudfront { region: AwsRegion, bucket: String, @@ -73,53 +42,71 @@ impl Source for Cloudfront { type Item = CloudfrontRow; type Error = AwsError; - // type DistIter = impl DistributedIterator>; //, ::Task: Serialize + for<'de> Deserialize<'de> - type DistIter = CloudfrontInner; - type Iter = iter::Empty>; + #[cfg(not(feature = "doc"))] + type DistIter = impl DistributedIterator>; + #[cfg(feature = "doc")] + type DistIter = amadeus_core::util::ImplDistributedIterator>; + type Iter = iter::Empty>; + #[allow(clippy::let_and_return)] fn dist_iter(self) -> Self::DistIter { let Self { bucket, region, objects, } = self; - objects + let ret = objects .into_dist_iter() - .flat_map(FnMut!([bucket, region] move |key:String| { + .flat_map(FnMut!(move |key: String| { let client = S3Client::new(region.clone()); ResultExpand( - block_on_01(retry(||client - .get_object(GetObjectRequest { - bucket: bucket.clone(), - key: key.clone(), - ..GetObjectRequest::default() - }) + loop { + match self::block_on_01(self::retry(|| { + client.get_object(GetObjectRequest { + bucket: bucket.clone(), + key: key.clone(), + ..GetObjectRequest::default() + }) + })) { + Err(RusotoError::HttpDispatch(_)) => continue, + Err(RusotoError::Unknown(response)) + if response.status.is_server_error() => + { + continue + } + res => break res, + } + } + .map_err(AwsError::from) + .map(|res| { + let body = res.body.unwrap().into_blocking_read(); + BufReader::new(MultiGzDecoder::new( + Box::new(body) as Box )) - .map_err(AwsError::from) - .map(|res| { - let body = res.body.unwrap().into_blocking_read(); - BufReader::new(MultiGzDecoder::new(Box::new(body) as Box)) - .lines() - .filter(FnMut!(|x:&Result| { - if let Ok(x) = x { - x.chars().filter(|x| !x.is_whitespace()).nth(0) != Some('#') - } else { - true - } - })) - .map(FnMut!(|x:Result| { - if let Ok(x) = x { - Ok(CloudfrontRow::from_line(&x)) - } else { - Err(AwsError::from(x.err().unwrap())) - } - })) - }), + .lines() + .filter(|x: &Result| { + if let Ok(x) = x { + x.chars().filter(|x| !x.is_whitespace()).nth(0) != Some('#') + } else { + true + } + }) + .map(|x: Result| { + if let Ok(x) = x { + Ok(CloudfrontRow::from_line(&x)) + } else { + Err(AwsError::from(x.err().unwrap())) + } + }) + }), ) })) .map(FnMut!( - |x: Result, _>| x.and_then(identity) - )) + |x: Result, _>| x.and_then(self::identity) + )); + #[cfg(feature = "doc")] + let ret = amadeus_core::util::ImplDistributedIterator::new(ret); + ret } fn iter(self) -> Self::Iter { @@ -129,10 +116,10 @@ impl Source for Cloudfront { #[derive(Clone, Eq, PartialEq, Serialize, Deserialize, Debug)] pub struct CloudfrontRow { - pub time: DateTime, + pub time: DateTime, pub edge_location: String, pub response_bytes: u64, - pub remote_ip: net::IpAddr, + pub remote_ip: IpAddr, #[serde(with = "http_serde")] pub method: Method, pub host: String, @@ -154,6 +141,7 @@ pub struct CloudfrontRow { pub fle_encrypted_fields: Option, } impl CloudfrontRow { + #[inline(always)] fn from_line(line: &str) -> Self { let mut values = line.split('\t'); let date = values.next().unwrap(); @@ -183,10 +171,10 @@ impl CloudfrontRow { let fle_status = values.next().unwrap(); let fle_encrypted_fields = values.next().unwrap(); assert_eq!(values.next(), None); - let time = Utc.from_utc_datetime(&NaiveDateTime::new( + let time = DateTime::from_chrono(&Utc.from_utc_datetime(&NaiveDateTime::new( NaiveDate::parse_from_str(&date, "%Y-%m-%d").unwrap(), NaiveTime::parse_from_str(&time, "%H:%M:%S").unwrap(), - )); + ))); let status = if sc_status != "000" { Some(StatusCode::from_bytes(sc_status.as_bytes()).unwrap()) } else { diff --git a/amadeus-aws/src/file.rs b/amadeus-aws/src/file.rs index dfa9c14d..43984525 100644 --- a/amadeus-aws/src/file.rs +++ b/amadeus-aws/src/file.rs @@ -213,10 +213,14 @@ impl Page for S3Page { key: self.key.clone(), range: Some(format!("bytes={}-{}", start, end)), ..GetObjectRequest::default() - })) - .await; - if let Err(RusotoError::HttpDispatch(_)) = res { - continue; + })); + let res = res.await; + match res { + Err(RusotoError::HttpDispatch(_)) => continue, + Err(RusotoError::Unknown(response)) if response.status.is_server_error() => { + continue + } + _ => (), } let mut read = res.unwrap().body.unwrap().into_async_read(); while len - cursor.position() > 0 { diff --git a/amadeus-aws/src/lib.rs b/amadeus-aws/src/lib.rs index ee7c3ea7..4958c035 100644 --- a/amadeus-aws/src/lib.rs +++ b/amadeus-aws/src/lib.rs @@ -1,14 +1,13 @@ -#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.1.2")] +#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.1.3")] +#![feature(type_alias_impl_trait)] mod cloudfront; mod file; use futures::future::FutureExt; - use once_cell::sync::Lazy; use rusoto_core::RusotoError; use rusoto_s3::{GetObjectError, ListObjectsV2Error, ListObjectsV2Request, Object, S3Client, S3}; - use std::{ cell::RefCell, error, fmt::{self, Display}, future::Future, io, iter, mem::transmute, ops::FnMut }; @@ -16,9 +15,10 @@ use tokio::runtime::Runtime; use amadeus_core::util::{IoError, ResultExpand}; +#[doc(inline)] pub use cloudfront::{Cloudfront, CloudfrontRow}; +#[doc(inline)] pub use file::{S3Directory, S3File}; - #[doc(inline)] pub use rusoto_core::Region as AwsRegion; diff --git a/amadeus-commoncrawl/Cargo.toml b/amadeus-commoncrawl/Cargo.toml index c2af8fcf..8d117a38 100644 --- a/amadeus-commoncrawl/Cargo.toml +++ b/amadeus-commoncrawl/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "amadeus-commoncrawl" -version = "0.1.2" +version = "0.1.3" license = "MIT OR Apache-2.0" authors = ["Stephen Becker IV ", "Alec Mocatta "] categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"] @@ -10,7 +10,7 @@ Harmonious distributed data analysis in Rust. """ repository = "https://github.com/alecmocatta/amadeus" homepage = "https://github.com/alecmocatta/amadeus" -documentation = "https://docs.rs/amadeus/0.1.2" +documentation = "https://docs.rs/amadeus/0.1.3" readme = "README.md" edition = "2018" @@ -18,13 +18,19 @@ edition = "2018" azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests" } maintenance = { status = "actively-developed" } +[features] +doc = [] + [dependencies] -amadeus-core = { version = "=0.1.2", path = "../amadeus-core" } -amadeus-types = { version = "=0.1.2", path = "../amadeus-types" } +amadeus-core = { version = "=0.1.3", path = "../amadeus-core" } +amadeus-types = { version = "=0.1.3", path = "../amadeus-types" } flate2 = "1.0" reqwest = "0.9" reqwest_resume = "0.2" nom = "4.2.3" -serde_closure = "0.1" +serde_closure = "0.2" serde = { version = "1.0", features = ["derive"] } url = { version = "2.1", features = ["serde"] } + +# dependency of reqwest/native-tls; ensure it's vendored to simplify cross-compilation +openssl = { version = "0.10", features = ["vendored"] } diff --git a/amadeus-commoncrawl/src/lib.rs b/amadeus-commoncrawl/src/lib.rs index aa29596a..716514a1 100644 --- a/amadeus-commoncrawl/src/lib.rs +++ b/amadeus-commoncrawl/src/lib.rs @@ -1,42 +1,27 @@ -#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.1.2")] +#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.1.3")] +#![feature(type_alias_impl_trait)] mod commoncrawl; mod parser; use flate2::read::MultiGzDecoder; use reqwest_resume::ClientExt; -use serde::{Deserialize, Serialize}; use serde_closure::*; use std::{ - io::{self, BufRead, BufReader}, iter, ops::FnMut, time + io::{self, BufRead, BufReader}, iter, time }; -use amadeus_core::{ - dist_iter::{Consumer, DistributedIterator}, into_dist_iter::IteratorExt -}; +use amadeus_core::{dist_iter::DistributedIterator, into_dist_iter::IteratorExt, Source}; use amadeus_types::Webpage; use commoncrawl::WarcParser; -type Closure = - serde_closure::FnMut fn(&'r mut Env, Args) -> Output>; - -type CommonCrawlInner = amadeus_core::dist_iter::FlatMap< - amadeus_core::into_dist_iter::IterIter< - iter::Map< - io::Lines>>, - Closure<(), (Result,), String>, - >, - >, - Closure<(), (String,), WarcParser>>, ->; - +/// See https://commoncrawl.s3.amazonaws.com/crawl-data/index.html +/// CC-MAIN-2018-43 pub struct CommonCrawl { - i: CommonCrawlInner, + body: reqwest_resume::Response, } impl CommonCrawl { - /// See https://commoncrawl.s3.amazonaws.com/crawl-data/index.html - /// CC-MAIN-2018-43 pub fn new(id: &str) -> Result { let url = format!( "https://commoncrawl.s3.amazonaws.com/crawl-data/{}/warc.paths.gz", @@ -49,9 +34,25 @@ impl CommonCrawl { .resumable() .get(url.parse().unwrap()) .send()?; - let body = MultiGzDecoder::new(body); // Content-Encoding isn't set, so decode manually + Ok(Self { body }) + } +} - let i = BufReader::new(body) +impl Source for CommonCrawl { + type Item = Webpage<'static>; + type Error = io::Error; + + #[cfg(not(feature = "doc"))] + type DistIter = impl DistributedIterator>; + #[cfg(feature = "doc")] + type DistIter = amadeus_core::util::ImplDistributedIterator>; + type Iter = iter::Empty>; + + #[allow(clippy::let_and_return)] + fn dist_iter(self) -> Self::DistIter { + let body = MultiGzDecoder::new(self.body); // Content-Encoding isn't set, so decode manually + + let ret = BufReader::new(body) .lines() .map(FnMut!(|url: Result| -> String { format!("http://commoncrawl.s3.amazonaws.com/{}", url.unwrap()) @@ -69,31 +70,11 @@ impl CommonCrawl { let body = MultiGzDecoder::new(body); WarcParser::new(body) })); - Ok(Self { i }) - } -} - -impl DistributedIterator for CommonCrawl { - type Item = Result, io::Error>; - type Task = CommonCrawlConsumer; - - fn size_hint(&self) -> (usize, Option) { - self.i.size_hint() + #[cfg(feature = "doc")] + let ret = amadeus_core::util::ImplDistributedIterator::new(ret); + ret } - fn next_task(&mut self) -> Option { - self.i.next_task().map(|task| CommonCrawlConsumer { task }) - } -} - -#[derive(Serialize, Deserialize)] -pub struct CommonCrawlConsumer { - task: ::Task, -} - -impl Consumer for CommonCrawlConsumer { - type Item = Result, io::Error>; - - fn run(self, i: &mut impl FnMut(Self::Item) -> bool) -> bool { - self.task.run(i) + fn iter(self) -> Self::Iter { + iter::empty() } } diff --git a/amadeus-commoncrawl/src/parser.rs b/amadeus-commoncrawl/src/parser.rs index f8b8b865..676afe6d 100644 --- a/amadeus-commoncrawl/src/parser.rs +++ b/amadeus-commoncrawl/src/parser.rs @@ -93,42 +93,42 @@ fn token(input: &[u8]) -> IResult<&[u8], &[u8]> { } named!(init_line <&[u8], (&str, &str)>, - do_parse!( - opt!(tag!("\r")) >> - opt!(tag!("\n")) >> - tag!("WARC") >> - tag!("/") >> - opt!(space) >> - version: map_res!(version_number, str::from_utf8) >> - opt!(tag!("\r")) >> - tag!("\n") >> - (("WARCVERSION", version)) - ) + do_parse!( + opt!(tag!("\r")) >> + opt!(tag!("\n")) >> + tag!("WARC") >> + tag!("/") >> + opt!(space) >> + version: map_res!(version_number, str::from_utf8) >> + opt!(tag!("\r")) >> + tag!("\n") >> + (("WARCVERSION", version)) + ) ); named!(header_match <&[u8], (&str, &str)>, - do_parse!( - name: map_res!(token, str::from_utf8) >> - opt!(space) >> - tag!(":") >> - opt!(space) >> - value: map_res!(utf8_allowed, str::from_utf8) >> - opt!(tag!("\r")) >> - tag!("\n") >> - ((name, value)) - ) + do_parse!( + name: map_res!(token, str::from_utf8) >> + opt!(space) >> + tag!(":") >> + opt!(space) >> + value: map_res!(utf8_allowed, str::from_utf8) >> + opt!(tag!("\r")) >> + tag!("\n") >> + ((name, value)) + ) ); named!(header_aggregator<&[u8], Vec<(&str,&str)> >, many1!(header_match)); named!(warc_header<&[u8], ((&str, &str), Vec<(&str,&str)>) >, - do_parse!( - version: init_line >> - headers: header_aggregator >> - opt!(tag!("\r")) >> - tag!("\n") >> - ((version, headers)) - ) + do_parse!( + version: init_line >> + headers: header_aggregator >> + opt!(tag!("\r")) >> + tag!("\n") >> + ((version, headers)) + ) ); /// Parses one record and returns an IResult from nom @@ -202,16 +202,16 @@ pub fn record(input: &[u8]) -> IResult<&[u8], Record> { } named!(record_complete <&[u8], Record >, - complete!( - do_parse!( - entry: record >> - opt!(tag!("\r")) >> - tag!("\n") >> - opt!(tag!("\r")) >> - tag!("\n") >> - (entry) - ) - ) + complete!( + do_parse!( + entry: record >> + opt!(tag!("\r")) >> + tag!("\n") >> + opt!(tag!("\r")) >> + tag!("\n") >> + (entry) + ) + ) ); // Parses many record and returns an IResult with a Vec of Record diff --git a/amadeus-core/Cargo.toml b/amadeus-core/Cargo.toml index f6ac49f9..80816bd1 100644 --- a/amadeus-core/Cargo.toml +++ b/amadeus-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "amadeus-core" -version = "0.1.2" +version = "0.1.3" license = "Apache-2.0" authors = ["Alec Mocatta "] categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"] @@ -10,7 +10,7 @@ Harmonious distributed data analysis in Rust. """ repository = "https://github.com/alecmocatta/amadeus" homepage = "https://github.com/alecmocatta/amadeus" -documentation = "https://docs.rs/amadeus/0.1.2" +documentation = "https://docs.rs/amadeus/0.1.3" readme = "README.md" edition = "2018" @@ -20,12 +20,12 @@ maintenance = { status = "actively-developed" } [dependencies] either = { version = "1.5", features = ["serde"] } -futures-preview = "=0.3.0-alpha.18" +futures-preview = "=0.3.0-alpha.19" owned_chars = "0.3" rand = "0.7" replace_with = "0.1" serde = { version = "1.0", features = ["derive"] } -serde_closure = "0.1" +serde_closure = "0.2" streaming_algorithms = "0.1" sum = { version = "0.1", features = ["serde"] } walkdir = "2.2" diff --git a/amadeus-core/src/dist_iter.rs b/amadeus-core/src/dist_iter.rs index 72563d4b..9f384467 100644 --- a/amadeus-core/src/dist_iter.rs +++ b/amadeus-core/src/dist_iter.rs @@ -165,18 +165,14 @@ pub trait DistributedIterator { .into_iter() .map(|tasks| { let reduce1 = reduce1factory.make(); - pool.spawn( - FnOnce!([tasks,reduce1] move || -> ::Output { - let mut reduce1: R1 = reduce1; - let tasks: Vec = tasks; - for task in tasks { - if !task.run(&mut |item| reduce1.push(item)) { - break; - } - }; - reduce1.ret() - }), - ) + pool.spawn(FnOnce!(move || -> ::Output { + for task in tasks { + if !task.run(&mut |item| reduce1.push(item)) { + break; + } + } + reduce1.ret() + })) }) .collect::>(); let mut more = true; diff --git a/amadeus-core/src/lib.rs b/amadeus-core/src/lib.rs index bfd195b4..2ac8d7f6 100644 --- a/amadeus-core/src/lib.rs +++ b/amadeus-core/src/lib.rs @@ -1,4 +1,4 @@ -#![doc(html_root_url = "https://docs.rs/amadeus-core/0.1.2")] +#![doc(html_root_url = "https://docs.rs/amadeus-core/0.1.3")] #![feature(atomic_min_max)] #![feature(specialization)] #![feature(never_type)] diff --git a/amadeus-core/src/util.rs b/amadeus-core/src/util.rs index 6149af3c..0e37eca6 100644 --- a/amadeus-core/src/util.rs +++ b/amadeus-core/src/util.rs @@ -1,5 +1,7 @@ use serde::{Deserialize, Serialize}; -use std::{error, fmt, io, sync::Arc}; +use std::{error, fmt, io, marker::PhantomData, sync::Arc}; + +use crate::dist_iter::{Consumer, DistributedIterator}; pub struct ResultExpand(pub Result); impl IntoIterator for ResultExpand @@ -59,3 +61,34 @@ impl From for io::Error { Arc::try_unwrap(err.0).unwrap() } } + +pub struct ImplDistributedIterator(PhantomData); +impl ImplDistributedIterator { + pub fn new(_drop: U) -> Self + where + U: DistributedIterator, + { + Self(PhantomData) + } +} +impl DistributedIterator for ImplDistributedIterator { + type Item = T; + type Task = ImplConsumer; + + fn size_hint(&self) -> (usize, Option) { + unreachable!() + } + fn next_task(&mut self) -> Option { + unreachable!() + } +} + +#[derive(Serialize, Deserialize)] +pub struct ImplConsumer(PhantomData); +impl Consumer for ImplConsumer { + type Item = T; + + fn run(self, _i: &mut impl FnMut(Self::Item) -> bool) -> bool { + unreachable!() + } +} diff --git a/amadeus-derive/Cargo.toml b/amadeus-derive/Cargo.toml index 0ab2e690..58141147 100644 --- a/amadeus-derive/Cargo.toml +++ b/amadeus-derive/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "amadeus-derive" -version = "0.1.2" +version = "0.1.3" license = "Apache-2.0" authors = ["Alec Mocatta "] categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"] @@ -10,7 +10,7 @@ Harmonious distributed data analysis in Rust. """ repository = "https://github.com/alecmocatta/amadeus" homepage = "https://github.com/alecmocatta/amadeus" -documentation = "https://docs.rs/amadeus/0.1.2" +documentation = "https://docs.rs/amadeus/0.1.3" readme = "README.md" edition = "2018" diff --git a/amadeus-derive/src/lib.rs b/amadeus-derive/src/lib.rs index 54bb202d..983004c5 100644 --- a/amadeus-derive/src/lib.rs +++ b/amadeus-derive/src/lib.rs @@ -15,21 +15,16 @@ // specific language governing permissions and limitations // under the License. -#![doc(html_root_url = "https://docs.rs/amadeus-derive/0.1.2")] +#![doc(html_root_url = "https://docs.rs/amadeus-derive/0.1.3")] #![recursion_limit = "400"] #![allow(clippy::useless_let_if_seq)] extern crate proc_macro; -extern crate proc_macro2; -#[macro_use] -extern crate syn; -#[macro_use] -extern crate quote; use proc_macro2::{Span, TokenStream}; -use quote::ToTokens; +use quote::{quote, ToTokens}; use syn::{ - punctuated::Punctuated, spanned::Spanned, Attribute, Data, DataEnum, DeriveInput, Error, Field, Fields, Ident, Lit, LitStr, Meta, NestedMeta, Path, TypeParam, WhereClause + punctuated::Punctuated, spanned::Spanned, Attribute, Data, DataEnum, DeriveInput, Error, Field, Fields, Ident, Lit, LitStr, Meta, NestedMeta, Path, Token, TypeParam, WhereClause }; /// This is a procedural macro to derive the [`Data`](amadeus::record::Data) trait on @@ -164,7 +159,7 @@ fn impl_struct( .predicates .push(syn::parse2(quote! { #ident: __::PostgresData }).unwrap()); } - let mut where_clause_with_serde_data = where_clause.clone(); + let mut where_clause_with_serde_data = where_clause; for TypeParam { ident, .. } in ast.generics.type_params() { where_clause_with_serde_data .predicates @@ -252,13 +247,9 @@ fn impl_struct( let mut parquet_derives = None; if cfg!(feature = "parquet") { parquet_includes = Some(quote! { - pub use ::amadeus_parquet::{ParquetData,internal::{ - basic::Repetition, - column::reader::ColumnReader, - errors::{ParquetError, Result as ParquetResult}, - record::{Schema as ParquetSchema, Reader, _private::DisplaySchemaGroup}, - schema::types::{ColumnPath, Type}, - }}; + pub use ::amadeus_parquet::derive::{ + ParquetData, Repetition, ColumnReader, ParquetError, ParquetResult, ParquetSchema, Reader, DisplaySchemaGroup, ColumnPath, Type + }; }); parquet_derives = Some(quote! { diff --git a/amadeus-parquet/Cargo.toml b/amadeus-parquet/Cargo.toml index 57b319dc..430815ae 100644 --- a/amadeus-parquet/Cargo.toml +++ b/amadeus-parquet/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "amadeus-parquet" -version = "0.1.2" +version = "0.1.3" license = "Apache-2.0" authors = ["Alec Mocatta ", "Apache Arrow "] categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"] @@ -10,7 +10,7 @@ An Apache Parquet implementation in Rust. """ repository = "https://github.com/alecmocatta/amadeus" homepage = "https://github.com/alecmocatta/amadeus" -documentation = "https://docs.rs/amadeus/0.1.2" +documentation = "https://docs.rs/amadeus/0.1.3" readme = "README.md" edition = "2018" @@ -18,12 +18,15 @@ edition = "2018" azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests" } maintenance = { status = "actively-developed" } +[features] +doc = [] + [dependencies] -amadeus-core = { version = "=0.1.2", path = "../amadeus-core" } -amadeus-types = { version = "=0.1.2", path = "../amadeus-types" } +amadeus-core = { version = "=0.1.3", path = "../amadeus-core" } +amadeus-types = { version = "=0.1.3", path = "../amadeus-types" } brotli = "3.3" byteorder = "1.2" -chrono = "0.4" +chrono = { version = "0.4", default-features = false } flate2 = { version = "1.0.2", features = ["rust_backend"], default-features = false } fxhash = "0.2" linked-hash-map = "0.5" @@ -32,7 +35,7 @@ num-bigint = "0.2" parquet-format = "2.6.0" quick-error = "1.2.2" serde = { version = "1.0", features = ["derive"] } -serde_closure = "0.1" +serde_closure = "0.2" snap = "0.2" sum = "0.1" thrift = "0.12.0" diff --git a/amadeus-parquet/benches/codec.rs b/amadeus-parquet/benches/codec.rs deleted file mode 100644 index a4b8f226..00000000 --- a/amadeus-parquet/benches/codec.rs +++ /dev/null @@ -1,192 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#![feature(test)] - -extern crate test; - -#[allow(dead_code)] -#[path = "common.rs"] -mod common; -use crate::common::*; - -use std::fs::File; -use test::Bencher; - -use amadeus_parquet::internal; -use internal::{basic::Compression, compression::*, file::reader::*}; - -// 10k rows written in page v2 with type: -// -// message test { -// required binary binary_field, -// required int32 int32_field, -// required int64 int64_field, -// required boolean boolean_field, -// required float float_field, -// required double double_field, -// required fixed_len_byte_array(1024) flba_field, -// required int96 int96_field -// } -// -// filled with random values. - -fn get_rg_reader() -> internal::file::reader::SerializedRowGroupReader { - let file = get_test_file("10k-v2.parquet"); - let f_reader = SerializedFileReader::new(file).unwrap(); - f_reader.get_row_group(0).unwrap() -} - -fn get_pages_bytes(col_idx: usize) -> Vec { - let mut data: Vec = Vec::new(); - let rg_reader = get_rg_reader(); - let mut pg_reader = rg_reader.get_column_page_reader(col_idx).unwrap(); - while let Some(p) = pg_reader.get_next_page().unwrap() { - data.extend_from_slice(p.buffer().data()); - } - data -} - -macro_rules! compress { - ($fname:ident, $codec:expr, $col_idx:expr) => { - #[bench] - fn $fname(bench: &mut Bencher) { - let mut codec = create_codec($codec).unwrap().unwrap(); - let data = get_pages_bytes($col_idx); - bench.bytes = data.len() as u64; - let mut v = Vec::new(); - bench.iter(|| { - codec.compress(&data[..], &mut v).unwrap(); - v.clear(); - }) - } - }; -} - -macro_rules! decompress { - ($fname:ident, $codec:expr, $col_idx:expr) => { - #[bench] - fn $fname(bench: &mut Bencher) { - let compressed_pages = { - let mut codec = create_codec($codec).unwrap().unwrap(); - let raw_data = get_pages_bytes($col_idx); - let mut v = Vec::new(); - codec.compress(&raw_data[..], &mut v).unwrap(); - v - }; - - let mut codec = create_codec($codec).unwrap().unwrap(); - let rg_reader = get_rg_reader(); - bench.bytes = rg_reader.metadata().total_byte_size() as u64; - let mut v = Vec::new(); - bench.iter(|| { - let _ = codec.decompress(&compressed_pages[..], &mut v).unwrap(); - v.clear(); - }) - } - }; -} - -compress!(compress_brotli_binary, Compression::Brotli, 0); -compress!(compress_brotli_int32, Compression::Brotli, 1); -compress!(compress_brotli_int64, Compression::Brotli, 2); -compress!(compress_brotli_boolean, Compression::Brotli, 3); -compress!(compress_brotli_float, Compression::Brotli, 4); -compress!(compress_brotli_double, Compression::Brotli, 5); -compress!(compress_brotli_fixed, Compression::Brotli, 6); -compress!(compress_brotli_int96, Compression::Brotli, 7); - -compress!(compress_gzip_binary, Compression::Gzip, 0); -compress!(compress_gzip_int32, Compression::Gzip, 1); -compress!(compress_gzip_int64, Compression::Gzip, 2); -compress!(compress_gzip_boolean, Compression::Gzip, 3); -compress!(compress_gzip_float, Compression::Gzip, 4); -compress!(compress_gzip_double, Compression::Gzip, 5); -compress!(compress_gzip_fixed, Compression::Gzip, 6); -compress!(compress_gzip_int96, Compression::Gzip, 7); - -compress!(compress_snappy_binary, Compression::Snappy, 0); -compress!(compress_snappy_int32, Compression::Snappy, 1); -compress!(compress_snappy_int64, Compression::Snappy, 2); -compress!(compress_snappy_boolean, Compression::Snappy, 3); -compress!(compress_snappy_float, Compression::Snappy, 4); -compress!(compress_snappy_double, Compression::Snappy, 5); -compress!(compress_snappy_fixed, Compression::Snappy, 6); -compress!(compress_snappy_int96, Compression::Snappy, 7); - -compress!(compress_lz4_binary, Compression::Lz4, 0); -compress!(compress_lz4_int32, Compression::Lz4, 1); -compress!(compress_lz4_int64, Compression::Lz4, 2); -compress!(compress_lz4_boolean, Compression::Lz4, 3); -compress!(compress_lz4_float, Compression::Lz4, 4); -compress!(compress_lz4_double, Compression::Lz4, 5); -compress!(compress_lz4_fixed, Compression::Lz4, 6); -compress!(compress_lz4_int96, Compression::Lz4, 7); - -compress!(compress_zstd_binary, Compression::Zstd, 0); -compress!(compress_zstd_int32, Compression::Zstd, 1); -compress!(compress_zstd_int64, Compression::Zstd, 2); -compress!(compress_zstd_boolean, Compression::Zstd, 3); -compress!(compress_zstd_float, Compression::Zstd, 4); -compress!(compress_zstd_double, Compression::Zstd, 5); -compress!(compress_zstd_fixed, Compression::Zstd, 6); -compress!(compress_zstd_int96, Compression::Zstd, 7); - -decompress!(decompress_brotli_binary, Compression::Brotli, 0); -decompress!(decompress_brotli_int32, Compression::Brotli, 1); -decompress!(decompress_brotli_int64, Compression::Brotli, 2); -decompress!(decompress_brotli_boolean, Compression::Brotli, 3); -decompress!(decompress_brotli_float, Compression::Brotli, 4); -decompress!(decompress_brotli_double, Compression::Brotli, 5); -decompress!(decompress_brotli_fixed, Compression::Brotli, 6); -decompress!(decompress_brotli_int96, Compression::Brotli, 7); - -decompress!(decompress_gzip_binary, Compression::Gzip, 0); -decompress!(decompress_gzip_int32, Compression::Gzip, 1); -decompress!(decompress_gzip_int64, Compression::Gzip, 2); -decompress!(decompress_gzip_boolean, Compression::Gzip, 3); -decompress!(decompress_gzip_float, Compression::Gzip, 4); -decompress!(decompress_gzip_double, Compression::Gzip, 5); -decompress!(decompress_gzip_fixed, Compression::Gzip, 6); -decompress!(decompress_gzip_int96, Compression::Gzip, 7); - -decompress!(decompress_snappy_binary, Compression::Snappy, 0); -decompress!(decompress_snappy_int32, Compression::Snappy, 1); -decompress!(decompress_snappy_int64, Compression::Snappy, 2); -decompress!(decompress_snappy_boolean, Compression::Snappy, 3); -decompress!(decompress_snappy_float, Compression::Snappy, 4); -decompress!(decompress_snappy_double, Compression::Snappy, 5); -decompress!(decompress_snappy_fixed, Compression::Snappy, 6); -decompress!(decompress_snappy_int96, Compression::Snappy, 7); - -decompress!(decompress_lz4_binary, Compression::Lz4, 0); -decompress!(decompress_lz4_int32, Compression::Lz4, 1); -decompress!(decompress_lz4_int64, Compression::Lz4, 2); -decompress!(decompress_lz4_boolean, Compression::Lz4, 3); -decompress!(decompress_lz4_float, Compression::Lz4, 4); -decompress!(decompress_lz4_double, Compression::Lz4, 5); -decompress!(decompress_lz4_fixed, Compression::Lz4, 6); -decompress!(decompress_lz4_int96, Compression::Lz4, 7); - -decompress!(decompress_zstd_binary, Compression::Zstd, 0); -decompress!(decompress_zstd_int32, Compression::Zstd, 1); -decompress!(decompress_zstd_int64, Compression::Zstd, 2); -decompress!(decompress_zstd_boolean, Compression::Zstd, 3); -decompress!(decompress_zstd_float, Compression::Zstd, 4); -decompress!(decompress_zstd_double, Compression::Zstd, 5); -decompress!(decompress_zstd_fixed, Compression::Zstd, 6); -decompress!(decompress_zstd_int96, Compression::Zstd, 7); diff --git a/amadeus-parquet/benches/decoding.rs b/amadeus-parquet/benches/decoding.rs deleted file mode 100644 index a4405118..00000000 --- a/amadeus-parquet/benches/decoding.rs +++ /dev/null @@ -1,205 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#![feature(test)] - -extern crate test; - -#[allow(dead_code)] -#[path = "common.rs"] -mod common; -use crate::common::*; - -use std::rc::Rc; -use test::Bencher; - -use amadeus_parquet::internal; -use internal::{ - basic::*, data_type::*, decoding::*, encoding::*, memory::{ByteBufferPtr, MemTracker} -}; - -macro_rules! plain { - ($fname:ident, $num_values:expr, $batch_size:expr, $ty:ident, $pty:expr, - $gen_data_fn:expr) => { - #[bench] - fn $fname(bench: &mut Bencher) { - let mem_tracker = Rc::new(MemTracker::new()); - let mut encoder = - PlainEncoder::<$ty>::new(Rc::new(col_desc(0, $pty)), mem_tracker, vec![]); - - let (_, values) = $gen_data_fn($num_values); - encoder.put(&values[..]).expect("put() should be OK"); - let buffer = encoder.flush_buffer().expect("flush_buffer() should be OK"); - - let decoder = PlainDecoder::<$ty>::new(0); - bench_decoding(bench, $num_values, $batch_size, buffer, Box::new(decoder)); - } - }; -} - -macro_rules! dict { - ($fname:ident, $num_values:expr, $batch_size:expr, $ty:ident, $pty:expr, - $gen_data_fn:expr) => { - #[bench] - fn $fname(bench: &mut Bencher) { - let mem_tracker = Rc::new(MemTracker::new()); - let mut encoder = DictEncoder::<$ty>::new(Rc::new(col_desc(0, $pty)), mem_tracker); - - let (_, values) = $gen_data_fn($num_values); - encoder.put(&values[..]).expect("put() should be OK"); - let mut dict_decoder = PlainDecoder::<$ty>::new(0); - dict_decoder - .set_data( - encoder.write_dict().expect("write_dict() should be OK"), - encoder.num_entries(), - ) - .expect("set_data() should be OK"); - - let buffer = encoder.flush_buffer().expect("flush_buffer() should be OK"); - let mut decoder = DictDecoder::<$ty>::new(); - decoder - .set_dict(Box::new(dict_decoder)) - .expect("set_dict() should be OK"); - - bench_decoding(bench, $num_values, $batch_size, buffer, Box::new(decoder)); - } - }; -} - -macro_rules! delta_bit_pack { - ($fname:ident, $num_values:expr, $batch_size:expr, $ty:ident, $gen_data_fn:expr) => { - #[bench] - fn $fname(bench: &mut Bencher) { - let mut encoder = DeltaBitPackEncoder::<$ty>::new(); - - let (_, values) = $gen_data_fn($num_values); - encoder.put(&values[..]).expect("put() should be OK"); - let buffer = encoder.flush_buffer().expect("flush_buffer() should be OK"); - - let decoder = DeltaBitPackDecoder::<$ty>::new(); - bench_decoding(bench, $num_values, $batch_size, buffer, Box::new(decoder)); - } - }; -} - -fn bench_decoding( - bench: &mut Bencher, num_values: usize, batch_size: usize, buffer: ByteBufferPtr, - mut decoder: Box>, -) { - bench.bytes = buffer.len() as u64; - bench.iter(|| { - decoder - .set_data(buffer.clone(), num_values) - .expect("set_data() should be OK"); - let mut values = vec![T::Type::default(); batch_size]; - loop { - if decoder.get(&mut values[..]).expect("get() should be OK") < batch_size { - break; - } - } - }) -} - -plain!(plain_i32_1k_32, 1024, 32, Int32Type, Type::Int32, gen_1000); -plain!(plain_i32_1k_64, 1024, 64, Int32Type, Type::Int32, gen_1000); -plain!( - plain_i32_1k_128, - 1024, - 128, - Int32Type, - Type::Int32, - gen_1000 -); -plain!(plain_i32_1m_32, 1024, 32, Int32Type, Type::Int32, gen_1000); -plain!(plain_i32_1m_64, 1024, 64, Int32Type, Type::Int32, gen_1000); -plain!( - plain_i32_1m_128, - 1024, - 128, - Int32Type, - Type::Int32, - gen_1000 -); -plain!( - plain_str_1m_128, - 1024, - 128, - ByteArrayType, - Type::ByteArray, - gen_test_strs -); - -dict!(dict_i32_1k_32, 1024, 32, Int32Type, Type::Int32, gen_1000); -dict!(dict_i32_1k_64, 1024, 64, Int32Type, Type::Int32, gen_1000); -dict!(dict_i32_1k_128, 1024, 128, Int32Type, Type::Int32, gen_1000); -dict!( - dict_i32_1m_32, - 1024 * 1024, - 32, - Int32Type, - Type::Int32, - gen_1000 -); -dict!( - dict_i32_1m_64, - 1024 * 1024, - 64, - Int32Type, - Type::Int32, - gen_1000 -); -dict!( - dict_i32_1m_128, - 1024 * 1024, - 128, - Int32Type, - Type::Int32, - gen_1000 -); -dict!( - dict_str_1m_128, - 1024 * 1024, - 128, - ByteArrayType, - Type::ByteArray, - gen_test_strs -); - -delta_bit_pack!(delta_bit_pack_i32_1k_32, 1024, 32, Int32Type, gen_1000); -delta_bit_pack!(delta_bit_pack_i32_1k_64, 1024, 64, Int32Type, gen_1000); -delta_bit_pack!(delta_bit_pack_i32_1k_128, 1024, 128, Int32Type, gen_1000); -delta_bit_pack!( - delta_bit_pack_i32_1m_32, - 1024 * 1024, - 32, - Int32Type, - gen_1000 -); -delta_bit_pack!( - delta_bit_pack_i32_1m_64, - 1024 * 1024, - 64, - Int32Type, - gen_1000 -); -delta_bit_pack!( - delta_bit_pack_i32_1m_128, - 1024 * 1024, - 128, - Int32Type, - gen_1000 -); diff --git a/amadeus-parquet/benches/encoding.rs b/amadeus-parquet/benches/encoding.rs deleted file mode 100644 index 945e78ed..00000000 --- a/amadeus-parquet/benches/encoding.rs +++ /dev/null @@ -1,135 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#![feature(test)] - -extern crate test; - -#[allow(dead_code)] -#[path = "common.rs"] -mod common; -use crate::common::*; - -use std::rc::Rc; -use test::Bencher; - -use amadeus_parquet::internal; -use internal::{basic::*, data_type::*, encoding::*, memory::MemTracker}; - -macro_rules! plain { - ($fname:ident, $batch_size:expr, $ty:ident, $pty:expr, $gen_data_fn:expr) => { - #[bench] - fn $fname(bench: &mut Bencher) { - let mem_tracker = Rc::new(MemTracker::new()); - let encoder = PlainEncoder::<$ty>::new(Rc::new(col_desc(0, $pty)), mem_tracker, vec![]); - let (bytes, values) = $gen_data_fn($batch_size); - bench_encoding(bench, bytes, values, Box::new(encoder)); - } - }; -} - -macro_rules! dict { - ($fname:ident, $batch_size:expr, $ty:ident, $pty:expr, $gen_data_fn:expr) => { - #[bench] - fn $fname(bench: &mut Bencher) { - let mem_tracker = Rc::new(MemTracker::new()); - let encoder = DictEncoder::<$ty>::new(Rc::new(col_desc(0, $pty)), mem_tracker); - let (bytes, values) = $gen_data_fn($batch_size); - bench_encoding(bench, bytes, values, Box::new(encoder)); - } - }; -} - -macro_rules! delta_bit_pack { - ($fname:ident, $batch_size:expr, $ty:ident, $gen_data_fn:expr) => { - #[bench] - fn $fname(bench: &mut Bencher) { - let encoder = DeltaBitPackEncoder::<$ty>::new(); - let (bytes, values) = $gen_data_fn($batch_size); - bench_encoding(bench, bytes, values, Box::new(encoder)); - } - }; -} - -fn bench_encoding( - bench: &mut Bencher, bytes: usize, values: Vec, mut encoder: Box>, -) { - bench.bytes = bytes as u64; - bench.iter(|| { - encoder.put(&values[..]).expect("put() should be OK"); - encoder.flush_buffer().expect("flush_buffer() should be OK"); - }) -} - -plain!(plain_i32_1k_10, 1024, Int32Type, Type::Int32, gen_10); -plain!(plain_i32_1k_100, 1024, Int32Type, Type::Int32, gen_100); -plain!(plain_i32_1k_1000, 1024, Int32Type, Type::Int32, gen_1000); -plain!(plain_i32_1m_10, 1024 * 1024, Int32Type, Type::Int32, gen_10); -plain!( - plain_i32_1m_100, - 1024 * 1024, - Int32Type, - Type::Int32, - gen_100 -); -plain!( - plain_i32_1m_1000, - 1024 * 1024, - Int32Type, - Type::Int32, - gen_1000 -); -plain!( - plain_str_1m, - 1024 * 1024, - ByteArrayType, - Type::ByteArray, - gen_test_strs -); - -dict!(dict_i32_1k_10, 1024, Int32Type, Type::Int32, gen_10); -dict!(dict_i32_1k_100, 1024, Int32Type, Type::Int32, gen_100); -dict!(dict_i32_1k_1000, 1024, Int32Type, Type::Int32, gen_1000); -dict!(dict_i32_1m_10, 1024 * 1024, Int32Type, Type::Int32, gen_10); -dict!( - dict_i32_1m_100, - 1024 * 1024, - Int32Type, - Type::Int32, - gen_100 -); -dict!( - dict_i32_1m_1000, - 1024 * 1024, - Int32Type, - Type::Int32, - gen_1000 -); -plain!( - dict_str_1m, - 1024 * 1024, - ByteArrayType, - Type::ByteArray, - gen_test_strs -); - -delta_bit_pack!(delta_bit_pack_i32_1k_10, 1024, Int32Type, gen_10); -delta_bit_pack!(delta_bit_pack_i32_1k_100, 1024, Int32Type, gen_100); -delta_bit_pack!(delta_bit_pack_i32_1k_1000, 1024, Int32Type, gen_1000); -delta_bit_pack!(delta_bit_pack_i32_1m_10, 1024 * 1024, Int32Type, gen_10); -delta_bit_pack!(delta_bit_pack_i32_1m_100, 1024 * 1024, Int32Type, gen_100); -delta_bit_pack!(delta_bit_pack_i32_1m_1000, 1024 * 1024, Int32Type, gen_1000); diff --git a/amadeus-parquet/benches/reader.rs b/amadeus-parquet/benches/reader.rs deleted file mode 100644 index ddd3bb0a..00000000 --- a/amadeus-parquet/benches/reader.rs +++ /dev/null @@ -1,200 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#![feature(test)] - -extern crate test; - -#[allow(dead_code)] -#[path = "common.rs"] -mod common; -use crate::common::*; - -use std::collections::HashMap; -use test::Bencher; - -use amadeus_parquet::internal; -use internal::{ - column::reader::{get_typed_column_reader, ColumnReader}, data_type::*, file::reader::{FileReader, RowGroupReader, SerializedFileReader}, record::types::{Row, Timestamp}, schema::types::ColumnPath -}; - -#[bench] -fn record_reader_10k(bench: &mut Bencher) { - let file = get_test_file("10k-v2.parquet"); - let len = file.metadata().unwrap().len(); - let parquet_reader = SerializedFileReader::new(file).unwrap(); - - bench.bytes = len; - bench.iter(move || { - (&parquet_reader) - .get_row_iter::(None) - .unwrap() - .for_each(drop); - }) -} - -#[bench] -fn record_reader_10k_typed(bench: &mut Bencher) { - let file = get_test_file("10k-v2.parquet"); - let len = file.metadata().unwrap().len(); - let parquet_reader = SerializedFileReader::new(file).unwrap(); - - bench.bytes = len; - bench.iter(|| { - type RowTyped = (Vec, i32, i64, bool, f32, f64, [u8; 1024], Timestamp); - (&parquet_reader) - .get_row_iter::(None) - .unwrap() - .for_each(drop); - }) -} - -#[bench] -fn record_reader_stock_simulated(bench: &mut Bencher) { - let file = get_test_file("stock_simulated.parquet"); - let len = file.metadata().unwrap().len(); - let parquet_reader = SerializedFileReader::new(file).unwrap(); - - bench.bytes = len; - bench.iter(|| { - (&parquet_reader) - .get_row_iter::(None) - .unwrap() - .for_each(drop); - }) -} - -#[bench] -fn record_reader_stock_simulated_typed(bench: &mut Bencher) { - let file = get_test_file("stock_simulated.parquet"); - let len = file.metadata().unwrap().len(); - let parquet_reader = SerializedFileReader::new(file).unwrap(); - - bench.bytes = len; - bench.iter(|| { - type RowTyped = ( - Option, - Option, - Option, - Option, - Option, - Option, - Option, - Option, - Option, - Option, - Option, - Option, - Option, - Option, - Option, - Option, - Option, - Option, - Option, - Option, - Option, - Option, - ); - (&parquet_reader) - .get_row_iter::(None) - .unwrap() - .for_each(drop); - }) -} - -#[bench] -fn record_reader_stock_simulated_column(bench: &mut Bencher) { - // WARNING THIS BENCH IS INTENDED FOR THIS DATA FILE ONLY - // COPY OR CHANGE THE DATA FILE MAY NOT WORK AS YOU WISH - let file = get_test_file("stock_simulated.parquet"); - let len = file.metadata().unwrap().len(); - let parquet_reader = SerializedFileReader::new(file).unwrap(); - - let descr = parquet_reader.metadata().file_metadata().schema_descr_ptr(); - let num_row_groups = parquet_reader.num_row_groups(); - let batch_size = 256; - - bench.bytes = len; - bench.iter(|| { - let mut current_row_group = 0; - - while current_row_group < num_row_groups { - let row_group_reader = parquet_reader.get_row_group(current_row_group).unwrap(); - let num_rows = row_group_reader.metadata().num_rows() as usize; - - let mut paths = HashMap::new(); - let row_group_metadata = row_group_reader.metadata(); - - for col_index in 0..row_group_reader.num_columns() { - let col_meta = row_group_metadata.column(col_index); - let col_path = col_meta.column_path().clone(); - paths.insert(col_path, col_index); - } - - let mut readers = Vec::new(); - for field in descr.root_schema().get_fields() { - let col_path = ColumnPath::new(vec![field.name().to_owned()]); - let orig_index = *paths.get(&col_path).unwrap(); - let col_reader = row_group_reader.get_column_reader(orig_index).unwrap(); - readers.push(col_reader); - } - - let mut def_levels = Some(vec![0; batch_size]); - let mut rep_levels = None::>; - - for col_reader in readers.into_iter() { - match col_reader { - r @ ColumnReader::Int64ColumnReader(..) => { - let mut data_collected = Vec::with_capacity(num_rows); - let mut val = vec![0; batch_size]; - let mut typed_reader = get_typed_column_reader::(r); - while let Ok((values_read, _levels_read)) = typed_reader.read_batch( - batch_size, - def_levels.as_mut().map(|x| &mut x[..]), - rep_levels.as_mut().map(|x| &mut x[..]), - &mut val, - ) { - data_collected.extend_from_slice(&val); - if values_read < batch_size { - break; - } - } - } - r @ ColumnReader::DoubleColumnReader(..) => { - let mut data_collected = Vec::with_capacity(num_rows); - let mut val = vec![0.0; batch_size]; - let mut typed_reader = get_typed_column_reader::(r); - while let Ok((values_read, _levels_read)) = typed_reader.read_batch( - batch_size, - def_levels.as_mut().map(|x| &mut x[..]), - rep_levels.as_mut().map(|x| &mut x[..]), - &mut val, - ) { - data_collected.extend_from_slice(&val); - if values_read < batch_size { - break; - } - } - } - _ => unimplemented!(), - } - } - current_row_group += 1; - } - }) -} diff --git a/amadeus-parquet/src/bin/parquet-read.rs b/amadeus-parquet/src/bin/parquet-read.rs deleted file mode 100644 index 8e618a47..00000000 --- a/amadeus-parquet/src/bin/parquet-read.rs +++ /dev/null @@ -1,87 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Binary file to read data from a Parquet file. -//! -//! # Install -//! -//! `parquet-read` can be installed using `cargo`: -//! ``` -//! cargo install parquet -//! ``` -//! After this `parquet-read` should be globally available: -//! ``` -//! parquet-read XYZ.parquet -//! ``` -//! -//! The binary can also be built from the source code and run as follows: -//! ``` -//! cargo run --bin parquet-read XYZ.parquet -//! ``` -//! -//! # Usage -//! -//! ``` -//! parquet-read [num-records] -//! ``` -//! where `file-path` is the path to a Parquet file and `num-records` is the optional -//! numeric option that allows to specify number of records to read from a file. -//! When not provided, all records are read. -//! -//! Note that `parquet-read` reads full file schema, no projection or filtering is -//! applied. - -use std::{env, fs::File, path::Path, process}; - -use amadeus_parquet::internal::{ - file::reader::{FileReader, SerializedFileReader}, record::types::Row -}; - -fn main() { - let args: Vec = env::args().collect(); - if args.len() != 2 && args.len() != 3 { - println!("Usage: parquet-read [num-records]"); - process::exit(1); - } - - let mut num_records: Option = None; - if args.len() == 3 { - match args[2].parse() { - Ok(value) => num_records = Some(value), - Err(e) => panic!("Error when reading value for [num-records], {}", e), - } - } - - let path = Path::new(&args[1]); - let file = File::open(&path).unwrap(); - let parquet_reader = SerializedFileReader::new(file).unwrap(); - - // Use full schema as projected schema - let mut iter = parquet_reader.get_row_iter::(None).unwrap(); - - let mut start = 0; - let end = num_records.unwrap_or(0); - let all_records = num_records.is_none(); - - while all_records || start < end { - match iter.next() { - Some(row) => println!("{:?}", row.unwrap()), - None => break, - } - start += 1; - } -} diff --git a/amadeus-parquet/src/bin/parquet-schema.rs b/amadeus-parquet/src/bin/parquet-schema.rs deleted file mode 100644 index 0a9c2e82..00000000 --- a/amadeus-parquet/src/bin/parquet-schema.rs +++ /dev/null @@ -1,85 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Binary file to print the schema and metadata of a Parquet file. -//! -//! # Install -//! -//! `parquet-schema` can be installed using `cargo`: -//! ``` -//! cargo install parquet -//! ``` -//! After this `parquet-schema` should be globally available: -//! ``` -//! parquet-schema XYZ.parquet -//! ``` -//! -//! The binary can also be built from the source code and run as follows: -//! ``` -//! cargo run --bin parquet-schema XYZ.parquet -//! ``` -//! -//! # Usage -//! -//! ``` -//! parquet-schema [verbose] -//! ``` -//! where `file-path` is the path to a Parquet file and `verbose` is the optional boolean -//! flag that allows to print schema only, when set to `false` (default behaviour when -//! not provided), or print full file metadata, when set to `true`. - -use std::{env, fs::File, path::Path, process}; - -use amadeus_parquet::internal::{ - file::reader::{FileReader, SerializedFileReader}, schema::printer::{print_file_metadata, print_parquet_metadata} -}; - -fn main() { - let args: Vec = env::args().collect(); - if args.len() != 2 && args.len() != 3 { - println!("Usage: parquet-schema [verbose]"); - process::exit(1); - } - let path = Path::new(&args[1]); - let mut verbose = false; - if args.len() == 3 { - match args[2].parse() { - Ok(b) => verbose = b, - Err(e) => panic!( - "Error when reading value for [verbose] (expected either 'true' or 'false'): {}", - e - ), - } - } - let file = match File::open(&path) { - Err(e) => panic!("Error when opening file {}: {}", path.display(), e), - Ok(f) => f, - }; - match SerializedFileReader::new(file) { - Err(e) => panic!("Error when parsing Parquet file: {}", e), - Ok(parquet_reader) => { - let metadata = parquet_reader.metadata(); - println!("Metadata for file: {}", &args[1]); - println!(); - if verbose { - print_parquet_metadata(&mut std::io::stdout(), &metadata); - } else { - print_file_metadata(&mut std::io::stdout(), &metadata.file_metadata()); - } - } - } -} diff --git a/amadeus-parquet/src/impls.rs b/amadeus-parquet/src/impls.rs deleted file mode 100644 index 5761485e..00000000 --- a/amadeus-parquet/src/impls.rs +++ /dev/null @@ -1,902 +0,0 @@ -#![allow(clippy::type_complexity)] - -use super::{ - internal::{ - self, basic::Repetition, column::reader::ColumnReader, errors::ParquetError, record::Record as ParquetRecord, schema::types::{ColumnPath, Type} - }, ParquetData -}; -use amadeus_types::{ - Bson, Date, Decimal, Enum, Group, Json, List, Map, Time, Timestamp, Value, ValueRequired -}; -use std::{ - collections::HashMap, convert::{TryFrom, TryInto}, hash::Hash, marker::PhantomData, mem::transmute -}; - -pub trait InternalInto { - fn internal_into(self) -> T; -} -impl InternalInto for internal::record::types::Bson { - fn internal_into(self) -> Bson { - let vec: Vec = self.into(); - vec.into() - } -} -impl InternalInto for internal::record::types::Json { - fn internal_into(self) -> Json { - let vec: String = self.into(); - vec.into() - } -} -impl InternalInto for internal::record::types::Enum { - fn internal_into(self) -> Enum { - let vec: String = self.into(); - vec.into() - } -} -impl InternalInto for internal::data_type::Decimal { - fn internal_into(self) -> Decimal { - unimplemented!() - } -} -impl InternalInto for internal::record::types::Group { - fn internal_into(self) -> Group { - let field_names = self.field_names(); - Group::new( - self.into_fields() - .into_iter() - .map(InternalInto::internal_into) - .collect(), - Some(field_names), - ) - } -} -const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588; -const NANOS_PER_MICRO: i64 = 1_000; -impl InternalInto for internal::record::types::Date { - fn internal_into(self) -> Date { - Date::from_days(self.as_days().into()).unwrap() - } -} -impl InternalInto