Python 爬虫

今天无意间接了一个活,用 Python3 做一个爬虫,分布式存储在 MongoDB 中。

开一篇记录技术细节。


首先的话说一下需求:

  1. 只爬站点首页,也就是根据不同的域进行爬行,只爬站外链;
  2. 开始的节点输入数据库,爬到每个站记录:
    1. 网站的 HTTP 头
    2. 网站的 HTML 内容
    3. 解析网站的 IP 地址位置信息
    4. 分析 html 的指纹,参考 whatweb

性能特征:

  1. 遇到 302 跳转的话要跟进;
  2. 要能够独立添加搜索源站;
  3. 站点爬取失败之后记录状态一周后重爬;
  4. 需要分布式多机执行任务;

下面就是具体实现步骤:

1. 创建一个 Git

在 BitBucket 上面创建了一个 Git (暂且设为公有)。

https://bitbucket.org/fish_ball/ecrawler

2. 下载安装 MongoDB 并部署实例以及管理工具

上网查了一下,比较好的管理工具有 RockMongo 和 PhpMoAdmin。

phpmoadmin 看起来只有一个文件,比较简单。

然后下载了一个,需要 php 配置。

http://php.net/manual/en/mongo.installation.php#mongo.installation.windows

留了个链,按照这个安装驱动。

搞起来有些麻烦,还是直接先用控制台好了。

我的 mongo 解压在 d:/mongo/

  1. cd d:/mongo/bin/
  2. md d:\data\db
  3. mongod –dbpath d:\data\db
  4. 另开一个窗口
  5. d:\mongo\bin\mongo
  6. print(‘Hello world!’);

暂且这样吧,后面再来整它。

3. 用 Python 写抓取部分的代码

直接创建一个 main.py

比较粗糙地,起码可以抓到内容了: 注意下面这个代码检测的 chardet 模块需要:

pip3 install chardet
import re
import json
# import http.client
import urllib.parse
import chardet
from http.client import HTTPConnection

def fetch_url(host, path=''):

    conn = HTTPConnection(host, 80)
    url = 'http://%s%s' % (host, path)
    conn.request('GET', url)
    response = conn.getresponse()

    # header
    headers = response.getheaders()
    print(headers)

    # Server - Apache
    # Last-Modified - Tue, 31 Dec 2013 11:13:44 GMT
    # Content-Length -
    # Content-Type - text/html

    for k, v in headers:
        print(k + ' - ' + v)

    raw_content = response.read()
    charset = chardet.detect(raw_content)
    # print(charset)
    try:
        data = raw_content.decode(charset['encoding'])
    except UnicodeDecodeError:
        print('%s: 编码错误', host)
    # print(data)
    conn.close()
    return

fetch_url('www.baidu.com')

下面是通过内容抓取外链,这个比较简单。 然后由于只要域名部分,乱揉了个正则解决问题:

def get_out_links(content):
    match = re.findall('<a[^>]* href=[\'\"]https?:\/\/([^\'\"\/:]+)[^>]*>', content)
    return set(match)

4. 做 MongoDB 的数据结构

好了,关键部分的代码已经写好,下面的问题就是抓取任务的规划。

首先呢,抓取回来的 header 数据用 mongo 正好一个文档塞进去,然后内容的话用一个属性放好就行了。

先来 pip 安装一下 pymongo 模块(这比操蛋的 php 容易装多了!)

5. 集成测试

… …

话说,这项目已经完成许久了,但是这篇帖子变成了太监帖,回过头来交代一下:

项目的源码:Bitbucket fish_ball/ecrawler

后面有空再具体剖析。

# config.py
import os

# configuration
config = {
    'db_host': 'localhost',
    'db_port': 27017,
    'buf_size': 10,  # how many to fetch each time when queue empty
    'delay': 600,  # how long to lost response when enqueue
    'retry': 7 * 24 * 3600,  # how long to lost response when enqueue
    'sleep': 1,  # how long to start a queue query after done
    'timeout': 10,  # http request timeout
    'max_threads': 20,  # max threads count
    'feature_path': '/var/ecrawler/cms.txt',
    'username': 'ecrawler',
    'password': 'abcd1234',
}
# utils.py
import re
import json
import chardet
import pymongo
import socket

from time import time, sleep

# import http.client
from http.client import HTTPConnection

import urllib.request
from urllib.parse import urlparse
from urllib.request import urlopen
from urllib.error import URLError

from config import config


