今天无意间接了一个活,用 Python3 做一个爬虫,分布式存储在 MongoDB 中。
开一篇记录技术细节。
首先的话说一下需求:
- 只爬站点首页,也就是根据不同的域进行爬行,只爬站外链;
- 开始的节点输入数据库,爬到每个站记录:
- 网站的 HTTP 头
- 网站的 HTML 内容
- 解析网站的 IP 地址位置信息
- 分析 html 的指纹,参考 whatweb
性能特征:
- 遇到 302 跳转的话要跟进;
- 要能够独立添加搜索源站;
- 站点爬取失败之后记录状态一周后重爬;
- 需要分布式多机执行任务;
下面就是具体实现步骤:
1. 创建一个 Git
在 BitBucket 上面创建了一个 Git (暂且设为公有)。
https://bitbucket.org/fish_ball/ecrawler
2. 下载安装 MongoDB 并部署实例以及管理工具
上网查了一下,比较好的管理工具有 RockMongo 和 PhpMoAdmin。
phpmoadmin 看起来只有一个文件,比较简单。
然后下载了一个,需要 php 配置。
http://php.net/manual/en/mongo.installation.php#mongo.installation.windows
留了个链,按照这个安装驱动。
搞起来有些麻烦,还是直接先用控制台好了。
我的 mongo 解压在 d:/mongo/
- cd d:/mongo/bin/
- md d:\data\db
- mongod –dbpath d:\data\db
- 另开一个窗口
- d:\mongo\bin\mongo
- print(‘Hello world!’);
暂且这样吧,后面再来整它。
3. 用 Python 写抓取部分的代码
直接创建一个 main.py
比较粗糙地,起码可以抓到内容了: 注意下面这个代码检测的 chardet 模块需要:
pip3 install chardet
import re
import json
# import http.client
import urllib.parse
import chardet
from http.client import HTTPConnection
def fetch_url(host, path=''):
conn = HTTPConnection(host, 80)
url = 'http://%s%s' % (host, path)
conn.request('GET', url)
response = conn.getresponse()
# header
headers = response.getheaders()
print(headers)
# Server - Apache
# Last-Modified - Tue, 31 Dec 2013 11:13:44 GMT
# Content-Length -
# Content-Type - text/html
for k, v in headers:
print(k + ' - ' + v)
raw_content = response.read()
charset = chardet.detect(raw_content)
# print(charset)
try:
data = raw_content.decode(charset['encoding'])
except UnicodeDecodeError:
print('%s: 编码错误', host)
# print(data)
conn.close()
return
fetch_url('www.baidu.com')
下面是通过内容抓取外链,这个比较简单。 然后由于只要域名部分,乱揉了个正则解决问题:
def get_out_links(content):
match = re.findall('<a[^>]* href=[\'\"]https?:\/\/([^\'\"\/:]+)[^>]*>', content)
return set(match)
4. 做 MongoDB 的数据结构
好了,关键部分的代码已经写好,下面的问题就是抓取任务的规划。
首先呢,抓取回来的 header 数据用 mongo 正好一个文档塞进去,然后内容的话用一个属性放好就行了。
先来 pip 安装一下 pymongo 模块(这比操蛋的 php 容易装多了!)
5. 集成测试
… …
话说,这项目已经完成许久了,但是这篇帖子变成了太监帖,回过头来交代一下:
项目的源码:Bitbucket fish_ball/ecrawler
后面有空再具体剖析。
# config.py
import os
# configuration
config = {
'db_host': 'localhost',
'db_port': 27017,
'buf_size': 10, # how many to fetch each time when queue empty
'delay': 600, # how long to lost response when enqueue
'retry': 7 * 24 * 3600, # how long to lost response when enqueue
'sleep': 1, # how long to start a queue query after done
'timeout': 10, # http request timeout
'max_threads': 20, # max threads count
'feature_path': '/var/ecrawler/cms.txt',
'username': 'ecrawler',
'password': 'abcd1234',
}
# utils.py
import re
import json
import chardet
import pymongo
import socket
from time import time, sleep
# import http.client
from http.client import HTTPConnection
import urllib.request
from urllib.parse import urlparse
from urllib.request import urlopen
from urllib.error import URLError
from config import config
# Connection, set to the specific instance in production.
mongo = pymongo.Connection(config['db_host'], config['db_port'])
# Database
db = mongo.ecrawler
db.authenticate(config['username'], config['password'])
# Table [nodes]
# - host | string | Domain name
# - ip | string | Ip address
# - timestamp | float | Last visit time, UNIX timestamp
# - headers | dict | { Http response header dictionary }
# - status_code | integer | http request statusCode
# - title | string | page title
# - links | list | [ domains linked out ]
# - fingerprint | list | [ js/css library uses ]
# - cms | string | match the cms the site uses
# - ip_location | string | ip location
# - content | string | html content
# Table [nodes_pending]
# - host | string | Domain name
# - timestamp | string | Timestamp to get priority
def pop_pending():
"""
Pops a pending host from mongodb.
:return: host name
"""
host = db.nodes_pending.find_one(sort=(('timestamp', pymongo.ASCENDING),))
if host is not None:
host = host.get('host')
db.nodes_pending.update(
{'host': host},
{'host': host, 'timestamp': time() + config['retry']},
True, # Upsert
)
return host
def pop_no_cms():
"""
Pops a host which doesn't have cms field from mongodb.
:return: host name
"""
host = db.nodes.find_one({'cms': None}, sort=(('timestamp', pymongo.ASCENDING),)).get('host')
db.nodes.update(
{'host': host},
{'host': host, 'timestamp': time() + config['retry']},
True, # Upsert
)
return host
def host_exists(host):
"""
return if host_exists in nodes or nodes_pending
:return: Boolean
"""
if db.nodes.find({'host': host}).count() > 0:
return 2
elif db.nodes_pending.find({'host': host}).count() > 0:
return 1
return 0
def fetch_header(host):
"""
Fetch with HEADER
:param host:
:return: respons,
"""
return urlopen(urllib.request.Request(
url='http://'+host,
method='HEAD,'
))
def get_html_title(content):
result = re.findall(r'<title[^>]*>([^<]*)', content)
return result and result[0] or ''
def get_out_links(content):
match = re.findall(r'<a[^>]* href=[\'\"]https?://([^\'\"/:\?#]+)[^>]*>', content)
return list(set(s.strip() for s in match if '.' in s))
def get_fingerprint(content):
match = re.findall(r'[\"\'/]([^\"\'/]+\.(js|css))[\"\?#]', content)
return list(set(name.strip() for name, ext in match))
def get_ip_info(host):
ip = socket.gethostbyname(host)
url = 'http://ip.taobao.com/service/getIpInfo.php?ip=' + ip
obj = json.loads(urlopen(url).read().decode())
return ip, obj
def replace_domain(host, force=False):
"""
Add a source host name to the crawler DB table,
If that host name exists: replace when force is True, or skip when force is False
Else insert that host with status equal 0 - unvisited.
:param mongo_table: mongodb table object
:param force: force to delete the original record when collides
:return: True if replaced, False
"""
if force:
db.nodes.remove({'host': host})
db.nodes_pending.remove({'host': host})
if db.nodes.find({'host': host}).count() or db.nodes_pending.find({'host': host}).count():
return False
else:
print('enqueue: '+host)
db.nodes_pending.insert({
'host': host,
'timestamp': time(),
})
return True
def do_crawl(host):
"""
Fetch the target host info and store the result in mongodb.
:param host: Host name
"""
# print('start: ' + host)
if not host:
return
try:
if db.nodes.find({'host': host}).count() > 0 or len(host) > 200:
db.nodes_pending.remove({'host': host})
print('remove: '+host)
return
url = 'http://' + host
response = urlopen(url, timeout=config['timeout'])
# header
headers = dict(response.headers)
# body
content = response.read()
if not content:
raise Exception('no content!')
charset = chardet.detect(content)
# decode body
encoding = charset and charset.get('encoding') or 'UTF8'
encoding = 'GBK' if encoding == 'GB2312' else encoding
body = content.decode(encoding, 'ignore')
# Success result
ip, location = get_ip_info(host)
links = get_out_links(body)
title = get_html_title(body)
db.nodes_pending.remove({'host': host})
db.nodes.update(
{'host': host},
{'$set': {
'ip': ip,
'timestamp': time(),
'headers': headers,
'status_code': response.code,
'title': title,
'links': links,
'fingerprint': get_fingerprint(body),
'cms': None,
'ip_location': location,
'content': body,
}},
True, # Upsert
)
for host_out in links:
if not host_exists(host_out):
db.nodes_pending.update(
{'host': host},
{'host': host, 'timestamp': time()},
True, # Upsert
)
print('success: %s - %s - %s' % (host, response.code, title))
except URLError as ex:
print('fail: %s - %s' % (host, ex))
except socket.timeout as ex:
print('timeout: %s - %s' % (host, ex))
except Exception as ex:
print('fail: %s - %s' % (host, ex))
# crawl.py
import pymongo
import threading
from config import config
from time import sleep
from utils import replace_domain, pop_pending, do_crawl
if __name__ == '__main__':
# Connection, set to the specific instance in production.
mongo = pymongo.Connection(config['db_host'], config['db_port'])
# Database
db = mongo.ecrawler
db.authenticate(config['username'], config['password'])
nodes = db.nodes
# nodes.ensure_index('host', unique=True)
# nodes.ensure_index('cms')
nodes_pending = db.nodes_pending
# nodes_pending.ensure_index('host', unique=True)
# nodes_pending.ensure_index('timestamp')
# SEED
replace_domain('www.baidu.com')
replace_domain('www.qq.com')
replace_domain('360.cn')
replace_domain('www.huangwenchao.com.cn')
# reset in debug
# nodes.remove({})
# exit()
# nodes.update({}, {'status': 0})
# while not queue, fetch urls from db
while True:
while threading.active_count() >= config.get('max_threads', 10):
sleep(config.get('sleep', 1))
threading.Thread(target=do_crawl, args=(pop_pending(),)).start()
# spy.py
import re
import json
import hashlib
import chardet
import pymongo
import threading
from time import time, sleep
from config import config
from urllib.request import urlopen
from utils import pop_no_cms
import urllib.error
# Connection, set to the specific instance in production.
mongo = pymongo.Connection(config['db_host'], config['db_port'])
# Database
db = mongo.ecrawler
db.authenticate(config['username'], config['password'])
def init_cms():
"""
Init the cms feature file to the db, from the config file
:return:
"""
# Update the config feature file in to table `cms`
for line in open(config['feature_path'], 'r', encoding='utf8'):
item = dict(zip(
['path', 'cms', 'cms2', 'md5'],
line.strip().split('|'),
))
db.cms.update(item, item, True)
if __name__ == '__main__':
# # init cms
# init_cms()
rules = list(db.cms.find())
def do_cms(host):
"""
"""
print('cms-start: ' + host)
db.nodes.update(
{'host': host},
{'$set': {'timestamp': time() + config.get('retry', 3600)}},
)
matched = None
for rule in rules:
try:
# response = fetch_header(host)
url = 'http://' + host + rule.get('path', '')
# print('try: %s' % url)
response = urlopen(url)
# print(response.code)
hash_obj = hashlib.md5()
hash_obj.update(response.read())
md5 = hash_obj.hexdigest()
# print(md5)
# print(rule['md5'])
if md5 == rule.get('md5'):
matched = rule
break
except urllib.error.HTTPError as ex:
# print(ex)
pass
except Exception as ex:
pass
if matched:
db.nodes.update(
{'host': host},
{'$set': {'cms': matched.get('cms', '')}},
)
print('matched: %s - %s' % (host, matched.get('cms', '')))
else:
db.nodes.update(
{'host': host},
{'$set': {'cms': ''}},
)
print('no-matched: %s' % host)
if __name__ == '__main__':
while True:
while threading.active_count() >= config.get('max_threads', 10):
sleep(config.get('sleep', 1))
threading.Thread(target=do_cms, args=(pop_no_cms(),)).start()
【转载请附】愿以此功德,回向 >>
原文链接:http://www.huangwenchao.com.cn/2014/08/python-%e7%88%ac%e8%99%ab.html【Python 爬虫】
自己独立做东西还是压力大 注意休息.