Skip to content

Commit 1e242f6

Browse files
Introduce traits for collection containers (#650)
* Introduce traits for collection containers * Iterative example Signed-off-by: Moritz Hoffmann <[email protected]> * Fix copilot hallucination Signed-off-by: Moritz Hoffmann <[email protected]> * Tidy names, correct doctest, update CI --------- Signed-off-by: Moritz Hoffmann <[email protected]> Co-authored-by: Moritz Hoffmann <[email protected]>
1 parent a27bc5a commit 1e242f6

File tree

6 files changed

+233
-146
lines changed

6 files changed

+233
-146
lines changed

.github/workflows/test.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ jobs:
2525
toolchain: ${{ matrix.toolchain }}
2626
- name: Cargo test
2727
run: cargo test --workspace --all-targets
28+
- name: Cargo doc test
29+
run: cargo test --doc
2830

2931
# Check formatting with rustfmt
3032
mdbook:
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
//! Show an iterative scope example that uses a wrapper type around a container
2+
3+
use timely::dataflow::channels::pact::Pipeline;
4+
use timely::dataflow::operators::Operator;
5+
use timely::order::Product;
6+
use timely::dataflow::{Scope, StreamCore};
7+
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
8+
use differential_dataflow::{AsCollection, Collection};
9+
use differential_dataflow::input::Input;
10+
use differential_dataflow::operators::iterate::Variable;
11+
use differential_dataflow::collection::containers::{Enter, Leave, Negate, ResultsIn};
12+
13+
/// A wrapper around a container that implements the necessary traits to be used in iterative scopes.
14+
#[derive(Clone, Default)]
15+
struct ContainerWrapper<C>(C);
16+
17+
impl<C: timely::container::Accountable> timely::container::Accountable for ContainerWrapper<C> {
18+
#[inline(always)] fn record_count(&self) -> i64 { self.0.record_count() }
19+
#[inline(always)] fn is_empty(&self) -> bool { self.0.is_empty() }
20+
}
21+
impl<C: Enter<T1, T2>, T1, T2> Enter<T1, T2> for ContainerWrapper<C> {
22+
type InnerContainer = ContainerWrapper<C::InnerContainer>;
23+
#[inline(always)] fn enter(self) -> Self::InnerContainer { ContainerWrapper(self.0.enter()) }
24+
}
25+
impl<C: Leave<T1, T2>, T1, T2> Leave<T1, T2> for ContainerWrapper<C> {
26+
type OuterContainer = ContainerWrapper<C::OuterContainer>;
27+
#[inline(always)] fn leave(self) -> Self::OuterContainer { ContainerWrapper(self.0.leave()) }
28+
}
29+
impl<C: Negate> Negate for ContainerWrapper<C> {
30+
#[inline(always)] fn negate(self) -> Self { ContainerWrapper(self.0.negate()) }
31+
}
32+
impl<C: ResultsIn<TS>, TS> ResultsIn<TS> for ContainerWrapper<C> {
33+
#[inline(always)] fn results_in(self, step: &TS) -> Self { ContainerWrapper(self.0.results_in(step)) }
34+
}
35+
36+
fn wrap<G: Scope, C: timely::Container>(stream: &StreamCore<G, C>) -> StreamCore<G, ContainerWrapper<C>> {
37+
let mut builder = OperatorBuilder::new("Wrap".to_string(), stream.scope());
38+
let (mut output, stream_out) = builder.new_output();
39+
let mut input = builder.new_input(stream, Pipeline);
40+
builder.build(move |_capability| move |_frontier| {
41+
let mut output = output.activate();
42+
input.for_each(|time, data| {
43+
let mut session = output.session(&time);
44+
session.give_container(&mut ContainerWrapper(std::mem::take(data)));
45+
});
46+
});
47+
stream_out
48+
}
49+
50+
51+
fn main() {
52+
timely::example(|scope| {
53+
54+
let numbers = scope.new_collection_from(1 .. 10u32).1;
55+
let numbers: Collection<_, u32, isize, _> = wrap(&numbers.inner).as_collection();
56+
57+
scope.iterative::<u64,_,_>(|nested| {
58+
let summary = Product::new(Default::default(), 1);
59+
let variable = Variable::new_from(numbers.enter(nested), summary);
60+
let mapped: Collection<_, u32, isize, _> = variable.inner.unary(Pipeline, "Map", |_,_| {
61+
|input, output| {
62+
input.for_each(|time, data| {
63+
let mut session = output.session(&time);
64+
for (x, _t, _d) in data.0.iter_mut() {
65+
*x = if *x % 2 == 0 { *x/2 } else { *x };
66+
}
67+
session.give_container(data);
68+
});
69+
}
70+
}).as_collection();
71+
let result = mapped.inner.unary(Pipeline, "Unwrap", |_,_| {
72+
|input, output| {
73+
input.for_each(|time, data| {
74+
let mut session = output.session(&time);
75+
session.give_container(&mut data.0);
76+
});
77+
}
78+
}).as_collection().consolidate();
79+
let result = wrap(&result.inner).as_collection();
80+
variable.set(&result)
81+
.leave()
82+
});
83+
})
84+
}

differential-dataflow/src/collection.rs

Lines changed: 140 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -204,13 +204,79 @@ impl<G: Scope, D, R, C: Container> Collection<G, D, R, C> {
204204
/// .assert_eq(&evens);
205205
/// });
206206
/// ```
207-
// TODO: Removing this function is possible, but breaks existing callers of `negate` who expect
208-
// an inherent method on `Collection`.
209-
pub fn negate(&self) -> Collection<G, D, R, C>
210-
where
211-
StreamCore<G, C>: crate::operators::Negate<G, C>
207+
pub fn negate(&self) -> Collection<G, D, R, C> where C: containers::Negate {
208+
use timely::dataflow::channels::pact::Pipeline;
209+
self.inner
210+
.unary(Pipeline, "Negate", move |_,_| move |input, output| {
211+
input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).negate()));
212+
})
213+
.as_collection()
214+
}
215+
216+
/// Brings a Collection into a nested scope.
217+
///
218+
/// # Examples
219+
///
220+
/// ```
221+
/// use timely::dataflow::Scope;
222+
/// use differential_dataflow::input::Input;
223+
///
224+
/// ::timely::example(|scope| {
225+
///
226+
/// let data = scope.new_collection_from(1 .. 10).1;
227+
///
228+
/// let result = scope.region(|child| {
229+
/// data.enter(child)
230+
/// .leave()
231+
/// });
232+
///
233+
/// data.assert_eq(&result);
234+
/// });
235+
/// ```
236+
pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R, <C as containers::Enter<<G as ScopeParent>::Timestamp, T>>::InnerContainer>
237+
where
238+
C: containers::Enter<<G as ScopeParent>::Timestamp, T, InnerContainer: Container>,
239+
T: Refines<<G as ScopeParent>::Timestamp>,
240+
{
241+
use timely::dataflow::channels::pact::Pipeline;
242+
self.inner
243+
.enter(child)
244+
.unary(Pipeline, "Enter", move |_,_| move |input, output| {
245+
input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).enter()));
246+
})
247+
.as_collection()
248+
}
249+
250+
/// Advances a timestamp in the stream according to the timestamp actions on the path.
251+
///
252+
/// The path may advance the timestamp sufficiently that it is no longer valid, for example if
253+
/// incrementing fields would result in integer overflow. In this case, the record is dropped.
254+
///
255+
/// # Examples
256+
/// ```
257+
/// use timely::dataflow::Scope;
258+
/// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen};
259+
///
260+
/// use differential_dataflow::input::Input;
261+
///
262+
/// timely::example(|scope| {
263+
/// let summary1 = 5;
264+
///
265+
/// let data = scope.new_collection_from(1 .. 10).1;
266+
/// /// Applies `results_in` on every timestamp in the collection.
267+
/// data.results_in(summary1);
268+
/// });
269+
/// ```
270+
pub fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self
271+
where
272+
C: containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
212273
{
213-
crate::operators::Negate::negate(&self.inner).as_collection()
274+
use timely::dataflow::channels::pact::Pipeline;
275+
self.inner
276+
.unary(Pipeline, "ResultsIn", move |_,_| move |input, output| {
277+
input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).results_in(&step)));
278+
})
279+
.as_collection()
214280
}
215281
}
216282