# Connection, set to the specific instance in production.
mongo = pymongo.Connection(config['db_host'], config['db_port'])

# Database
db = mongo.ecrawler
db.authenticate(config['username'], config['password'])

# Table [nodes]
#   - host          | string    | Domain name
#   - ip            | string    | Ip address
#   - timestamp     | float     | Last visit time, UNIX timestamp
#   - headers       | dict      | { Http response header dictionary }
#   - status_code   | integer   | http request statusCode
#   - title         | string    | page title
#   - links         | list      | [ domains linked out ]
#   - fingerprint   | list      | [ js/css library uses ]
#   - cms           | string    | match the cms the site uses
#   - ip_location   | string    | ip location
#   - content       | string    | html content

# Table [nodes_pending]
#   - host          | string    | Domain name
#   - timestamp     | string    | Timestamp to get priority


def pop_pending():
    """
    Pops a pending host from mongodb.
    :return: host name
    """
    host = db.nodes_pending.find_one(sort=(('timestamp', pymongo.ASCENDING),))
    if host is not None:
        host = host.get('host')
        db.nodes_pending.update(
            {'host': host},
            {'host': host, 'timestamp': time() + config['retry']},
            True,  # Upsert
        )
    return host


def pop_no_cms():
    """
    Pops a host which doesn't have cms field from mongodb.
    :return: host name
    """
    host = db.nodes.find_one({'cms': None}, sort=(('timestamp', pymongo.ASCENDING),)).get('host')
    db.nodes.update(
        {'host': host},
        {'host': host, 'timestamp': time() + config['retry']},
        True,  # Upsert
    )
    return host


def host_exists(host):
    """
    return if host_exists in nodes or nodes_pending
    :return: Boolean
    """
    if db.nodes.find({'host': host}).count() > 0:
        return 2
    elif db.nodes_pending.find({'host': host}).count() > 0:
        return 1
    return 0


def fetch_header(host):
    """
    Fetch with HEADER
    :param host:
    :return: respons,
    """
    return urlopen(urllib.request.Request(
        url='http://'+host,
        method='HEAD,'
    ))


def get_html_title(content):
    result = re.findall(r'<title[^>]*>([^<]*)', content)
    return result and result[0] or ''


def get_out_links(content):
    match = re.findall(r'<a[^>]* href=[\'\"]https?://([^\'\"/:\?#]+)[^>]*>', content)
    return list(set(s.strip() for s in match if '.' in s))


def get_fingerprint(content):
    match = re.findall(r'[\"\'/]([^\"\'/]+\.(js|css))[\"\?#]', content)
    return list(set(name.strip() for name, ext in match))


def get_ip_info(host):
    ip = socket.gethostbyname(host)
    url = 'http://ip.taobao.com/service/getIpInfo.php?ip=' + ip
    obj = json.loads(urlopen(url).read().decode())
    return ip, obj


def replace_domain(host, force=False):
    """
    Add a source host name to the crawler DB table,
    If that host name exists: replace when force is True, or skip when force is False
    Else insert that host with status equal 0 - unvisited.

    :param mongo_table: mongodb table object
    :param force: force to delete the original record when collides
    :return: True if replaced, False
    """
    if force:
        db.nodes.remove({'host': host})
        db.nodes_pending.remove({'host': host})
    if db.nodes.find({'host': host}).count() or db.nodes_pending.find({'host': host}).count():
        return False
    else:
        print('enqueue: '+host)
        db.nodes_pending.insert({
            'host': host,
            'timestamp': time(),
        })
        return True


def do_crawl(host):
    """
    Fetch the target host info and store the result in mongodb.
    :param host: Host name
    """
    # print('start: ' + host)
    if not host:
        return

    try:

        if db.nodes.find({'host': host}).count() > 0 or len(host) > 200:
            db.nodes_pending.remove({'host': host})
            print('remove: '+host)
            return

        url = 'http://' + host
        response = urlopen(url, timeout=config['timeout'])

        # header
        headers = dict(response.headers)

        # body
        content = response.read()
        if not content:
            raise Exception('no content!')
        charset = chardet.detect(content)

        # decode body
        encoding = charset and charset.get('encoding') or 'UTF8'
        encoding = 'GBK' if encoding == 'GB2312' else encoding
        body = content.decode(encoding, 'ignore')

        # Success result
        ip, location = get_ip_info(host)
        links = get_out_links(body)
        title = get_html_title(body)
        db.nodes_pending.remove({'host': host})
        db.nodes.update(
            {'host': host},
            {'$set': {
                'ip': ip,
                'timestamp': time(),
                'headers': headers,
                'status_code': response.code,
                'title': title,
                'links': links,
                'fingerprint': get_fingerprint(body),
                'cms': None,
                'ip_location': location,
                'content': body,
            }},
            True,  # Upsert
        )
        for host_out in links:
            if not host_exists(host_out):
                db.nodes_pending.update(
                    {'host': host},
                    {'host': host, 'timestamp': time()},
                    True,  # Upsert
                )

        print('success: %s - %s - %s' % (host, response.code, title))

    except URLError as ex:

        print('fail: %s - %s' % (host, ex))

    except socket.timeout as ex:

        print('timeout: %s - %s' % (host, ex))

    except Exception as ex:

        print('fail: %s - %s' % (host, ex))
