Skip to content

Commit

Permalink
Merge pull request #7 from alecmocatta/compile-time
Browse files Browse the repository at this point in the history
Improve compile times; rename Timestamp -> DateTime
  • Loading branch information
mergify[bot] authored Oct 17, 2019
2 parents 6910ff5 + 4167503 commit 26ef26b
Show file tree
Hide file tree
Showing 95 changed files with 5,414 additions and 8,496 deletions.
28 changes: 15 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "amadeus"
version = "0.1.2"
version = "0.1.3"
license = "Apache-2.0"
authors = ["Alec Mocatta <[email protected]>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -14,7 +14,7 @@ parquet postgres aws s3 cloudfront elb json csv logs hadoop hdfs arrow common cr
"""
repository = "https://github.com/alecmocatta/amadeus"
homepage = "https://github.com/alecmocatta/amadeus"
documentation = "https://docs.rs/amadeus/0.1.2"
documentation = "https://docs.rs/amadeus/0.1.3"
readme = "README.md"
edition = "2018"

Expand All @@ -30,23 +30,25 @@ parquet = ["amadeus-parquet", "amadeus-derive/parquet"]
postgres = ["amadeus-postgres", "amadeus-derive/postgres"]
csv = ["amadeus-serde", "amadeus-derive/serde"]
json = ["amadeus-serde", "amadeus-derive/serde"]
doc = ["amadeus-aws/doc", "amadeus-commoncrawl/doc", "amadeus-parquet/doc", "amadeus-postgres/doc", "amadeus-serde/doc"]

[package.metadata.docs.rs]
features = ["constellation", "aws", "commoncrawl", "postgres", "csv", "json"]
features = ["doc", "constellation", "aws", "commoncrawl", "postgres", "csv", "json"]

[dependencies]
amadeus-core = { version = "=0.1.2", path = "amadeus-core" }
amadeus-derive = { version = "=0.1.2", path = "amadeus-derive" }
amadeus-types = { version = "=0.1.2", path = "amadeus-types" }
amadeus-aws = { version = "=0.1.2", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.1.2", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.1.2", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.1.2", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.1.2", path = "amadeus-serde", optional = true }
amadeus-core = { version = "=0.1.3", path = "amadeus-core" }
amadeus-derive = { version = "=0.1.3", path = "amadeus-derive" }
amadeus-types = { version = "=0.1.3", path = "amadeus-types" }
amadeus-aws = { version = "=0.1.3", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.1.3", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.1.3", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.1.3", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.1.3", path = "amadeus-serde", optional = true }
constellation-rs = { version = "0.1", default-features = false, optional = true }
futures-preview = "=0.3.0-alpha.18"
futures-preview = "=0.3.0-alpha.19"
pin-utils = "0.1.0-alpha.4"
serde = { version = "1.0", features = ["derive"] }
serde_closure = "0.1"
serde_closure = "0.2"
serde_traitobject = "0.1.6"

# pin; broken for some reason
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
</p>

<p align="center">
Harmonious distributed data analysis in Rust
Harmonious distributed data processing & analysis in Rust
</p>

<p align="center">
Expand All @@ -13,7 +13,7 @@
</p>

<p align="center">
<a href="https://docs.rs/amadeus/0.1.2">Docs</a>
<a href="https://docs.rs/amadeus/0.1.3">Docs</a>
</p>


Expand Down
19 changes: 13 additions & 6 deletions amadeus-aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-aws"
version = "0.1.2"
version = "0.1.3"
license = "Apache-2.0"
authors = ["Alec Mocatta <[email protected]>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -10,26 +10,33 @@ Harmonious distributed data analysis in Rust.
"""
repository = "https://github.com/alecmocatta/amadeus"
homepage = "https://github.com/alecmocatta/amadeus"
documentation = "https://docs.rs/amadeus/0.1.2"
documentation = "https://docs.rs/amadeus/0.1.3"
readme = "README.md"
edition = "2018"

[badges]
azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests" }
maintenance = { status = "actively-developed" }

[features]
doc = []

[dependencies]
amadeus-core = { version = "=0.1.2", path = "../amadeus-core" }
chrono = "0.4"
amadeus-core = { version = "=0.1.3", path = "../amadeus-core" }
amadeus-types = { version = "=0.1.3", path = "../amadeus-types" }
chrono = { version = "0.4", default-features = false }
flate2 = "1.0"
futures-01 = { package = "futures", version = "0.1" }
futures-preview = { version = "=0.3.0-alpha.18", features = ["compat"] }
futures-preview = { version = "=0.3.0-alpha.19", features = ["compat"] }
http = "0.1"
once_cell = "1.0"
rusoto_core = "0.40"
rusoto_s3 = "0.40"
serde_closure = "0.1"
serde_closure = "0.2"
serde = { version = "1.0", features = ["derive"] }
tokio = "0.1.7"
tokio-retry = "0.2"
url = { version = "2.1", features = ["serde"] }

# dependency of rusoto_core/hyper-tls/native-tls; ensure it's vendored to simplify cross-compilation
openssl = { version = "0.10", features = ["vendored"] }
132 changes: 60 additions & 72 deletions amadeus-aws/src/cloudfront.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,21 @@
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
use chrono::{NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
use flate2::read::MultiGzDecoder;
use http::{Method, StatusCode};

use rusoto_core::RusotoError;
use rusoto_s3::{GetObjectRequest, Object, S3Client, S3};
use serde::{Deserialize, Serialize};
use serde_closure::*;
use std::{
convert::identity, io::{self, BufRead, BufReader}, iter, net, time::Duration, vec
convert::identity, io::{self, BufRead, BufReader}, iter, time::Duration
};
use url::Url;

use amadeus_core::{
dist_iter::DistributedIterator, into_dist_iter::IntoDistributedIterator, util::ResultExpand, Source
};
use amadeus_types::{DateTime, IpAddr, Url};

use super::{block_on_01, list, retry, AwsError, AwsRegion};

type Closure<Env, Args, Output> =
serde_closure::FnMut<Env, for<'r> fn(&'r mut Env, Args) -> Output>;

type CloudfrontInner = amadeus_core::dist_iter::Map<
amadeus_core::dist_iter::FlatMap<
amadeus_core::into_dist_iter::IterIter<vec::IntoIter<String>>,
Closure<
(String, AwsRegion),
(String,),
ResultExpand<
iter::Map<
iter::Filter<
io::Lines<BufReader<MultiGzDecoder<Box<dyn io::Read + Send>>>>,
serde_closure::FnMut<
(),
for<'r, 'a> fn(&'r mut (), (&'a Result<String, io::Error>,)) -> bool,
>,
>,
Closure<(), (Result<String, io::Error>,), Result<CloudfrontRow, AwsError>>,
>,
AwsError,
>,
>,
>,
Closure<
(),
(Result<Result<CloudfrontRow, AwsError>, AwsError>,),
Result<CloudfrontRow, AwsError>,
>,
>;

