From 3c04f51b9641d60e7b0fcce603afd75bb7a2c168 Mon Sep 17 00:00:00 2001 From: alecmocatta Date: Mon, 27 Jul 2020 17:39:01 +0100 Subject: [PATCH 1/5] switch to indexmap --- amadeus-core/Cargo.toml | 1 + amadeus-core/src/par_sink/group_by.rs | 76 +++++++++------------------ amadeus-core/src/par_stream.rs | 7 +-- 3 files changed, 31 insertions(+), 53 deletions(-) diff --git a/amadeus-core/Cargo.toml b/amadeus-core/Cargo.toml index b1f824a8..cc952ce4 100644 --- a/amadeus-core/Cargo.toml +++ b/amadeus-core/Cargo.toml @@ -24,6 +24,7 @@ derive-new = "0.5" educe = "0.4" either = { version = "1.5", features = ["serde"] } futures = "0.3" +indexmap = { version = "1.5", features = ["serde-1"] } itertools = "0.9" owned_chars = "0.3" pin-project = "0.4" diff --git a/amadeus-core/src/par_sink/group_by.rs b/amadeus-core/src/par_sink/group_by.rs index b19e484a..0be2391b 100644 --- a/amadeus-core/src/par_sink/group_by.rs +++ b/amadeus-core/src/par_sink/group_by.rs @@ -3,10 +3,11 @@ use derive_new::new; use educe::Educe; use futures::{pin_mut, ready, stream, Stream, StreamExt}; +use indexmap::IndexMap; use pin_project::pin_project; use serde::{Deserialize, Serialize}; use std::{ - collections::HashMap, hash::Hash, marker::PhantomData, mem, pin::Pin, task::{Context, Poll} + hash::Hash, marker::PhantomData, mem, pin::Pin, task::{Context, Poll} }; use sum::Sum2; @@ -33,7 +34,7 @@ where B::ReduceC: Clone, B::Done: Send + 'static, { - type Done = HashMap; + type Done = IndexMap; type Pipe = A; type ReduceA = GroupByReducerA<>::Task, B::ReduceA, T, U>; type ReduceC = GroupByReducerB< @@ -62,7 +63,7 @@ where B::ReduceC: Clone, B::Done: ProcessSend + 'static, { - type Done = HashMap; + type Done = IndexMap; type Pipe = A; type ReduceA = GroupByReducerA<>::Task, B::ReduceA, T, U>; type ReduceB = GroupByReducerB< @@ -103,7 +104,7 @@ where R: Reducer + Clone, T: Eq + Hash, { - type Done = HashMap; + type Done = IndexMap; type Async = GroupByReducerAAsync; fn into_async(self) -> Self::Async { @@ -117,7 +118,7 @@ where T: Eq + Hash + ProcessSend + 'static, R::Done: ProcessSend + 'static, { - type Done = HashMap; + type Done = IndexMap; } impl ReducerSend<(T, U)> for GroupByReducerA where @@ -126,7 +127,7 @@ where T: Eq + Hash + Send + 'static, R::Done: Send + 'static, { - type Done = HashMap; + type Done = IndexMap; } #[pin_project] @@ -142,8 +143,7 @@ where #[new(default)] pending: Option, Option>>), Vec>>>, #[new(default)] - map: HashMap>>, - marker: PhantomData (R, U)>, + map: IndexMap>>, } impl Sink<(T, U)> for GroupByReducerAAsync @@ -152,7 +152,7 @@ where R: Reducer + Clone, T: Eq + Hash, { - type Done = HashMap; + type Done = IndexMap; #[inline(always)] fn poll_forward( @@ -196,7 +196,7 @@ where .pipe(self_.pipe.as_mut()); pin_mut!(stream); let map = &mut *self_.map; - let r_ = r.as_mut().unwrap_or_else(|| map.get_mut(&k).unwrap()); + let r_ = r.as_mut().unwrap_or_else(|| map.get_mut(k).unwrap()); if r_.as_mut().poll_forward(cx, stream).is_ready() { let _ = u.take(); } @@ -228,9 +228,8 @@ where if !done_ { return Poll::Pending; } - let ret = self_ - .map - .drain() + let ret = mem::take(self_.map) + .into_iter() .zip(done.iter_mut()) .map(|((k, _), v)| (k, v.take().unwrap())) .collect(); @@ -249,33 +248,33 @@ where )] pub struct GroupByReducerB(R, PhantomData (T, U)>); -impl Reducer> for GroupByReducerB +impl Reducer> for GroupByReducerB where R: Reducer + Clone, T: Eq + Hash, { - type Done = HashMap; + type Done = IndexMap; type Async = GroupByReducerBAsync; fn into_async(self) -> Self::Async { GroupByReducerBAsync::new(self.0) } } -impl ReducerProcessSend> for GroupByReducerB +impl ReducerProcessSend> for GroupByReducerB where R: Reducer + Clone, T: Eq + Hash + ProcessSend + 'static, R::Done: ProcessSend + 'static, { - type Done = HashMap; + type Done = IndexMap; } -impl ReducerSend> for GroupByReducerB +impl ReducerSend> for GroupByReducerB where R: Reducer + Clone, T: Eq + Hash + Send + 'static, R::Done: Send + 'static, { - type Done = HashMap; + type Done = IndexMap; } #[pin_project] @@ -286,23 +285,22 @@ where { f: R, #[new(default)] - pending: Option>>)>, Vec>>>, + pending: Option>>)>, Vec>>>, #[new(default)] - map: HashMap>>, - marker: PhantomData (T, U)>, + map: IndexMap>>, } -impl Sink> for GroupByReducerBAsync +impl Sink> for GroupByReducerBAsync where R: Reducer + Clone, T: Eq + Hash, { - type Done = HashMap; + type Done = IndexMap; #[inline(always)] fn poll_forward( self: Pin<&mut Self>, cx: &mut Context, - mut stream: Pin<&mut impl Stream>>, + mut stream: Pin<&mut impl Stream>>, ) -> Poll { let self_ = self.project(); loop { @@ -329,7 +327,7 @@ where } match self_.pending.as_mut().unwrap() { Sum2::A(pending) => { - while let Some((k, (v, mut r))) = pop(pending) { + while let Some((k, (v, mut r))) = pending.pop() { let mut v = Some(v); let waker = cx.waker(); let stream = stream::poll_fn(|cx| { @@ -381,9 +379,8 @@ where if !done_ { return Poll::Pending; } - let ret = self_ - .map - .drain() + let ret = mem::take(self_.map) + .into_iter() .zip(done.iter_mut()) .map(|((k, _), v)| (k, v.take().unwrap())) .collect(); @@ -393,24 +390,3 @@ where } } } - -// https://github.com/rust-lang/rfcs/issues/1800#issuecomment-653757340 -#[allow(clippy::unnecessary_filter_map)] -fn pop(map: &mut HashMap) -> Option<(K, V)> -where - K: Eq + Hash, -{ - let mut first = None; - *map = mem::take(map) - .into_iter() - .filter_map(|el| { - if first.is_none() { - first = Some(el); - None - } else { - Some(el) - } - }) - .collect(); - first -} diff --git a/amadeus-core/src/par_stream.rs b/amadeus-core/src/par_stream.rs index acdc0b71..85b6d167 100644 --- a/amadeus-core/src/par_stream.rs +++ b/amadeus-core/src/par_stream.rs @@ -16,9 +16,10 @@ mod update; use async_trait::async_trait; use either::Either; use futures::{future, pin_mut, stream, stream::StreamExt as _, Stream}; +use indexmap::IndexMap; use serde_closure::{traits, FnOnce}; use std::{ - cmp::Ordering, collections::HashMap, hash::Hash, iter, ops, pin::Pin, task::{Context, Poll}, vec + cmp::Ordering, hash::Hash, iter, ops, pin::Pin, task::{Context, Poll}, vec }; use super::{par_pipe::*, par_sink::*}; @@ -454,7 +455,7 @@ stream!(ParallelStream ParallelPipe ParallelSink FromParallelStream IntoParallel .await } - async fn group_by(self, pool: &P, sink: S) -> HashMap + async fn group_by(self, pool: &P, sink: S) -> IndexMap where P: ThreadPool, A: Eq + Hash + Send + 'static, @@ -680,7 +681,7 @@ stream!(DistributedStream DistributedPipe DistributedSink FromDistributedStream .await } - async fn group_by(self, pool: &P, sink: S) -> HashMap + async fn group_by(self, pool: &P, sink: S) -> IndexMap where P: ProcessPool, A: Eq + Hash + ProcessSend + 'static, From a585d8a1704ca14c03d67fc2dfb4657f1e56574d Mon Sep 17 00:00:00 2001 From: alecmocatta Date: Wed, 29 Jul 2020 19:25:27 +0100 Subject: [PATCH 2/5] perf improvements --- Cargo.toml | 5 +- amadeus-core/src/into_par_stream.rs | 5 +- .../src/into_par_stream/collections.rs | 23 ++++ amadeus-core/src/into_par_stream/iterator.rs | 10 ++ amadeus-core/src/into_par_stream/slice.rs | 1 + amadeus-core/src/par_pipe.rs | 27 ++++ amadeus-core/src/par_sink/count.rs | 2 + amadeus-core/src/par_sink/folder.rs | 3 +- amadeus-core/src/par_sink/fork.rs | 5 + amadeus-core/src/par_sink/group_by.rs | 2 +- amadeus-core/src/par_sink/pipe.rs | 7 + amadeus-core/src/par_sink/sum.rs | 19 ++- amadeus-core/src/par_stream.rs | 50 +++++-- amadeus-core/src/par_stream/cloned.rs | 2 +- amadeus-core/src/par_stream/identity.rs | 30 +++++ amadeus-core/src/pipe.rs | 124 +++++++++++++++++- amadeus-core/src/pipe/filter.rs | 2 + amadeus-core/src/pipe/flat_map.rs | 2 + amadeus-core/src/pipe/flatten.rs | 70 ++++++++++ amadeus-core/src/pipe/map.rs | 12 +- amadeus-core/src/pool.rs | 40 ++++++ amadeus-core/src/util.rs | 1 + src/lib.rs | 3 +- src/pool.rs | 35 ++++- src/pool/process.rs | 87 ++++++++++++ src/pool/thread.rs | 117 ++++++++++++++++- tests/single_threaded.rs | 10 ++ 27 files changed, 657 insertions(+), 37 deletions(-) create mode 100644 amadeus-core/src/pipe/flatten.rs diff --git a/Cargo.toml b/Cargo.toml index a22cdd45..22cc07db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2 maintenance = { status = "actively-developed" } [features] -constellation = ["constellation-rs", "serde_traitobject"] +constellation = ["bincode", "constellation-rs", "serde_traitobject"] aws = ["amadeus-aws"] commoncrawl = ["amadeus-commoncrawl"] parquet = ["amadeus-parquet", "amadeus-derive/parquet"] @@ -44,6 +44,7 @@ amadeus-parquet = { version = "=0.3.6", path = "amadeus-parquet", optional = tru amadeus-postgres = { version = "=0.3.6", path = "amadeus-postgres", optional = true } amadeus-serde = { version = "=0.3.6", path = "amadeus-serde", optional = true } async-channel = "1.1" +bincode = { version = "1.3", optional = true } constellation-rs = { version = "0.2.0-alpha.2", default-features = false, optional = true } derive-new = "0.5" doc-comment = "0.3" @@ -62,7 +63,9 @@ web-sys = { version = "0.3", features = ["Blob", "Performance", "Response", "Win [dev-dependencies] either = { version = "1.5", features = ["serde"] } +once_cell = "1.4" rand = "0.7" +rayon = "1.3" serde_json = "1.0" streaming_algorithms = "0.3" tokio = { version = "0.2", features = ["macros", "time"] } diff --git a/amadeus-core/src/into_par_stream.rs b/amadeus-core/src/into_par_stream.rs index 61d6269c..e7d5cd52 100644 --- a/amadeus-core/src/into_par_stream.rs +++ b/amadeus-core/src/into_par_stream.rs @@ -13,12 +13,14 @@ impl_par_dist_rename! { fn into_par_stream(self) -> Self::ParStream where Self: Sized; - fn dist_stream_mut(&mut self) -> <&mut Self as IntoParallelStream>::ParStream + #[inline(always)] + fn par_stream_mut(&mut self) -> <&mut Self as IntoParallelStream>::ParStream where for<'a> &'a mut Self: IntoParallelStream, { <&mut Self as IntoParallelStream>::into_par_stream(self) } + #[inline(always)] fn par_stream(&self) -> <&Self as IntoParallelStream>::ParStream where for<'a> &'a Self: IntoParallelStream, @@ -31,6 +33,7 @@ impl_par_dist_rename! { type ParStream = Self; type Item = ::Item; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, diff --git a/amadeus-core/src/into_par_stream/collections.rs b/amadeus-core/src/into_par_stream/collections.rs index 2c8c62bf..82bcef6d 100644 --- a/amadeus-core/src/into_par_stream/collections.rs +++ b/amadeus-core/src/into_par_stream/collections.rs @@ -14,6 +14,7 @@ impl<'a, 'b, I: Iterator, A: Clone + 'a, B: Clone + 'b> I { type Item = (A, B); + #[inline(always)] fn next(&mut self) -> Option { self.0.next().map(|(a, b)| (a.clone(), b.clone())) } @@ -27,6 +28,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -41,6 +43,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -56,6 +59,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -70,6 +74,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -85,6 +90,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -99,6 +105,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -114,6 +121,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -128,6 +136,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -144,6 +153,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -159,6 +169,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -176,6 +187,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>; type Item = (K, V); + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -192,6 +204,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>>; type Item = (K, V); + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -207,6 +220,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -221,6 +235,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -237,6 +252,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>; type Item = (K, V); + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -252,6 +268,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>>; type Item = (K, V); + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -264,6 +281,7 @@ impl_par_dist_rename! { type ParStream = IterParStream; type Item = char; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -275,6 +293,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>; type Item = char; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -290,6 +309,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -304,6 +324,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -319,6 +340,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -333,6 +355,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>>; type Item = T; + #[inline(always)] fn into_par_stream(self) -> Self::ParStream where Self: Sized, diff --git a/amadeus-core/src/into_par_stream/iterator.rs b/amadeus-core/src/into_par_stream/iterator.rs index 1945aa31..35e73984 100644 --- a/amadeus-core/src/into_par_stream/iterator.rs +++ b/amadeus-core/src/into_par_stream/iterator.rs @@ -11,9 +11,11 @@ use super::{ use crate::pool::ProcessSend; pub trait IteratorExt: Iterator + Sized { + #[inline] fn par(self) -> IterParStream { IterParStream(self) } + #[inline] fn dist(self) -> IterDistStream { IterDistStream(self) } @@ -31,9 +33,11 @@ impl_par_dist_rename! { type Item = I::Item; type Task = IterStreamTask; + #[inline] fn size_hint(&self) -> (usize, Option) { self.0.size_hint() } + #[inline] fn next_task(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { Poll::Ready(self.0.next().map(IterStreamTask::new)) } @@ -44,6 +48,7 @@ impl_par_dist_rename! { #[derive(Serialize, Deserialize)] pub struct IterStreamTask(Option); impl IterStreamTask { + #[inline] fn new(t: T) -> Self { Self(Some(t)) } @@ -53,6 +58,7 @@ impl StreamTask for IterStreamTask { type Item = T; type Async = IterStreamTask; + #[inline] fn into_async(self) -> Self::Async { self } @@ -60,6 +66,7 @@ impl StreamTask for IterStreamTask { impl Stream for IterStreamTask { type Item = T; + #[inline] fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { Poll::Ready(self.project().0.take()) } @@ -74,6 +81,7 @@ impl_par_dist_rename! { type ParStream = IterParStream; type Item = ::Item; + #[inline] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -90,6 +98,7 @@ impl_par_dist_rename! { type ParStream = IterParStream; type Item = ::Item; + #[inline] fn into_par_stream(self) -> Self::ParStream where Self: Sized, @@ -106,6 +115,7 @@ impl_par_dist_rename! { type ParStream = IterParStream; type Item = ::Item; + #[inline] fn into_par_stream(self) -> Self::ParStream where Self: Sized, diff --git a/amadeus-core/src/into_par_stream/slice.rs b/amadeus-core/src/into_par_stream/slice.rs index fe70ed1b..b3ec5ddc 100644 --- a/amadeus-core/src/into_par_stream/slice.rs +++ b/amadeus-core/src/into_par_stream/slice.rs @@ -32,6 +32,7 @@ impl_par_dist_rename! { type ParStream = IterParStream>>; type Item = T; + #[inline] fn into_par_stream(self) -> Self::ParStream where Self: Sized, diff --git a/amadeus-core/src/par_pipe.rs b/amadeus-core/src/par_pipe.rs index 949f10ec..8a57bca7 100644 --- a/amadeus-core/src/par_pipe.rs +++ b/amadeus-core/src/par_pipe.rs @@ -26,6 +26,7 @@ macro_rules! pipe { fn task(&self) -> Self::Task; + #[inline] fn inspect(self, f: F) -> Inspect where F: $fns::FnMut(&Self::Output) + Clone + $send + 'static, @@ -34,6 +35,7 @@ macro_rules! pipe { $assert_pipe(Inspect::new(self, f)) } + #[inline] fn update(self, f: F) -> Update where F: $fns::FnMut(&mut Self::Output) + Clone + $send + 'static, @@ -42,6 +44,7 @@ macro_rules! pipe { $assert_pipe(Update::new(self, f)) } + #[inline] fn map(self, f: F) -> Map where F: $fns::FnMut(Self::Output) -> B + Clone + $send + 'static, @@ -50,6 +53,7 @@ macro_rules! pipe { $assert_pipe(Map::new(self, f)) } + #[inline] fn flat_map(self, f: F) -> FlatMap where F: $fns::FnMut(Self::Output) -> B + Clone + $send + 'static, @@ -59,6 +63,7 @@ macro_rules! pipe { $assert_pipe(FlatMap::new(self, f)) } + #[inline] fn filter(self, f: F) -> Filter where F: $fns::FnMut(&Self::Output) -> bool + Clone + $send + 'static, @@ -67,6 +72,7 @@ macro_rules! pipe { $assert_pipe(Filter::new(self, f)) } + #[inline] fn cloned<'a, T>(self) -> Cloned where T: Clone + 'a, @@ -85,6 +91,7 @@ macro_rules! pipe { // $assert_pipe(Chain::new(self, chain.into_par_stream())) // } + #[inline] fn pipe(self, sink: S) -> super::par_sink::Pipe where S: $sink, @@ -93,6 +100,7 @@ macro_rules! pipe { $assert_sink(super::par_sink::Pipe::new(self, sink)) } + #[inline] fn fork( self, sink: A, sink_ref: B, ) -> Fork @@ -104,6 +112,7 @@ macro_rules! pipe { $assert_sink(Fork::new(self, sink, sink_ref)) } + #[inline] fn for_each(self, f: F) -> ForEach where F: $fns::FnMut(Self::Output) + Clone + $send + 'static, @@ -112,6 +121,7 @@ macro_rules! pipe { $assert_sink(ForEach::new(self, f)) } + #[inline] fn fold(self, identity: ID, op: F) -> Fold where ID: $fns::FnMut() -> B + Clone + $send + 'static, @@ -122,6 +132,7 @@ macro_rules! pipe { $assert_sink(Fold::new(self, identity, op)) } + #[inline] fn group_by(self, sink: S) -> GroupBy where A: Eq + Hash + $send + 'static, @@ -135,6 +146,7 @@ macro_rules! pipe { $assert_sink(GroupBy::new(self, sink)) } + #[inline] fn histogram(self) -> Histogram where Self::Output: Hash + Ord + $send + 'static, @@ -143,6 +155,7 @@ macro_rules! pipe { $assert_sink(Histogram::new(self)) } + #[inline] fn count(self) -> Count where Self: Sized, @@ -150,6 +163,7 @@ macro_rules! pipe { $assert_sink(Count::new(self)) } + #[inline] fn sum(self) -> Sum where B: iter::Sum + iter::Sum + $send + 'static, @@ -158,6 +172,7 @@ macro_rules! pipe { $assert_sink(Sum::new(self)) } + #[inline] fn combine(self, f: F) -> Combine where F: $fns::FnMut(Self::Output, Self::Output) -> Self::Output + Clone + $send + 'static, @@ -167,6 +182,7 @@ macro_rules! pipe { $assert_sink(Combine::new(self, f)) } + #[inline] fn max(self) -> Max where Self::Output: Ord + $send + 'static, @@ -175,6 +191,7 @@ macro_rules! pipe { $assert_sink(Max::new(self)) } + #[inline] fn max_by(self, f: F) -> MaxBy where F: $fns::FnMut(&Self::Output, &Self::Output) -> Ordering + Clone + $send + 'static, @@ -184,6 +201,7 @@ macro_rules! pipe { $assert_sink(MaxBy::new(self, f)) } + #[inline] fn max_by_key(self, f: F) -> MaxByKey where F: $fns::FnMut(&Self::Output) -> B + Clone + $send + 'static, @@ -194,6 +212,7 @@ macro_rules! pipe { $assert_sink(MaxByKey::new(self, f)) } + #[inline] fn min(self) -> Min where Self::Output: Ord + $send + 'static, @@ -202,6 +221,7 @@ macro_rules! pipe { $assert_sink(Min::new(self)) } + #[inline] fn min_by(self, f: F) -> MinBy where F: $fns::FnMut(&Self::Output, &Self::Output) -> Ordering + Clone + $send + 'static, @@ -211,6 +231,7 @@ macro_rules! pipe { $assert_sink(MinBy::new(self, f)) } + #[inline] fn min_by_key(self, f: F) -> MinByKey where F: $fns::FnMut(&Self::Output) -> B + Clone + $send + 'static, @@ -221,6 +242,7 @@ macro_rules! pipe { $assert_sink(MinByKey::new(self, f)) } + #[inline] fn most_frequent(self, n: usize, probability: f64, tolerance: f64) -> MostFrequent where Self::Output: Hash + Eq + Clone + $send + 'static, @@ -229,6 +251,7 @@ macro_rules! pipe { $assert_sink(MostFrequent::new(self, n, probability, tolerance)) } + #[inline] fn most_distinct( self, n: usize, probability: f64, tolerance: f64, error_rate: f64, ) -> MostDistinct @@ -246,6 +269,7 @@ macro_rules! pipe { )) } + #[inline] fn sample_unstable(self, samples: usize) -> SampleUnstable where Self::Output: $send + 'static, @@ -254,6 +278,7 @@ macro_rules! pipe { $assert_sink(SampleUnstable::new(self, samples)) } + #[inline] fn all(self, f: F) -> All where F: $fns::FnMut(Self::Output) -> bool + Clone + $send + 'static, @@ -262,6 +287,7 @@ macro_rules! pipe { $assert_sink(All::new(self, f)) } + #[inline] fn any(self, f: F) -> Any where F: $fns::FnMut(Self::Output) -> bool + Clone + $send + 'static, @@ -270,6 +296,7 @@ macro_rules! pipe { $assert_sink(Any::new(self, f)) } + #[inline] fn collect(self) -> Collect where B: $from_sink, diff --git a/amadeus-core/src/par_sink/count.rs b/amadeus-core/src/par_sink/count.rs index 8da8e73d..8046ae1b 100644 --- a/amadeus-core/src/par_sink/count.rs +++ b/amadeus-core/src/par_sink/count.rs @@ -29,9 +29,11 @@ pub struct CountFolder; impl FolderSync for CountFolder { type Done = usize; + #[inline(always)] fn zero(&mut self) -> Self::Done { 0 } + #[inline(always)] fn push(&mut self, state: &mut Self::Done, _item: Item) { *state += 1; } diff --git a/amadeus-core/src/par_sink/folder.rs b/amadeus-core/src/par_sink/folder.rs index d0c2b2fa..589464a1 100644 --- a/amadeus-core/src/par_sink/folder.rs +++ b/amadeus-core/src/par_sink/folder.rs @@ -125,8 +125,9 @@ where ) -> Poll { let self_ = self.project(); let folder = self_.folder; + let state = self_.state.as_mut().unwrap(); while let Some(item) = ready!(stream.as_mut().poll_next(cx)) { - folder.push(self_.state.as_mut().unwrap(), item); + folder.push(state, item); } Poll::Ready(self_.state.take().unwrap()) } diff --git a/amadeus-core/src/par_sink/fork.rs b/amadeus-core/src/par_sink/fork.rs index e4bc1b13..fc52efc8 100644 --- a/amadeus-core/src/par_sink/fork.rs +++ b/amadeus-core/src/par_sink/fork.rs @@ -40,9 +40,11 @@ impl_par_dist! { type Item = Sum2; type Task = JoinTask; + #[inline(always)] fn size_hint(&self) -> (usize, Option) { self.a.size_hint() } + #[inline(always)] fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let self_ = self.project(); let b = self_.b; @@ -67,6 +69,7 @@ impl_par_dist! { type Output = Sum2; type Task = JoinTask; + #[inline(always)] fn task(&self) -> Self::Task { let stream = self.a.task(); let pipe = self.b.task(); @@ -93,6 +96,7 @@ where type ReduceA = ReduceA2; type ReduceC = ReduceC2; + #[inline(always)] fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) { let (iterator_a, reducer_a_a, reducer_a_c) = self.b.reducers(); let (iterator_b, reducer_b_a, reducer_b_c) = self.c.reducers(); @@ -116,6 +120,7 @@ where type ReduceB = ReduceC2; type ReduceC = ReduceC2; + #[inline(always)] fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) { let (iterator_a, reducer_a_a, reducer_a_b, reducer_a_c) = self.b.reducers(); let (iterator_b, reducer_b_a, reducer_b_b, reducer_b_c) = self.c.reducers(); diff --git a/amadeus-core/src/par_sink/group_by.rs b/amadeus-core/src/par_sink/group_by.rs index 0be2391b..fa625bdc 100644 --- a/amadeus-core/src/par_sink/group_by.rs +++ b/amadeus-core/src/par_sink/group_by.rs @@ -96,7 +96,7 @@ where bound(serialize = "P: Serialize, R: Serialize"), bound(deserialize = "P: Deserialize<'de>, R: Deserialize<'de>") )] -pub struct GroupByReducerA(P, R, PhantomData (R, T, U)>); +pub struct GroupByReducerA(P, R, PhantomData (T, U)>); impl Reducer<(T, U)> for GroupByReducerA where diff --git a/amadeus-core/src/par_sink/pipe.rs b/amadeus-core/src/par_sink/pipe.rs index d5381e15..88300a50 100644 --- a/amadeus-core/src/par_sink/pipe.rs +++ b/amadeus-core/src/par_sink/pipe.rs @@ -24,6 +24,7 @@ impl_par_dist! { type Item = B::Output; type Task = JoinTask; + #[inline(always)] fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let self_ = self.project(); let b = self_.b; @@ -34,6 +35,7 @@ impl_par_dist! { }) }) } + #[inline(always)] fn size_hint(&self) -> (usize, Option) { self.a.size_hint() } @@ -44,6 +46,7 @@ impl_par_dist! { type Output = B::Output; type Task = JoinTask; + #[inline(always)] fn task(&self) -> Self::Task { let a = self.a.task(); let b = self.b.task(); @@ -58,6 +61,7 @@ impl, B: ParallelSink, Input> ParallelSink (Self::Pipe, Self::ReduceA, Self::ReduceC) { let (a, b, c) = self.b.reducers(); (Pipe::new(self.a, a), b, c) @@ -72,6 +76,7 @@ impl, B: DistributedSink, Input> Distribute type ReduceB = B::ReduceB; type ReduceC = B::ReduceC; + #[inline(always)] fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) { let (a, b, c, d) = self.b.reducers(); (Pipe::new(self.a, a), b, c, d) @@ -91,6 +96,7 @@ impl> StreamTask for JoinTask { type Item = B::Output; type Async = StreamPipe; + #[inline(always)] fn into_async(self) -> Self::Async { self.a.into_async().pipe(self.b.into_async()) } @@ -100,6 +106,7 @@ impl, B: PipeTask, Input> PipeTask for Join type Output = B::Output; type Async = PipePipe; + #[inline(always)] fn into_async(self) -> Self::Async { self.a.into_async().pipe(self.b.into_async()) } diff --git a/amadeus-core/src/par_sink/sum.rs b/amadeus-core/src/par_sink/sum.rs index b68eb5d3..8ea191e1 100644 --- a/amadeus-core/src/par_sink/sum.rs +++ b/amadeus-core/src/par_sink/sum.rs @@ -41,13 +41,16 @@ where { type Done = B; + #[inline(always)] fn zero(&mut self) -> Self::Done { iter::empty::().sum() } + #[inline(always)] fn push(&mut self, state: &mut Self::Done, item: Item) { - *state = iter::once(mem::replace(state, iter::empty::().sum())) - .chain(iter::once(iter::once(item).sum::())) - .sum(); + let zero = iter::empty::().sum(); + let left = mem::replace(state, zero); + let right = iter::once(item).sum::(); + *state = B::sum(iter::once(left).chain(iter::once(right))); } } @@ -56,6 +59,7 @@ pub struct SumZeroFolder { zero: Option, } impl SumZeroFolder { + #[inline(always)] pub(crate) fn new(zero: B) -> Self { Self { zero: Some(zero) } } @@ -67,14 +71,15 @@ where { type Done = Item; + #[inline(always)] fn zero(&mut self) -> Self::Done { self.zero.take().unwrap() } + #[inline(always)] fn push(&mut self, state: &mut Self::Done, item: Item) { - replace_with_or_abort(state, |state| { - iter::once(state) - .chain(iter::once(iter::once(item).sum::>().unwrap())) - .sum::>() + replace_with_or_abort(state, |left| { + let right = iter::once(item).sum::>().unwrap(); + as iter::Sum>::sum(iter::once(left).chain(iter::once(right))) .unwrap() }) } diff --git a/amadeus-core/src/par_stream.rs b/amadeus-core/src/par_stream.rs index 85b6d167..19ddda50 100644 --- a/amadeus-core/src/par_stream.rs +++ b/amadeus-core/src/par_stream.rs @@ -15,7 +15,7 @@ mod update; use async_trait::async_trait; use either::Either; -use futures::{future, pin_mut, stream, stream::StreamExt as _, Stream}; +use futures::{future, pin_mut, stream::StreamExt as _, Stream}; use indexmap::IndexMap; use serde_closure::{traits, FnOnce}; use std::{ @@ -24,7 +24,7 @@ use std::{ use super::{par_pipe::*, par_sink::*}; use crate::{ - into_par_stream::{IntoDistributedStream, IntoParallelStream}, pipe::StreamExt, pool::{ProcessPool, ProcessSend, ThreadPool} + into_par_stream::{IntoDistributedStream, IntoParallelStream}, pipe::{Sink, StreamExt}, pool::{ProcessPool, ProcessSend, ThreadPool} }; pub use self::{ @@ -53,6 +53,7 @@ macro_rules! stream { $($items)* + #[inline] fn inspect(self, f: F) -> Inspect where F: $fns::FnMut(&Self::Item) + Clone + $send + 'static, @@ -61,6 +62,7 @@ macro_rules! stream { $assert_stream(Inspect::new(self, f)) } + #[inline] fn update(self, f: F) -> Update where F: $fns::FnMut(&mut Self::Item) + Clone + $send + 'static, @@ -69,6 +71,7 @@ macro_rules! stream { $assert_stream(Update::new(self, f)) } + #[inline] fn map(self, f: F) -> Map where F: $fns::FnMut(Self::Item) -> B + Clone + $send + 'static, @@ -77,6 +80,7 @@ macro_rules! stream { $assert_stream(Map::new(self, f)) } + #[inline] fn flat_map(self, f: F) -> FlatMap where F: $fns::FnMut(Self::Item) -> B + Clone + $send + 'static, @@ -86,6 +90,7 @@ macro_rules! stream { $assert_stream(FlatMap::new(self, f)) } + #[inline] fn filter(self, f: F) -> Filter where F: $fns::FnMut(&Self::Item) -> bool + Clone + $send + 'static, @@ -94,6 +99,7 @@ macro_rules! stream { $assert_stream(Filter::new(self, f)) } + #[inline] fn chain(self, chain: C) -> Chain where C: $into_stream, @@ -102,6 +108,7 @@ macro_rules! stream { $assert_stream(Chain::new(self, chain.$into_stream_fn())) } + #[inline] async fn for_each(self, pool: &P, f: F) where P: $pool, @@ -114,6 +121,7 @@ macro_rules! stream { .await } + #[inline] async fn fold(self, pool: &P, identity: ID, op: F) -> B where P: $pool, @@ -131,6 +139,7 @@ macro_rules! stream { .await } + #[inline] async fn histogram

(self, pool: &P) -> Vec<(Self::Item, usize)> where P: $pool, @@ -142,6 +151,7 @@ macro_rules! stream { .await } + #[inline] async fn count

(self, pool: &P) -> usize where P: $pool, @@ -153,6 +163,7 @@ macro_rules! stream { .await } + #[inline] async fn sum(self, pool: &P) -> S where P: $pool, @@ -165,6 +176,7 @@ macro_rules! stream { .await } + #[inline] async fn combine(self, pool: &P, f: F) -> Option where P: $pool, @@ -177,6 +189,7 @@ macro_rules! stream { .await } + #[inline] async fn max

(self, pool: &P) -> Option where P: $pool, @@ -188,6 +201,7 @@ macro_rules! stream { .await } + #[inline] async fn max_by(self, pool: &P, f: F) -> Option where P: $pool, @@ -200,6 +214,7 @@ macro_rules! stream { .await } + #[inline] async fn max_by_key(self, pool: &P, f: F) -> Option where P: $pool, @@ -213,6 +228,7 @@ macro_rules! stream { .await } + #[inline] async fn min

(self, pool: &P) -> Option where P: $pool, @@ -224,6 +240,7 @@ macro_rules! stream { .await } + #[inline] async fn min_by(self, pool: &P, f: F) -> Option where P: $pool, @@ -236,6 +253,7 @@ macro_rules! stream { .await } + #[inline] async fn min_by_key(self, pool: &P, f: F) -> Option where P: $pool, @@ -249,6 +267,7 @@ macro_rules! stream { .await } + #[inline] async fn most_frequent

( self, pool: &P, n: usize, probability: f64, tolerance: f64, ) -> ::streaming_algorithms::Top @@ -265,6 +284,7 @@ macro_rules! stream { .await } + #[inline] async fn most_distinct( self, pool: &P, n: usize, probability: f64, tolerance: f64, error_rate: f64, ) -> ::streaming_algorithms::Top> @@ -288,6 +308,7 @@ macro_rules! stream { .await } + #[inline] async fn sample_unstable

( self, pool: &P, samples: usize, ) -> ::streaming_algorithms::SampleUnstable @@ -304,6 +325,7 @@ macro_rules! stream { .await } + #[inline] async fn all(self, pool: &P, f: F) -> bool where P: $pool, @@ -316,6 +338,7 @@ macro_rules! stream { .await } + #[inline] async fn any(self, pool: &P, f: F) -> bool where P: $pool, @@ -397,9 +420,14 @@ stream!(ParallelStream ParallelPipe ParallelSink FromParallelStream IntoParallel pool.spawn(move || async move { let sink = reduce_a.into_async(); pin_mut!(sink); - let tasks = - stream::iter(tasks.into_iter().map(StreamTask::into_async)).flatten(); - tasks.sink(sink.as_mut()).await + // this is faster than stream::iter(tasks.into_iter().map(StreamTask::into_async)).flatten().sink(sink).await + for task in tasks.into_iter().map(StreamTask::into_async) { + pin_mut!(task); + if let Some(ret) = sink.send_all(&mut task).await { + return ret; + } + } + sink.done().await }) }) .collect::>(); @@ -604,10 +632,14 @@ stream!(DistributedStream DistributedPipe DistributedSink FromDistributedStream pool.spawn(move || async move { let sink = reduce_a.into_async(); pin_mut!(sink); - let tasks = - stream::iter(tasks.into_iter().map(StreamTask::into_async)) - .flatten(); - tasks.sink(sink.as_mut()).await + // this is faster than stream::iter(tasks.into_iter().map(StreamTask::into_async)).flatten().sink(sink).await + for task in tasks.into_iter().map(StreamTask::into_async) { + pin_mut!(task); + if let Some(ret) = sink.send_all(&mut task).await { + return ret; + } + } + sink.done().await }) }) .collect::>(); diff --git a/amadeus-core/src/par_stream/cloned.rs b/amadeus-core/src/par_stream/cloned.rs index 7985e7f5..a6e384a2 100644 --- a/amadeus-core/src/par_stream/cloned.rs +++ b/amadeus-core/src/par_stream/cloned.rs @@ -13,7 +13,7 @@ use crate::pipe::Pipe; #[must_use] pub struct Cloned { pipe: P, - marker: PhantomData, + marker: PhantomData (Input, T)>, } impl_par_dist! { diff --git a/amadeus-core/src/par_stream/identity.rs b/amadeus-core/src/par_stream/identity.rs index be1bf48d..b6e5bcc2 100644 --- a/amadeus-core/src/par_stream/identity.rs +++ b/amadeus-core/src/par_stream/identity.rs @@ -19,6 +19,7 @@ impl_par_dist! { type Output = Item; type Task = IdentityTask; + #[inline] fn task(&self) -> Self::Task { IdentityTask } @@ -32,14 +33,17 @@ mod workaround { #[cfg_attr(not(nightly), serde_closure::desugar)] #[doc(hidden)] impl Identity { + #[inline] pub fn pipe(self, sink: S) -> Pipe { Pipe::new(self, sink) } + #[inline] pub fn fork(self, sink: A, sink_ref: B) -> Fork { Fork::new(self, sink, sink_ref) } + #[inline] pub fn inspect(self, f: F) -> Inspect where F: Clone + Send + 'static, @@ -47,6 +51,7 @@ mod workaround { Inspect::new(self, f) } + #[inline] pub fn update(self, f: F) -> Update where F: Clone + Send + 'static, @@ -54,6 +59,7 @@ mod workaround { Update::new(self, f) } + #[inline] pub fn map(self, f: F) -> Map where F: Clone + Send + 'static, @@ -61,6 +67,7 @@ mod workaround { Map::new(self, f) } + #[inline] pub fn flat_map(self, f: F) -> FlatMap where F: Clone + Send + 'static, @@ -68,6 +75,7 @@ mod workaround { FlatMap::new(self, f) } + #[inline] pub fn filter(self, f: F) -> Filter where F: Clone + Send + 'static, @@ -76,6 +84,7 @@ mod workaround { } // #[must_use] + // #[inline] // pub fn chain(self, chain: C) -> Chain // where // C: IntoParallelStream, @@ -83,6 +92,7 @@ mod workaround { // Chain::new(self, chain.into_par_stream()) // } + #[inline] pub fn for_each(self, f: F) -> ForEach where F: Clone + Send + 'static, @@ -90,6 +100,7 @@ mod workaround { ForEach::new(self, f) } + #[inline] pub fn fold(self, identity: ID, op: F) -> Fold where ID: traits::FnMut() -> B + Clone + Send + 'static, @@ -99,18 +110,22 @@ mod workaround { Fold::new(self, identity, op) } + #[inline] pub fn group_by(self, sink: S) -> GroupBy { GroupBy::new(self, sink) } + #[inline] pub fn histogram(self) -> Histogram { Histogram::new(self) } + #[inline] pub fn count(self) -> Count { Count::new(self) } + #[inline] pub fn sum(self) -> Sum where B: iter::Sum + Send + 'static, @@ -118,6 +133,7 @@ mod workaround { Sum::new(self) } + #[inline] pub fn combine(self, f: F) -> Combine where F: Clone + Send + 'static, @@ -125,10 +141,12 @@ mod workaround { Combine::new(self, f) } + #[inline] pub fn max(self) -> Max { Max::new(self) } + #[inline] pub fn max_by(self, f: F) -> MaxBy where F: Clone + Send + 'static, @@ -136,6 +154,7 @@ mod workaround { MaxBy::new(self, f) } + #[inline] pub fn max_by_key(self, f: F) -> MaxByKey where F: Clone + Send + 'static, @@ -143,10 +162,12 @@ mod workaround { MaxByKey::new(self, f) } + #[inline] pub fn min(self) -> Min { Min::new(self) } + #[inline] pub fn min_by(self, f: F) -> MinBy where F: Clone + Send + 'static, @@ -154,6 +175,7 @@ mod workaround { MinBy::new(self, f) } + #[inline] pub fn min_by_key(self, f: F) -> MinByKey where F: Clone + Send + 'static, @@ -161,22 +183,26 @@ mod workaround { MinByKey::new(self, f) } + #[inline] pub fn most_frequent( self, n: usize, probability: f64, tolerance: f64, ) -> MostFrequent { MostFrequent::new(self, n, probability, tolerance) } + #[inline] pub fn most_distinct( self, n: usize, probability: f64, tolerance: f64, error_rate: f64, ) -> MostDistinct { MostDistinct::new(self, n, probability, tolerance, error_rate) } + #[inline] pub fn sample_unstable(self, samples: usize) -> SampleUnstable { SampleUnstable::new(self, samples) } + #[inline] pub fn all(self, f: F) -> All where F: Clone + Send + 'static, @@ -184,6 +210,7 @@ mod workaround { All::new(self, f) } + #[inline] pub fn any(self, f: F) -> Any where F: Clone + Send + 'static, @@ -191,6 +218,7 @@ mod workaround { Any::new(self, f) } + #[inline] pub fn collect(self) -> Collect { Collect::new(self) } @@ -203,6 +231,7 @@ impl PipeTask for IdentityTask { type Output = Item; type Async = IdentityTask; + #[inline] fn into_async(self) -> Self::Async { IdentityTask } @@ -210,6 +239,7 @@ impl PipeTask for IdentityTask { impl crate::pipe::Pipe for IdentityTask { type Output = Item; + #[inline(always)] fn poll_next( self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream>, ) -> Poll> { diff --git a/amadeus-core/src/pipe.rs b/amadeus-core/src/pipe.rs index 7e1ac22e..664eef85 100644 --- a/amadeus-core/src/pipe.rs +++ b/amadeus-core/src/pipe.rs @@ -1,14 +1,16 @@ mod filter; mod flat_map; +mod flatten; mod map; -use futures::{pin_mut, Future, Stream}; +use derive_new::new; +use futures::{pin_mut, stream, Future, Stream}; use pin_project::pin_project; use std::{ - ops::DerefMut, pin::Pin, task::{Context, Poll} + marker::PhantomData, mem, ops::DerefMut, pin::Pin, task::{Context, Poll} }; -pub use self::{filter::*, flat_map::*, map::*}; +pub use self::{filter::*, flat_map::*, flatten::*, map::*}; // Sink takes Input as an input parameter rather than associated type to accept // for<'a> &'a T, but this might not be necessary in future? @@ -79,6 +81,15 @@ pub trait Pipe { assert_pipe(FlatMap::new(self, f)) } + #[inline(always)] + fn flatten(self) -> Flatten + where + Self::Output: Stream, + Self: Sized, + { + assert_pipe(Flatten::new(self)) + } + #[inline(always)] fn map(self, f: F) -> Map where @@ -89,15 +100,40 @@ pub trait Pipe { } } -pub trait Sink { +pub trait Sink { type Done; /// Returns `Poll::Ready` when a) it can't accept any more elements from `stream` and b) all /// accepted elements have been fully processed. By convention, `stream` yielding `None` /// typically triggers (a). fn poll_forward( - self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream>, + self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream>, ) -> Poll; + + #[inline(always)] + fn send(&mut self, item: Item) -> Send<'_, Self, Item> + where + Self: Unpin, + { + assert_future(Send::new(self, Poll::Ready(item))) + } + + #[inline(always)] + fn send_all<'a, S: ?Sized>(&'a mut self, items: &'a mut S) -> SendAll<'a, Self, S> + where + S: Stream + Unpin, + Self: Unpin, + { + assert_future(SendAll::new(self, items)) + } + + #[inline(always)] + fn done(&mut self) -> Done<'_, Self, Item> + where + Self: Unpin, + { + assert_future(Done::new(self)) + } } #[inline(always)] @@ -225,6 +261,84 @@ where } } +#[pin_project] +#[derive(new)] +pub struct Send<'a, S: ?Sized, Item> { + sink: &'a mut S, + item: Poll, +} +impl + Unpin, Item> Future for Send<'_, S, Item> { + type Output = Option; + + #[inline(always)] + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let self_ = self.project(); + let item = self_.item; + let stream = stream::poll_fn(|_| mem::replace(item, Poll::Pending).map(Some)); + pin_mut!(stream); + if let Poll::Ready(done) = Pin::new(self_.sink).poll_forward(cx, stream) { + return Poll::Ready(Some(done)); + } + if item.is_pending() { + Poll::Ready(None) + } else { + Poll::Pending + } + } +} + +#[pin_project] +#[derive(new)] +pub struct SendAll<'a, S: ?Sized, St: ?Sized> { + sink: &'a mut S, + items: &'a mut St, +} +impl + Unpin, St: ?Sized + Stream + Unpin, Item> Future + for SendAll<'_, S, St> +{ + type Output = Option; + + #[inline(always)] + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let self_ = self.project(); + let items = &mut **self_.items; + let mut given_all = false; + let stream = stream::poll_fn(|cx| match Pin::new(&mut *items).poll_next(cx) { + x @ Poll::Ready(Some(_)) | x @ Poll::Pending => x, + Poll::Ready(None) => { + given_all = true; + Poll::Pending + } + }); + pin_mut!(stream); + if let Poll::Ready(done) = Pin::new(self_.sink).poll_forward(cx, stream) { + return Poll::Ready(Some(done)); + } + if given_all { + Poll::Ready(None) + } else { + Poll::Pending + } + } +} + +#[pin_project] +#[derive(new)] +pub struct Done<'a, S: ?Sized, Item: ?Sized> { + sink: &'a mut S, + marker: PhantomData Item>, +} +impl + Unpin, Item> Future for Done<'_, S, Item> { + type Output = S::Done; + + #[inline(always)] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let stream = stream::empty(); + pin_mut!(stream); + Pin::new(&mut self.sink).poll_forward(cx, stream) + } +} + impl Pipe for Pin

where P: DerefMut + Unpin, diff --git a/amadeus-core/src/pipe/filter.rs b/amadeus-core/src/pipe/filter.rs index d456450b..140ebb23 100644 --- a/amadeus-core/src/pipe/filter.rs +++ b/amadeus-core/src/pipe/filter.rs @@ -22,6 +22,7 @@ where { type Item = P::Item; + #[inline] fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut self_ = self.project(); let (mut pipe, f) = (self_.pipe, &mut self_.f); @@ -41,6 +42,7 @@ where { type Output = P::Output; + #[inline] fn poll_next( self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream>, ) -> Poll> { diff --git a/amadeus-core/src/pipe/flat_map.rs b/amadeus-core/src/pipe/flat_map.rs index ede0d720..917789b6 100644 --- a/amadeus-core/src/pipe/flat_map.rs +++ b/amadeus-core/src/pipe/flat_map.rs @@ -26,6 +26,7 @@ where { type Item = R::Item; + #[inline] fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut self_ = self.project(); Poll::Ready(loop { @@ -51,6 +52,7 @@ where { type Output = R::Item; + #[inline] fn poll_next( self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream>, ) -> Poll> { diff --git a/amadeus-core/src/pipe/flatten.rs b/amadeus-core/src/pipe/flatten.rs new file mode 100644 index 00000000..f8f9e325 --- /dev/null +++ b/amadeus-core/src/pipe/flatten.rs @@ -0,0 +1,70 @@ +use derive_new::new; +use futures::{ready, Stream}; +use pin_project::pin_project; +use std::{ + pin::Pin, task::{Context, Poll} +}; + +use super::Pipe; + +#[pin_project] +#[derive(new)] +pub struct Flatten { + #[pin] + pipe: P, + #[pin] + #[new(default)] + next: Option, +} + +impl Stream for Flatten +where + P::Item: Stream, +{ + type Item = ::Item; + + #[inline] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut self_ = self.project(); + Poll::Ready(loop { + if let Some(s) = self_.next.as_mut().as_pin_mut() { + if let Some(item) = ready!(s.poll_next(cx)) { + break Some(item); + } else { + self_.next.set(None); + } + } else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx)) { + self_.next.set(Some(s)); + } else { + break None; + } + }) + } +} + +impl, Input> Pipe for Flatten +where + P::Output: Stream, +{ + type Output = ::Item; + + #[inline] + fn poll_next( + self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream>, + ) -> Poll> { + let mut self_ = self.project(); + Poll::Ready(loop { + if let Some(s) = self_.next.as_mut().as_pin_mut() { + if let Some(item) = ready!(s.poll_next(cx)) { + break Some(item); + } else { + self_.next.set(None); + } + } else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx, stream.as_mut())) { + self_.next.set(Some(s)); + } else { + break None; + } + }) + } +} diff --git a/amadeus-core/src/pipe/map.rs b/amadeus-core/src/pipe/map.rs index 827afb1f..85f87e7a 100644 --- a/amadeus-core/src/pipe/map.rs +++ b/amadeus-core/src/pipe/map.rs @@ -1,5 +1,5 @@ use derive_new::new; -use futures::{ready, Stream}; +use futures::Stream; use pin_project::pin_project; use serde_closure::traits::FnMut; use std::{ @@ -22,10 +22,13 @@ where { type Item = F::Output; + #[inline] fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut self_ = self.project(); let (mut pipe, f) = (self_.pipe, &mut self_.f); - Poll::Ready(ready!(pipe.as_mut().poll_next(cx)).map(|t| f.call_mut((t,)))) + pipe.as_mut() + .poll_next(cx) + .map(|t| t.map(|t| f.call_mut((t,)))) } } @@ -35,11 +38,14 @@ where { type Output = F::Output; + #[inline] fn poll_next( self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream>, ) -> Poll> { let mut self_ = self.project(); let (mut pipe, f) = (self_.pipe, &mut self_.f); - Poll::Ready(ready!(pipe.as_mut().poll_next(cx, stream.as_mut())).map(|t| f.call_mut((t,)))) + pipe.as_mut() + .poll_next(cx, stream.as_mut()) + .map(|t| t.map(|t| f.call_mut((t,)))) } } diff --git a/amadeus-core/src/pool.rs b/amadeus-core/src/pool.rs index 23f1b5d0..a7a60616 100644 --- a/amadeus-core/src/pool.rs +++ b/amadeus-core/src/pool.rs @@ -15,20 +15,42 @@ pub trait ProcessPool: Clone + Send + Sync + RefUnwindSafe + UnwindSafe + Unpin type ThreadPool: ThreadPool + 'static; fn processes(&self) -> usize; + fn spawn(&self, work: F) -> BoxFuture<'static, Result> where F: traits::FnOnce(&Self::ThreadPool) -> Fut + ProcessSend + 'static, Fut: Future + 'static, T: ProcessSend + 'static; + + /// # Safety + /// + /// Must be polled to completion before dropping. Unsound to forget it without having polled to completion. + #[allow(unsafe_code)] + unsafe fn spawn_unchecked<'a, F, Fut, T>(&self, work: F) -> BoxFuture<'a, Result> + where + F: traits::FnOnce(&Self::ThreadPool) -> Fut + ProcessSend + 'a, + Fut: Future + 'a, + T: ProcessSend + 'a; } pub trait ThreadPool: Clone + Send + Sync + RefUnwindSafe + UnwindSafe + Unpin { fn threads(&self) -> usize; + fn spawn(&self, work: F) -> BoxFuture<'static, Result> where F: FnOnce() -> Fut + Send + 'static, Fut: Future + 'static, T: Send + 'static; + + /// # Safety + /// + /// Must be polled to completion before dropping. Unsound to forget it without having polled to completion. + #[allow(unsafe_code)] + unsafe fn spawn_unchecked<'a, F, Fut, T>(&self, work: F) -> BoxFuture<'a, Result> + where + F: FnOnce() -> Fut + Send + 'a, + Fut: Future + 'a, + T: Send + 'a; } #[cfg_attr(not(nightly), serde_closure::desugar)] @@ -49,6 +71,15 @@ where { (*self).spawn(work) } + #[allow(unsafe_code)] + unsafe fn spawn_unchecked<'a, F, Fut, T>(&self, work: F) -> BoxFuture<'a, Result> + where + F: traits::FnOnce(&Self::ThreadPool) -> Fut + ProcessSend + 'a, + Fut: Future + 'a, + T: ProcessSend + 'a, + { + (*self).spawn_unchecked(work) + } } impl ThreadPool for &P @@ -66,4 +97,13 @@ where { (*self).spawn(work) } + #[allow(unsafe_code)] + unsafe fn spawn_unchecked<'a, F, Fut, T>(&self, work: F) -> BoxFuture<'a, Result> + where + F: FnOnce() -> Fut + Send + 'a, + Fut: Future + 'a, + T: Send + 'a, + { + (*self).spawn_unchecked(work) + } } diff --git a/amadeus-core/src/util.rs b/amadeus-core/src/util.rs index 36792962..39bb75d4 100644 --- a/amadeus-core/src/util.rs +++ b/amadeus-core/src/util.rs @@ -173,6 +173,7 @@ where /// /// Not. #[allow(unsafe_code)] +#[inline(always)] pub unsafe fn transmute(a: A) -> B { use std::mem; assert_eq!( diff --git a/src/lib.rs b/src/lib.rs index 6e66b5c9..1a08e0c6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,7 +25,8 @@ clippy::similar_names, clippy::if_not_else, clippy::must_use_candidate, - clippy::missing_errors_doc + clippy::missing_errors_doc, + clippy::missing_safety_doc )] #![deny(unsafe_code)] diff --git a/src/pool.rs b/src/pool.rs index e50a3eeb..a4b234af 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -33,6 +33,15 @@ impl ProcessPoolTrait for ProcessPool { { Box::pin(ProcessPool::spawn(self, work).map_err(|e| Box::new(e) as _)) } + #[allow(unsafe_code)] + unsafe fn spawn_unchecked<'a, F, Fut, T>(&self, work: F) -> BoxFuture<'a, Result> + where + F: traits::FnOnce(&Self::ThreadPool) -> Fut + ProcessSend + 'a, + Fut: Future + 'a, + T: ProcessSend + 'a, + { + Box::pin(ProcessPool::spawn_unchecked(self, work).map_err(|e| Box::new(e) as _)) + } } #[cfg_attr(not(nightly), serde_closure::desugar)] @@ -49,10 +58,19 @@ impl ProcessPoolTrait for ThreadPool { T: ProcessSend + 'static, { let self_ = self.clone(); - Box::pin( - ThreadPool::spawn(self, move || work.call_once((&self_,))) - .map_err(|e| Box::new(e) as _), - ) + let spawn = move || work.call_once((&self_,)); + Box::pin(ThreadPool::spawn(self, spawn).map_err(|e| Box::new(e) as _)) + } + #[allow(unsafe_code)] + unsafe fn spawn_unchecked<'a, F, Fut, T>(&self, work: F) -> BoxFuture<'a, Result> + where + F: traits::FnOnce(&Self::ThreadPool) -> Fut + ProcessSend + 'a, + Fut: Future + 'a, + T: ProcessSend + 'a, + { + let self_ = self.clone(); + let spawn = move || work.call_once((&self_,)); + Box::pin(ThreadPool::spawn_unchecked(self, spawn).map_err(|e| Box::new(e) as _)) } } @@ -68,4 +86,13 @@ impl ThreadPoolTrait for ThreadPool { { Box::pin(ThreadPool::spawn(self, work).map_err(|e| Box::new(e) as _)) } + #[allow(unsafe_code)] + unsafe fn spawn_unchecked<'a, F, Fut, T>(&self, work: F) -> BoxFuture<'a, Result> + where + F: FnOnce() -> Fut + Send + 'a, + Fut: Future + 'a, + T: Send + 'a, + { + Box::pin(ThreadPool::spawn_unchecked(self, work).map_err(|e| Box::new(e) as _)) + } } diff --git a/src/pool/process.rs b/src/pool/process.rs index 64a52d27..c7377d32 100644 --- a/src/pool/process.rs +++ b/src/pool/process.rs @@ -244,6 +244,81 @@ impl ProcessPoolInner { drop(process_inner_lock); boxed.map(|boxed| *Box::::downcast::(boxed.into_any_send()).unwrap()) } + #[allow(unsafe_code)] + async unsafe fn spawn_unchecked<'a, F, Fut, T>(&self, work: F) -> Result + where + F: for<'b> traits::FnOnce<(&'b ThreadPool,), Output = Fut> + ProcessSend + 'a, + Fut: Future + 'a, + T: ProcessSend + 'a, + { + let process_index = self.i.get(); + let process = &self.processes[process_index]; + let request = st::Box::new(FnOnce!(move |thread_pool: &_| { + let work: F = work; + work.call_once((thread_pool,)) + .map(|response| { + let response = bincode::serialize(&response).unwrap(); + Box::new(response) as Response + }) + .boxed_local() + })); + let request = mem::transmute::< + st::Box LocalBoxFuture<'a, Response> + Send>, + st::Box LocalBoxFuture<'static, Response> + Send>, + >(request); + let x = process.sender.send(Some(request)); + x.await; + let index; + { + // https://github.com/rust-lang/rust/issues/57478 + let mut process_inner_lock = process.inner.lock().unwrap(); + process_inner_lock.queue.push_back(Queued::Awaiting); + index = process_inner_lock.tail + process_inner_lock.queue.len() - 1; + drop(process_inner_lock); + } + let on_drop = OnDrop::new(|| { + let mut process_inner_lock = process.inner.lock().unwrap(); + let offset = index - process_inner_lock.tail; + process_inner_lock.queue[offset].drop_(); + while let Some(Queued::Taken) = process_inner_lock.queue.front() { + let _ = process_inner_lock.queue.pop_front().unwrap(); + process_inner_lock.tail += 1; + } + drop(process_inner_lock); + }); + while process.inner.lock().unwrap().received <= index { + process + .synchronize + .synchronize(async { + if process.inner.lock().unwrap().received > index { + return; + } + let z = process.receiver.recv().await; + let t = z.unwrap(); + let mut process_inner_lock = process.inner.lock().unwrap(); + let offset = process_inner_lock.received - process_inner_lock.tail; + process_inner_lock.queue[offset].received(t); + process_inner_lock.received += 1; + drop(process_inner_lock); + }) + .await; + } + on_drop.cancel(); + let mut process_inner_lock = process.inner.lock().unwrap(); + let offset = index - process_inner_lock.tail; + let boxed = process_inner_lock.queue[offset].take(); + while let Some(Queued::Taken) = process_inner_lock.queue.front() { + let _ = process_inner_lock.queue.pop_front().unwrap(); + process_inner_lock.tail += 1; + } + drop(process_inner_lock); + boxed.map(|boxed| { + bincode::deserialize( + &Box::::downcast::>(boxed.into_any_send()).unwrap(), + ) + .unwrap() + }) + } } impl Drop for ProcessPoolInner { fn drop(&mut self) { @@ -279,6 +354,18 @@ impl ProcessPool { let inner = self.0.clone(); async move { inner.spawn(work).await } } + #[allow(unsafe_code)] + pub unsafe fn spawn_unchecked<'a, F, Fut, T>( + &self, work: F, + ) -> impl Future> + Send + 'a + where + F: traits::FnOnce(&ThreadPool) -> Fut + ProcessSend + 'a, + Fut: Future + 'a, + T: ProcessSend + 'a, + { + let inner = self.0.clone(); + async move { inner.spawn_unchecked(work).await } + } } impl Clone for ProcessPool { diff --git a/src/pool/thread.rs b/src/pool/thread.rs index ce451bec..fc945491 100644 --- a/src/pool/thread.rs +++ b/src/pool/thread.rs @@ -1,6 +1,7 @@ -use futures::TryFutureExt; +use futures::{ready, TryFutureExt}; +use pin_project::{pin_project, pinned_drop}; use std::{ - future::Future, io, panic::{RefUnwindSafe, UnwindSafe}, sync::Arc + future::Future, io, panic::{RefUnwindSafe, UnwindSafe}, pin::Pin, sync::Arc, task::{Context, Poll} }; #[cfg(not(target_arch = "wasm32"))] @@ -68,6 +69,47 @@ impl ThreadPool { remote_handle } } + #[allow(unsafe_code)] + pub unsafe fn spawn_unchecked<'a, F, Fut, T>( + &self, task: F, + ) -> impl Future> + Send + 'a + where + F: FnOnce() -> Fut + Send + 'a, + Fut: Future + 'a, + T: Send + 'a, + { + #[cfg(not(target_arch = "wasm32"))] + return Guard::new( + self.0 + .pool + .spawn_pinned_unchecked(task) + .map_err(JoinError::into_panic) + .map_err(Panicked::from), + ); + #[cfg(target_arch = "wasm32")] + { + let _self = self; + let task = std::mem::transmute::< + Box Pin>>> + Send>, + Box Pin>>> + Send>, + >(Box::new(|| { + let fut = Box::pin(task().map(|t| Box::new(t) as Box)); + std::mem::transmute::< + Pin>>>, + Pin>>>, + >(fut) + })); + let (remote, remote_handle) = AssertUnwindSafe(future::lazy(|_| task()).flatten()) + .catch_unwind() + .map_err(Into::into) + .remote_handle(); + wasm_bindgen_futures::spawn_local(remote); + Guard::new(remote_handle.map_ok(|t| { + let t: *mut dyn Send = Box::into_raw(t); + *Box::from_raw(t as *mut T) + })) + } + } } impl Clone for ThreadPool { @@ -83,6 +125,39 @@ impl Clone for ThreadPool { impl UnwindSafe for ThreadPool {} impl RefUnwindSafe for ThreadPool {} +#[pin_project(PinnedDrop)] +struct Guard(#[pin] Option); +impl Guard { + fn new(f: F) -> Self { + Self(Some(f)) + } +} +impl Future for Guard +where + F: Future, +{ + type Output = F::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.as_mut().project().0.as_pin_mut() { + Some(fut) => { + let output = ready!(fut.poll(cx)); + self.project().0.set(None); + Poll::Ready(output) + } + None => Poll::Pending, + } + } +} +#[pinned_drop] +impl PinnedDrop for Guard { + fn drop(self: Pin<&mut Self>) { + if self.project().0.is_some() { + panic!("dropped before finished polling!"); + } + } +} + fn _assert() { let _ = assert_sync_and_send::; } @@ -92,7 +167,7 @@ fn _assert() { mod pool { use async_channel::{bounded, Sender}; use futures::{future::RemoteHandle, FutureExt}; - use std::{any::Any, future::Future, panic::AssertUnwindSafe, pin::Pin}; + use std::{any::Any, future::Future, mem, panic::AssertUnwindSafe, pin::Pin}; use tokio::{ runtime::Handle, task::{JoinError, LocalSet} }; @@ -152,6 +227,42 @@ mod pool { .map_err(JoinError::panic) } } + #[allow(unsafe_code)] + pub(super) unsafe fn spawn_pinned_unchecked<'a, F, Fut, T>( + &self, task: F, + ) -> impl Future> + Send + 'a + where + F: FnOnce() -> Fut + Send + 'a, + Fut: Future + 'a, + T: Send + 'a, + { + let sender = self.sender.clone(); + async move { + let task: Box Box> + Send> = + Box::new(|| { + Box::new( + AssertUnwindSafe(task().map(|t| { + let t: Box = Box::new(t); + let t: Box = mem::transmute(t); + t + })) + .catch_unwind(), + ) + }); + let task: Box Box> + Send> = + mem::transmute(task); + let (sender_, receiver) = bounded::>(1); + sender.send((task, sender_)).await.unwrap(); + let res = receiver.recv().await; + let res = res.unwrap().await; + #[allow(deprecated)] + res.map(|t| { + let t: *mut dyn Any = Box::into_raw(t); + *Box::from_raw(t as *mut T) + }) + .map_err(JoinError::panic) + } + } } #[cfg(test)] diff --git a/tests/single_threaded.rs b/tests/single_threaded.rs index c3a46109..1de9dab5 100644 --- a/tests/single_threaded.rs +++ b/tests/single_threaded.rs @@ -48,4 +48,14 @@ impl ThreadPool for LocalPool { { Box::pin(future::lazy(|_| work().now_or_never().unwrap()).map(Ok)) } + unsafe fn spawn_unchecked<'a, F, Fut, T>( + &self, work: F, + ) -> BoxFuture<'a, Result>> + where + F: FnOnce() -> Fut + Send + 'a, + Fut: Future + 'a, + T: Send + 'a, + { + Box::pin(future::lazy(|_| work().now_or_never().unwrap()).map(Ok)) + } } From 7dd4127103112f0eec6f6b1c4ce3b18ac677f8f8 Mon Sep 17 00:00:00 2001 From: alecmocatta Date: Thu, 30 Jul 2020 10:20:21 +0100 Subject: [PATCH 3/5] some renames that were missed in #91 --- amadeus-core/src/par_sink.rs | 32 +++++++++++++------------- amadeus-core/src/par_sink/all.rs | 4 ++-- amadeus-core/src/par_sink/any.rs | 4 ++-- amadeus-core/src/par_sink/collect.rs | 4 ++-- amadeus-core/src/par_sink/combine.rs | 2 +- amadeus-core/src/par_sink/count.rs | 2 +- amadeus-core/src/par_sink/fold.rs | 2 +- amadeus-core/src/par_sink/for_each.rs | 4 ++-- amadeus-core/src/par_sink/fork.rs | 8 +++---- amadeus-core/src/par_sink/group_by.rs | 6 ++--- amadeus-core/src/par_sink/histogram.rs | 2 +- amadeus-core/src/par_sink/max.rs | 12 +++++----- amadeus-core/src/par_sink/pipe.rs | 4 ++-- amadeus-core/src/par_sink/sample.rs | 6 ++--- amadeus-core/src/par_sink/sum.rs | 2 +- amadeus-core/src/par_sink/tuple.rs | 16 ++++++------- amadeus-core/src/pipe.rs | 24 +++++++++---------- 17 files changed, 67 insertions(+), 67 deletions(-) diff --git a/amadeus-core/src/par_sink.rs b/amadeus-core/src/par_sink.rs index 96eaf1f8..780373d8 100644 --- a/amadeus-core/src/par_sink.rs +++ b/amadeus-core/src/par_sink.rs @@ -24,28 +24,28 @@ pub use self::{ }; #[must_use] -pub trait Reducer { +pub trait Reducer { type Done; - type Async: Sink; + type Async: Sink; fn into_async(self) -> Self::Async; } -pub trait ReducerSend: Reducer>::Done> { +pub trait ReducerSend: Reducer>::Done> { type Done: Send + 'static; } -pub trait ReducerProcessSend: - ReducerSend>::Done> +pub trait ReducerProcessSend: + ReducerSend>::Done> { type Done: ProcessSend + 'static; } #[must_use] -pub trait ParallelSink { +pub trait ParallelSink { type Done; - type Pipe: ParallelPipe; - type ReduceA: ReducerSend<>::Output> + Clone + Send; + type Pipe: ParallelPipe; + type ReduceA: ReducerSend<>::Output> + Clone + Send; type ReduceC: Reducer< - >::Output>>::Done, + >::Output>>::Done, Done = Self::Done, >; @@ -53,25 +53,25 @@ pub trait ParallelSink { } #[inline(always)] -pub(crate) fn assert_parallel_sink, Input>(r: R) -> R { +pub(crate) fn assert_parallel_sink, Item>(r: R) -> R { r } #[must_use] -pub trait DistributedSink { +pub trait DistributedSink { type Done; - type Pipe: DistributedPipe; - type ReduceA: ReducerSend<>::Output> + type Pipe: DistributedPipe; + type ReduceA: ReducerSend<>::Output> + Clone + ProcessSend + Send; type ReduceB: ReducerProcessSend< - >::Output>>::Done, + >::Output>>::Done, > + Clone + ProcessSend; type ReduceC: Reducer< >::Output>>::Done, + >::Output>>::Done, >>::Done, Done = Self::Done, >; @@ -80,6 +80,6 @@ pub trait DistributedSink { } #[inline(always)] -pub(crate) fn assert_distributed_sink, Input>(r: R) -> R { +pub(crate) fn assert_distributed_sink, Item>(r: R) -> R { r } diff --git a/amadeus-core/src/par_sink/all.rs b/amadeus-core/src/par_sink/all.rs index 10c3a9a6..c40d2dda 100644 --- a/amadeus-core/src/par_sink/all.rs +++ b/amadeus-core/src/par_sink/all.rs @@ -20,7 +20,7 @@ pub struct All { f: F, } -impl, Input, F> ParallelSink for All +impl, Item, F> ParallelSink for All where F: FnMut<(P::Output,), Output = bool> + Clone + Send + 'static, { @@ -33,7 +33,7 @@ where (self.pipe, AllReducer(self.f, PhantomData), BoolAndReducer) } } -impl, Input, F> DistributedSink for All +impl, Item, F> DistributedSink for All where F: FnMut<(P::Output,), Output = bool> + Clone + ProcessSend + 'static, { diff --git a/amadeus-core/src/par_sink/any.rs b/amadeus-core/src/par_sink/any.rs index ccdfdb66..91c51264 100644 --- a/amadeus-core/src/par_sink/any.rs +++ b/amadeus-core/src/par_sink/any.rs @@ -20,7 +20,7 @@ pub struct Any { f: F, } -impl, Input, F> ParallelSink for Any +impl, Item, F> ParallelSink for Any where F: FnMut<(P::Output,), Output = bool> + Clone + Send + 'static, { @@ -33,7 +33,7 @@ where (self.pipe, AnyReducer(self.f, PhantomData), BoolOrReducer) } } -impl, Input, F> DistributedSink for Any +impl, Item, F> DistributedSink for Any where F: FnMut<(P::Output,), Output = bool> + Clone + ProcessSend + 'static, { diff --git a/amadeus-core/src/par_sink/collect.rs b/amadeus-core/src/par_sink/collect.rs index 704cec65..d01b2f2a 100644 --- a/amadeus-core/src/par_sink/collect.rs +++ b/amadeus-core/src/par_sink/collect.rs @@ -19,7 +19,7 @@ pub struct Collect { marker: PhantomData A>, } -impl, Input, T: FromParallelStream> ParallelSink +impl, Item, T: FromParallelStream> ParallelSink for Collect { type Done = T; @@ -32,7 +32,7 @@ impl, Input, T: FromParallelStream> ParallelSi (self.pipe, a, b) } } -impl, Input, T: FromDistributedStream> DistributedSink +impl, Item, T: FromDistributedStream> DistributedSink for Collect { type Done = T; diff --git a/amadeus-core/src/par_sink/combine.rs b/amadeus-core/src/par_sink/combine.rs index 1baf18da..7ba5b7ce 100644 --- a/amadeus-core/src/par_sink/combine.rs +++ b/amadeus-core/src/par_sink/combine.rs @@ -44,7 +44,7 @@ pub struct Combine { } impl_par_dist! { - impl, Input, F> ParallelSink for Combine + impl, Item, F> ParallelSink for Combine where F: FnMut<(P::Output, P::Output), Output = P::Output> + Clone + Send + 'static, P::Output: Send + 'static, diff --git a/amadeus-core/src/par_sink/count.rs b/amadeus-core/src/par_sink/count.rs index 8046ae1b..6d7caee6 100644 --- a/amadeus-core/src/par_sink/count.rs +++ b/amadeus-core/src/par_sink/count.rs @@ -12,7 +12,7 @@ pub struct Count

{ } impl_par_dist! { - impl, Input> ParallelSink for Count

{ + impl, Item> ParallelSink for Count

{ folder_par_sink!( CountFolder, SumFolder, diff --git a/amadeus-core/src/par_sink/fold.rs b/amadeus-core/src/par_sink/fold.rs index 8a04f718..b588780f 100644 --- a/amadeus-core/src/par_sink/fold.rs +++ b/amadeus-core/src/par_sink/fold.rs @@ -20,7 +20,7 @@ pub struct Fold { } impl_par_dist! { - impl, Input, ID, F, B> ParallelSink for Fold + impl, Item, ID, F, B> ParallelSink for Fold where ID: FnMut<(), Output = B> + Clone + Send + 'static, F: FnMut<(B, Either), Output = B> + Clone + Send + 'static, diff --git a/amadeus-core/src/par_sink/for_each.rs b/amadeus-core/src/par_sink/for_each.rs index 24603d45..6b9e3366 100644 --- a/amadeus-core/src/par_sink/for_each.rs +++ b/amadeus-core/src/par_sink/for_each.rs @@ -20,7 +20,7 @@ pub struct ForEach { f: F, } -impl, Input, F> ParallelSink for ForEach +impl, Item, F> ParallelSink for ForEach where F: FnMut<(P::Output,), Output = ()> + Clone + Send + 'static, { @@ -37,7 +37,7 @@ where ) } } -impl, Input, F> DistributedSink for ForEach +impl, Item, F> DistributedSink for ForEach where F: FnMut<(P::Output,), Output = ()> + Clone + ProcessSend + 'static, { diff --git a/amadeus-core/src/par_sink/fork.rs b/amadeus-core/src/par_sink/fork.rs index fc52efc8..939d46ae 100644 --- a/amadeus-core/src/par_sink/fork.rs +++ b/amadeus-core/src/par_sink/fork.rs @@ -84,9 +84,9 @@ impl_par_dist! { } } -impl ParallelSink for Fork +impl ParallelSink for Fork where - A: ParallelPipe, + A: ParallelPipe, B: ParallelSink, C: ParallelSink, RefAItem: 'static, @@ -107,9 +107,9 @@ where ) } } -impl DistributedSink for Fork +impl DistributedSink for Fork where - A: DistributedPipe, + A: DistributedPipe, B: DistributedSink, C: DistributedSink, RefAItem: 'static, diff --git a/amadeus-core/src/par_sink/group_by.rs b/amadeus-core/src/par_sink/group_by.rs index fa625bdc..7a558d77 100644 --- a/amadeus-core/src/par_sink/group_by.rs +++ b/amadeus-core/src/par_sink/group_by.rs @@ -25,7 +25,7 @@ pub struct GroupBy { b: B, } -impl, B: ParallelSink, Input, T, U> ParallelSink +impl, B: ParallelSink, Item, T, U> ParallelSink for GroupBy where T: Eq + Hash + Send + 'static, @@ -53,8 +53,8 @@ where } } -impl, B: DistributedSink, Input, T, U> - DistributedSink for GroupBy +impl, B: DistributedSink, Item, T, U> + DistributedSink for GroupBy where T: Eq + Hash + ProcessSend + 'static, >::Task: Clone + ProcessSend + 'static, diff --git a/amadeus-core/src/par_sink/histogram.rs b/amadeus-core/src/par_sink/histogram.rs index 459aff52..1380bff0 100644 --- a/amadeus-core/src/par_sink/histogram.rs +++ b/amadeus-core/src/par_sink/histogram.rs @@ -16,7 +16,7 @@ pub struct Histogram

{ } impl_par_dist! { - impl, Input> ParallelSink for Histogram

+ impl, Item> ParallelSink for Histogram

where P::Output: Hash + Ord + Send + 'static, { diff --git a/amadeus-core/src/par_sink/max.rs b/amadeus-core/src/par_sink/max.rs index cc40088e..210eeed2 100644 --- a/amadeus-core/src/par_sink/max.rs +++ b/amadeus-core/src/par_sink/max.rs @@ -12,7 +12,7 @@ pub struct Max

{ pipe: P, } impl_par_dist! { - impl, Input> ParallelSink for Max

+ impl, Item> ParallelSink for Max

where P::Output: Ord + Send + 'static, { @@ -27,7 +27,7 @@ pub struct MaxBy { f: F, } impl_par_dist! { - impl, Input, F> ParallelSink for MaxBy + impl, Item, F> ParallelSink for MaxBy where F: for<'a, 'b> FnMut<(&'a P::Output, &'b P::Output), Output = Ordering> + Clone @@ -46,7 +46,7 @@ pub struct MaxByKey { f: F, } impl_par_dist! { - impl, Input, F, B> ParallelSink for MaxByKey + impl, Item, F, B> ParallelSink for MaxByKey where F: for<'a> FnMut<(&'a P::Output,), Output = B> + Clone + Send + 'static, B: Ord + 'static, @@ -62,7 +62,7 @@ pub struct Min

{ pipe: P, } impl_par_dist! { - impl, Input> ParallelSink for Min

+ impl, Item> ParallelSink for Min

where P::Output: Ord + Send + 'static, { @@ -77,7 +77,7 @@ pub struct MinBy { f: F, } impl_par_dist! { - impl, Input, F> ParallelSink for MinBy + impl, Item, F> ParallelSink for MinBy where F: for<'a, 'b> FnMut<(&'a P::Output, &'b P::Output), Output = Ordering> + Clone @@ -96,7 +96,7 @@ pub struct MinByKey { f: F, } impl_par_dist! { - impl, Input, F, B> ParallelSink for MinByKey + impl, Item, F, B> ParallelSink for MinByKey where F: for<'a> FnMut<(&'a P::Output,), Output = B> + Clone + Send + 'static, B: Ord + 'static, diff --git a/amadeus-core/src/par_sink/pipe.rs b/amadeus-core/src/par_sink/pipe.rs index 88300a50..c7a28cec 100644 --- a/amadeus-core/src/par_sink/pipe.rs +++ b/amadeus-core/src/par_sink/pipe.rs @@ -55,7 +55,7 @@ impl_par_dist! { } } -impl, B: ParallelSink, Input> ParallelSink for Pipe { +impl, B: ParallelSink, Item> ParallelSink for Pipe { type Done = B::Done; type Pipe = Pipe; type ReduceA = B::ReduceA; @@ -67,7 +67,7 @@ impl, B: ParallelSink, Input> ParallelSink, B: DistributedSink, Input> DistributedSink +impl, B: DistributedSink, Item> DistributedSink for Pipe { type Done = B::Done; diff --git a/amadeus-core/src/par_sink/sample.rs b/amadeus-core/src/par_sink/sample.rs index 0eec0dff..55c5ab02 100644 --- a/amadeus-core/src/par_sink/sample.rs +++ b/amadeus-core/src/par_sink/sample.rs @@ -18,7 +18,7 @@ pub struct SampleUnstable

{ } impl_par_dist! { - impl, Input> ParallelSink for SampleUnstable

+ impl, Item> ParallelSink for SampleUnstable

where P::Output: Send + 'static, { @@ -58,7 +58,7 @@ pub struct MostFrequent

{ } impl_par_dist! { - impl, Input> ParallelSink for MostFrequent

+ impl, Item> ParallelSink for MostFrequent

where P::Output: Clone + Hash + Eq + Send + 'static, { @@ -104,7 +104,7 @@ pub struct MostDistinct

{ } impl_par_dist! { - impl, Input, A, B> ParallelSink for MostDistinct

+ impl, Item, A, B> ParallelSink for MostDistinct

where A: Clone + Hash + Eq + Send + 'static, B: Hash + 'static, diff --git a/amadeus-core/src/par_sink/sum.rs b/amadeus-core/src/par_sink/sum.rs index 8ea191e1..46f5328b 100644 --- a/amadeus-core/src/par_sink/sum.rs +++ b/amadeus-core/src/par_sink/sum.rs @@ -14,7 +14,7 @@ pub struct Sum { } impl_par_dist! { - impl, Input, B> ParallelSink for Sum + impl, Item, B> ParallelSink for Sum where B: iter::Sum + iter::Sum + Send + 'static, { diff --git a/amadeus-core/src/par_sink/tuple.rs b/amadeus-core/src/par_sink/tuple.rs index 837d8102..320c6c73 100644 --- a/amadeus-core/src/par_sink/tuple.rs +++ b/amadeus-core/src/par_sink/tuple.rs @@ -49,11 +49,11 @@ where macro_rules! impl_tuple { ($reducea:ident $reduceaasync:ident $reduceb:ident $reducebasync:ident $async:ident $enum:ident $join:ident $($copy:ident)? : $($num:tt $t:ident $s:ident $i:ident $r:ident $o:ident $c:ident $iterator:ident $reducera:ident $reducerb:ident $($copyb:ident)? , $comma:tt)*) => ( impl< - Input, - $($r: ParallelSink,)* + Item, + $($r: ParallelSink,)* $($o,)* - > ParallelSink for ($($r,)*) - where Input: $($copy)*, + > ParallelSink for ($($r,)*) + where Item: $($copy)*, { type Done = ($($o,)*); type Pipe = ($($r::Pipe,)*); @@ -70,11 +70,11 @@ macro_rules! impl_tuple { } } impl< - Input, - $($r: DistributedSink,)* + Item, + $($r: DistributedSink,)* $($o,)* - > DistributedSink for ($($r,)*) - where Input: $($copy)*, + > DistributedSink for ($($r,)*) + where Item: $($copy)*, { type Done = ($($o,)*); type Pipe = ($($r::Pipe,)*); diff --git a/amadeus-core/src/pipe.rs b/amadeus-core/src/pipe.rs index 664eef85..ea4fdf60 100644 --- a/amadeus-core/src/pipe.rs +++ b/amadeus-core/src/pipe.rs @@ -151,9 +151,9 @@ where p } #[inline(always)] -fn assert_sink(s: S) -> S +fn assert_sink(s: S) -> S where - S: Sink, + S: Sink, { s } @@ -221,16 +221,16 @@ pub struct PipeSink { sink: S, } -impl Sink for PipeSink +impl Sink for PipeSink where - P: Pipe, + P: Pipe, S: Sink, { type Done = S::Done; #[inline(always)] fn poll_forward( - self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream>, + self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream>, ) -> Poll { let self_ = self.project(); let stream = stream.pipe(self_.pipe); @@ -368,30 +368,30 @@ where } } -impl Sink for Pin

+impl Sink for Pin

where P: DerefMut + Unpin, - P::Target: Sink, + P::Target: Sink, { - type Done = >::Done; + type Done = >::Done; #[inline(always)] fn poll_forward( - self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream>, + self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream>, ) -> Poll { self.get_mut().as_mut().poll_forward(cx, stream) } } -impl Sink for &mut T +impl Sink for &mut T where - T: Sink + Unpin, + T: Sink + Unpin, { type Done = T::Done; #[inline(always)] fn poll_forward( - mut self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream>, + mut self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream>, ) -> Poll { Pin::new(&mut **self).poll_forward(cx, stream) } From 4d6515b4c1aabe3ad1b7317c11fd2370caba143d Mon Sep 17 00:00:00 2001 From: alecmocatta Date: Thu, 30 Jul 2020 14:17:54 +0100 Subject: [PATCH 4/5] add benchmarks --- Cargo.toml | 23 ++++- amadeus-core/src/pool.rs | 16 +++- azure-pipelines.yml | 8 +- benches/csv.rs | 130 ++++++++++++++++++++++++++++ benches/in_memory.rs | 72 ++++++++++++++++ benches/parquet.rs | 123 ++++++++++++++++++++++++++ tests/csv.rs | 4 +- tests/csv_dist.rs | 4 +- tests/csv_wasm.rs | 4 +- tests/parquet.rs | 182 +++++++++++++++++++-------------------- tests/parquet_dist.rs | 182 +++++++++++++++++++-------------------- 11 files changed, 545 insertions(+), 203 deletions(-) create mode 100644 benches/csv.rs create mode 100644 benches/in_memory.rs create mode 100644 benches/parquet.rs diff --git a/Cargo.toml b/Cargo.toml index 22cc07db..d0cd9b54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ parquet = ["amadeus-parquet", "amadeus-derive/parquet"] postgres = ["amadeus-postgres", "amadeus-derive/postgres"] csv = ["amadeus-serde", "amadeus-derive/serde"] json = ["amadeus-serde", "amadeus-derive/serde"] +bench = ["serde-csv", "once_cell", "arrow-parquet", "rayon"] [package.metadata.docs.rs] features = ["constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"] @@ -47,7 +48,6 @@ async-channel = "1.1" bincode = { version = "1.3", optional = true } constellation-rs = { version = "0.2.0-alpha.2", default-features = false, optional = true } derive-new = "0.5" -doc-comment = "0.3" futures = "0.3" num_cpus = "1.13" pin-project = "0.4" @@ -56,16 +56,21 @@ serde_closure = "0.3" serde_traitobject = { version = "0.2", optional = true } tokio = { version = "0.2", features = ["rt-threaded", "rt-util", "blocking"] } +# Move to dev-dependencies once fixed: https://github.com/rust-lang/cargo/issues/1596 +arrow-parquet = { package = "parquet", version = "1.0", default-features = false, features = ["brotli", "flate2", "lz4", "snap"], optional = true } +once_cell = { version = "1.4", optional = true } +rayon = { version = "1.3", optional = true } +serde-csv = { package = "csv", version = "1.0", optional = true } + [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen = "0.2" wasm-bindgen-futures = "0.4" web-sys = { version = "0.3", features = ["Blob", "Performance", "Response", "Window"] } [dev-dependencies] +doc-comment = "0.3" either = { version = "1.5", features = ["serde"] } -once_cell = "1.4" rand = "0.7" -rayon = "1.3" serde_json = "1.0" streaming_algorithms = "0.3" tokio = { version = "0.2", features = ["macros", "time"] } @@ -167,3 +172,15 @@ name = "postgres_dist" harness = false required-features = ["postgres"] test = false # TODO set up postgres on CI + +[[bench]] +name = "csv" +required-features = ["bench", "csv"] + +[[bench]] +name = "in_memory" +required-features = ["bench"] + +[[bench]] +name = "parquet" +required-features = ["bench", "parquet"] diff --git a/amadeus-core/src/pool.rs b/amadeus-core/src/pool.rs index a7a60616..6e6c3a63 100644 --- a/amadeus-core/src/pool.rs +++ b/amadeus-core/src/pool.rs @@ -20,7 +20,13 @@ pub trait ProcessPool: Clone + Send + Sync + RefUnwindSafe + UnwindSafe + Unpin where F: traits::FnOnce(&Self::ThreadPool) -> Fut + ProcessSend + 'static, Fut: Future + 'static, - T: ProcessSend + 'static; + T: ProcessSend + 'static, + { + #[allow(unsafe_code)] + unsafe { + self.spawn_unchecked(work) + } + } /// # Safety /// @@ -40,7 +46,13 @@ pub trait ThreadPool: Clone + Send + Sync + RefUnwindSafe + UnwindSafe + Unpin { where F: FnOnce() -> Fut + Send + 'static, Fut: Future + 'static, - T: Send + 'static; + T: Send + 'static, + { + #[allow(unsafe_code)] + unsafe { + self.spawn_unchecked(work) + } + } /// # Safety /// diff --git a/azure-pipelines.yml b/azure-pipelines.yml index d265da45..8fcf189a 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -22,8 +22,8 @@ jobs: rust_toolchain: nightly rust_lint_toolchain: nightly-2020-07-26 rust_flags: '' - rust_features_clippy: ';aws;commoncrawl;parquet;postgres;csv;json;constellation aws commoncrawl parquet postgres csv json' - rust_features: 'constellation aws commoncrawl parquet postgres csv json' + rust_features_clippy: ';aws;commoncrawl;parquet;postgres;csv;json;constellation aws commoncrawl parquet postgres csv json bench' + rust_features: 'constellation aws commoncrawl parquet postgres csv json bench' rust_doc_features: 'constellation aws commoncrawl parquet postgres csv json' rust_target_check: '' rust_target_build: '' @@ -31,8 +31,8 @@ jobs: matrix: windows: imageName: 'windows-latest' - rust_features_clippy: ';aws;commoncrawl;parquet;postgres;csv;json;aws commoncrawl parquet postgres csv json' - rust_features: 'aws commoncrawl parquet postgres csv json' + rust_features_clippy: ';aws;commoncrawl;parquet;postgres;csv;json;aws commoncrawl parquet postgres csv json bench' + rust_features: 'aws commoncrawl parquet postgres csv json bench' rust_doc_features: 'aws commoncrawl parquet postgres csv json' rust_target_run: 'x86_64-pc-windows-msvc' mac: diff --git a/benches/csv.rs b/benches/csv.rs new file mode 100644 index 00000000..f9188fef --- /dev/null +++ b/benches/csv.rs @@ -0,0 +1,130 @@ +#![cfg(nightly)] +#![feature(test)] +#![allow(clippy::suspicious_map)] + +extern crate test; + +use once_cell::sync::Lazy; +use serde::Deserialize; +use std::{fs, future::Future, path::PathBuf}; +use test::Bencher; +use tokio::runtime::Runtime; + +use amadeus::prelude::*; + +static RT: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap() +}); +static POOL: Lazy = Lazy::new(|| ThreadPool::new(None).unwrap()); + +#[derive(Data, Clone, Deserialize, PartialEq, PartialOrd, Debug)] +struct GameDerived { + a: String, + b: String, + c: String, + d: String, + e: u32, + f: String, +} + +#[derive(Data, Clone, PartialEq, PartialOrd, Debug)] +struct GameDerived2 { + a: String, + b: String, + c: String, + d: String, + e: u64, + f: String, +} + +#[bench] +fn csv_typed(b: &mut Bencher) { + let file = "amadeus-testing/csv/game.csv"; // 2,600,000 bytes + run(b, file, || async { + let rows = Csv::<_, GameDerived>::new(vec![PathBuf::from(file)]) + .await + .unwrap(); + assert_eq!( + rows.par_stream() + .map(|row: Result<_, _>| row.unwrap()) + .count(&*POOL) + .await, + 100_000 + ); + }) +} + +#[bench] +fn csv_typed_serde(b: &mut Bencher) { + let file = "amadeus-testing/csv/game.csv"; // 2,600,000 bytes + run(b, file, || async { + let mut rows = serde_csv::ReaderBuilder::new() + .has_headers(false) + .from_path(file) + .unwrap(); + assert_eq!(rows.deserialize::().count(), 100_000); + }); +} + +#[bench] +fn csv_untyped(b: &mut Bencher) { + let file = "amadeus-testing/csv/game.csv"; // 2,600,000 bytes + run(b, file, || async { + let rows = Csv::<_, Value>::new(vec![PathBuf::from(file)]) + .await + .unwrap(); + assert_eq!( + rows.par_stream() + .map(|row: Result<_, _>| row.unwrap()) + .count(&*POOL) + .await, + 100_000 + ); + }) +} + +#[bench] +fn csv_untyped_serde(b: &mut Bencher) { + let file = "amadeus-testing/csv/game.csv"; // 2,600,000 bytes + run(b, file, || async { + let mut rows = serde_csv::ReaderBuilder::new() + .has_headers(false) + .from_path(file) + .unwrap(); + assert_eq!(rows.records().count(), 100_000); + }); +} + +#[bench] +fn csv_untyped_downcase(b: &mut Bencher) { + let file = "amadeus-testing/csv/game.csv"; // 2,600,000 bytes + run(b, file, || async { + let rows = Csv::<_, Value>::new(vec![PathBuf::from(file)]) + .await + .unwrap(); + assert_eq!( + rows.par_stream() + .map(|row: Result<_, _>| { + let _: GameDerived2 = row.unwrap().downcast().unwrap(); + }) + .count(&*POOL) + .await, + 100_000 + ); + }) +} + +fn run(b: &mut Bencher, file: &str, mut task: impl FnMut() -> F) +where + F: Future, +{ + RT.enter(|| { + let _ = Lazy::force(&POOL); + b.bytes = fs::metadata(file).unwrap().len(); + b.iter(|| RT.handle().block_on(task())) + }) +} diff --git a/benches/in_memory.rs b/benches/in_memory.rs new file mode 100644 index 00000000..ddea7636 --- /dev/null +++ b/benches/in_memory.rs @@ -0,0 +1,72 @@ +#![cfg(nightly)] +#![feature(test)] +#![allow(clippy::suspicious_map)] + +extern crate test; + +use once_cell::sync::Lazy; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use std::{future::Future, mem}; +use test::Bencher; +use tokio::runtime::Runtime; + +use amadeus::prelude::*; + +static RT: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap() +}); +static POOL: Lazy = Lazy::new(|| ThreadPool::new(None).unwrap()); + +#[bench] +fn vec(b: &mut Bencher) { + let rows: Vec = (0..1u32 << 28).collect(); + let len = rows.len() as u64; + let sum = len * (len - 1) / 2; + let bytes = len * mem::size_of::() as u64; + run(b, bytes, || async { + assert_eq!( + rows.par_stream() + .map(|x| x as u64) + .sum::<_, u64>(&*POOL) + .await, + sum + ); + }) +} + +#[bench] +fn iter(b: &mut Bencher) { + let rows: Vec = (0..1u32 << 28).collect(); + let len = rows.len() as u64; + let sum = len * (len - 1) / 2; + let bytes = len * mem::size_of::() as u64; + run(b, bytes, || async { + assert_eq!(rows.iter().map(|&x| x as u64).sum::(), sum); + }); +} + +#[bench] +fn rayon(b: &mut Bencher) { + let rows: Vec = (0..1u32 << 28).collect(); + let len = rows.len() as u64; + let sum = len * (len - 1) / 2; + let bytes = len * mem::size_of::() as u64; + run(b, bytes, || async { + assert_eq!(rows.par_iter().map(|&x| x as u64).sum::(), sum); + }); +} + +fn run(b: &mut Bencher, bytes: u64, mut task: impl FnMut() -> F) +where + F: Future, +{ + RT.enter(|| { + let _ = Lazy::force(&POOL); + b.bytes = bytes; + b.iter(|| RT.handle().block_on(task())) + }) +} diff --git a/benches/parquet.rs b/benches/parquet.rs new file mode 100644 index 00000000..57d325d9 --- /dev/null +++ b/benches/parquet.rs @@ -0,0 +1,123 @@ +#![cfg(nightly)] +#![feature(test)] +#![allow(clippy::suspicious_map)] + +extern crate test; + +use arrow_parquet::file::reader::{FileReader, SerializedFileReader}; +use once_cell::sync::Lazy; +use std::{fs, fs::File, future::Future, path::PathBuf}; +use test::Bencher; +use tokio::runtime::Runtime; + +use amadeus::prelude::*; + +static RT: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap() +}); +static POOL: Lazy = Lazy::new(|| ThreadPool::new(None).unwrap()); + +#[derive(Data, Clone, PartialEq, Debug)] +struct TenKayVeeTwo { + binary_field: List, + int32_field: i32, + int64_field: i64, + boolean_field: bool, + float_field: f32, + double_field: f64, + flba_field: List, // [u8;1024], + int96_field: DateTime, +} + +#[derive(Data, Clone, PartialEq, Debug)] +struct StockSimulated { + bp1: Option, + bp2: Option, + bp3: Option, + bp4: Option, + bp5: Option, + bs1: Option, + bs2: Option, + bs3: Option, + bs4: Option, + bs5: Option, + ap1: Option, + ap2: Option, + ap3: Option, + ap4: Option, + ap5: Option, + as1: Option, + as2: Option, + as3: Option, + as4: Option, + as5: Option, + valid: Option, + __index_level_0__: Option, +} + +#[bench] +fn parquet_10k(b: &mut Bencher) { + let file = "amadeus-testing/parquet/10k-v2.parquet"; // 669,034 bytes + run(b, file, || async { + let rows = Parquet::<_, TenKayVeeTwo>::new(PathBuf::from(file)) + .await + .unwrap(); + assert_eq!( + rows.par_stream() + .map(|row: Result<_, _>| row.unwrap()) + .count(&*POOL) + .await, + 10_000 + ); + }) +} + +#[bench] +fn parquet_stock(b: &mut Bencher) { + let file = "amadeus-testing/parquet/stock_simulated.parquet"; // 1,289,419 bytes + run(b, file, || async { + let rows = Parquet::<_, StockSimulated>::new(PathBuf::from(file)) + .await + .unwrap(); + assert_eq!( + rows.par_stream() + .map(|row: Result<_, _>| row.unwrap()) + .count(&*POOL) + .await, + 42_000 + ); + }) +} + +#[bench] +fn parquet_10k_arrow(b: &mut Bencher) { + let file = "amadeus-testing/parquet/10k-v2.parquet"; // 669,034 bytes + run(b, file, || async { + let parquet_reader = SerializedFileReader::new(File::open(file).unwrap()).unwrap(); + assert_eq!(parquet_reader.get_row_iter(None).unwrap().count(), 10_000); + }) +} + +#[bench] +fn parquet_stock_arrow(b: &mut Bencher) { + let file = "amadeus-testing/parquet/stock_simulated.parquet"; // 1,289,419 bytes + run(b, file, || async { + let parquet_reader = SerializedFileReader::new(File::open(file).unwrap()).unwrap(); + assert_eq!(parquet_reader.get_row_iter(None).unwrap().count(), 42_000); + }) +} + +fn run(b: &mut Bencher, file: &str, mut task: impl FnMut() -> F) +where + F: Future, +{ + RT.enter(|| { + let _ = Lazy::force(&POOL); + b.bytes = fs::metadata(file).unwrap().len(); + b.iter(|| RT.handle().block_on(task())) + }) +} diff --git a/tests/csv.rs b/tests/csv.rs index 987d98dd..b1b56a40 100644 --- a/tests/csv.rs +++ b/tests/csv.rs @@ -20,7 +20,7 @@ async fn csv() { f: String, } - let rows = Csv::<_, GameDerived>::new(vec![PathBuf::from("amadeus-testing/csv/game.csv")]) + let rows = Csv::<_, GameDerived>::new(PathBuf::from("amadeus-testing/csv/game.csv")) .await .unwrap(); assert_eq!( @@ -41,7 +41,7 @@ async fn csv() { f: String, } - let rows = Csv::<_, Value>::new(vec![PathBuf::from("amadeus-testing/csv/game.csv")]) + let rows = Csv::<_, Value>::new(PathBuf::from("amadeus-testing/csv/game.csv")) .await .unwrap(); assert_eq!( diff --git a/tests/csv_dist.rs b/tests/csv_dist.rs index 73a08769..3d214555 100644 --- a/tests/csv_dist.rs +++ b/tests/csv_dist.rs @@ -45,7 +45,7 @@ async fn run(pool: &P) -> Duration { f: String, } - let rows = Csv::<_, GameDerived>::new(vec![PathBuf::from("amadeus-testing/csv/game.csv")]) + let rows = Csv::<_, GameDerived>::new(PathBuf::from("amadeus-testing/csv/game.csv")) .await .unwrap(); assert_eq!( @@ -66,7 +66,7 @@ async fn run(pool: &P) -> Duration { f: String, } - let rows = Csv::<_, Value>::new(vec![PathBuf::from("amadeus-testing/csv/game.csv")]) + let rows = Csv::<_, Value>::new(PathBuf::from("amadeus-testing/csv/game.csv")) .await .unwrap(); assert_eq!( diff --git a/tests/csv_wasm.rs b/tests/csv_wasm.rs index f2bcf27c..4c51d47b 100644 --- a/tests/csv_wasm.rs +++ b/tests/csv_wasm.rs @@ -56,7 +56,7 @@ async fn csv() { f: String, } - let rows = Csv::<_, GameDerived>::new(vec![PathBuf::from("amadeus-testing/csv/game.csv")]) + let rows = Csv::<_, GameDerived>::new(PathBuf::from("amadeus-testing/csv/game.csv")) .await .unwrap(); assert_eq!( @@ -77,7 +77,7 @@ async fn csv() { f: String, } - let rows = Csv::<_, Value>::new(vec![PathBuf::from("amadeus-testing/csv/game.csv")]) + let rows = Csv::<_, Value>::new(PathBuf::from("amadeus-testing/csv/game.csv")) .await .unwrap(); assert_eq!( diff --git a/tests/parquet.rs b/tests/parquet.rs index 758eb082..c48da012 100644 --- a/tests/parquet.rs +++ b/tests/parquet.rs @@ -133,9 +133,9 @@ async fn parquet() { valid: Option, __index_level_0__: Option, } - let rows = Parquet::<_, StockSimulatedDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, StockSimulatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -146,9 +146,9 @@ async fn parquet() { 42_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -169,9 +169,9 @@ async fn parquet() { __index_level_0__: Option, } - let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new(vec![PathBuf::from( + let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -182,9 +182,9 @@ async fn parquet() { 42_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -202,9 +202,9 @@ async fn parquet() { #[derive(Data, Clone, PartialEq, Debug)] struct StockSimulatedDerivedProjection2 {} - let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new(vec![PathBuf::from( + let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -215,9 +215,9 @@ async fn parquet() { 42_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -255,11 +255,10 @@ async fn parquet() { int96_field: DateTime, } - let rows = Parquet::<_, TenKayVeeTwo>::new(vec![PathBuf::from( - "amadeus-testing/parquet/10k-v2.parquet", - )]) - .await - .unwrap(); + let rows = + Parquet::<_, TenKayVeeTwo>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) + .await + .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) @@ -268,9 +267,9 @@ async fn parquet() { 10_000 ); - let rows = Parquet::<_, TenKayVeeTwoDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, TenKayVeeTwoDerived>::new(PathBuf::from( "amadeus-testing/parquet/10k-v2.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -281,11 +280,9 @@ async fn parquet() { 10_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( - "amadeus-testing/parquet/10k-v2.parquet", - )]) - .await - .unwrap(); + let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) + .await + .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { @@ -328,9 +325,9 @@ async fn parquet() { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesDictionary>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesDictionary>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -341,9 +338,9 @@ async fn parquet() { 2 ); - let rows = Parquet::<_, AlltypesDictionaryDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesDictionaryDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -354,9 +351,9 @@ async fn parquet() { 2 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -401,9 +398,9 @@ async fn parquet() { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesPlain>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlain>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -414,9 +411,9 @@ async fn parquet() { 8 ); - let rows = Parquet::<_, AlltypesPlainDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlainDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -427,9 +424,9 @@ async fn parquet() { 8 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -474,9 +471,9 @@ async fn parquet() { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesPlainSnappy>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlainSnappy>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -487,9 +484,9 @@ async fn parquet() { 2 ); - let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -500,9 +497,9 @@ async fn parquet() { 2 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -555,9 +552,9 @@ async fn parquet() { a: Option>>>>>>, b: i32, } - let rows = Parquet::<_, NestedLists>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedLists>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -568,9 +565,9 @@ async fn parquet() { 3 ); - let rows = Parquet::<_, NestedListsDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedListsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -581,9 +578,9 @@ async fn parquet() { 3 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -610,9 +607,9 @@ async fn parquet() { b: i32, c: f64, } - let rows = Parquet::<_, NestedMaps>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedMaps>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -623,9 +620,9 @@ async fn parquet() { 6 ); - let rows = Parquet::<_, NestedMapsDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedMapsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -636,9 +633,9 @@ async fn parquet() { 6 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -704,9 +701,9 @@ async fn parquet() { f: String, } - let rows = Parquet::<_, Nonnullable>::new(vec![PathBuf::from( + let rows = Parquet::<_, Nonnullable>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -717,9 +714,9 @@ async fn parquet() { 1 ); - let rows = Parquet::<_, NonnullableDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NonnullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -730,9 +727,9 @@ async fn parquet() { 1 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -777,9 +774,9 @@ async fn parquet() { Option>>,)>,)>>>, )>, } - let rows = Parquet::<_, Nullable>::new(vec![PathBuf::from( + let rows = Parquet::<_, Nullable>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -790,9 +787,9 @@ async fn parquet() { 7 ); - let rows = Parquet::<_, NullableDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -803,9 +800,9 @@ async fn parquet() { 7 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -826,9 +823,9 @@ async fn parquet() { struct NullsDerived { b_struct: Option<(Option,)>, } - let rows = Parquet::<_, Nulls>::new(vec![PathBuf::from( + let rows = Parquet::<_, Nulls>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -839,9 +836,9 @@ async fn parquet() { 8 ); - let rows = Parquet::<_, NullsDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NullsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -852,9 +849,9 @@ async fn parquet() { 8 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -877,9 +874,9 @@ async fn parquet() { #[amadeus(name = "phoneNumbers")] phone_numbers: Option<(List<(i64, Option)>,)>, } - let rows = Parquet::<_, Repeated>::new(vec![PathBuf::from( + let rows = Parquet::<_, Repeated>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -890,9 +887,9 @@ async fn parquet() { 6 ); - let rows = Parquet::<_, RepeatedDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, RepeatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -903,9 +900,9 @@ async fn parquet() { 6 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -930,9 +927,9 @@ async fn parquet() { d: bool, e: Option>, } - let rows = Parquet::<_, TestDatapage>::new(vec![PathBuf::from( + let rows = Parquet::<_, TestDatapage>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -943,9 +940,9 @@ async fn parquet() { 5 ); - let rows = Parquet::<_, TestDatapageDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, TestDatapageDerived>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -956,9 +953,9 @@ async fn parquet() { 5 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -1001,11 +998,10 @@ async fn parquet() { __index_level_0__: Option, } - let rows = Parquet::<_, CommitsDerived>::new(vec![PathBuf::from( - "amadeus-testing/parquet/commits.parquet", - )]) - .await - .unwrap(); + let rows = + Parquet::<_, CommitsDerived>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) + .await + .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) @@ -1014,11 +1010,9 @@ async fn parquet() { 14_444 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( - "amadeus-testing/parquet/commits.parquet", - )]) - .await - .unwrap(); + let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) + .await + .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { diff --git a/tests/parquet_dist.rs b/tests/parquet_dist.rs index 9a851255..5ef4a762 100644 --- a/tests/parquet_dist.rs +++ b/tests/parquet_dist.rs @@ -138,9 +138,9 @@ async fn run(pool: &P) -> Duration { valid: Option, __index_level_0__: Option, } - let rows = Parquet::<_, StockSimulatedDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, StockSimulatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -151,9 +151,9 @@ async fn run(pool: &P) -> Duration { 42_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -174,9 +174,9 @@ async fn run(pool: &P) -> Duration { __index_level_0__: Option, } - let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new(vec![PathBuf::from( + let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -187,9 +187,9 @@ async fn run(pool: &P) -> Duration { 42_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -207,9 +207,9 @@ async fn run(pool: &P) -> Duration { #[derive(Data, Clone, PartialEq, Debug)] struct StockSimulatedDerivedProjection2 {} - let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new(vec![PathBuf::from( + let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -220,9 +220,9 @@ async fn run(pool: &P) -> Duration { 42_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -260,11 +260,10 @@ async fn run(pool: &P) -> Duration { int96_field: DateTime, } - let rows = Parquet::<_, TenKayVeeTwo>::new(vec![PathBuf::from( - "amadeus-testing/parquet/10k-v2.parquet", - )]) - .await - .unwrap(); + let rows = + Parquet::<_, TenKayVeeTwo>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) + .await + .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) @@ -273,9 +272,9 @@ async fn run(pool: &P) -> Duration { 10_000 ); - let rows = Parquet::<_, TenKayVeeTwoDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, TenKayVeeTwoDerived>::new(PathBuf::from( "amadeus-testing/parquet/10k-v2.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -286,11 +285,9 @@ async fn run(pool: &P) -> Duration { 10_000 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( - "amadeus-testing/parquet/10k-v2.parquet", - )]) - .await - .unwrap(); + let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) + .await + .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { @@ -333,9 +330,9 @@ async fn run(pool: &P) -> Duration { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesDictionary>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesDictionary>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -346,9 +343,9 @@ async fn run(pool: &P) -> Duration { 2 ); - let rows = Parquet::<_, AlltypesDictionaryDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesDictionaryDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -359,9 +356,9 @@ async fn run(pool: &P) -> Duration { 2 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -406,9 +403,9 @@ async fn run(pool: &P) -> Duration { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesPlain>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlain>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -419,9 +416,9 @@ async fn run(pool: &P) -> Duration { 8 ); - let rows = Parquet::<_, AlltypesPlainDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlainDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -432,9 +429,9 @@ async fn run(pool: &P) -> Duration { 8 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -479,9 +476,9 @@ async fn run(pool: &P) -> Duration { timestamp_col: Option, } - let rows = Parquet::<_, AlltypesPlainSnappy>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlainSnappy>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -492,9 +489,9 @@ async fn run(pool: &P) -> Duration { 2 ); - let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -505,9 +502,9 @@ async fn run(pool: &P) -> Duration { 2 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -560,9 +557,9 @@ async fn run(pool: &P) -> Duration { a: Option>>>>>>, b: i32, } - let rows = Parquet::<_, NestedLists>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedLists>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -573,9 +570,9 @@ async fn run(pool: &P) -> Duration { 3 ); - let rows = Parquet::<_, NestedListsDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedListsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -586,9 +583,9 @@ async fn run(pool: &P) -> Duration { 3 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -615,9 +612,9 @@ async fn run(pool: &P) -> Duration { b: i32, c: f64, } - let rows = Parquet::<_, NestedMaps>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedMaps>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -628,9 +625,9 @@ async fn run(pool: &P) -> Duration { 6 ); - let rows = Parquet::<_, NestedMapsDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NestedMapsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -641,9 +638,9 @@ async fn run(pool: &P) -> Duration { 6 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -709,9 +706,9 @@ async fn run(pool: &P) -> Duration { f: String, } - let rows = Parquet::<_, Nonnullable>::new(vec![PathBuf::from( + let rows = Parquet::<_, Nonnullable>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -722,9 +719,9 @@ async fn run(pool: &P) -> Duration { 1 ); - let rows = Parquet::<_, NonnullableDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NonnullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -735,9 +732,9 @@ async fn run(pool: &P) -> Duration { 1 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -782,9 +779,9 @@ async fn run(pool: &P) -> Duration { Option>>,)>,)>>>, )>, } - let rows = Parquet::<_, Nullable>::new(vec![PathBuf::from( + let rows = Parquet::<_, Nullable>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -795,9 +792,9 @@ async fn run(pool: &P) -> Duration { 7 ); - let rows = Parquet::<_, NullableDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -808,9 +805,9 @@ async fn run(pool: &P) -> Duration { 7 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -831,9 +828,9 @@ async fn run(pool: &P) -> Duration { struct NullsDerived { b_struct: Option<(Option,)>, } - let rows = Parquet::<_, Nulls>::new(vec![PathBuf::from( + let rows = Parquet::<_, Nulls>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -844,9 +841,9 @@ async fn run(pool: &P) -> Duration { 8 ); - let rows = Parquet::<_, NullsDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, NullsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -857,9 +854,9 @@ async fn run(pool: &P) -> Duration { 8 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -882,9 +879,9 @@ async fn run(pool: &P) -> Duration { #[amadeus(name = "phoneNumbers")] phone_numbers: Option<(List<(i64, Option)>,)>, } - let rows = Parquet::<_, Repeated>::new(vec![PathBuf::from( + let rows = Parquet::<_, Repeated>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -895,9 +892,9 @@ async fn run(pool: &P) -> Duration { 6 ); - let rows = Parquet::<_, RepeatedDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, RepeatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -908,9 +905,9 @@ async fn run(pool: &P) -> Duration { 6 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -935,9 +932,9 @@ async fn run(pool: &P) -> Duration { d: bool, e: Option>, } - let rows = Parquet::<_, TestDatapage>::new(vec![PathBuf::from( + let rows = Parquet::<_, TestDatapage>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -948,9 +945,9 @@ async fn run(pool: &P) -> Duration { 5 ); - let rows = Parquet::<_, TestDatapageDerived>::new(vec![PathBuf::from( + let rows = Parquet::<_, TestDatapageDerived>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -961,9 +958,9 @@ async fn run(pool: &P) -> Duration { 5 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( + let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", - )]) + )) .await .unwrap(); assert_eq!( @@ -1006,11 +1003,10 @@ async fn run(pool: &P) -> Duration { __index_level_0__: Option, } - let rows = Parquet::<_, CommitsDerived>::new(vec![PathBuf::from( - "amadeus-testing/parquet/commits.parquet", - )]) - .await - .unwrap(); + let rows = + Parquet::<_, CommitsDerived>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) + .await + .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) @@ -1019,11 +1015,9 @@ async fn run(pool: &P) -> Duration { 14_444 ); - let rows = Parquet::<_, Value>::new(vec![PathBuf::from( - "amadeus-testing/parquet/commits.parquet", - )]) - .await - .unwrap(); + let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) + .await + .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { From 6672dd76471e54093ae6d1aee431b391a2e31860 Mon Sep 17 00:00:00 2001 From: alecmocatta Date: Thu, 30 Jul 2020 14:42:35 +0100 Subject: [PATCH 5/5] v0.3.7 --- Cargo.toml | 18 +++++++++--------- amadeus-aws/Cargo.toml | 6 +++--- amadeus-aws/src/lib.rs | 2 +- amadeus-commoncrawl/Cargo.toml | 6 +++--- amadeus-commoncrawl/src/lib.rs | 2 +- amadeus-core/Cargo.toml | 2 +- amadeus-core/src/lib.rs | 2 +- amadeus-derive/Cargo.toml | 2 +- amadeus-derive/src/lib.rs | 2 +- amadeus-parquet/Cargo.toml | 6 +++--- amadeus-parquet/src/lib.rs | 2 +- amadeus-postgres/Cargo.toml | 6 +++--- amadeus-postgres/src/lib.rs | 2 +- amadeus-serde/Cargo.toml | 6 +++--- amadeus-serde/src/lib.rs | 2 +- amadeus-types/Cargo.toml | 4 ++-- amadeus-types/src/lib.rs | 2 +- src/lib.rs | 2 +- 18 files changed, 37 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d0cd9b54..31107bb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ [package] name = "amadeus" -version = "0.3.6" +version = "0.3.7" license = "Apache-2.0" authors = ["Alec Mocatta "] categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"] @@ -36,14 +36,14 @@ bench = ["serde-csv", "once_cell", "arrow-parquet", "rayon"] features = ["constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"] [dependencies] -amadeus-core = { version = "=0.3.6", path = "amadeus-core" } -amadeus-derive = { version = "=0.3.6", path = "amadeus-derive" } -amadeus-types = { version = "=0.3.6", path = "amadeus-types" } -amadeus-aws = { version = "=0.3.6", path = "amadeus-aws", optional = true } -amadeus-commoncrawl = { version = "=0.3.6", path = "amadeus-commoncrawl", optional = true } -amadeus-parquet = { version = "=0.3.6", path = "amadeus-parquet", optional = true } -amadeus-postgres = { version = "=0.3.6", path = "amadeus-postgres", optional = true } -amadeus-serde = { version = "=0.3.6", path = "amadeus-serde", optional = true } +amadeus-core = { version = "=0.3.7", path = "amadeus-core" } +amadeus-derive = { version = "=0.3.7", path = "amadeus-derive" } +amadeus-types = { version = "=0.3.7", path = "amadeus-types" } +amadeus-aws = { version = "=0.3.7", path = "amadeus-aws", optional = true } +amadeus-commoncrawl = { version = "=0.3.7", path = "amadeus-commoncrawl", optional = true } +amadeus-parquet = { version = "=0.3.7", path = "amadeus-parquet", optional = true } +amadeus-postgres = { version = "=0.3.7", path = "amadeus-postgres", optional = true } +amadeus-serde = { version = "=0.3.7", path = "amadeus-serde", optional = true } async-channel = "1.1" bincode = { version = "1.3", optional = true } constellation-rs = { version = "0.2.0-alpha.2", default-features = false, optional = true } diff --git a/amadeus-aws/Cargo.toml b/amadeus-aws/Cargo.toml index 10c76fba..04eac276 100644 --- a/amadeus-aws/Cargo.toml +++ b/amadeus-aws/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "amadeus-aws" -version = "0.3.6" +version = "0.3.7" license = "Apache-2.0" authors = ["Alec Mocatta "] categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"] @@ -19,8 +19,8 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2 maintenance = { status = "actively-developed" } [dependencies] -amadeus-core = { version = "=0.3.6", path = "../amadeus-core" } -amadeus-types = { version = "=0.3.6", path = "../amadeus-types" } +amadeus-core = { version = "=0.3.7", path = "../amadeus-core" } +amadeus-types = { version = "=0.3.7", path = "../amadeus-types" } async-compression = { version = "0.3.3", features = ["gzip", "futures-bufread"] } async-trait = "0.1" chrono = { version = "0.4", default-features = false } diff --git a/amadeus-aws/src/lib.rs b/amadeus-aws/src/lib.rs index ff7b77d8..d3a022d2 100644 --- a/amadeus-aws/src/lib.rs +++ b/amadeus-aws/src/lib.rs @@ -6,7 +6,7 @@ //! //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html). -#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.3.6")] +#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.3.7")] #![cfg_attr(nightly, feature(type_alias_impl_trait))] #![warn( // missing_copy_implementations, diff --git a/amadeus-commoncrawl/Cargo.toml b/amadeus-commoncrawl/Cargo.toml index 27003c30..c25e2e73 100644 --- a/amadeus-commoncrawl/Cargo.toml +++ b/amadeus-commoncrawl/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "amadeus-commoncrawl" -version = "0.3.6" +version = "0.3.7" license = "MIT OR Apache-2.0" authors = ["Stephen Becker IV ", "Alec Mocatta "] categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"] @@ -19,8 +19,8 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2 maintenance = { status = "actively-developed" } [dependencies] -amadeus-core = { version = "=0.3.6", path = "../amadeus-core" } -amadeus-types = { version = "=0.3.6", path = "../amadeus-types" } +amadeus-core = { version = "=0.3.7", path = "../amadeus-core" } +amadeus-types = { version = "=0.3.7", path = "../amadeus-types" } async-compression = { version = "0.3.3", features = ["gzip", "futures-bufread"] } futures = "0.3" nom = "4.2.3" diff --git a/amadeus-commoncrawl/src/lib.rs b/amadeus-commoncrawl/src/lib.rs index 9a081314..43a91b2b 100644 --- a/amadeus-commoncrawl/src/lib.rs +++ b/amadeus-commoncrawl/src/lib.rs @@ -6,7 +6,7 @@ //! //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html). -#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.3.6")] +#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.3.7")] #![cfg_attr(nightly, feature(type_alias_impl_trait))] #![warn( // missing_copy_implementations, diff --git a/amadeus-core/Cargo.toml b/amadeus-core/Cargo.toml index cc952ce4..ceeb87c4 100644 --- a/amadeus-core/Cargo.toml +++ b/amadeus-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "amadeus-core" -version = "0.3.6" +version = "0.3.7" license = "Apache-2.0" authors = ["Alec Mocatta "] categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"] diff --git a/amadeus-core/src/lib.rs b/amadeus-core/src/lib.rs index 23f47af9..b80ee168 100644 --- a/amadeus-core/src/lib.rs +++ b/amadeus-core/src/lib.rs @@ -6,7 +6,7 @@ //! //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. All functionality is re-exposed in [`amadeus`](https://docs.rs/amadeus/0.3/amadeus/). -#![doc(html_root_url = "https://docs.rs/amadeus-core/0.3.6")] +#![doc(html_root_url = "https://docs.rs/amadeus-core/0.3.7")] #![cfg_attr(nightly, feature(unboxed_closures))] #![recursion_limit = "25600"] #![warn( diff --git a/amadeus-derive/Cargo.toml b/amadeus-derive/Cargo.toml index cd11d2dd..4ff7e69f 100644 --- a/amadeus-derive/Cargo.toml +++ b/amadeus-derive/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "amadeus-derive" -version = "0.3.6" +version = "0.3.7" license = "Apache-2.0" authors = ["Alec Mocatta "] categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"] diff --git a/amadeus-derive/src/lib.rs b/amadeus-derive/src/lib.rs index 03a73ab3..12a1c90c 100644 --- a/amadeus-derive/src/lib.rs +++ b/amadeus-derive/src/lib.rs @@ -6,7 +6,7 @@ //! //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. This macro is re-exposed as [`amadeus::data::Data`](https://docs.rs/amadeus/0.3/amadeus/data/derive.Data.html). -#![doc(html_root_url = "https://docs.rs/amadeus-derive/0.3.6")] +#![doc(html_root_url = "https://docs.rs/amadeus-derive/0.3.7")] #![recursion_limit = "400"] #![warn( missing_copy_implementations, diff --git a/amadeus-parquet/Cargo.toml b/amadeus-parquet/Cargo.toml index 109b1c19..02d61dd7 100644 --- a/amadeus-parquet/Cargo.toml +++ b/amadeus-parquet/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "amadeus-parquet" -version = "0.3.6" +version = "0.3.7" license = "Apache-2.0" authors = ["Alec Mocatta ", "Apache Arrow "] categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"] @@ -19,8 +19,8 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2 maintenance = { status = "actively-developed" } [dependencies] -amadeus-core = { version = "=0.3.6", path = "../amadeus-core" } -amadeus-types = { version = "=0.3.6", path = "../amadeus-types" } +amadeus-core = { version = "=0.3.7", path = "../amadeus-core" } +amadeus-types = { version = "=0.3.7", path = "../amadeus-types" } async-trait = "0.1" brotli = "3.3" byteorder = "1.2" diff --git a/amadeus-parquet/src/lib.rs b/amadeus-parquet/src/lib.rs index 87e23d4e..a18e7d6c 100644 --- a/amadeus-parquet/src/lib.rs +++ b/amadeus-parquet/src/lib.rs @@ -6,7 +6,7 @@ //! //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html). -#![doc(html_root_url = "https://docs.rs/amadeus-parquet/0.3.6")] +#![doc(html_root_url = "https://docs.rs/amadeus-parquet/0.3.7")] #![cfg_attr(nightly, feature(bufreader_seek_relative))] #![cfg_attr(nightly, feature(read_initializer))] #![cfg_attr(nightly, feature(specialization))] diff --git a/amadeus-postgres/Cargo.toml b/amadeus-postgres/Cargo.toml index 91fc70fd..d9a864c9 100644 --- a/amadeus-postgres/Cargo.toml +++ b/amadeus-postgres/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "amadeus-postgres" -version = "0.3.6" +version = "0.3.7" license = "Apache-2.0" authors = ["Alec Mocatta "] categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"] @@ -19,8 +19,8 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2 maintenance = { status = "actively-developed" } [dependencies] -amadeus-core = { version = "=0.3.6", path = "../amadeus-core" } -amadeus-types = { version = "=0.3.6", path = "../amadeus-types" } +amadeus-core = { version = "=0.3.7", path = "../amadeus-core" } +amadeus-types = { version = "=0.3.7", path = "../amadeus-types" } bytes = "0.5" chrono = { version = "0.4", default-features = false } educe = "0.4" diff --git a/amadeus-postgres/src/lib.rs b/amadeus-postgres/src/lib.rs index 04b49bf4..7fc72f6d 100644 --- a/amadeus-postgres/src/lib.rs +++ b/amadeus-postgres/src/lib.rs @@ -6,7 +6,7 @@ //! //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html). -#![doc(html_root_url = "https://docs.rs/amadeus-postgres/0.3.6")] +#![doc(html_root_url = "https://docs.rs/amadeus-postgres/0.3.7")] #![cfg_attr(nightly, feature(type_alias_impl_trait))] #![warn( // missing_copy_implementations, diff --git a/amadeus-serde/Cargo.toml b/amadeus-serde/Cargo.toml index 2293c616..9df53106 100644 --- a/amadeus-serde/Cargo.toml +++ b/amadeus-serde/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "amadeus-serde" -version = "0.3.6" +version = "0.3.7" license = "Apache-2.0" authors = ["Alec Mocatta "] categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"] @@ -19,8 +19,8 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2 maintenance = { status = "actively-developed" } [dependencies] -amadeus-core = { version = "=0.3.6", path = "../amadeus-core" } -amadeus-types = { version = "=0.3.6", path = "../amadeus-types" } +amadeus-core = { version = "=0.3.7", path = "../amadeus-core" } +amadeus-types = { version = "=0.3.7", path = "../amadeus-types" } chrono = { version = "0.4", default-features = false, features = ["serde"] } csv = "1.0" educe = "0.4" diff --git a/amadeus-serde/src/lib.rs b/amadeus-serde/src/lib.rs index ada46c01..aaa73bd6 100644 --- a/amadeus-serde/src/lib.rs +++ b/amadeus-serde/src/lib.rs @@ -6,7 +6,7 @@ //! //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html). -#![doc(html_root_url = "https://docs.rs/amadeus-serde/0.3.6")] +#![doc(html_root_url = "https://docs.rs/amadeus-serde/0.3.7")] #![cfg_attr(nightly, feature(type_alias_impl_trait))] #![warn( // missing_copy_implementations, diff --git a/amadeus-types/Cargo.toml b/amadeus-types/Cargo.toml index 89ea1d75..5a384502 100644 --- a/amadeus-types/Cargo.toml +++ b/amadeus-types/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "amadeus-types" -version = "0.3.6" +version = "0.3.7" license = "Apache-2.0" authors = ["Alec Mocatta "] categories = ["concurrency", "science", "database", "date-and-time", "data-structures"] @@ -19,7 +19,7 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2 maintenance = { status = "actively-developed" } [dependencies] -amadeus-core = { version = "=0.3.6", path = "../amadeus-core" } +amadeus-core = { version = "=0.3.7", path = "../amadeus-core" } chrono = { version = "0.4", default-features = false, features = ["std", "serde"] } chrono-tz = { version = "0.5", features = ["serde"] } fxhash = "0.2" diff --git a/amadeus-types/src/lib.rs b/amadeus-types/src/lib.rs index 94f6e688..d6c7b711 100644 --- a/amadeus-types/src/lib.rs +++ b/amadeus-types/src/lib.rs @@ -6,7 +6,7 @@ //! //! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::data`](https://docs.rs/amadeus/0.3/amadeus/data/index.html). -#![doc(html_root_url = "https://docs.rs/amadeus-types/0.3.6")] +#![doc(html_root_url = "https://docs.rs/amadeus-types/0.3.7")] #![warn( // missing_copy_implementations, // missing_debug_implementations, diff --git a/src/lib.rs b/src/lib.rs index 1a08e0c6..140628f4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,7 +4,7 @@ //! 📦  Crates.io  â”‚  ðŸ“‘  GitHub  â”‚  ðŸ’¬  Chat //!

-#![doc(html_root_url = "https://docs.rs/amadeus/0.3.6")] +#![doc(html_root_url = "https://docs.rs/amadeus/0.3.7")] #![doc( html_logo_url = "https://raw.githubusercontent.com/constellation-rs/amadeus/master/logo.svg?sanitize=true" )]