教程雨

OKX新手入门教程导航,收录OKX注册、充值、买币、提现等基础操作教程

发光蜘蛛网络采集数据,展现爬虫程序获取互联网信息的科技感

Python爬虫与数据采集实战:从零构建企业级数据采集系统

前言

数据是这个时代最重要的资产之一。无论是做市场调研、竞品分析、价格监控,还是训练AI模型,都需要大量的数据支撑。而爬虫,就是获取这些数据的最主要手段。

很多人觉得爬虫很难,要学很多乱七八糟的技术。其实不是。爬虫的本质很简单:发送请求→获取页面→解析数据→保存结果。只要搞清楚这四步,你就已经入门了。

今天这篇文章,我会用一个完整的电商数据采集项目,带你从零开始构建一套企业级的数据采集系统。内容包括基础的requests使用、BeautifulSoup解析、Scrapy框架实战,以及进阶的异步采集、代理池、分布式架构等。

为了避免给第三方网站造成压力,我会用一些公开的测试网站来演示,但代码逻辑是完全通用的,稍作修改就能应用到真实场景。

数据采集流程图,蓝绿渐变扁平插画风格

一、基础环境准备

1.1 依赖安装

首先安装Python环境,然后安装所需的依赖包:

bash

# 推荐使用虚拟环境
python -m venv crawler-env
source crawler-env/bin/activate  # Windows: crawler-env\Scripts\activate

# 安装核心依赖
pip install requests beautifulsoup4 lxml
pip install scrapy
pip install redis  # 分布式队列
pip install pymongo  # MongoDB存储
pip install mysql-connector-python  # MySQL存储
pip install selenium  # JS渲染页面
pip install playwright  # 现代浏览器自动化
pip install asyncio aiohttp  # 异步请求

建议创建一个 requirements.txt 文件,方便管理依赖:

plaintext

requests==2.31.0
beautifulsoup4==4.12.2
lxml==5.1.0
scrapy==2.11.0
aiohttp==3.9.1
aiofiles==23.2.1
selenium==4.16.0
playwright==1.40.0
redis==5.0.1
pymongo==4.6.1
mysql-connector-python==8.2.0
fake-useragent==1.4.0
tenacity==8.2.3

1.2 项目结构

plaintext

crawler_project/
├── spiders/              # 爬虫目录
│   ├── __init__.py
│   ├── base_spider.py    # 基础爬虫类
│   ├── e-commerce/        # 电商爬虫
│   └── news/             # 新闻爬虫
├── utils/                # 工具模块
│   ├── __init__.py
│   ├── http_client.py    # HTTP客户端封装
│   ├── proxy_pool.py     # 代理池管理
│   └── storage.py        # 数据存储
├── pipelines/           # 数据处理管道
├── middlewares/          # 中间件
├── config/               # 配置文件
├── scripts/              # 辅助脚本
├── tests/                # 测试代码
├── requirements.txt
└── README.md

二、基础爬虫开发:requests + BeautifulSoup

2.1 简单请求示例

先从最简单的爬虫开始,爬取一个公开的测试网站:

python

import requests
from bs4 import BeautifulSoup

def basic_crawler():
    url = "https://books.toscrape.com/"
    
    # 设置请求头,模拟浏览器访问
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
    }
    
    # 发送请求
    response = requests.get(url, headers=headers, timeout=10)
    
    # 检查响应状态
    if response.status_code == 200:
        print("请求成功")
        # 打印部分内容
        print(response.text[:500])
    else:
        print(f"请求失败: {response.status_code}")

if __name__ == "__main__":
    basic_crawler()

运行上面的代码,如果看到页面HTML输出,说明请求成功了。

2.2 解析页面内容

获取到HTML之后,用BeautifulSoup解析出我们需要的数据:

python

import requests
from bs4 import BeautifulSoup