@@ -379,36 +445,6 @@ impl<G: Scope, D: Clone+'static, R: Clone+'static> Collection<G, D, R> {
379445
.as_collection()
380446
}
381447

382-
/// Brings a Collection into a nested scope.
383-
///
384-
/// # Examples
385-
///
386-
/// ```
387-
/// use timely::dataflow::Scope;
388-
/// use differential_dataflow::input::Input;
389-
///
390-
/// ::timely::example(|scope| {
391-
///
392-
/// let data = scope.new_collection_from(1 .. 10).1;
393-
///
394-
/// let result = scope.region(|child| {
395-
/// data.enter(child)
396-
/// .leave()
397-
/// });
398-
///
399-
/// data.assert_eq(&result);
400-
/// });
401-
/// ```
402-
pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R>
403-
where
404-
T: Refines<<G as ScopeParent>::Timestamp>,
405-
{
406-
self.inner
407-
.enter(child)
408-
.map(|(data, time, diff)| (data, T::to_inner(time), diff))
409-
.as_collection()
410-
}
411-
412448
/// Brings a Collection into a nested scope, at varying times.
413449
///
414450
/// The `initial` function indicates the time at which each element of the Collection should appear.
@@ -558,8 +594,9 @@ use timely::dataflow::scopes::ScopeParent;
558594
use timely::progress::timestamp::Refines;
559595

