|
| 1 | +from pyspark import SparkConf, SparkContext |
| 2 | +import argparse |
| 3 | +import time |
| 4 | +from itertools import combinations |
| 5 | +from collections import deque |
| 6 | +import copy |
| 7 | + |
| 8 | +# spark-submit community_detection.py 7 ~/Downloads/sample_data.csv ~/Downloads/output_betweenness.txt ~/Downloads/output_community.txt |
| 9 | + |
| 10 | +def create_adjacency_graph(user_edges_rdd): |
| 11 | + rdd_1 = user_edges_rdd.groupByKey().mapValues(set) |
| 12 | + rdd_2 = user_edges_rdd.map(lambda x: (x[1], x[0])).groupByKey().mapValues(set) |
| 13 | + return rdd_1.union(rdd_2).reduceByKey(lambda x, y: x.union(y)) |
| 14 | + |
| 15 | + |
| 16 | +def breadth_first_search(start_node, user_adjacency_dict): |
| 17 | + """ |
| 18 | + Step 1 of Girvan-Newman Algorithm |
| 19 | + :return: Returns a dictionary which contains the level wise node graph |
| 20 | + Eg:{0: ['B'], 1: ['A', 'K', 'L', 'M'], 2: ['C', 'Y'], 3: ['D', 'E', 'F'], 4: ['Z']} |
| 21 | + """ |
| 22 | + visited_nodes = [start_node] |
| 23 | + nodes_current_level_list = [start_node] |
| 24 | + level_index = 0 |
| 25 | + bfs_graph_dict = {} |
| 26 | + |
| 27 | + while len(nodes_current_level_list) != 0: |
| 28 | + bfs_graph_dict[level_index] = nodes_current_level_list |
| 29 | + level_index += 1 |
| 30 | + nodes_next_level_list = [] |
| 31 | + for current_node in nodes_current_level_list: |
| 32 | + current_neighbors = user_adjacency_dict[current_node] |
| 33 | + for neighbor in current_neighbors: |
| 34 | + if neighbor not in visited_nodes: |
| 35 | + visited_nodes.append(neighbor) |
| 36 | + nodes_next_level_list.append(neighbor) |
| 37 | + nodes_current_level_list = nodes_next_level_list |
| 38 | + return bfs_graph_dict |
| 39 | + |
| 40 | + |
| 41 | +def generate_node_weights(bfs_graph_dict, user_adjacency_dict): |
| 42 | + """ |
| 43 | + Step 2 of Girvan-Newman Algorithm |
| 44 | + :return: |
| 45 | + """ |
| 46 | + node_weights = {} |
| 47 | + levels = len(bfs_graph_dict) |
| 48 | + for root_level_node in bfs_graph_dict[0]: |
| 49 | + node_weights[root_level_node] = 1.0 |
| 50 | + |
| 51 | + for current_level in range(1, levels): |
| 52 | + previous_level_nodes = set(bfs_graph_dict[current_level-1]) |
| 53 | + current_level_nodes = set(bfs_graph_dict[current_level]) |
| 54 | + for node in current_level_nodes: |
| 55 | + node_neighbors = set(user_adjacency_dict[node]) |
| 56 | + parent_nodes = previous_level_nodes.intersection(node_neighbors) |
| 57 | + sum = 0.0 |
| 58 | + for parent in parent_nodes: |
| 59 | + sum += node_weights[parent] |
| 60 | + node_weights[node] = sum |
| 61 | + return node_weights |
| 62 | + |
| 63 | + |
| 64 | +def generate_edge_weights(node_weights, bfs_graph_dict, user_adjacency_dict): |
| 65 | + """ |
| 66 | + Step 3 of Girvan-Newman Algorithm |
| 67 | + :return: |
| 68 | + """ |
| 69 | + edge_weights = {} |
| 70 | + node_credits = {} |
| 71 | + levels = len(bfs_graph_dict) |
| 72 | + for last_level_node in bfs_graph_dict[levels - 1]: |
| 73 | + node_credits[last_level_node] = 1 |
| 74 | + |
| 75 | + for current_level in range(levels - 2, -1, -1): |
| 76 | + current_level_nodes = set(bfs_graph_dict[current_level]) |
| 77 | + next_level_nodes = set(bfs_graph_dict[current_level + 1]) |
| 78 | + for node in current_level_nodes: |
| 79 | + node_neighbors = set(user_adjacency_dict[node]) |
| 80 | + child_nodes = next_level_nodes.intersection(node_neighbors) |
| 81 | + sum = 1.0 if current_level != 0 else 0.0 |
| 82 | + for child in child_nodes: |
| 83 | + value = (node_credits[child]/node_weights[child]) * node_weights[node] |
| 84 | + edge_weights_sort_key = tuple(sorted([node, child])) |
| 85 | + edge_weights[edge_weights_sort_key] = value |
| 86 | + sum += value |
| 87 | + node_credits[node] = sum |
| 88 | + return edge_weights |
| 89 | + |
| 90 | + |
| 91 | +def calculate_betweenness(start_node, user_adjacency_dict): |
| 92 | + bfs_graph_dict = breadth_first_search(start_node, user_adjacency_dict) |
| 93 | + node_weights = generate_node_weights(bfs_graph_dict, user_adjacency_dict) |
| 94 | + edge_weights = generate_edge_weights(node_weights, bfs_graph_dict, user_adjacency_dict) |
| 95 | + return edge_weights.items() |
| 96 | + |
| 97 | + |
| 98 | +def fetch_connected_communities(): |
| 99 | + visited = [] |
| 100 | + connected_components = [] |
| 101 | + for start_node in community_user_adjacency_dict.keys(): |
| 102 | + detected_community = [] |
| 103 | + queue = deque([start_node]) |
| 104 | + while queue: |
| 105 | + node = queue.popleft() |
| 106 | + if node not in visited: |
| 107 | + visited.append(node) |
| 108 | + detected_community.append(node) |
| 109 | + node_neighbors = community_user_adjacency_dict[node] |
| 110 | + for neighbor in node_neighbors: |
| 111 | + queue.append(neighbor) |
| 112 | + if len(detected_community) != 0: |
| 113 | + detected_community.sort() |
| 114 | + connected_components.append(detected_community) |
| 115 | + return connected_components |
| 116 | + |
| 117 | + |
| 118 | +def calculate_modularity(community_list): |
| 119 | + modularity_sum = 0 |
| 120 | + for community in community_list: |
| 121 | + if len(community) > 1: |
| 122 | + for node_i_index in range(0, len(community)): |
| 123 | + for node_j_index in range(node_i_index, len(community)): |
| 124 | + node_i = community[node_i_index] |
| 125 | + node_j = community[node_j_index] |
| 126 | + modularity_sort_key = tuple(sorted([node_i, node_j])) |
| 127 | + adjacent_matrix_value = 1.0 if modularity_sort_key in original_user_edges_list else 0.0 |
| 128 | + # neighbors_node_i = user_adjacency_dict[node_i] |
| 129 | + # if node_j in neighbors_node_i: |
| 130 | + # adjacent_matrix_value = 1.0 |
| 131 | + # else: |
| 132 | + # adjacent_matrix_value = 0.0 |
| 133 | + value = adjacent_matrix_value - (node_degrees_dict[node_i] * node_degrees_dict[node_j] * formula_first_part) |
| 134 | + modularity_sum += value |
| 135 | + modularity_sum = modularity_sum * formula_first_part |
| 136 | + return modularity_sum |
| 137 | + |
| 138 | + |
| 139 | +if __name__ == "__main__": |
| 140 | + parser = argparse.ArgumentParser() |
| 141 | + parser.add_argument("filter_threshold", type=int, help="Enter the filter threshold to generate edges between " |
| 142 | + "user nodes") |
| 143 | + parser.add_argument("input_file_path", type=str, help="Enter the input file path") |
| 144 | + parser.add_argument("betweenness_output", type=str, help="Enter the path of the betweenness output file") |
| 145 | + parser.add_argument("community_output", type=str, help="Enter the path of the community output file") |
| 146 | + args = parser.parse_args() |
| 147 | + |
| 148 | + start = time.time() |
| 149 | + conf = SparkConf().setAppName("h4_task1").setMaster("local[*]").set("spark.driver.memory", "4g")\ |
| 150 | + .set("spark.executor.memory", "4g") |
| 151 | + sc = SparkContext(conf=conf) |
| 152 | + |
| 153 | + threshold = args.filter_threshold |
| 154 | + |
| 155 | + input_data_rdd = sc.textFile(args.input_file_path) |
| 156 | + header_line = input_data_rdd.first() |
| 157 | + input_data_rdd = input_data_rdd.filter(lambda x: x != header_line).map(lambda y: y.split(",")) |
| 158 | + users_rdd = input_data_rdd.map(lambda x: (x[0], x[1])).groupByKey().mapValues(set) |
| 159 | + users_rdd.persist() |
| 160 | + users_dict = dict(users_rdd.collect()) |
| 161 | + distinct_users = users_rdd.keys().collect() |
| 162 | + |
| 163 | + original_user_edges_list = [] |
| 164 | + for temp_user in combinations(distinct_users, 2): |
| 165 | + if len(users_dict[temp_user[0]].intersection(users_dict[temp_user[1]])) >= threshold: |
| 166 | + edge_sort_key = tuple(sorted([temp_user[0], temp_user[1]])) |
| 167 | + original_user_edges_list.append(edge_sort_key) |
| 168 | + |
| 169 | + num_user_edges_original_graph = len(original_user_edges_list) |
| 170 | + |
| 171 | + user_edges_rdd = sc.parallelize(original_user_edges_list).persist() |
| 172 | + user_adjacency_rdd = create_adjacency_graph(user_edges_rdd) |
| 173 | + user_adjacency_dict = user_adjacency_rdd.collectAsMap() |
| 174 | + node_degrees_dict = user_adjacency_rdd.map(lambda x: (x[0], len(x[1]))).collectAsMap() |
| 175 | + |
| 176 | + edge_betweenness_rdd = user_adjacency_rdd.keys().flatMap(lambda x: calculate_betweenness(x, user_adjacency_dict))\ |
| 177 | + .reduceByKey(lambda a, b: a+b).mapValues(lambda y: y/2.0).sortBy(lambda z: (-z[1], z[0]), ascending=True) |
| 178 | + |
| 179 | + edge_betweenness_values = edge_betweenness_rdd.collect() |
| 180 | + with open(args.betweenness_output, 'w') as file: |
| 181 | + for line in edge_betweenness_values: |
| 182 | + line_write = str(line[0]) + ", " + str(line[1]) + "\n" |
| 183 | + file.write(line_write) |
| 184 | + |
| 185 | + community_edges = deque(edge_betweenness_values) |
| 186 | + community_user_adjacency_dict = copy.deepcopy(user_adjacency_dict) |
| 187 | + |
| 188 | + formula_first_part = (1 / (2 * num_user_edges_original_graph)) |
| 189 | + global_maximum_modularity = -1.0 |
| 190 | + final_communities = [] |
| 191 | + |
| 192 | + while len(community_edges) != 0: |
| 193 | + removed_edge = community_edges.popleft()[0] |
| 194 | + community_user_adjacency_dict[removed_edge[0]].remove(removed_edge[1]) |
| 195 | + community_user_adjacency_dict[removed_edge[1]].remove(removed_edge[0]) |
| 196 | + # node_degrees_dict[removed_edge[0]] -= 1 |
| 197 | + # node_degrees_dict[removed_edge[1]] -= 1 |
| 198 | + connected_communities = fetch_connected_communities() |
| 199 | + community_modularity = calculate_modularity(connected_communities) |
| 200 | + if community_modularity > global_maximum_modularity: |
| 201 | + global_maximum_modularity = community_modularity |
| 202 | + final_communities = connected_communities |
| 203 | + community_user_adjacency_rdd = sc.parallelize(community_user_adjacency_dict.items()) |
| 204 | + community_edges_betweenness = community_user_adjacency_rdd.keys().\ |
| 205 | + flatMap(lambda x: calculate_betweenness(x, community_user_adjacency_dict)).reduceByKey(lambda a, b: a + b).\ |
| 206 | + mapValues(lambda y: y / 2.0).sortBy(lambda z: (-z[1], z[0]), ascending=True).collect() |
| 207 | + community_edges = deque(community_edges_betweenness) |
| 208 | + |
| 209 | + # print(global_maximum_modularity) |
| 210 | + # print(len(final_communities)) |
| 211 | + |
| 212 | + final_communities = sorted(final_communities, key=lambda x: (len(x), x)) |
| 213 | + with open(args.community_output, 'w') as file: |
| 214 | + for community in final_communities: |
| 215 | + value = str(community).replace('[', '').replace(']', '') + "\n" |
| 216 | + file.write(value) |
| 217 | + end = time.time() |
| 218 | + |
| 219 | + # print("Duration: ", end-start) |
0 commit comments