pub struct Cloudfront {
region: AwsRegion,
bucket: String,
Expand All @@ -73,53 +42,71 @@ impl Source for Cloudfront {
type Item = CloudfrontRow;
type Error = AwsError;

// type DistIter = impl DistributedIterator<Item = Result<CloudfrontRow, AwsError>>; //, <Self as super::super::DistributedIterator>::Task: Serialize + for<'de> Deserialize<'de>
type DistIter = CloudfrontInner;
type Iter = iter::Empty<Result<CloudfrontRow, AwsError>>;
#[cfg(not(feature = "doc"))]
type DistIter = impl DistributedIterator<Item = Result<Self::Item, Self::Error>>;
#[cfg(feature = "doc")]
type DistIter = amadeus_core::util::ImplDistributedIterator<Result<Self::Item, Self::Error>>;
type Iter = iter::Empty<Result<Self::Item, Self::Error>>;

#[allow(clippy::let_and_return)]
fn dist_iter(self) -> Self::DistIter {
let Self {
bucket,
region,
objects,
} = self;
objects
let ret = objects
.into_dist_iter()
.flat_map(FnMut!([bucket, region] move |key:String| {
.flat_map(FnMut!(move |key: String| {
let client = S3Client::new(region.clone());
ResultExpand(
block_on_01(retry(||client
.get_object(GetObjectRequest {
bucket: bucket.clone(),
key: key.clone(),
..GetObjectRequest::default()
})
loop {
match self::block_on_01(self::retry(|| {
client.get_object(GetObjectRequest {
bucket: bucket.clone(),
key: key.clone(),
..GetObjectRequest::default()
})
})) {
Err(RusotoError::HttpDispatch(_)) => continue,
Err(RusotoError::Unknown(response))
if response.status.is_server_error() =>
{
continue
}
res => break res,
}
}
.map_err(AwsError::from)
.map(|res| {
let body = res.body.unwrap().into_blocking_read();
BufReader::new(MultiGzDecoder::new(
Box::new(body) as Box<dyn io::Read + Send>
))
.map_err(AwsError::from)
.map(|res| {
let body = res.body.unwrap().into_blocking_read();
BufReader::new(MultiGzDecoder::new(Box::new(body) as Box<dyn io::Read + Send>))
.lines()
.filter(FnMut!(|x:&Result<String,io::Error>| {
if let Ok(x) = x {
x.chars().filter(|x| !x.is_whitespace()).nth(0) != Some('#')
} else {
true
}
}))
.map(FnMut!(|x:Result<String,io::Error>| {
if let Ok(x) = x {
Ok(CloudfrontRow::from_line(&x))
} else {
Err(AwsError::from(x.err().unwrap()))
}
}))
}),
.lines()
.filter(|x: &Result<String, io::Error>| {
if let Ok(x) = x {
x.chars().filter(|x| !x.is_whitespace()).nth(0) != Some('#')
} else {
true
}
})
.map(|x: Result<String, io::Error>| {
if let Ok(x) = x {
Ok(CloudfrontRow::from_line(&x))
} else {
Err(AwsError::from(x.err().unwrap()))
}
})
}),
)
}))
.map(FnMut!(
|x: Result<Result<CloudfrontRow, _>, _>| x.and_then(identity)
))
|x: Result<Result<CloudfrontRow, _>, _>| x.and_then(self::identity)
));
#[cfg(feature = "doc")]
let ret = amadeus_core::util::ImplDistributedIterator::new(ret);
ret
}

fn iter(self) -> Self::Iter {
Expand All @@ -129,10 +116,10 @@ impl Source for Cloudfront {

#[derive(Clone, Eq, PartialEq, Serialize, Deserialize, Debug)]
pub struct CloudfrontRow {
pub time: DateTime<Utc>,
pub time: DateTime,
pub edge_location: String,
pub response_bytes: u64,
pub remote_ip: net::IpAddr,
pub remote_ip: IpAddr,
#[serde(with = "http_serde")]
pub method: Method,
pub host: String,
Expand All @@ -154,6 +141,7 @@ pub struct CloudfrontRow {
pub fle_encrypted_fields: Option<String>,
}
impl CloudfrontRow {
#[inline(always)]
fn from_line(line: &str) -> Self {
let mut values = line.split('\t');
let date = values.next().unwrap();
Expand Down Expand Up @@ -183,10 +171,10 @@ impl CloudfrontRow {
let fle_status = values.next().unwrap();
let fle_encrypted_fields = values.next().unwrap();
assert_eq!(values.next(), None);
let time = Utc.from_utc_datetime(&NaiveDateTime::new(
let time = DateTime::from_chrono(&Utc.from_utc_datetime(&NaiveDateTime::new(
NaiveDate::parse_from_str(&date, "%Y-%m-%d").unwrap(),
NaiveTime::parse_from_str(&time, "%H:%M:%S").unwrap(),
));
)));
let status = if sc_status != "000" {
Some(StatusCode::from_bytes(sc_status.as_bytes()).unwrap())
} else {
Expand Down
12 changes: 8 additions & 4 deletions amadeus-aws/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,14 @@ impl Page for S3Page {
key: self.key.clone(),
range: Some(format!("bytes={}-{}", start, end)),
..GetObjectRequest::default()
}))
.await;
if let Err(RusotoError::HttpDispatch(_)) = res {
continue;
}));
let res = res.await;
match res {
Err(RusotoError::HttpDispatch(_)) => continue,
Err(RusotoError::Unknown(response)) if response.status.is_server_error() => {
continue
}
_ => (),
}
let mut read = res.unwrap().body.unwrap().into_async_read();
while len - cursor.position() > 0 {
Expand Down
8 changes: 4 additions & 4 deletions amadeus-aws/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.1.2")]
#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.1.3")]
#![feature(type_alias_impl_trait)]

mod cloudfront;
mod file;

use futures::future::FutureExt;

use once_cell::sync::Lazy;
use rusoto_core::RusotoError;
use rusoto_s3::{GetObjectError, ListObjectsV2Error, ListObjectsV2Request, Object, S3Client, S3};

use std::{
cell::RefCell, error, fmt::{self, Display}, future::Future, io, iter, mem::transmute, ops::FnMut
};
use tokio::runtime::Runtime;

use amadeus_core::util::{IoError, ResultExpand};

#[doc(inline)]
pub use cloudfront::{Cloudfront, CloudfrontRow};
#[doc(inline)]
pub use file::{S3Directory, S3File};

#[doc(inline)]
pub use rusoto_core::Region as AwsRegion;

Expand Down
Loading

0 comments on commit 26ef26b

Please sign in to comment.