def parse_books():
    url = "https://books.toscrape.com/catalogue/page-1.html"
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    }
    
    response = requests.get(url, headers=headers, timeout=10)
    soup = BeautifulSoup(response.text, 'lxml')  # 使用lxml解析器,速度更快
    
    # 找到所有书籍的容器
    books = soup.select('article.product_pod')
    
    results = []
    for book in books:
        title = book.select_one('h3 a')['title']
        price = book.select_one('.price_color').text.strip()
        rating = book.select_one('.star-rating')['class'][1]  # One, Two, Three, Four, Five
        
        results.append({
            'title': title,
            'price': price,
            'rating': rating
        })
    
    for book in results:
        print(f"书名: {book['title']}, 价格: {book['price']}, 评分: {book['rating']}星")
    
    return results

if __name__ == "__main__":
    parse_books()

2.3 多页面爬取

实际项目中,通常需要爬取多个页面的数据。实现翻页爬取:

python

import requests
from bs4 import BeautifulSoup
import time

def crawl_all_pages():
    base_url = "https://books.toscrape.com/catalogue/page-{}.html"
    all_books = []
    
    for page in range(1, 51):  # 爬取前50页
        url = base_url.format(page)
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        
        try:
            response = requests.get(url, headers=headers, timeout=10)
            
            if response.status_code == 200:
                soup = BeautifulSoup(response.text, 'lxml')
                books = soup.select('article.product_pod')
                
                for book in books:
                    title = book.select_one('h3 a')['title']
                    price = book.select_one('.price_color').text.strip()
                    all_books.append({
                        'title': title,
                        'price': price,
                        'page': page
                    })
                
                print(f"第{page}页爬取完成,获取{len(books)}本书")
            else:
                print(f"第{page}页请求失败: {response.status_code}")
                
        except Exception as e:
            print(f"第{page}页爬取出错: {e}")
        
        # 添加延时,避免请求过快
        time.sleep(1)
    
    print(f"\n总计爬取 {len(all_books)} 本书")
    return all_books

if __name__ == "__main__":
    crawl_all_pages()

三、进阶HTTP客户端封装

3.1 添加代理和重试机制

直接请求很容易被封,需要添加代理IP和重试机制:

python

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import random
import time

class HTTPClient:
    """封装HTTP请求,添加重试、代理、超时等功能"""
    
    def __init__(self, proxies=None, timeout=10, max_retries=3):
        self.session = requests.Session()
        self.timeout = timeout
        self.proxies = proxies or []
        
        # 配置重试策略
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,  # 重试间隔:1s, 2s, 4s
            status_forcelist=[429, 500, 502, 503, 504],
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
    
    def get_random_user_agent(self):
        """随机User-Agent"""
        user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
        ]
        return random.choice(user_agents)
    
    def get(self, url, **kwargs):
        """GET请求"""
        headers = kwargs.pop('headers', {})
        headers['User-Agent'] = self.get_random_user_agent()
        
        # 随机选择代理
        proxy = None
        if self.proxies:
            proxy = random.choice(self.proxies)
        
        try:
            response = self.session.get(
                url,
                headers=headers,
                proxies={'http': proxy, 'https': proxy} if proxy else None,
                timeout=self.timeout,
                **kwargs
            )
            response.raise_for_status()
            return response
        except requests.RequestException as e:
            print(f"请求失败: {url}, 错误: {e}")
            return None
    
    def close(self):
        """关闭会话"""
        self.session.close()


# 使用示例
if __name__ == "__main__":
    client = HTTPClient(timeout=10)
    response = client.get("https://httpbin.org/ip")
    if response:
        print(response.json())
    client.close()

3.2 代理池管理

一个简单但实用的代理池实现:

python

import requests
import random
import time