560596
/// Methods requiring a nested scope.
561-
impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static> Collection<Child<'a, G, T>, D, R>
597+
impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static, C: Container> Collection<Child<'a, G, T>, D, R, C>
562598
where
599+
C: containers::Leave<T, G::Timestamp, OuterContainer: Container>,
563600
T: Refines<<G as ScopeParent>::Timestamp>,
564601
{
565602
/// Returns the final value of a Collection from a nested scope to its containing scope.
@@ -582,10 +619,13 @@ where
582619
/// data.assert_eq(&result);
583620
/// });
584621
/// ```
585-
pub fn leave(&self) -> Collection<G, D, R> {
622+
pub fn leave(&self) -> Collection<G, D, R, <C as containers::Leave<T, G::Timestamp>>::OuterContainer> {
623+
use timely::dataflow::channels::pact::Pipeline;
586624
self.inner
587625
.leave()
588-
.map(|(data, time, diff)| (data, time.to_outer(), diff))
626+
.unary(Pipeline, "Leave", move |_,_| move |input, output| {
627+
input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).leave()));
628+
})
589629
.as_collection()
590630
}
591631
}
@@ -690,3 +730,64 @@ where
690730
.concatenate(iterator.into_iter().map(|x| x.inner))
691731
.as_collection()
692732
}
733+
734+
/// Traits that can be implemented by containers to provide functionality to collections based on them.
735+
pub mod containers {
736+
737+
use timely::progress::{Timestamp, timestamp::Refines};
738+
use crate::collection::Abelian;
739+
740+
/// A container that can negate its updates.
741+
pub trait Negate {
742+
/// Negates Abelian differences of each update.
743+
fn negate(self) -> Self;
744+
}
745+
impl<D, T, R: Abelian> Negate for Vec<(D, T, R)> {
746+
fn negate(mut self) -> Self {
747+
for (_data, _time, diff) in self.iter_mut() {
748+
diff.negate();
749+
}
750+
self
751+
}
752+
}
753+
754+
/// A container that can enter from timestamp `T1` into timestamp `T2`.
755+
pub trait Enter<T1, T2> {
756+
/// The resulting container type.
757+
type InnerContainer;
758+
/// Update timestamps from `T1` to `T2`.
759+
fn enter(self) -> Self::InnerContainer;
760+
}
761+
impl<D, T1: Timestamp, T2: Refines<T1>, R> Enter<T1, T2> for Vec<(D, T1, R)> {
762+
type InnerContainer = Vec<(D, T2, R)>;
763+
fn enter(self) -> Self::InnerContainer {
764+
self.into_iter().map(|(d,t1,r)| (d,T2::to_inner(t1),r)).collect()
765+
}
766+
}
767+
768+
/// A container that can leave from timestamp `T1` into timestamp `T2`.
769+
pub trait Leave<T1, T2> {
770+
/// The resulting container type.
771+
type OuterContainer;
772+
/// Update timestamps from `T1` to `T2`.
773+
fn leave(self) -> Self::OuterContainer;
774+
}
775+
impl<D, T1: Refines<T2>, T2: Timestamp, R> Leave<T1, T2> for Vec<(D, T1, R)> {
776+
type OuterContainer = Vec<(D, T2, R)>;
777+
fn leave(self) -> Self::OuterContainer {
778+
self.into_iter().map(|(d,t1,r)| (d,t1.to_outer(),r)).collect()
779+
}
780+
}
781+
782+
/// A container that can advance timestamps by a summary `TS`.
783+
pub trait ResultsIn<TS> {
784+
/// Advance times in the container by `step`.
785+
fn results_in(self, step: &TS) -> Self;
786+
}
787+
impl<D, T: Timestamp, R> ResultsIn<T::Summary> for Vec<(D, T, R)> {
788+
fn results_in(self, step: &T::Summary) -> Self {
789+
use timely::progress::PathSummary;
790+
self.into_iter().filter_map(move |(d,t,r)| step.results_in(&t).map(|t| (d,t,r))).collect()
791+
}
792+
}
793+
}

0 commit comments

Comments
 (0)