# crawl.py
import pymongo
import threading
from config import config
from time import sleep
from utils import replace_domain, pop_pending, do_crawl


if __name__ == '__main__':

    # Connection, set to the specific instance in production.
    mongo = pymongo.Connection(config['db_host'], config['db_port'])

    # Database
    db = mongo.ecrawler
    db.authenticate(config['username'], config['password'])

    nodes = db.nodes
    # nodes.ensure_index('host', unique=True)
    # nodes.ensure_index('cms')

    nodes_pending = db.nodes_pending
    # nodes_pending.ensure_index('host', unique=True)
    # nodes_pending.ensure_index('timestamp')

    # SEED
    replace_domain('www.baidu.com')
    replace_domain('www.qq.com')
    replace_domain('360.cn')
    replace_domain('www.huangwenchao.com.cn')

    # reset in debug
    # nodes.remove({})
    # exit()
    # nodes.update({}, {'status': 0})

    # while not queue, fetch  urls from db
    while True:
        while threading.active_count() >= config.get('max_threads', 10):
            sleep(config.get('sleep', 1))

        threading.Thread(target=do_crawl, args=(pop_pending(),)).start()
# spy.py
import re
import json
import hashlib
import chardet
import pymongo
import threading
from time import time, sleep
from config import config
from urllib.request import urlopen
from utils import pop_no_cms
import urllib.error


# Connection, set to the specific instance in production.
mongo = pymongo.Connection(config['db_host'], config['db_port'])

# Database
db = mongo.ecrawler
db.authenticate(config['username'], config['password'])


def init_cms():
    """
    Init the cms feature file to the db, from the config file
    :return:
    """

    # Update the config feature file in to table `cms`
    for line in open(config['feature_path'], 'r', encoding='utf8'):
        item = dict(zip(
            ['path', 'cms', 'cms2', 'md5'],
            line.strip().split('|'),
        ))
        db.cms.update(item, item, True)


if __name__ == '__main__':
#     # init cms
#     init_cms()
    rules = list(db.cms.find())


def do_cms(host):
    """

    """
    print('cms-start: ' + host)

    db.nodes.update(
        {'host': host},
        {'$set': {'timestamp': time() + config.get('retry', 3600)}},
    )

    matched = None

    for rule in rules:

        try:
            # response = fetch_header(host)
            url = 'http://' + host + rule.get('path', '')
            # print('try: %s' % url)
            response = urlopen(url)
            # print(response.code)

            hash_obj = hashlib.md5()
            hash_obj.update(response.read())
            md5 = hash_obj.hexdigest()

            # print(md5)
            # print(rule['md5'])

            if md5 == rule.get('md5'):
                matched = rule
                break

        except urllib.error.HTTPError as ex:
            # print(ex)
            pass

        except Exception as ex:
            pass

    if matched:
        db.nodes.update(
            {'host': host},
            {'$set': {'cms': matched.get('cms', '')}},
        )
        print('matched: %s - %s' % (host, matched.get('cms', '')))

    else:
        db.nodes.update(
            {'host': host},
            {'$set': {'cms': ''}},
        )
        print('no-matched: %s' % host)


if __name__ == '__main__':

    while True:

        while threading.active_count() >= config.get('max_threads', 10):
            sleep(config.get('sleep', 1))

        threading.Thread(target=do_cms, args=(pop_no_cms(),)).start()

【转载请附】愿以此功德,回向 >>

原文链接:https://www.huangwenchao.com.cn/2014/08/python-%e7%88%ac%e8%99%ab.html【Python 爬虫】

《Python 爬虫》有2个想法

发表评论

电子邮件地址不会被公开。 必填项已用*标注