class ProxyPool:
    """简单的代理池管理"""
    
    def __init__(self):
        self.proxies = []
        self.failed_count = {}  # 记录每个代理的失败次数
    
    def add_proxy(self, proxy):
        """添加代理"""
        if proxy not in self.proxies:
            self.proxies.append(proxy)
    
    def add_proxies(self, proxies):
        """批量添加代理"""
        for proxy in proxies:
            self.add_proxy(proxy)
    
    def get_random_proxy(self):
        """获取随机代理"""
        if not self.proxies:
            return None
        
        # 优先使用成功率高的代理
        valid_proxies = [p for p in self.proxies if self.failed_count.get(p, 0) < 3]
        if valid_proxies:
            return random.choice(valid_proxies)
        
        # 如果所有代理都失败过,随机返回一个
        return random.choice(self.proxies)
    
    def mark_success(self, proxy):
        """标记代理成功"""
        self.failed_count[proxy] = 0
    
    def mark_failed(self, proxy):
        """标记代理失败"""
        self.failed_count[proxy] = self.failed_count.get(proxy, 0) + 1
        
        # 失败超过5次,移除代理
        if self.failed_count[proxy] >= 5:
            print(f"移除失效代理: {proxy}")
            self.proxies.remove(proxy)
    
    def validate_proxy(self, proxy, test_url="https://httpbin.org/ip"):
        """验证代理是否有效"""
        try:
            response = requests.get(
                test_url,
                proxies={'http': proxy, 'https': proxy},
                timeout=5
            )
            return response.status_code == 200
        except:
            return False
    
    def validate_all(self):
        """验证所有代理"""
        valid_proxies = []
        for proxy in self.proxies:
            if self.validate_proxy(proxy):
                valid_proxies.append(proxy)
                print(f"✓ 代理有效: {proxy}")
            else:
                print(f"✗ 代理失效: {proxy}")
        
        self.proxies = valid_proxies
        return valid_proxies


# 免费代理源(注意:这些代理随时可能失效)
FREE_PROXY_SOURCES = [
    'https://www.proxy-list.download/api/v1/get?type=http',
    'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/http.txt',
]


def fetch_free_proxies():
    """从公开源获取免费代理"""
    proxies = []
    
    # 这里只是示例,实际使用时建议使用付费代理服务
    # 免费代理质量差、存活时间短、速度慢
    for source in FREE_PROXY_SOURCES:
        try:
            response = requests.get(source, timeout=10)
            if response.status_code == 200:
                proxies.extend(response.text.strip().split('\n'))
        except Exception as e:
            print(f"获取代理失败 {source}: {e}")
    
    return [p.strip() for p in proxies if p.strip()]


if __name__ == "__main__":
    pool = ProxyPool()
    # 添加一些测试代理
    pool.add_proxies([
        'http://username:password@proxy.example.com:8080',  # 带认证的代理
        'http://proxy.example.com:8080',  # 普通代理
    ])
    
    print(f"当前代理池: {pool.proxies}")

四、Scrapy框架实战

4.1 Scrapy项目创建

Scrapy是Python最成熟的爬虫框架,适合构建企业级爬虫:

bash

scrapy startproject ecommerce_crawler
cd ecommerce_crawler
scrapy genspider book books.toscrape.com

项目结构:

plaintext

ecommerce_crawler/
├── ecommerce_crawler/
│   ├── __init__.py
│   ├── items.py          # 定义数据结构
│   ├── middlewares.py    # 中间件
│   ├── pipelines.py      # 数据管道
│   ├── settings.py       # 配置文件
│   └── spiders/
│       ├── __init__.py
│       └── book.py       # 书籍爬虫
└── scrapy.cfg

4.2 定义Item

python

# items.py
import scrapy

class BookItem(scrapy.Item):
    title = scrapy.Field()           # 书名
    price = scrapy.Field()           # 价格
    rating = scrapy.Field()          # 评分
    availability = scrapy.Field()    # 库存状态
    description = scrapy.Field()     # 描述
    category = scrapy.Field()        # 分类
    url = scrapy.Field()            # 原始URL
    scraped_time = scrapy.Field()    # 爬取时间

4.3 编写爬虫

python

# spiders/book.py
import scrapy
from ecommerce_crawler.items import BookItem
from datetime import datetime

