-
Notifications
You must be signed in to change notification settings - Fork 15
/
new_live_processing.py
100 lines (80 loc) · 3.04 KB
/
new_live_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from pyspark import SparkConf, SparkContext
from pyspark.mllib.classification import NaiveBayesModel
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.regression import LabeledPoint
import operator
import pickle
import json
import MySQLdb
def insert_tweet(tweet,username,pnr,prediction,tweet_id,latitude,longitude):
HOST = ""
PORT = 3306
USER = ""
PASSWORD = ""
DB = ""
query = 'INSERT INTO tweets(tweet,username,pnr,prediction,tweet_id,latitude,longitude,time) VALUES ("%s","%s",%s,%s,%s,%s,%s,CURRENT_TIMESTAMP);'% (tweet,username,str(pnr),str(int(prediction)),str(tweet_id),str(latitude),str(longitude))
try:
conn = MySQLdb.connect(host=HOST,port=PORT,user=USER,passwd=PASSWORD,db=DB)
cursor = conn.cursor()
cursor.execute(query)
print("Database insertion SUCCESSFUL!!")
conn.commit()
except MySQLdb.Error as e:
print(e)
print("Database insertion unsuccessful!!")
finally:
conn.close()
from pyspark.streaming import StreamingContext
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
val = sc.parallelize("abd")
ssc = StreamingContext(sc, 10)
ssc.checkpoint("checkpoint")
kstream = KafkaUtils.createDirectStream(
ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
tweets = kstream.map(lambda x: json.loads(x[1]))
with open('IRModel1', 'rb') as f:
loadedModel = pickle.load(f)
bc_model = sc.broadcast(loadedModel)
def process_data(data):
print("Processing data ...")
if (not data.isEmpty()):
nbModel=bc_model.value
hashingTF = HashingTF(100000)
tf = hashingTF.transform(data.map(lambda x: x[0].encode('utf-8','ignore')))
tf.cache()
idf = IDF(minDocFreq=2).fit(tf)
tfidf = idf.transform(tf)
tfidf.cache()
prediction=nbModel.predict(tfidf)
temp = []
i=0
for p,q,r in data.collect():
temp.append([])
temp[i].append(p.encode('utf-8','ignore'))
temp[i].append(q)
temp[i].append(r)
i+=1
i=0
for p in prediction.collect():
temp[i].append(p)
i+=1
print(temp)
for i in temp:
insert_tweet(str(i[0]),str(i[1]),"0",int(i[3]),int(i[2]),"0","0")
else:
print("Empty RDD !!!")
pass
twitter=tweets.map(lambda tweet: tweet['user']['screen_name'])
tweet_text = tweets.map(lambda tweet: tweet['text'])
txt = tweets.map(lambda x: (x['text'], x['user']['screen_name'], x['id']))
txt.foreachRDD(process_data)
#text = tweet_text.map(lambda x: x.encode('utf-8','ignore'))
#text.foreachRDD(process_data)
ssc.start()
ssc.awaitTerminationOrTimeout(1000)
ssc.stop(stopGraceFully = True)