1+ use timely:: dataflow:: operators:: probe:: Handle ;
2+
3+ use differential_dataflow:: {
4+ Collection ,
5+ input:: InputSession ,
6+ operators:: { Join , Iterate , Reduce } ,
7+ } ;
8+
9+ // We want to represent collections of ast nodes, perhaps as `(Id, (Op, Vec<Id>))`.
10+ // Here the first `Id` is a unique identifier for the node, and the `Vec<Id>` are
11+ // identifiers for the children. We'll want to be able to mutate the child identifiers.
12+
13+ // Perhaps we represent this as collections
14+ // 1. Ops: (Id, Op),
15+ // 2. Args: (Id, usize, Id),
16+ // Congruence amounts to
17+
18+ fn main ( ) {
19+ // Define a timely dataflow computation
20+ timely:: execute_from_args ( std:: env:: args ( ) , move |worker| {
21+
22+ // Create an input collection of data
23+ let mut input = InputSession :: new ( ) ;
24+ let mut equivs = InputSession :: new ( ) ;
25+
26+ // Probe to determine progress / completion.
27+ let mut probe = Handle :: new ( ) ;
28+
29+ // Define a new computation
30+ worker. dataflow ( |scope| {
31+
32+ // Create a new collection from our input
33+ // Our AST nodes will be modeled as a collection of `(Id, (Op, Vec<Id>))` records.
34+ // Each entry is an AST node, with an identifier, an operator, and a list of child identifiers.
35+ let ast_nodes: Collection < _ , ( usize , ( String , Vec < usize > ) ) > = input. to_collection ( scope) ;
36+
37+ // Exogenous equivalences, mapping identifiers to (lesser) canonical identifiers.
38+ // This map is not necessarily transitively closed, nor complete for all identifiers.
39+ let equivs = equivs. to_collection ( scope) ;
40+
41+ // Iteratively develop a map from `Id` to `Id` that canonicalizes identifiers.
42+ // This involves both exogenous equivalences, and those from equivalent AST nodes.
43+ ast_nodes
44+ . map ( |( id, _) | ( id, id) )
45+ . iterate ( |canonical| {
46+
47+ // Collection is loop invariant, but must be brought in scope.
48+ let ast_nodes = ast_nodes. enter ( & canonical. scope ( ) ) ;
49+ let equivs = equivs. enter ( & canonical. scope ( ) ) ;
50+
51+ // Separate AST node operators and their arguments.
52+ let ops = ast_nodes. map ( |( id, ( op, _) ) | ( id, op) ) ;
53+ let args = ast_nodes. flat_map ( |( id, ( _, args) ) | args. into_iter ( ) . enumerate ( ) . map ( move |( index, arg) | ( arg, ( id, index) ) ) ) ;
54+
55+ // Update argument identifiers, and then equate `(Ops, Args)` tuples to inform equivalences.
56+ let equivalent_asts =
57+ args. join_map ( canonical, |_child, & ( node, index) , & canonical| ( node, ( index, canonical) ) )
58+ . reduce ( |_node, input, output| {
59+ let mut args = Vec :: new ( ) ;
60+ for ( ( _index, canonical) , _) in input. iter ( ) {
61+ args. push ( * canonical) ;
62+ }
63+ output. push ( ( args, 1isize ) ) ;
64+ } )
65+ . join_map ( & ops, |node, children, op| ( ( children. clone ( ) , op. clone ( ) ) , * node) )
66+ . concat ( & ast_nodes. filter ( |( _, ( _, args) ) | args. is_empty ( ) ) . map ( |( node, ( op, _) ) | ( ( vec ! [ ] , op) , node) ) )
67+ . reduce ( |_key, input, output| {
68+ for node in input. iter ( ) {
69+ output. push ( ( ( * ( node. 0 ) , * input[ 0 ] . 0 ) , 1 ) ) ;
70+ }
71+ } )
72+ . map ( |( _key, ( node, canonical) ) | ( node, canonical) ) ;
73+
74+ // Blend together the two forms of equivalence, and compute the transitive closure.
75+ equivalent_asts
76+ . concat ( & equivs)
77+ . reduce ( |_node, input, output| { output. push ( ( * input[ 0 ] . 0 , 1 ) ) ; } )
78+ . iterate ( |inner| {
79+ inner. map ( |( node, canonical) | ( canonical, node) )
80+ . join_map ( & inner, |_canonical, & node, & canonical| ( node, canonical) )
81+ } )
82+ } )
83+ . consolidate ( )
84+ . inspect ( |x| println ! ( "{:?}" , x) )
85+ . probe_with ( & mut probe) ;
86+ } ) ;
87+
88+ input. advance_to ( 0 ) ;
89+ input. insert ( ( 0 , ( "a" . to_string ( ) , vec ! [ ] ) ) ) ;
90+ input. insert ( ( 1 , ( "b" . to_string ( ) , vec ! [ ] ) ) ) ;
91+ input. insert ( ( 2 , ( "c" . to_string ( ) , vec ! [ ] ) ) ) ;
92+ input. insert ( ( 3 , ( "add" . to_string ( ) , vec ! [ 0 , 2 ] ) ) ) ;
93+ input. insert ( ( 4 , ( "add" . to_string ( ) , vec ! [ 1 , 2 ] ) ) ) ;
94+
95+ equivs. advance_to ( 0 ) ;
96+
97+ input. advance_to ( 1 ) ;
98+ equivs. advance_to ( 1 ) ;
99+ input. flush ( ) ;
100+ equivs. flush ( ) ;
101+
102+ worker. step_while ( || probe. less_than ( & input. time ( ) ) ) ;
103+ println ! ( "" ) ;
104+ println ! ( "Marking 0 equivalent to 1" ) ;
105+
106+ equivs. insert ( ( 1 , 0 ) ) ;
107+ ]
108+ input. advance_to ( 2 ) ;
109+ equivs. advance_to ( 2 ) ;
110+ input. flush ( ) ;
111+ equivs. flush ( ) ;
112+
113+ worker. step_while ( || probe. less_than ( & input. time ( ) ) ) ;
114+ println ! ( "" ) ;
115+ println ! ( "Un-marking 0 equivalent to 1" ) ;
116+
117+ equivs. remove ( ( 1 , 0 ) ) ;
118+
119+ } ) . expect ( "Computation terminated abnormally" ) ;
120+ }
0 commit comments