-
Notifications
You must be signed in to change notification settings - Fork 0
/
lww_element_graph.py
303 lines (285 loc) · 11.9 KB
/
lww_element_graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
"""
This module contains the (Last-Write-Wins)LWW-element-graph class from (Conflict-free Replicated Data Types) CRDT
"""
import time
import logging
logging.basicConfig(format='%(filename)s - %(levelname)s - %(asctime)s %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p',
level=logging.DEBUG)
class LWW_Element_Graph:
"""
Private class for the LWW element Graph
"""
add_vertex_set = None
add_edge_set = None
remove_vertex_set = None
remove_edge_set = None
def __init__(self, adjacency_list):
self.add_vertex_set = {}
self.remove_vertex_set = {}
self.add_edge_set = {}
self.remove_edge_set = {}
self.adjacency_list = adjacency_list
def __str__(self):
"""
:return: string -- returns adjacency list as a string.
"""
string = ''
for j in self.adjacency_list:
string += str(j) + ':\t' + str(self.adjacency_list[j]) + '\n'
return string
def check_edge_exists(self, edge) -> bool:
"""
Checks if edge already exists in the graph.
:param edge: integer
:return: boolean
"""
if self.check_vertex_exists(edge[0]) and self.check_vertex_exists(edge[1]):
if edge not in self.add_edge_set:
# Element not in add_set
return False
elif edge not in self.remove_edge_set:
# Element in add_set and not in remove_set
return True
elif self.remove_edge_set[edge] <= self.add_edge_set[edge]:
# Element in both add_set and remove_set, but addition is after removal
return True
else:
return False
else:
# Element in both add_set and remove_set, but addition is before removal
return False
def check_vertex_exists(self, vertex) -> bool:
"""
Checks if vertex exists in the graph.
:param vertex: integer
:return: boolean
"""
if vertex not in self.add_vertex_set:
# Element not in add_set
return False
elif vertex not in self.remove_vertex_set:
# Element in add_set and not in remove_set
return True
elif self.remove_vertex_set[vertex] < self.add_vertex_set[vertex]:
# Element in both add_set and remove_set, but addition is after removal
return True
elif self.remove_vertex_set[vertex] == self.add_vertex_set[vertex]:
# Element in both add_set and remove_set, but addition is equal to removal biased towards add
return True
else:
# Element in both add_set and remove_set, but addition is before removal
return False
def add_vertex(self, vertex, timestamp):
"""
Add vertex in the LWW-graph with the given timestamp.
:param vertex: integer
:param timestamp: string
:return: True if success else False
"""
try:
current_timestamp = time.time()
if self.check_vertex_exists(vertex):
logging.info("vertex already exists")
return False
else:
if vertex in self.remove_vertex_set:
if self.remove_vertex_set[vertex] <= timestamp:
if timestamp < current_timestamp:
timestamp = current_timestamp
self.add_vertex_set[vertex] = timestamp
self.adjacency_list[vertex] = []
return True
else:
if timestamp < current_timestamp:
timestamp = current_timestamp
self.add_vertex_set[vertex] = timestamp
self.adjacency_list[vertex] = []
return True
except TypeError as error:
logging.error(str(error))
def add_edge(self, pair_tuple, timestamp):
"""
Add edge in the graph with the given timestamp.
:param pair_tuple: tuple of vertex in between new edge will be added
:param timestamp: timestamp of adding new edge
:return: True if success else False
"""
try:
if self.check_vertex_exists(pair_tuple[0]) and self.check_vertex_exists(pair_tuple[1]):
if not self.check_edge_exists(pair_tuple):
if pair_tuple in self.add_edge_set:
self.add_edge_set[pair_tuple] = timestamp
self.adjacency_list[pair_tuple[0]].append(pair_tuple[1])
self.adjacency_list[pair_tuple[1]].append(pair_tuple[0])
logging.info("edge added successfully.")
return True
else:
self.add_edge_set[pair_tuple] = timestamp
self.adjacency_list[pair_tuple[0]].append(pair_tuple[1])
self.adjacency_list[pair_tuple[1]].append(pair_tuple[0])
logging.info("edge added successfully.")
return True
else:
logging.info("edge already exists.")
return False
else:
logging.info("invalid vertices.")
return False
except TypeError as error:
logging.error(str(error))
def remove_vertex(self, vertex, timestamp):
"""
Removes vertex from the graph by following LWW methodology.
This function is biased towards add operation.
:param vertex: integer value of vertex.
:param timestamp: timestamp of vertex removal from the graph.
:return: True if success else False.
"""
try:
if self.check_vertex_exists(vertex):
if vertex in self.remove_vertex_set:
if self.remove_vertex_set[vertex] < timestamp:
self.remove_vertex_set[vertex] = timestamp
else:
self.remove_vertex_set[vertex] = timestamp
if self.remove_vertex_set[vertex] > self.add_vertex_set[vertex]:
for j in self.adjacency_list:
if vertex in self.adjacency_list[j]:
self.adjacency_list[j].remove(vertex)
self.adjacency_list.pop(vertex)
logging.info("vertex removed successfully.")
return True
else:
self.remove_vertex_set[vertex] = timestamp
logging.info("biased towards add.")
return False
else:
if vertex in self.remove_vertex_set:
if self.remove_vertex_set[vertex] < timestamp:
self.remove_vertex_set[vertex] = timestamp
else:
self.remove_vertex_set[vertex] = timestamp
logging.info("vertex does not exists.")
return False
except TypeError as error:
logging.error(str(error))
def remove_edge(self, edge, timestamp):
"""
Removes edge from the graph by following LWW methodology.
This function is biased towards add operation.
:param edge:
:param timestamp: timestamp of edge removal from the graph.
:return: True if success else False.
"""
try:
if self.check_edge_exists(edge):
if edge in self.remove_edge_set:
if self.remove_edge_set[edge] < timestamp:
self.remove_edge_set[edge] = timestamp
else:
self.remove_edge_set[edge] = timestamp
if self.remove_edge_set[edge] > self.add_edge_set[edge]:
self.adjacency_list[edge[0]].remove(edge[1])
self.adjacency_list[edge[1]].remove(edge[0])
logging.info("edge removed successfully.")
return True
else:
logging.info('biased towards add.')
return False
else:
if edge in self.remove_edge_set:
if self.remove_edge_set[edge] < timestamp:
self.remove_edge_set[edge] = timestamp
else:
self.remove_edge_set[edge] = timestamp
logging.info("edge doesnt exist")
return False
except TypeError as error:
logging.error(str(error))
def get_vertices(self) -> list:
"""
This method returns the list of vertices present in the graph.
Biased towards add operation
:return: list of vertices.
"""
try:
vertices = []
for vertex in self.add_vertex_set:
if (vertex not in self.remove_vertex_set) or \
(self.add_vertex_set[vertex] >= self.remove_vertex_set[vertex]):
vertices.append(vertex)
return vertices
except TypeError as error:
logging.error(str(error))
def query_vertices(self, vertex):
"""
This method queries the graph and return all vertices of the given vertex.
:param vertex: integer value of vertex.
:return: all the vertices of the vertex if valid vertex is passed else None.
"""
try:
if self.check_vertex_exists(vertex):
return self.adjacency_list[vertex]
return None
except TypeError as error:
logging.error(str(error))
def find_path(self, start, end, path=None):
"""
This method finds the path between two vertexes of the graph.
:param start: start vertex.
:param end: end vertex.
:param path: path list.
:return: path between two vertices if it is found else None.
"""
try:
if path is None:
path = []
if self.check_vertex_exists(start) and self.check_vertex_exists(end):
path = path + [start]
if start == end:
return path
if start not in self.adjacency_list:
return None
for node in self.adjacency_list[start]:
if node not in path:
new_path = self.find_path(node, end, path)
if new_path:
return new_path
return None
return None
except TypeError as error:
logging.error(str(error))
def merge(self, lww_element_graph):
"""
This method merges the given graph with current graph.
For merging within a (add/remove) set, preference is given to latest timestamp
:param lww_element_graph {LWWElementGraph} -- set to merge with current graph.
:return: merged [LWWElementGraph]
"""
try:
self.merge_sets(self.add_vertex_set, lww_element_graph.add_vertex_set)
self.merge_sets(self.remove_vertex_set, lww_element_graph.remove_vertex_set)
self.merge_sets(self.add_edge_set, lww_element_graph.add_edge_set)
self.merge_sets(self.remove_edge_set, lww_element_graph.remove_edge_set)
self.merge_sets(self.adjacency_list, lww_element_graph.adjacency_list)
return self
except TypeError as error:
logging.error(str(error))
@staticmethod
def merge_sets(first, second):
"""
This method merges two sets.
:param first:
:param second:
:return: merged set.
"""
try:
for key in second:
if key not in first:
first[key] = second[key]
else:
if second[key] > first[key]:
first[key] = second[key]
except TypeError as error:
logging.error(str(error))