前言:为什么你的Python程序跑得这么慢?
写爬虫的时候,100个请求要等100秒;做数据采集,批量处理几百张图片却卡得一动不动;调用API接口,一个接一个地等待响应。这些场景你是不是很熟悉?
问题不在你的代码逻辑,而在于同步编程带来的IO等待浪费。当你的程序在等待网络响应、数据库查询、文件读写的时候,CPU其实什么都没干,就这么干等着。
我之前负责一个数据采集项目,用同步方式处理1000条数据,每条耗时1秒,跑了将近17分钟。后来改成异步写法,同样的任务2分钟搞定——这就是异步编程的魅力。
今天就和大家聊聊Python异步编程的核心——asyncio,以及怎么用它来解决实际工作中的性能问题。

一、异步编程解决的是什么问题?
1.1 同步编程的痛点
先看一个很常见的场景:用Python请求10个URL,每个响应需要1秒。
python
import requests
import time
def fetch_all(urls):
"""同步方式请求所有URL"""
start = time.time()
for url in urls:
response = requests.get(url)
print(f"完成: {url}, 状态码: {response.status_code}")
print(f"总耗时: {time.time() - start:.2f}秒")
urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
fetch_all(urls)
运行结果:总耗时约10秒。
串行执行的逻辑很简单,但效率太低——前9个请求完成的时候,第10个请求根本还没开始。这10秒钟的等待时间,完全可以并发处理多个请求。
1.2 异步编程的思路
异步编程的核心是协作式多任务。当一个任务遇到IO等待时,它主动让出CPU控制权,去执行其他任务。等IO操作完成,再回来继续执行。
类比一下:
- 同步模式:你去咖啡馆点了一杯拿铁,然后站在柜台前一动不动地等5分钟,直到咖啡做好才去找座位。
- 异步模式:你点好单拿到号码,然后先去找座位刷手机,服务员做好后叫你去取。
1.3 性能对比
| 场景 | 同步方式 | asyncio方式 | 提升倍数 |
|---|---|---|---|
| 并发请求100个URL | ~50秒 | ~2秒 | 约25倍 |
| 批量数据库查询1000次 | ~30秒 | ~3秒 | 约10倍 |
| 读取500个小文件 | ~8秒 | ~1秒 | 约8倍 |
重要提示:asyncio适合IO密集型任务(网络请求、数据库、文件操作)。对于CPU密集型任务(大规模计算、图像处理),建议使用multiprocessing多进程方案。
二、协程基础:async与await关键字
2.1 什么是协程?
协程(Coroutine)是一种可以在执行过程中暂停并让出控制权的函数。与普通函数不同,调用协程函数不会立即执行,而是返回一个”协程对象”,需要由事件循环来驱动执行。
定义一个协程函数只需在def前加async:
python
import asyncio
async def say_hello(name: str) -> None:
"""一个简单的协程函数"""
# await关键字:暂停当前协程,让出事件循环控制权
await asyncio.sleep(1) # 模拟1秒的IO等待
print(f"Hello, {name}!")
# 运行协程
asyncio.run(say_hello("World"))
预期输出:
plaintext
Hello, World!
常见误区:直接调用say_hello("World")不会执行任何代码,只会返回一个协程对象<coroutine object say_hello>。必须通过asyncio.run()或await来驱动执行。
2.2 await关键字的作用
await只能用在async函数内部,它的作用是:暂停当前协程的执行,等待右侧的可等待对象完成,期间将控制权交还给事件循环。
可以被await的对象有三类:
- 协程对象(coroutine)
- Task对象(asyncio.Task)
- Future对象(asyncio.Future)
python
async def fetch_data(url: str) -> str:
"""模拟一次网络请求"""
print(f"开始请求: {url}")
await asyncio.sleep(2) # ← 在这里暂停,让出控制权
print(f"请求完成: {url}")
return f"来自 {url} 的数据"
async def main() -> None:
result = await fetch_data("https://api.example.com/data")
print(result)
asyncio.run(main())
2.3 事件循环:asyncio的心脏
事件循环(Event Loop)是asyncio的核心调度器。它持续监听所有注册的协程和IO事件,当某个协程在await处暂停时,立即切换去执行另一个就绪的协程——这就是”并发”的来源。
Python 3.7+推荐使用asyncio.run(),它会自动创建、运行并关闭事件循环:
python
import asyncio
async def main() -> None:
"""所有异步代码的入口点"""
await asyncio.sleep(1)
print("事件循环运行完毕")
# ✅ 推荐写法(Python 3.7+)
asyncio.run(main())
# ❌ 旧写法(不推荐,Python 3.10已废弃)
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
三、并发执行:让多个任务同时运行
3.1 asyncio.create_task()与asyncio.gather()
要实现真正的并发,需要把协程封装成Task,然后同时调度多个任务:
python
import asyncio
import time
async def fetch_url(index: int) -> str:
"""模拟请求一个URL"""
print(f"开始请求 {index}")
await asyncio.sleep(1) # 模拟1秒的网络延迟
print(f"完成请求 {index}")
return f"数据_{index}"
async def main():
start = time.time()
# 创建任务列表
tasks = [
asyncio.create_task(fetch_url(i))
for i in range(10)
]
# 等待所有任务完成
results = await asyncio.gather(*tasks)
print(f"\n总耗时: {time.time() - start:.2f}秒")
print(f"收到 {len(results)} 个结果")
asyncio.run(main())
运行结果:
plaintext
开始请求 0
开始请求 1
...
开始请求 9
完成请求 0
完成请求 1
...
完成请求 9
总耗时: 1.02秒
收到 10 个结果
10个1秒的任务只用了1秒完成,这就是并发执行的威力。
3.2 asyncio.Task的灵活控制
获取任务返回结果
python
async def main():
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(async_request(session, i)) for i in range(3)]
# gather返回所有任务的结果列表
results = await asyncio.gather(*tasks)
print(f"所有请求结果:{results}") # 输出:[200, 200, 200]
超时控制
避免任务无限等待:
python
async def main():
async with aiohttp.ClientSession() as session:
task = asyncio.create_task(async_request(session, 0))
try:
# 等待任务,超时时间0.5秒
result = await asyncio.wait_for(task, timeout=0.5)
except asyncio.TimeoutError:
print("请求超时!")
取消任务
python
async def long_task():
try:
print("任务开始执行...")
await asyncio.sleep(5) # 模拟长时间任务
print("任务执行完成")
except asyncio.CancelledError:
print("任务被取消")
async def main():
task = asyncio.create_task(long_task())
# 1秒后取消任务
await asyncio.sleep(1)
task.cancel()
# 等待任务取消完成
await task
四、实战:异步HTTP客户端
4.1 为什么不能用requests库?
asyncio的事件循环是单线程的,所有任务共享一个线程。如果在异步代码里使用requests库(同步阻塞),会卡住整个事件循环,其他任务都无法执行。
正确的做法是使用异步HTTP库,推荐:
- aiohttp:最流行的异步HTTP客户端
- httpx:同时支持同步和异步
- curl_cffi:高性能HTTP客户端
4.2 aiohttp实战:并发请求URL
python
import asyncio
import aiohttp
import time
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
"""异步请求单个URL"""
async with session.get(url) as response:
return {
"url": url,
"status": response.status,
"data": await response.text()
}
async def fetch_all(urls: list[str]) -> list[dict]:
"""并发请求所有URL"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
start = time.time()
results = await fetch_all(urls)
print(f"总耗时: {time.time() - start:.2f}秒")
print(f"成功获取 {len(results)} 个响应")
asyncio.run(main())
4.3 带错误处理的健壮实现
实际项目中,网络请求经常会出现超时、连接错误等情况,需要做好错误处理:
python
import asyncio
import aiohttp
from aiohttp import ClientError
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3
) -> dict:
"""带重试机制的异步请求"""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
return {
"url": url,
"status": response.status,
"data": await response.json()
}
except (ClientError, asyncio.TimeoutError) as e:
if attempt == max_retries - 1:
return {"url": url, "error": str(e)}
await asyncio.sleep(2 ** attempt) # 指数退避
async def main():
urls = ["https://api.example.com/data1", "https://invalid-url-that-will-fail.com"]
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_retry(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
if "error" in result:
print(f"请求失败: {result['url']} - {result['error']}")
else:
print(f"请求成功: {result['url']} - 状态码: {result['status']}")
五、避坑指南:新手最容易踩的5个坑
5.1 坑1:在异步函数里用了同步阻塞操作
python
# ❌ 错误:用了 time.sleep 而不是 asyncio.sleep
async def bad_example():
import time
time.sleep(5) # 这会阻塞整个事件循环!
return "done"
# ✅ 正确:使用 await asyncio.sleep()
async def good_example():
await asyncio.sleep(5)
return "done"
time.sleep会卡住当前线程,事件循环在这5秒内完全无法工作。所有同步阻塞操作都不能直接用在异步函数里,包括:
requests.get()→ 用aiohttptime.sleep()→ 用await asyncio.sleep()open().read()→ 用aiofiles
5.2 坑2:忘记加await
python
# ❌ 错误:协程对象没有被await,它永远不会执行
async def wrong():
coro = some_async_function()
print(coro) # 打印 <coroutine object>
# 协程对象没有被await,它永远不会执行
# ✅ 正确:必须await或调度协程
async def correct():
result = await some_async_function()
print(result)
协程对象被创建后,必须被await或者被asyncio.create_task()调度,否则它不会执行。
5.3 坑3:在同步代码里调用异步函数
python
# ❌ 错误:不能在同步函数里直接await
def sync_function():
result = await async_function() # SyntaxError
# ✅ 正确:用 asyncio.run()
def sync_function():
result = asyncio.run(async_function())
await只能在async def定义的函数内部使用。
5.4 坑4:创建了太多任务导致资源耗尽
python
# ❌ 错误:一下子创建几万个任务
for i in range(100000):
asyncio.create_task(heavy_task(i))
await asyncio.gather(*tasks) # 可能内存爆炸
# ✅ 正确:使用信号量限制并发数
async def main():
semaphore = asyncio.Semaphore(50) # 最多同时50个任务
async def bounded_task(i):
async with semaphore:
await heavy_task(i)
tasks = [asyncio.create_task(bounded_task(i)) for i in range(100000)]
await asyncio.gather(*tasks)
5.5 坑5:混用同步和异步数据库操作
python
# ❌ 错误:同步数据库驱动会阻塞事件循环
import pymysql # 同步驱动
async def bad_db_query():
conn = pymysql.connect(host='localhost') # 阻塞!
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
conn.close()
# ✅ 正确:使用异步数据库驱动
import aiomysql # 异步驱动
async def good_db_query():
pool = await aiomysql.create_pool(host='localhost', port=3306, user='root')
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM users")
result = await cursor.fetchall()
pool.close()
六、进阶应用:异步编程的最佳实践
6.1 异步上下文管理器
如果自定义的类需要在异步环境中进行初始化和清理,可以使用异步上下文管理器:
python
class AsyncDatabasePool:
"""异步数据库连接池"""
def __init__(self, dsn: str):
self.dsn = dsn
self.pool = None
async def __aenter__(self):
"""进入异步上下文时创建连接池"""
self.pool = await create_async_pool(self.dsn)
return self.pool
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""退出异步上下文时关闭连接池"""
if self.pool:
self.pool.close()
await self.pool.wait_closed()
# 使用方式
async def main():
async with AsyncDatabasePool(dsn="mysql://user:pass@localhost/db") as pool:
# pool可以直接使用
result = await pool.query("SELECT * FROM users")
return result
6.2 asyncio.Queue实现生产者-消费者模式
python
import asyncio
async def producer(queue: asyncio.Queue):
"""生产者:生成任务"""
for i in range(20):
await queue.put(i)
print(f"生产: {i}")
await queue.put(None) # 发送结束信号
async def consumer(queue: asyncio.Queue, name: str):
"""消费者:处理任务"""
while True:
item = await queue.get()
if item is None: # 收到结束信号
queue.task_done()
break
print(f"[{name}] 处理: {item}")
await asyncio.sleep(0.5)
queue.task_done()
async def main():
queue = asyncio.Queue()
# 启动1个生产者和3个消费者
await asyncio.gather(
producer(queue),
consumer(queue, "消费者A"),
consumer(queue, "消费者B"),
consumer(queue, "消费者C"),
)
asyncio.run(main())
6.3 异步生成器
python
async def async_data_generator(batch_size: int = 100):
"""异步生成器:流式处理大量数据"""
offset = 0
while True:
# 模拟从数据库或API分批获取数据
batch = await fetch_data_batch(offset=offset, limit=batch_size)
if not batch:
break
for item in batch:
yield item
offset += batch_size
async def main():
async for item in async_data_generator():
await process_item(item)
七、实战项目:构建高并发爬虫
7.1 项目需求
抓取1000个商品页面的信息,每个请求约0.5秒,使用异步并发提升效率。
7.2 完整实现
python
import asyncio
import aiohttp
import json
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class Product:
id: str
name: str
price: float
rating: float
class ProductCrawler:
"""高并发商品爬虫"""
def __init__(self, concurrency: int = 50):
self.concurrency = concurrency
self.semaphore = asyncio.Semaphore(concurrency)
self.results: list[Product] = []
async def fetch_product(self, session: aiohttp.ClientSession, product_id: str) -> Optional[Product]:
"""抓取单个商品信息"""
url = f"https://api.example.com/products/{product_id}"
async with self.semaphore: # 限制并发数
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
data = await response.json()
return Product(
id=product_id,
name=data.get("name", ""),
price=float(data.get("price", 0)),
rating=float(data.get("rating", 0))
)
except Exception as e:
print(f"抓取失败 {product_id}: {e}")
return None
async def crawl_all(self, product_ids: list[str]) -> list[Product]:
"""并发抓取所有商品"""
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_product(session, pid) for pid in product_ids]
results = await asyncio.gather(*tasks)
return [r for r in results if r is not None]
async def run(self, product_ids: list[str]):
"""运行爬虫"""
print(f"开始抓取 {len(product_ids)} 个商品,最大并发: {self.concurrency}")
start = time.time()
self.results = await self.crawl_all(product_ids)
elapsed = time.time() - start
print(f"\n抓取完成!")
print(f"成功: {len(self.results)} 个")
print(f"失败: {len(product_ids) - len(self.results)} 个")
print(f"总耗时: {elapsed:.2f}秒")
print(f"平均耗时: {elapsed/len(product_ids):.3f}秒/个")
print(f"预估同步耗时: {len(product_ids) * 0.5:.1f}秒")
print(f"性能提升: {(len(product_ids) * 0.5) / elapsed:.1f}x")
async def main():
# 模拟1000个商品ID
product_ids = [f"PROD_{i:05d}" for i in range(1000)]
crawler = ProductCrawler(concurrency=100)
await crawler.run(product_ids)
# 保存结果
with open("products.json", "w") as f:
json.dump([
{"id": p.id, "name": p.name, "price": p.price, "rating": p.rating}
for p in crawler.results
], f, indent=2)
asyncio.run(main())
7.3 性能分析
假设每个请求耗时0.5秒:
| 方式 | 1000个请求耗时 |
|---|---|
| 同步串行 | 500秒 |
| 10并发 | 50秒 |
| 50并发(推荐) | 10秒 |
| 100并发 | 5秒 |
注意:并发数不是越高越好,过高的并发可能导致:
- 目标服务器IP被封
- 本地内存压力增大
- 触发反爬机制
建议并发数控制在50-200之间,并根据实际情况调整。
总结
Python asyncio异步编程是处理IO密集型任务的神器,通过本文你学会了:
- 协程基础:async/await关键字的使用
- 事件循环:asyncio的核心调度机制
- 并发执行:asyncio.create_task和asyncio.gather
- 异步HTTP:aiohttp的使用和最佳实践
- 避坑指南:5个新手最容易犯的错误
- 实战应用:高并发爬虫的完整实现
异步编程的关键是理解事件循环的工作原理和避免同步阻塞操作。一旦掌握这个思维模式,你会发现很多性能问题都能迎刃而解。
建议从简单的HTTP请求开始练手,逐步尝试数据库操作、文件IO等场景。等你熟练之后,可以探索更高级的异步框架如FastAPI,它就是基于asyncio构建的异步Web框架。
相关资源推荐
- Python官方asyncio文档
- aiohttp官方文档
- Awesome Asyncio – asyncio生态资源汇总

发表回复