前言
数据是这个时代最重要的资产之一。无论是做市场调研、竞品分析、价格监控,还是训练AI模型,都需要大量的数据支撑。而爬虫,就是获取这些数据的最主要手段。
很多人觉得爬虫很难,要学很多乱七八糟的技术。其实不是。爬虫的本质很简单:发送请求→获取页面→解析数据→保存结果。只要搞清楚这四步,你就已经入门了。
今天这篇文章,我会用一个完整的电商数据采集项目,带你从零开始构建一套企业级的数据采集系统。内容包括基础的requests使用、BeautifulSoup解析、Scrapy框架实战,以及进阶的异步采集、代理池、分布式架构等。
为了避免给第三方网站造成压力,我会用一些公开的测试网站来演示,但代码逻辑是完全通用的,稍作修改就能应用到真实场景。

一、基础环境准备
1.1 依赖安装
首先安装Python环境,然后安装所需的依赖包:
bash
# 推荐使用虚拟环境
python -m venv crawler-env
source crawler-env/bin/activate # Windows: crawler-env\Scripts\activate
# 安装核心依赖
pip install requests beautifulsoup4 lxml
pip install scrapy
pip install redis # 分布式队列
pip install pymongo # MongoDB存储
pip install mysql-connector-python # MySQL存储
pip install selenium # JS渲染页面
pip install playwright # 现代浏览器自动化
pip install asyncio aiohttp # 异步请求
建议创建一个 requirements.txt 文件,方便管理依赖:
plaintext
requests==2.31.0
beautifulsoup4==4.12.2
lxml==5.1.0
scrapy==2.11.0
aiohttp==3.9.1
aiofiles==23.2.1
selenium==4.16.0
playwright==1.40.0
redis==5.0.1
pymongo==4.6.1
mysql-connector-python==8.2.0
fake-useragent==1.4.0
tenacity==8.2.3
1.2 项目结构
plaintext
crawler_project/
├── spiders/ # 爬虫目录
│ ├── __init__.py
│ ├── base_spider.py # 基础爬虫类
│ ├── e-commerce/ # 电商爬虫
│ └── news/ # 新闻爬虫
├── utils/ # 工具模块
│ ├── __init__.py
│ ├── http_client.py # HTTP客户端封装
│ ├── proxy_pool.py # 代理池管理
│ └── storage.py # 数据存储
├── pipelines/ # 数据处理管道
├── middlewares/ # 中间件
├── config/ # 配置文件
├── scripts/ # 辅助脚本
├── tests/ # 测试代码
├── requirements.txt
└── README.md
二、基础爬虫开发:requests + BeautifulSoup
2.1 简单请求示例
先从最简单的爬虫开始,爬取一个公开的测试网站:
python
import requests
from bs4 import BeautifulSoup
def basic_crawler():
url = "https://books.toscrape.com/"
# 设置请求头,模拟浏览器访问
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
# 发送请求
response = requests.get(url, headers=headers, timeout=10)
# 检查响应状态
if response.status_code == 200:
print("请求成功")
# 打印部分内容
print(response.text[:500])
else:
print(f"请求失败: {response.status_code}")
if __name__ == "__main__":
basic_crawler()
运行上面的代码,如果看到页面HTML输出,说明请求成功了。
2.2 解析页面内容
获取到HTML之后,用BeautifulSoup解析出我们需要的数据:
python
import requests
from bs4 import BeautifulSoup
def parse_books():
url = "https://books.toscrape.com/catalogue/page-1.html"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
soup = BeautifulSoup(response.text, 'lxml') # 使用lxml解析器,速度更快
# 找到所有书籍的容器
books = soup.select('article.product_pod')
results = []
for book in books:
title = book.select_one('h3 a')['title']
price = book.select_one('.price_color').text.strip()
rating = book.select_one('.star-rating')['class'][1] # One, Two, Three, Four, Five
results.append({
'title': title,
'price': price,
'rating': rating
})
for book in results:
print(f"书名: {book['title']}, 价格: {book['price']}, 评分: {book['rating']}星")
return results
if __name__ == "__main__":
parse_books()
2.3 多页面爬取
实际项目中,通常需要爬取多个页面的数据。实现翻页爬取:
python
import requests
from bs4 import BeautifulSoup
import time
def crawl_all_pages():
base_url = "https://books.toscrape.com/catalogue/page-{}.html"
all_books = []
for page in range(1, 51): # 爬取前50页
url = base_url.format(page)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
try:
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'lxml')
books = soup.select('article.product_pod')
for book in books:
title = book.select_one('h3 a')['title']
price = book.select_one('.price_color').text.strip()
all_books.append({
'title': title,
'price': price,
'page': page
})
print(f"第{page}页爬取完成,获取{len(books)}本书")
else:
print(f"第{page}页请求失败: {response.status_code}")
except Exception as e:
print(f"第{page}页爬取出错: {e}")
# 添加延时,避免请求过快
time.sleep(1)
print(f"\n总计爬取 {len(all_books)} 本书")
return all_books
if __name__ == "__main__":
crawl_all_pages()
三、进阶HTTP客户端封装
3.1 添加代理和重试机制
直接请求很容易被封,需要添加代理IP和重试机制:
python
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import random
import time
class HTTPClient:
"""封装HTTP请求,添加重试、代理、超时等功能"""
def __init__(self, proxies=None, timeout=10, max_retries=3):
self.session = requests.Session()
self.timeout = timeout
self.proxies = proxies or []
# 配置重试策略
retry_strategy = Retry(
total=max_retries,
backoff_factor=1, # 重试间隔:1s, 2s, 4s
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def get_random_user_agent(self):
"""随机User-Agent"""
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
]
return random.choice(user_agents)
def get(self, url, **kwargs):
"""GET请求"""
headers = kwargs.pop('headers', {})
headers['User-Agent'] = self.get_random_user_agent()
# 随机选择代理
proxy = None
if self.proxies:
proxy = random.choice(self.proxies)
try:
response = self.session.get(
url,
headers=headers,
proxies={'http': proxy, 'https': proxy} if proxy else None,
timeout=self.timeout,
**kwargs
)
response.raise_for_status()
return response
except requests.RequestException as e:
print(f"请求失败: {url}, 错误: {e}")
return None
def close(self):
"""关闭会话"""
self.session.close()
# 使用示例
if __name__ == "__main__":
client = HTTPClient(timeout=10)
response = client.get("https://httpbin.org/ip")
if response:
print(response.json())
client.close()
3.2 代理池管理
一个简单但实用的代理池实现:
python
import requests
import random
import time
class ProxyPool:
"""简单的代理池管理"""
def __init__(self):
self.proxies = []
self.failed_count = {} # 记录每个代理的失败次数
def add_proxy(self, proxy):
"""添加代理"""
if proxy not in self.proxies:
self.proxies.append(proxy)
def add_proxies(self, proxies):
"""批量添加代理"""
for proxy in proxies:
self.add_proxy(proxy)
def get_random_proxy(self):
"""获取随机代理"""
if not self.proxies:
return None
# 优先使用成功率高的代理
valid_proxies = [p for p in self.proxies if self.failed_count.get(p, 0) < 3]
if valid_proxies:
return random.choice(valid_proxies)
# 如果所有代理都失败过,随机返回一个
return random.choice(self.proxies)
def mark_success(self, proxy):
"""标记代理成功"""
self.failed_count[proxy] = 0
def mark_failed(self, proxy):
"""标记代理失败"""
self.failed_count[proxy] = self.failed_count.get(proxy, 0) + 1
# 失败超过5次,移除代理
if self.failed_count[proxy] >= 5:
print(f"移除失效代理: {proxy}")
self.proxies.remove(proxy)
def validate_proxy(self, proxy, test_url="https://httpbin.org/ip"):
"""验证代理是否有效"""
try:
response = requests.get(
test_url,
proxies={'http': proxy, 'https': proxy},
timeout=5
)
return response.status_code == 200
except:
return False
def validate_all(self):
"""验证所有代理"""
valid_proxies = []
for proxy in self.proxies:
if self.validate_proxy(proxy):
valid_proxies.append(proxy)
print(f"✓ 代理有效: {proxy}")
else:
print(f"✗ 代理失效: {proxy}")
self.proxies = valid_proxies
return valid_proxies
# 免费代理源(注意:这些代理随时可能失效)
FREE_PROXY_SOURCES = [
'https://www.proxy-list.download/api/v1/get?type=http',
'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/http.txt',
]
def fetch_free_proxies():
"""从公开源获取免费代理"""
proxies = []
# 这里只是示例,实际使用时建议使用付费代理服务
# 免费代理质量差、存活时间短、速度慢
for source in FREE_PROXY_SOURCES:
try:
response = requests.get(source, timeout=10)
if response.status_code == 200:
proxies.extend(response.text.strip().split('\n'))
except Exception as e:
print(f"获取代理失败 {source}: {e}")
return [p.strip() for p in proxies if p.strip()]
if __name__ == "__main__":
pool = ProxyPool()
# 添加一些测试代理
pool.add_proxies([
'http://username:password@proxy.example.com:8080', # 带认证的代理
'http://proxy.example.com:8080', # 普通代理
])
print(f"当前代理池: {pool.proxies}")
四、Scrapy框架实战
4.1 Scrapy项目创建
Scrapy是Python最成熟的爬虫框架,适合构建企业级爬虫:
bash
scrapy startproject ecommerce_crawler
cd ecommerce_crawler
scrapy genspider book books.toscrape.com
项目结构:
plaintext
ecommerce_crawler/
├── ecommerce_crawler/
│ ├── __init__.py
│ ├── items.py # 定义数据结构
│ ├── middlewares.py # 中间件
│ ├── pipelines.py # 数据管道
│ ├── settings.py # 配置文件
│ └── spiders/
│ ├── __init__.py
│ └── book.py # 书籍爬虫
└── scrapy.cfg
4.2 定义Item
python
# items.py
import scrapy
class BookItem(scrapy.Item):
title = scrapy.Field() # 书名
price = scrapy.Field() # 价格
rating = scrapy.Field() # 评分
availability = scrapy.Field() # 库存状态
description = scrapy.Field() # 描述
category = scrapy.Field() # 分类
url = scrapy.Field() # 原始URL
scraped_time = scrapy.Field() # 爬取时间
4.3 编写爬虫
python
# spiders/book.py
import scrapy
from ecommerce_crawler.items import BookItem
from datetime import datetime
class BookSpider(scrapy.Spider):
name = 'book'
allowed_domains = ['books.toscrape.com']
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.page_count = 1
self.max_pages = 50 # 最多爬取50页
def start_requests(self):
"""起始URL"""
base_url = 'https://books.toscrape.com/catalogue/page-{}.html'
for page in range(1, self.max_pages + 1):
url = base_url.format(page)
yield scrapy.Request(
url,
callback=self.parse_book_list,
meta={'page': page}
)
def parse_book_list(self, response):
"""解析书籍列表页"""
books = response.css('article.product_pod')
for book in books:
book_url = response.urljoin(book.css('h3 a::attr(href)').get())
yield scrapy.Request(
book_url,
callback=self.parse_book_detail,
meta={'page': response.meta['page']}
)
self.logger.info(f"第{response.meta['page']}页解析完成,共{len(books)}本书")
def parse_book_detail(self, response):
"""解析书籍详情页"""
item = BookItem()
item['title'] = response.css('h1::text').get()
item['price'] = response.css('.price_color::text').get()
item['rating'] = response.css('.star-rating::attr(class)').re_first(r'star-rating (\w+)')
item['availability'] = response.css('.availability::text').re_first(r'(\d+ available)')
item['description'] = response.css('#product_description + p::text').get()
item['category'] = response.css('breadcrumb a::text').getall()[-1]
item['url'] = response.url
item['scraped_time'] = datetime.now().isoformat()
yield item
4.4 配置管道
python
# pipelines.py
import json
import pymongo
from datetime import datetime
class JsonWriterPipeline:
"""JSON文件输出"""
def open_spider(self, spider):
self.file = open(f'{spider.name}_items.json', 'w', encoding='utf-8')
self.file.write('[\n')
def close_spider(self, spider):
self.file.write('\n]')
self.file.close()
def process_item(self, item, spider):
line = json.dumps(dict(item), ensure_ascii=False) + ',\n'
self.file.write(line)
return item
class MongoPipeline:
"""MongoDB存储"""
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DB')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
self.db[spider.name].update_one(
{'url': item['url']},
{'$set': dict(item)},
upsert=True
)
return item
class DuplicatesPipeline:
"""去重管道"""
def __init__(self):
self.urls_seen = set()
def process_item(self, item, spider):
if item['url'] in self.urls_seen:
spider.logger.warning(f"重复项目: {item['url']}")
return item
self.urls_seen.add(item['url'])
return item
4.5 配置中间件
python
# middlewares.py
import random
import time
from scrapy import signals
class RandomUserAgentMiddleware:
"""随机User-Agent中间件"""
def __init__(self):
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0.0.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) Chrome/120.0.0.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Firefox/121.0',
]
def process_request(self, request, spider):
request.headers['User-Agent'] = random.choice(self.user_agents)
class ProxyMiddleware:
"""代理中间件"""
def __init__(self, proxy_list):
self.proxy_list = proxy_list
@classmethod
def from_crawler(cls, crawler):
proxy_list = crawler.settings.getlist('PROXY_LIST', [])
return cls(proxy_list)
def process_request(self, request, spider):
if self.proxy_list:
proxy = random.choice(self.proxy_list)
request.meta['proxy'] = proxy
spider.logger.debug(f"使用代理: {proxy}")
def process_response(self, request, response, spider):
# 如果返回403,说明被识别为爬虫,换个代理重试
if response.status == 403:
spider.logger.warning("遇到403,切换代理重试")
new_proxy = random.choice(self.proxy_list)
request.meta['proxy'] = new_proxy
return request
return response
class DownloadDelayMiddleware:
"""下载延迟中间件"""
def __init__(self, delay):
self.delay = delay
@classmethod
def from_crawler(cls, crawler):
delay = crawler.settings.getfloat('DOWNLOAD_DELAY', 1)
return cls(delay)
def process_request(self, request, spider):
time.sleep(self.delay)
4.6 settings.py配置
python
# settings.py
BOT_NAME = 'ecommerce_crawler'
SPIDER_MODULES = ['ecommerce_crawler.spiders']
NEWSPIDER_MODULE = 'ecommerce_crawler.spiders'
# 遵循robots.txt规则
ROBOTSTXT_OBEY = True
# 并发请求数
CONCURRENT_REQUESTS = 4
# 下载延迟(秒)
DOWNLOAD_DELAY = 1
# 请求超时
DOWNLOAD_TIMEOUT = 15
# 随机User-Agent
USER_AGENT = 'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)'
# 启用中间件
DOWNLOADER_MIDDLEWARES = {
'ecommerce_crawler.middlewares.RandomUserAgentMiddleware': 400,
'ecommerce_crawler.middlewares.ProxyMiddleware': 350,
'ecommerce_crawler.middlewares.DownloadDelayMiddleware': 300,
}
# 启用管道
ITEM_PIPELINES = {
'ecommerce_crawler.pipelines.JsonWriterPipeline': 300,
'ecommerce_crawler.pipelines.MongoPipeline': 400,
'ecommerce_crawler.pipelines.DuplicatesPipeline': 500,
}
# MongoDB配置
MONGO_URI = 'mongodb://localhost:27017/'
MONGO_DB = 'crawler_db'
# 代理列表
PROXY_LIST = [
'http://username:password@proxy.example.com:8080',
]
4.7 运行爬虫
bash
# 运行爬虫
scrapy crawl book
# 保存为JSON
scrapy crawl book -o books.json
# 保存为CSV
scrapy crawl book -o books.csv
# 运行并设置并发数
scrapy crawl book -s CONCURRENT_REQUESTS=8 -s DOWNLOAD_DELAY=0.5
五、异步爬虫:aiohttp实战
5.1 为什么需要异步
Scrapy虽然强大,但资源消耗也比较大。对于一些轻量级的爬虫任务,asyncio + aiohttp是更好的选择,速度快、资源占用低。
5.2 异步爬虫实现
python
import asyncio
import aiohttp
import aiofiles
from bs4 import BeautifulSoup
import random
import time
class AsyncCrawler:
"""异步爬虫"""
def __init__(self, concurrency=10):
self.concurrency = concurrency
self.session = None
self.results = []
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0.0.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) Chrome/120.0.0.0',
]
async def fetch(self, session, url):
"""异步获取页面"""
headers = {'User-Agent': random.choice(self.user_agents)}
try:
async with session.get(url, headers=headers, timeout=10) as response:
if response.status == 200:
return await response.text()
else:
print(f"请求失败 {url}: {response.status}")
return None
except Exception as e:
print(f"请求异常 {url}: {e}")
return None
async def parse_book_list(self, html):
"""解析书籍列表"""
if not html:
return []
soup = BeautifulSoup(html, 'lxml')
books = soup.select('article.product_pod')
results = []
for book in books:
title = book.select_one('h3 a')['title']
price = book.select_one('.price_color').text.strip()
results.append({'title': title, 'price': price})
return results
async def crawl_page(self, page):
"""爬取单页"""
url = f"https://books.toscrape.com/catalogue/page-{page}.html"
async with self.session.get(url) as response:
if response.status == 200:
html = await response.text()
books = await self.parse_book_list(html)
print(f"第{page}页完成,获取{len(books)}本书")
return books
return []
async def run(self, pages=50):
"""运行爬虫"""
connector = aiohttp.TCPConnector(limit=self.concurrency)
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
self.session = session
# 创建所有页面的任务
tasks = [self.crawl_page(page) for page in range(1, pages + 1)]
# 并发执行
results = await asyncio.gather(*tasks)
# 合并结果
all_books = []
for page_books in results:
all_books.extend(page_books)
print(f"\n总计爬取 {len(all_books)} 本书")
# 保存结果
async with aiofiles.open('async_books.json', 'w', encoding='utf-8') as f:
import json
await f.write(json.dumps(all_books, ensure_ascii=False, indent=2))
return all_books
if __name__ == "__main__":
crawler = AsyncCrawler(concurrency=5) # 同时5个请求
asyncio.run(crawler.run(pages=10))
六、数据存储方案
6.1 MySQL存储
python
import mysql.connector
from mysql.connector import Error
class MySQLStorage:
"""MySQL数据存储"""
def __init__(self, host, port, user, password, database):
self.config = {
'host': host,
'port': port,
'user': user,
'password': password,
'database': database,
}
self.connection = None
def connect(self):
"""建立连接"""
try:
self.connection = mysql.connector.connect(**self.config)
print("MySQL连接成功")
except Error as e:
print(f"MySQL连接失败: {e}")
def create_table(self):
"""创建表"""
create_table_sql = """
CREATE TABLE IF NOT EXISTS books (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(500),
price VARCHAR(50),
rating VARCHAR(50),
availability VARCHAR(100),
description TEXT,
category VARCHAR(200),
url VARCHAR(1000) UNIQUE,
scraped_time DATETIME,
INDEX idx_url (url),
INDEX idx_category (category),
INDEX idx_scraped_time (scraped_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
cursor = self.connection.cursor()
cursor.execute(create_table_sql)
self.connection.commit()
print("表创建成功")
def insert(self, item):
"""插入数据"""
insert_sql = """
INSERT INTO books (title, price, rating, availability, description, category, url, scraped_time)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
title = VALUES(title),
price = VALUES(price),
scraped_time = VALUES(scraped_time);
"""
cursor = self.connection.cursor()
cursor.execute(insert_sql, (
item.get('title'),
item.get('price'),
item.get('rating'),
item.get('availability'),
item.get('description'),
item.get('category'),
item.get('url'),
item.get('scraped_time'),
))
self.connection.commit()
def close(self):
"""关闭连接"""
if self.connection and self.connection.is_connected():
self.connection.close()
print("MySQL连接已关闭")
七、反爬虫与应对策略
7.1 常见反爬虫机制
- User-Agent检测:服务器检测请求头中的User-Agent
- IP频率限制:同一IP短时间内请求次数过多
- Cookie/Session检测:检测访问轨迹是否像真人
- 验证码:图形验证码、滑动验证码等
- JavaScript渲染:内容通过JS动态加载
- 行为分析:鼠标轨迹、点击模式等
7.2 应对策略
基础策略:
- 随机User-Agent
- 请求间隔(time.sleep或DOWNLOAD_DELAY)
- 使用代理IP
进阶策略:
- 使用Selenium/Playwright模拟浏览器
- 打码平台处理验证码
- 分布式爬虫多IP请求
- Cookie池管理
道德建议:
- 遵守robots.txt规则
- 设置合理的请求频率
- 不要爬取敏感/隐私数据
- 注意网站的版权声明
总结
今天的文章有点长,干货密度很高。简单总结一下今天学到的内容:
- requests + BeautifulSoup:适合简单的小规模爬虫
- Scrapy框架:企业级爬虫开发的首选
- aiohttp异步爬虫:轻量级高性能爬虫方案
- 代理池、UA轮换:提高爬虫稳定性
- 数据存储:MySQL、MongoDB等多种方案
最后提醒一点:爬虫只是工具,怎么用、用在哪里,才是真正需要思考的问题。在合法合规的前提下,让数据为你创造价值,才是我们学习爬虫的最终目的。

发表回复