Skip to content

Commit e7980a1

Browse files
committed
一个CS架构的分布式爬虫例子
1 parent 8ebb870 commit e7980a1

15 files changed

+481
-0
lines changed

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
* [搜狗微信基于关键词爬取相关微信公众号文章](./wechat_spider)
1010

11+
* [一个`CS`架构的分布式爬虫例子](./distributed_web_spider_demo)
12+
1113
## `Web`开发相关
1214

1315
* [`Flask`实现`api token`](./redis_token)

distributed_web_spider_demo/README.md

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# 一个`CS`架构的分布式爬虫例子
2+
3+
基于`Redis`设计缓存队列,爬取拉勾网上指定关键字的职位信息。
4+
5+
#### 使用
6+
7+
* 客户端
8+
9+
从职位列表获取职位详情页链接,放入缓存队列中。
10+
11+
```bash
12+
python3 client.py
13+
```
14+
15+
* 服务端
16+
17+
从缓存队列中取出职位详情页链接,解析详情页页面,结构化数据存入数据库。
18+
19+
```bash
20+
python3 server.py
21+
```

distributed_web_spider_demo/client.py

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import asyncio
2+
import traceback
3+
4+
from settings import *
5+
6+
from db.redis_session import *
7+
from client.downloader import *
8+
from client.proxies import *
9+
10+
proxies_set = RedisSet('proxies', host = REDIS_HOST, port = REDIS_PORT, db = REDIS_DB, password = REDIS_PWD)
11+
pageurls_queue = RedisQueue('pageurls', host = REDIS_HOST, port = REDIS_PORT, db = REDIS_DB, password = REDIS_PWD)
12+
13+
if __name__ == '__main__':
14+
# GetIps(100).main()
15+
tasks = []
16+
while True:
17+
if pageurls_queue.qsize() == 0:
18+
break
19+
url = pageurls_queue.get_nowait()
20+
url = url.decode('utf-8')
21+
22+
header = {
23+
'User-Agent': random.sample(user_agent_list, 1)[0]
24+
}
25+
proxies = {}
26+
if 'https' in proxies_set.get_rand():
27+
proxies.update({
28+
"https": proxies_set.get_rand() if proxies_set.size() != 0 else ''
29+
})
30+
else:
31+
proxies.update({
32+
"http": proxies_set.get_rand() if proxies_set.size() != 0 else ''
33+
})
34+
35+
tasks.append(asyncio.ensure_future(get_content(url, header, proxies)))
36+
loop = asyncio.get_event_loop()
37+
loop.run_until_complete(asyncio.wait(tasks))
38+
loop.close()

