@@ -85,7 +85,7 @@ def sentiment_analysis(lines, model, hashingTF, iDF):
85
85
analysis .foreachRDD (lambda x : pos_cnt_li .extend (x .collect ()))
86
86
87
87
88
- def data_to_db (db , start_time , counts , keywords , hashtags , pos ):
88
+ def data_to_db (db , start_time , counts , keywords , hashtags , pos , tracking_word , related_keywords_tb ):
89
89
# Store counts
90
90
counts_t = []
91
91
for i in range (min (len (counts ), len (start_time ))):
@@ -100,7 +100,8 @@ def data_to_db(db, start_time, counts, keywords, hashtags, pos):
100
100
db ['counts' ].insert (counts_js )
101
101
# Store keywords
102
102
collection = db ['keywords' ]
103
- db ['keywords' ].insert (keywords )
103
+ if related_keywords_tb :
104
+ db ['keywords' ].insert (keywords )
104
105
# Store hashtags
105
106
collection = db ['hashtags' ]
106
107
db ['hashtags' ].insert (hashtags )
@@ -115,6 +116,11 @@ def data_to_db(db, start_time, counts, keywords, hashtags, pos):
115
116
ratio_js = json .loads (ratio_df .reset_index ().to_json (orient = 'records' ))
116
117
collection = db ['ratio' ]
117
118
db ['ratio' ].insert (ratio_js )
119
+ # Store tracking_word
120
+ tracking_word_df = pd .DataFrame ([tracking_word ], columns = ['Tracking_word' ])
121
+ tracking_word_js = json .loads (tracking_word_df .reset_index ().to_json (orient = 'records' ))
122
+ collection = db ['tracking_word' ]
123
+ db ['tracking_word' ].insert (tracking_word_js )
118
124
119
125
def parseLine (line ):
120
126
parts = line .split ('\t ' )
@@ -126,7 +132,7 @@ def parseLine(line):
126
132
return (label , words )
127
133
128
134
129
- def main (sc , db ):
135
+ def main (sc , db , tracking_word ):
130
136
131
137
print ('>' * 30 + 'SPARK START' + '>' * 30 )
132
138
@@ -239,6 +245,9 @@ def main(sc, db):
239
245
if len (sqlContext .tables ().filter ("tableName LIKE 'related_keywords_tmp'" ).collect ()) == 1 :
240
246
top_words = sqlContext .sql ( 'Select Keyword, Count from related_keywords_tmp' )
241
247
related_keywords_df = related_keywords_df .unionAll (top_words )
248
+ related_keywords_tb = True
249
+ else :
250
+ related_keywords_tb = False
242
251
243
252
# Find the top related hashtags
244
253
if len (sqlContext .tables ().filter ("tableName LIKE 'related_hashtags_tmp'" ).collect ()) == 1 :
@@ -249,34 +258,41 @@ def main(sc, db):
249
258
process_cnt += 1
250
259
251
260
# Final tables
252
- related_keywords_df = related_keywords_df .filter (related_keywords_df ['Keyword' ] != 'none' )
253
- # Spark SQL to Pandas Dataframe
254
- related_keywords_pd = related_keywords_df .toPandas ()
255
- related_keywords_pd = related_keywords_pd .groupby (related_keywords_pd ['Keyword' ]).sum ()
256
- related_keywords_pd = pd .DataFrame (related_keywords_pd )
257
- related_keywords_pd = related_keywords_pd .sort ("Count" , ascending = 0 ).iloc [0 :9 ]
261
+ if related_keywords_tb :
262
+ related_keywords_df = related_keywords_df .filter (related_keywords_df ['Keyword' ] != 'none' )
263
+ # Spark SQL to Pandas Dataframe
264
+ related_keywords_pd = related_keywords_df .toPandas ()
265
+ related_keywords_pd = related_keywords_pd [related_keywords_pd ['Keyword' ] != tracking_word ]
266
+ related_keywords_pd = related_keywords_pd .groupby (related_keywords_pd ['Keyword' ]).sum ()
267
+ related_keywords_pd = pd .DataFrame (related_keywords_pd )
268
+ related_keywords_pd = related_keywords_pd .sort ("Count" , ascending = 0 ).iloc [0 :min (9 , related_keywords_pd .shape [0 ])]
258
269
259
270
# Spark SQL to Pandas Dataframe
260
- related_hashtags_pd = related_hashtags_df .toPandas ()
271
+ related_hashtags_pd = related_hashtags_df .toPandas ()
272
+ related_hashtags_pd = related_hashtags_pd [related_hashtags_pd ['Hashtag' ] != '#' + tracking_word ]
261
273
related_hashtags_pd = related_hashtags_pd .groupby (related_hashtags_pd ['Hashtag' ]).sum ()
262
274
related_hashtags_pd = pd .DataFrame (related_hashtags_pd )
263
- related_hashtags_pd = related_hashtags_pd .sort ("Count" , ascending = 0 ).iloc [0 :9 ]
275
+ related_hashtags_pd = related_hashtags_pd .sort ("Count" , ascending = 0 ).iloc [0 :min ( 9 , related_hashtags_pd . shape [ 0 ])]
264
276
265
277
ssc .stop ()
266
278
###########################################################################
267
279
268
280
print (tweet_cnt_li )
269
281
print (start_time )
270
282
print (pos_cnt_li )
283
+ print (related_keywords_tb )
271
284
#print(related_keywords_pd.head(10))
272
285
#print(related_hashtags_pd.head(10))
273
- related_keywords_js = json .loads (related_keywords_pd .reset_index ().to_json (orient = 'records' ))
286
+ if related_keywords_tb :
287
+ related_keywords_js = json .loads (related_keywords_pd .reset_index ().to_json (orient = 'records' ))
288
+ else :
289
+ related_keywords_js = None
274
290
#print(related_keywords_js)
275
291
related_hashtags_js = json .loads (related_hashtags_pd .reset_index ().to_json (orient = 'records' ))
276
292
#print(related_hashtags_js)
277
293
278
294
# Store the data to MongoDB
279
- data_to_db (db , start_time , tweet_cnt_li , related_keywords_js , related_hashtags_js , pos_cnt_li )
295
+ data_to_db (db , start_time , tweet_cnt_li , related_keywords_js , related_hashtags_js , pos_cnt_li , tracking_word , related_keywords_tb )
280
296
281
297
print ('>' * 30 + 'SPARK STOP' + '>' * 30 )
282
298
@@ -297,6 +313,7 @@ def main(sc, db):
297
313
# Load parameters
298
314
with open ('conf/parameters.json' ) as f :
299
315
p = json .load (f )
316
+ tracking_word = p ['hashtag' ][1 :]
300
317
batch_interval = int (p ['DStream' ]['batch_interval' ])
301
318
window_time = int (p ['DStream' ]['window_time' ])
302
319
process_times = int (p ['DStream' ]['process_times' ])
@@ -308,6 +325,7 @@ def main(sc, db):
308
325
db ['keywords' ].drop ()
309
326
db ['hashtags' ].drop ()
310
327
db ['ratio' ].drop ()
328
+ db ['tracking_word' ].drop ()
311
329
# Execute main function
312
- main (sc , db )
330
+ main (sc , db , tracking_word )
313
331
0 commit comments