diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index a63d83374..de2c06c88 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -407,7 +407,7 @@ impl CapabilitySet { /// let mut cap = CapabilitySet::from_elem(default_cap); /// let mut vector = Vec::new(); /// move |input, output| { - /// cap.downgrade(&input.frontier().frontier()); + /// cap.downgrade(input.frontier().frontier()); /// while let Some((time, data)) = input.next() { /// data.swap(&mut vector); /// } diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index 6c8dd8ea9..d49d13373 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -1,4 +1,4 @@ -use crate::progress::frontier::{AntichainRef, MutableAntichain}; +use crate::progress::frontier::{Antichain, MutableAntichain}; use crate::progress::Timestamp; use crate::dataflow::operators::Capability; use crate::logging::TimelyLogger as Logger; @@ -40,7 +40,7 @@ impl<'a, T: Timestamp> Notificator<'a, T> { } /// Reveals the elements in the frontier of the indicated input. - pub fn frontier(&self, input: usize) -> AntichainRef { + pub fn frontier(&self, input: usize) -> &Antichain { self.frontiers[input].frontier() } diff --git a/timely/src/dataflow/operators/inspect.rs b/timely/src/dataflow/operators/inspect.rs index 6e26856d1..8942737c8 100644 --- a/timely/src/dataflow/operators/inspect.rs +++ b/timely/src/dataflow/operators/inspect.rs @@ -140,7 +140,7 @@ impl InspectCore for StreamCore { let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum()); let mut vector = Default::default(); self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |input, output| { - if input.frontier.frontier() != frontier.borrow() { + if input.frontier.frontier() != &frontier { frontier.clear(); frontier.extend(input.frontier.frontier().iter().cloned()); func(Err(frontier.elements())); diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/probe.rs index 7c5a8567e..c64d5c1a3 100644 --- a/timely/src/dataflow/operators/probe.rs +++ b/timely/src/dataflow/operators/probe.rs @@ -167,7 +167,7 @@ impl Handle { /// ``` #[inline] pub fn with_frontier)->R>(&self, mut function: F) -> R { - function(self.frontier.borrow().frontier()) + function(self.frontier.borrow().frontier().borrow()) } } diff --git a/timely/src/progress/frontier.rs b/timely/src/progress/frontier.rs index ea52ba63e..a82113043 100644 --- a/timely/src/progress/frontier.rs +++ b/timely/src/progress/frontier.rs @@ -293,6 +293,14 @@ impl ::std::iter::IntoIterator for Antichain { } } +impl<'a, T> ::std::iter::IntoIterator for &'a Antichain { + type Item = &'a T; + type IntoIter = ::std::slice::Iter<'a, T>; + fn into_iter(self) -> Self::IntoIter { + self.elements.iter() + } +} + /// An antichain based on a multiset whose elements frequencies can be updated. /// /// The `MutableAntichain` maintains frequencies for many elements of type `T`, and exposes the set @@ -316,7 +324,7 @@ impl ::std::iter::IntoIterator for Antichain { pub struct MutableAntichain { dirty: usize, updates: Vec<(T, i64)>, - frontier: Vec, + frontier: Antichain, changes: ChangeBatch, } @@ -336,7 +344,7 @@ impl MutableAntichain { MutableAntichain { dirty: 0, updates: Vec::new(), - frontier: Vec::new(), + frontier: Antichain::new(), changes: ChangeBatch::new(), } } @@ -380,9 +388,9 @@ impl MutableAntichain { /// assert!(frontier.frontier().len() == 0); ///``` #[inline] - pub fn frontier(&self) -> AntichainRef<'_, T> { + pub fn frontier(&self) -> &Antichain { debug_assert_eq!(self.dirty, 0); - AntichainRef::new(&self.frontier) + &self.frontier } /// Creates a new singleton `MutableAntichain`. @@ -390,10 +398,10 @@ impl MutableAntichain { /// # Examples /// ///``` - /// use timely::progress::frontier::{AntichainRef, MutableAntichain}; + /// use timely::progress::frontier::{Antichain, MutableAntichain}; /// /// let mut frontier = MutableAntichain::new_bottom(0u64); - /// assert!(frontier.frontier() == AntichainRef::new(&[0u64])); + /// assert!(frontier.frontier() == &Antichain::from_elem(0u64)); ///``` #[inline] pub fn new_bottom(bottom: T) -> MutableAntichain @@ -403,7 +411,7 @@ impl MutableAntichain { MutableAntichain { dirty: 0, updates: vec![(bottom.clone(), 1)], - frontier: vec![bottom], + frontier: Antichain::from_elem(bottom), changes: ChangeBatch::new(), } } @@ -483,7 +491,7 @@ impl MutableAntichain { /// # Examples /// ///``` - /// use timely::progress::frontier::{AntichainRef, MutableAntichain}; + /// use timely::progress::frontier::{Antichain, MutableAntichain}; /// /// let mut frontier = MutableAntichain::new_bottom(1u64); /// let changes = @@ -491,7 +499,7 @@ impl MutableAntichain { /// .update_iter(vec![(1, -1), (2, 7)]) /// .collect::>(); /// - /// assert!(frontier.frontier() == AntichainRef::new(&[2])); + /// assert!(frontier.frontier() == &Antichain::from_elem(2)); /// assert!(changes == vec![(1, -1), (2, 1)]); ///``` #[inline] @@ -557,16 +565,14 @@ impl MutableAntichain { self.updates.retain(|x| x.1 != 0); } - for time in self.frontier.drain(..) { + for time in self.frontier.elements.drain(..) { self.changes.update(time, -1); } // build new frontier using strictly positive times. // as the times are sorted, we don't need to worry that we might displace frontier elements. for time in self.updates.iter().filter(|x| x.1 > 0) { - if !self.frontier.iter().any(|f| f.less_equal(&time.0)) { - self.frontier.push(time.0.clone()); - } + self.frontier.insert(time.0.clone()); } for time in self.frontier.iter() {