distributed_web_spider_demo/client/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import random
2+
import requests
3+
import asyncio
4+
import traceback
5+
import functools
6+
from concurrent import futures
7+
8+
from server.proxies import *
9+
from server.user_agent import *
10+
from client.pipelines import *
11+
12+
proxies_set = RedisSet('proxies', host = REDIS_HOST, port = REDIS_PORT, db = REDIS_DB, password = REDIS_PWD)
13+
pageurls_queue = RedisQueue('pageurls', host = REDIS_HOST, port = REDIS_PORT, db = REDIS_DB, password = REDIS_PWD)
14+
15+
executor = futures.ThreadPoolExecutor(max_workers = 10)
16+
17+
async def get_content(url, header, proxies):
18+
loop = asyncio.get_event_loop()
19+
try:
20+
response = await loop.run_in_executor(executor, functools.partial(requests.get, url = url, headers = header, proxies = proxies, timeout = 30))
21+
if response.status_code == 200:
22+
if analyze_content(response.text):
23+
print('Success crawl page: {}, {}, {}'.format(url, header, proxies))
24+
else:
25+
proxies_set.remove(list(proxies.values())[0])
26+
print('Analyze error. Removing proxy:', list(proxies.values())[0])
27+
pageurls_queue.put(url)
28+
print('Add page url to queue again.')
29+
else:
30+
proxies_set.remove(list(proxies.values())[0])
31+
print('Response error. Removing proxy:', list(proxies.values())[0])
32+
pageurls_queue.put(url)
33+
print('Add page url to queue again.')
34+
except Exception as e:
35+
proxies_set.remove(list(proxies.values())[0])
36+
traceback.print_exc()
37+
print('Removing proxy:', list(proxies.values())[0])
38+
pageurls_queue.put(url)
39+
print('Add page url to queue again.')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import traceback
2+
from bs4 import BeautifulSoup
3+
4+
from settings import *
5+
from db.redis_session import *
6+
7+
jobdatas = RedisSet('jobdatas', host = REDIS_HOST, port = REDIS_PORT, db = REDIS_DB, password = REDIS_PWD)
8+
9+
def analyze_content(html_text):
10+
try:
11+
soup = BeautifulSoup(html_text, 'html5lib')
12+
13+
job_names = soup.find_all(name = 'div', attrs = {"class": "job-name"})[0]
14+
name = job_names.attrs['title']
15+
company = job_names.find(name = 'div', attrs = {"class": "company"}).text
16+
17+
job_requests = soup.find_all(name = 'dd', attrs = {"class": "job_request"})[0]
18+
request = "".join([i.text for i in job_requests.find_all(name = 'span')])
19+
20+
job_detail = soup.select('#job_detail')[0]
21+
job_advantage = job_detail.select('.job-advantage p')[0].text
22+
23+
job_description = [i.text for i in job_detail.select('.job_bt div p')]
24+
25+
job_address = job_detail.select('.work_addr')[0].text.replace('\n', '').replace(' ', '').replace('查看地图', '')
26+
27+
job_url = soup.find(name = 'link', attrs = {"rel": "canonical"}).attrs['href']
28+
29+
job_data = {
30+
'name': name,
31+
'company': company,
32+
'request': request,
33+
'advantage': job_advantage,
34+
'description': job_description,
35+
'address': job_address,
36+
'url': job_url
37+
}
38+
39+
jobdatas.add(job_data)
40+
return True
41+
except Exception as e:
42+
traceback.print_exc()
43+
return False
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import requests
2+
import asyncio
3+
import functools
4+
from bs4 import BeautifulSoup
5+
from concurrent import futures
6+
7+
from settings import *
8+
from db.redis_session import *
9+
10+
executor = futures.ThreadPoolExecutor(max_workers = 10)
11+
12+
proxies_set = RedisSet('proxies', host = REDIS_HOST, port = REDIS_PORT, db = REDIS_DB, password = REDIS_PWD)
13+
14+
class GetIps():
15+
def __init__(self, page):
16+
self.ips = []
17+
self.urls = []
18+
for i in range(page):
19+
self.urls.append(PROXY_CRAWL_URL + "%s"%i)
20+
self.header = {
21+
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.75 Safari/537.36'
22+
}
23+
24+
def get_ips(self):
25+
for url in self.urls:
26+
res = requests.get(url, headers = self.header)
27+
soup = BeautifulSoup(res.text, 'html5lib')
28+
ips = soup.find_all('tr')
29+
for ip in ips:
30+
tds = ip.find_all('td')
31+
if len(tds) < 2:
32+
continue
33+
ip_temp = 'http://' + tds[0].contents[0] + ':' + tds[1].contents[0]
34+
self.ips.append(str(ip_temp))
35+
36+
async def review_ips(self, url, ip):
37+
loop = asyncio.get_event_loop()
38+
try:
39+
proxy = {'http': ip}
40+
response = await loop.run_in_executor(executor, functools.partial(requests.get, url = url, proxies = proxy, timeout = 3))
41+
if response.status_code == 200:
42+
proxies_set.add(ip)
43+
else:
44+
print('Time Out!')
45+
except Exception as e:
46+
print(e)
47+
pass
48+
49+
def main(self):
50+
self.get_ips()
51+
url = PROXY_TEST_URL
52+
tasks = [asyncio.ensure_future(self.review_ips(url, ip)) for ip in self.ips]
53+
loop = asyncio.get_event_loop()
54+
loop.run_until_complete(asyncio.wait(tasks))
55+
loop.close()

distributed_web_spider_demo/db/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import redis
2+
3+
class RedisQueue(object):
4+
def __init__(self, name, namespace='queue', **redis_kwargs):
5+
self.__db = redis.Redis(**redis_kwargs)
6+
self.key = '%s:%s' %(namespace, name)
7+
8+
def qsize(self):
9+
return self.__db.llen(self.key)
10+
11+
def put(self, item):
12+
self.__db.rpush(self.key, item)
13+
14+
def get_wait(self, timeout=None):
15+
item = self.__db.blpop(self.key, timeout=timeout)
16+
return item
17+
18+
def get_nowait(self):
19+
item = self.__db.lpop(self.key)
20+
return item
21+
22+
class RedisSet(object):
23+
def __init__(self, name, namespace='set', **redis_kwargs):
24+
self.__db = redis.Redis(**redis_kwargs)
25+
self.name = '%s:%s' %(namespace, name)
26+
27+
def size(self):
28+
return self.__db.scard(self.name)
29+
30+
def add(self, value):
31+
self.__db.sadd(self.name, value)
32+
33+
def get_rand(self):
34+
return self.__db.srandmember(self.name, 1)[0].decode('utf-8')
35+
36+
def remove(self, value):
37+
self.__db.srem(self.name, value)