class BookSpider(scrapy.Spider):
    name = 'book'
    allowed_domains = ['books.toscrape.com']
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.page_count = 1
        self.max_pages = 50  # 最多爬取50页
    
    def start_requests(self):
        """起始URL"""
        base_url = 'https://books.toscrape.com/catalogue/page-{}.html'
        for page in range(1, self.max_pages + 1):
            url = base_url.format(page)
            yield scrapy.Request(
                url,
                callback=self.parse_book_list,
                meta={'page': page}
            )
    
    def parse_book_list(self, response):
        """解析书籍列表页"""
        books = response.css('article.product_pod')
        
        for book in books:
            book_url = response.urljoin(book.css('h3 a::attr(href)').get())
            yield scrapy.Request(
                book_url,
                callback=self.parse_book_detail,
                meta={'page': response.meta['page']}
            )
        
        self.logger.info(f"第{response.meta['page']}页解析完成,共{len(books)}本书")
    
    def parse_book_detail(self, response):
        """解析书籍详情页"""
        item = BookItem()
        
        item['title'] = response.css('h1::text').get()
        item['price'] = response.css('.price_color::text').get()
        item['rating'] = response.css('.star-rating::attr(class)').re_first(r'star-rating (\w+)')
        item['availability'] = response.css('.availability::text').re_first(r'(\d+ available)')
        item['description'] = response.css('#product_description + p::text').get()
        item['category'] = response.css('breadcrumb a::text').getall()[-1]
        item['url'] = response.url
        item['scraped_time'] = datetime.now().isoformat()
        
        yield item

4.4 配置管道

python

# pipelines.py
import json
import pymongo
from datetime import datetime

class JsonWriterPipeline:
    """JSON文件输出"""
    
    def open_spider(self, spider):
        self.file = open(f'{spider.name}_items.json', 'w', encoding='utf-8')
        self.file.write('[\n')
    
    def close_spider(self, spider):
        self.file.write('\n]')
        self.file.close()
    
    def process_item(self, item, spider):
        line = json.dumps(dict(item), ensure_ascii=False) + ',\n'
        self.file.write(line)
        return item


class MongoPipeline:
    """MongoDB存储"""
    
    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db
    
    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DB')
        )
    
    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]
    
    def close_spider(self, spider):
        self.client.close()
    
    def process_item(self, item, spider):
        self.db[spider.name].update_one(
            {'url': item['url']},
            {'$set': dict(item)},
            upsert=True
        )
        return item


class DuplicatesPipeline:
    """去重管道"""
    
    def __init__(self):
        self.urls_seen = set()
    
    def process_item(self, item, spider):
        if item['url'] in self.urls_seen:
            spider.logger.warning(f"重复项目: {item['url']}")
            return item
        
        self.urls_seen.add(item['url'])
        return item

4.5 配置中间件

python

# middlewares.py
import random
import time
from scrapy import signals

class RandomUserAgentMiddleware:
    """随机User-Agent中间件"""
    
    def __init__(self):
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0.0.0',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) Chrome/120.0.0.0',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Firefox/121.0',
        ]
    
    def process_request(self, request, spider):
        request.headers['User-Agent'] = random.choice(self.user_agents)


class ProxyMiddleware:
    """代理中间件"""
    
    def __init__(self, proxy_list):
        self.proxy_list = proxy_list
    
    @classmethod
    def from_crawler(cls, crawler):
        proxy_list = crawler.settings.getlist('PROXY_LIST', [])
        return cls(proxy_list)
    
    def process_request(self, request, spider):
        if self.proxy_list:
            proxy = random.choice(self.proxy_list)
            request.meta['proxy'] = proxy
            spider.logger.debug(f"使用代理: {proxy}")
    
    def process_response(self, request, response, spider):
        # 如果返回403,说明被识别为爬虫,换个代理重试
        if response.status == 403:
            spider.logger.warning("遇到403,切换代理重试")
            new_proxy = random.choice(self.proxy_list)
            request.meta['proxy'] = new_proxy
            return request
        
        return response


class DownloadDelayMiddleware:
    """下载延迟中间件"""
    
    def __init__(self, delay):
        self.delay = delay
    
    @classmethod
    def from_crawler(cls, crawler):
        delay = crawler.settings.getfloat('DOWNLOAD_DELAY', 1)
        return cls(delay)
    
    def process_request(self, request, spider):
        time.sleep(self.delay)

