5959//! use differential_dataflow::operators::arrange::upsert;
6060//!
6161//! let stream = scope.input_from(&mut input);
62- //! let arranged = upsert::arrange_from_upsert::<_, _, _, ValBuilder<Key, Val, _, _>, ValSpine<Key, Val, _, _>>(&stream, &"test");
62+ //! let arranged = upsert::arrange_from_upsert::<_, ValBuilder<Key, Val, _, _>, ValSpine<Key, Val, _, _>>(&stream, &"test");
6363//!
6464//! arranged
6565//! .as_collection(|k,v| (k.clone(), v.clone()))
@@ -111,7 +111,9 @@ use timely::dataflow::operators::Capability;
111111use crate :: operators:: arrange:: arrangement:: Arranged ;
112112use crate :: trace:: { Builder , Description } ;
113113use crate :: trace:: { self , Trace , TraceReader , Cursor } ;
114- use crate :: { ExchangeData , Hashable , IntoOwned } ;
114+ use crate :: { ExchangeData , Hashable } ;
115+
116+ use crate :: trace:: implementations:: containers:: BatchContainer ;
115117
116118use super :: TraceAgent ;
117119
@@ -125,21 +127,19 @@ use super::TraceAgent;
125127/// This method is only implemented for totally ordered times, as we do not yet
126128/// understand what a "sequence" of upserts would mean for partially ordered
127129/// timestamps.
128- pub fn arrange_from_upsert < G , K , V , Bu , Tr > (
129- stream : & Stream < G , ( K , Option < V > , G :: Timestamp ) > ,
130+ pub fn arrange_from_upsert < G , Bu , Tr > (
131+ stream : & Stream < G , ( Tr :: KeyOwn , Option < Tr :: ValOwn > , G :: Timestamp ) > ,
130132 name : & str ,
131133) -> Arranged < G , TraceAgent < Tr > >
132134where
133135 G : Scope < Timestamp =Tr :: Time > ,
134- Tr : Trace + for < ' a > TraceReader <
135- Key < ' a > : IntoOwned < ' a , Owned = K > ,
136- Val < ' a > : IntoOwned < ' a , Owned = V > ,
136+ Tr : for < ' a > Trace <
137+ KeyOwn : ExchangeData + Hashable +std :: hash :: Hash ,
138+ ValOwn : ExchangeData ,
137139 Time : TotalOrder +ExchangeData ,
138140 Diff =isize ,
139141 > +' static ,
140- K : ExchangeData +Hashable +std:: hash:: Hash ,
141- V : ExchangeData ,
142- Bu : Builder < Time =G :: Timestamp , Input = Vec < ( ( K , V ) , Tr :: Time , Tr :: Diff ) > , Output = Tr :: Batch > ,
142+ Bu : Builder < Time =G :: Timestamp , Input = Vec < ( ( Tr :: KeyOwn , Tr :: ValOwn ) , Tr :: Time , Tr :: Diff ) > , Output = Tr :: Batch > ,
143143{
144144 let mut reader: Option < TraceAgent < Tr > > = None ;
145145
@@ -148,7 +148,7 @@ where
148148
149149 let reader = & mut reader;
150150
151- let exchange = Exchange :: new ( move |update : & ( K , Option < V > , G :: Timestamp ) | ( update. 0 ) . hashed ( ) . into ( ) ) ;
151+ let exchange = Exchange :: new ( move |update : & ( Tr :: KeyOwn , Option < Tr :: ValOwn > , G :: Timestamp ) | ( update. 0 ) . hashed ( ) . into ( ) ) ;
152152
153153 stream. unary_frontier ( exchange, name, move |_capability, info| {
154154
@@ -173,7 +173,7 @@ where
173173 let mut prev_frontier = Antichain :: from_elem ( <G :: Timestamp as Timestamp >:: minimum ( ) ) ;
174174
175175 // For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap).
176- let mut priority_queue = BinaryHeap :: < std:: cmp:: Reverse < ( G :: Timestamp , K , Option < V > ) > > :: new ( ) ;
176+ let mut priority_queue = BinaryHeap :: < std:: cmp:: Reverse < ( G :: Timestamp , Tr :: KeyOwn , Option < Tr :: ValOwn > ) > > :: new ( ) ;
177177 let mut updates = Vec :: new ( ) ;
178178
179179 move |input, output| {
@@ -236,19 +236,19 @@ where
236236 for ( key, mut list) in to_process {
237237
238238 // The prior value associated with the key.
239- let mut prev_value: Option < V > = None ;
239+ let mut prev_value: Option < Tr :: ValOwn > = None ;
240240
241241 // Attempt to find the key in the trace.
242- trace_cursor. seek_key ( & trace_storage, IntoOwned :: borrow_as ( & key) ) ;
243- if trace_cursor. get_key ( & trace_storage) . map ( |k| k. eq ( & IntoOwned :: borrow_as ( & key) ) ) . unwrap_or ( false ) {
242+ trace_cursor. seek_key ( & trace_storage, Tr :: KeyContainer :: borrow_as ( & key) ) ;
243+ if trace_cursor. get_key ( & trace_storage) . map ( |k| k. eq ( & Tr :: KeyContainer :: borrow_as ( & key) ) ) . unwrap_or ( false ) {
244244 // Determine the prior value associated with the key.
245245 while let Some ( val) = trace_cursor. get_val ( & trace_storage) {
246246 let mut count = 0 ;
247247 trace_cursor. map_times ( & trace_storage, |_time, diff| count += Tr :: owned_diff ( diff) ) ;
248248 assert ! ( count == 0 || count == 1 ) ;
249249 if count == 1 {
250250 assert ! ( prev_value. is_none( ) ) ;
251- prev_value = Some ( val . into_owned ( ) ) ;
251+ prev_value = Some ( Tr :: owned_val ( val ) ) ;
252252 }
253253 trace_cursor. step_val ( & trace_storage) ;
254254 }
0 commit comments