distributed_web_spider_demo/server.py

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import random
2+
import requests
3+
import traceback
4+
from bs4 import BeautifulSoup
5+
6+
from settings import *
7+
8+
from server.proxies import *
9+
from server.user_agent import *
10+
from db.redis_session import *
11+
from server.pipelines import *
12+
13+
proxies_set = RedisSet('proxies', host = REDIS_HOST, port = REDIS_PORT, db = REDIS_DB, password = REDIS_PWD)
14+
pageurls_queue = RedisQueue('pageurls', host = REDIS_HOST, port = REDIS_PORT, db = REDIS_DB, password = REDIS_PWD)
15+
16+
class Engine():
17+
def __init__(self, root_url, page_num):
18+
self.root_url = root_url
19+
self.page_num = page_num
20+
21+
def get_urls(self, num):
22+
while True:
23+
try:
24+
headers = {
25+
'User-Agent': random.sample(user_agent_list, 1)[0],
26+
'Host': 'www.lagou.com',
27+
'Connection': 'keep-alive',
28+
'Content-Length': '26',
29+
'Pragma': 'no-cache',
30+
'Cache-Control': 'no-cache',
31+
'Origin': 'https://www.lagou.com',
32+
'X-Anit-Forge-Code': '0',
33+
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
34+
'Accept': 'application/json, text/javascript, */*; q=0.01',
35+
'X-Requested-With': 'XMLHttpRequest',
36+
'X-Anit-Forge-Token': 'None',
37+
'Referer': 'https://www.lagou.com/jobs/list_python?labelWords=&fromSearch=true&suginput=',
38+
'Accept-Encoding': 'gzip, deflate, br',
39+
'Accept-Language': 'zh-CN,zh;q=0.9',
40+
'Cookie': '_ga=GA1.2.2102803584.1542767027; user_trace_token=20181121102346-794ca69c-ed34-11e8-af2a-525400f775ce; LGUID=20181121102346-794ca90c-ed34-11e8-af2a-525400f775ce; index_location_city=%E6%B7%B1%E5%9C%B3; _gid=GA1.2.417210577.1545100733; JSESSIONID=ABAAABAAAFCAAEGA2594CCC871A5C6033C439A7D767B848; _gat=1; LGSID=20181218212005-a2432f21-02c7-11e9-8f5b-5254005c3644; PRE_UTM=; PRE_HOST=; PRE_SITE=; PRE_LAND=https%3A%2F%2Fwww.lagou.com%2F; Hm_lvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1542767027,1545100734,1545139206; TG-TRACK-CODE=index_search; SEARCH_ID=3062075691be4d739e276ef7ea9921c1; Hm_lpvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1545139212; LGRID=20181218212011-a5f61409-02c7-11e9-8f5b-5254005c3644'
41+
}
42+
proxies = {
43+
"http": proxies_set.get_rand()
44+
}
45+
data = {
46+
'first': 'false',
47+
'pn': str(num),
48+
'kd': 'python'
49+
}
50+
params = {
51+
'city': '深圳',
52+
'needAddtionalResult': False
53+
}
54+
res = requests.post(self.root_url, params = params, data = data, headers = headers, proxies = proxies, timeout = 3)
55+
url_list = url_generator(res)
56+
[pageurls_queue.put(url) for url in url_list]
57+
if res.status_code == 200:
58+
print('success', str(num), proxies['http'])
59+
break
60+
else:
61+
print('status failed')
62+
continue
63+
except requests.exceptions.ProxyError:
64+
proxies_set.remove(proxies['http'])
65+
print('ProxyError. Removing proxy:', proxies['http'])
66+
continue
67+
except requests.exceptions.ConnectTimeout:
68+
proxies_set.remove(proxies['http'])
69+
print('ConnectTimeout. Removing proxy:', proxies['http'])
70+
continue
71+
except Exception as e:
72+
traceback.format_exc()
73+
continue
74+
75+
def main(self):
76+
for num in range(self.page_num):
77+
self.get_urls(num + 1)
78+
79+
if __name__ == '__main__':
80+
GetIps(100).main()
81+
engine = Engine(SERVER_ROOT_URL, 30)
82+
engine.main()

distributed_web_spider_demo/server/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import requests
2+
from pprint import pprint
3+
4+
def url_generator(response):
5+
'''
6+
:param response: class:`Response <Response>` object
7+
:return: :list:Details page URL list
8+
:rtype: list
9+
'''
10+
url_list = []
11+
for result in response.json()['content']['positionResult']['result']:
12+
url_list.append('https://www.lagou.com/jobs/{}.html'.format(result['positionId']))
13+
return url_list

0 commit comments

Comments
 (0)