4.6 settings.py配置

python

# settings.py
BOT_NAME = 'ecommerce_crawler'

SPIDER_MODULES = ['ecommerce_crawler.spiders']
NEWSPIDER_MODULE = 'ecommerce_crawler.spiders'

# 遵循robots.txt规则
ROBOTSTXT_OBEY = True

# 并发请求数
CONCURRENT_REQUESTS = 4

# 下载延迟(秒)
DOWNLOAD_DELAY = 1

# 请求超时
DOWNLOAD_TIMEOUT = 15

# 随机User-Agent
USER_AGENT = 'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)'

# 启用中间件
DOWNLOADER_MIDDLEWARES = {
    'ecommerce_crawler.middlewares.RandomUserAgentMiddleware': 400,
    'ecommerce_crawler.middlewares.ProxyMiddleware': 350,
    'ecommerce_crawler.middlewares.DownloadDelayMiddleware': 300,
}

# 启用管道
ITEM_PIPELINES = {
    'ecommerce_crawler.pipelines.JsonWriterPipeline': 300,
    'ecommerce_crawler.pipelines.MongoPipeline': 400,
    'ecommerce_crawler.pipelines.DuplicatesPipeline': 500,
}

# MongoDB配置
MONGO_URI = 'mongodb://localhost:27017/'
MONGO_DB = 'crawler_db'

# 代理列表
PROXY_LIST = [
    'http://username:password@proxy.example.com:8080',
]

4.7 运行爬虫

bash

# 运行爬虫
scrapy crawl book

# 保存为JSON
scrapy crawl book -o books.json

# 保存为CSV
scrapy crawl book -o books.csv

# 运行并设置并发数
scrapy crawl book -s CONCURRENT_REQUESTS=8 -s DOWNLOAD_DELAY=0.5

五、异步爬虫:aiohttp实战

5.1 为什么需要异步

Scrapy虽然强大,但资源消耗也比较大。对于一些轻量级的爬虫任务,asyncio + aiohttp是更好的选择,速度快、资源占用低。

5.2 异步爬虫实现

python

import asyncio
import aiohttp
import aiofiles
from bs4 import BeautifulSoup
import random
import time

class AsyncCrawler:
    """异步爬虫"""
    
    def __init__(self, concurrency=10):
        self.concurrency = concurrency
        self.session = None
        self.results = []
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0.0.0',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) Chrome/120.0.0.0',
        ]
    
    async def fetch(self, session, url):
        """异步获取页面"""
        headers = {'User-Agent': random.choice(self.user_agents)}
        
        try:
            async with session.get(url, headers=headers, timeout=10) as response:
                if response.status == 200:
                    return await response.text()
                else:
                    print(f"请求失败 {url}: {response.status}")
                    return None
        except Exception as e:
            print(f"请求异常 {url}: {e}")
            return None
    
    async def parse_book_list(self, html):
        """解析书籍列表"""
        if not html:
            return []
        
        soup = BeautifulSoup(html, 'lxml')
        books = soup.select('article.product_pod')
        
        results = []
        for book in books:
            title = book.select_one('h3 a')['title']
            price = book.select_one('.price_color').text.strip()
            results.append({'title': title, 'price': price})
        
        return results
    
    async def crawl_page(self, page):
        """爬取单页"""
        url = f"https://books.toscrape.com/catalogue/page-{page}.html"
        
        async with self.session.get(url) as response:
            if response.status == 200:
                html = await response.text()
                books = await self.parse_book_list(html)
                print(f"第{page}页完成,获取{len(books)}本书")
                return books
        
        return []
    
    async def run(self, pages=50):
        """运行爬虫"""
        connector = aiohttp.TCPConnector(limit=self.concurrency)
        timeout = aiohttp.ClientTimeout(total=30)
        
        async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
            self.session = session
            
            # 创建所有页面的任务
            tasks = [self.crawl_page(page) for page in range(1, pages + 1)]
            
            # 并发执行
            results = await asyncio.gather(*tasks)
            
            # 合并结果
            all_books = []
            for page_books in results:
                all_books.extend(page_books)
            
            print(f"\n总计爬取 {len(all_books)} 本书")
            
            # 保存结果
            async with aiofiles.open('async_books.json', 'w', encoding='utf-8') as f:
                import json
                await f.write(json.dumps(all_books, ensure_ascii=False, indent=2))
            
            return all_books


if __name__ == "__main__":
    crawler = AsyncCrawler(concurrency=5)  # 同时5个请求
    asyncio.run(crawler.run(pages=10))

六、数据存储方案

6.1 MySQL存储

python

import mysql.connector
from mysql.connector import Error

class MySQLStorage:
    """MySQL数据存储"""
    
    def __init__(self, host, port, user, password, database):
        self.config = {
            'host': host,
            'port': port,
            'user': user,
            'password': password,
            'database': database,
        }
        self.connection = None
    
    def connect(self):
        """建立连接"""
        try:
            self.connection = mysql.connector.connect(**self.config)
            print("MySQL连接成功")
        except Error as e:
            print(f"MySQL连接失败: {e}")
    
    def create_table(self):
        """创建表"""
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS books (
            id INT AUTO_INCREMENT PRIMARY KEY,
            title VARCHAR(500),
            price VARCHAR(50),
            rating VARCHAR(50),
            availability VARCHAR(100),
            description TEXT,
            category VARCHAR(200),
            url VARCHAR(1000) UNIQUE,
            scraped_time DATETIME,
            INDEX idx_url (url),
            INDEX idx_category (category),
            INDEX idx_scraped_time (scraped_time)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
        """
        
        cursor = self.connection.cursor()
        cursor.execute(create_table_sql)
        self.connection.commit()
        print("表创建成功")
    
    def insert(self, item):
        """插入数据"""
        insert_sql = """
        INSERT INTO books (title, price, rating, availability, description, category, url, scraped_time)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE
            title = VALUES(title),
            price = VALUES(price),
            scraped_time = VALUES(scraped_time);
        """
        
        cursor = self.connection.cursor()
        cursor.execute(insert_sql, (
            item.get('title'),
            item.get('price'),
            item.get('rating'),
            item.get('availability'),
            item.get('description'),
            item.get('category'),
            item.get('url'),
            item.get('scraped_time'),
        ))
        self.connection.commit()
    
    def close(self):
        """关闭连接"""
        if self.connection and self.connection.is_connected():
            self.connection.close()
            print("MySQL连接已关闭")

七、反爬虫与应对策略

7.1 常见反爬虫机制

  1. User-Agent检测:服务器检测请求头中的User-Agent
  2. IP频率限制:同一IP短时间内请求次数过多
  3. Cookie/Session检测:检测访问轨迹是否像真人
  4. 验证码:图形验证码、滑动验证码等
  5. JavaScript渲染:内容通过JS动态加载
  6. 行为分析:鼠标轨迹、点击模式等

7.2 应对策略

基础策略

  • 随机User-Agent
  • 请求间隔(time.sleep或DOWNLOAD_DELAY)
  • 使用代理IP

进阶策略

  • 使用Selenium/Playwright模拟浏览器
  • 打码平台处理验证码
  • 分布式爬虫多IP请求
  • Cookie池管理

道德建议

  • 遵守robots.txt规则
  • 设置合理的请求频率
  • 不要爬取敏感/隐私数据
  • 注意网站的版权声明

总结

今天的文章有点长,干货密度很高。简单总结一下今天学到的内容:

  1. requests + BeautifulSoup:适合简单的小规模爬虫
  2. Scrapy框架:企业级爬虫开发的首选
  3. aiohttp异步爬虫:轻量级高性能爬虫方案
  4. 代理池、UA轮换:提高爬虫稳定性
  5. 数据存储:MySQL、MongoDB等多种方案

最后提醒一点:爬虫只是工具,怎么用、用在哪里,才是真正需要思考的问题。在合法合规的前提下,让数据为你创造价值,才是我们学习爬虫的最终目的。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注