教程雨

OKX新手入门教程导航,收录OKX注册、充值、买币、提现等基础操作教程

Python asyncio异步编程教程封面,科技风格展示协程与并发主题

Python asyncio异步编程实战:从协程原理到高并发应用开发

前言:为什么你的Python程序跑得这么慢?

写爬虫的时候,100个请求要等100秒;做数据采集,批量处理几百张图片却卡得一动不动;调用API接口,一个接一个地等待响应。这些场景你是不是很熟悉?

问题不在你的代码逻辑,而在于同步编程带来的IO等待浪费。当你的程序在等待网络响应、数据库查询、文件读写的时候,CPU其实什么都没干,就这么干等着。

我之前负责一个数据采集项目,用同步方式处理1000条数据,每条耗时1秒,跑了将近17分钟。后来改成异步写法,同样的任务2分钟搞定——这就是异步编程的魅力。

今天就和大家聊聊Python异步编程的核心——asyncio,以及怎么用它来解决实际工作中的性能问题。

Python asyncio异步编程学习路径图,从协程基础到高并发应用的完整规划

一、异步编程解决的是什么问题?

1.1 同步编程的痛点

先看一个很常见的场景:用Python请求10个URL,每个响应需要1秒。

python

import requests
import time

def fetch_all(urls):
    """同步方式请求所有URL"""
    start = time.time()
    for url in urls:
        response = requests.get(url)
        print(f"完成: {url}, 状态码: {response.status_code}")
    print(f"总耗时: {time.time() - start:.2f}秒")

urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
fetch_all(urls)

运行结果:总耗时约10秒

串行执行的逻辑很简单,但效率太低——前9个请求完成的时候,第10个请求根本还没开始。这10秒钟的等待时间,完全可以并发处理多个请求。

1.2 异步编程的思路

异步编程的核心是协作式多任务。当一个任务遇到IO等待时,它主动让出CPU控制权,去执行其他任务。等IO操作完成,再回来继续执行。

类比一下:

  • 同步模式:你去咖啡馆点了一杯拿铁,然后站在柜台前一动不动地等5分钟,直到咖啡做好才去找座位。
  • 异步模式:你点好单拿到号码,然后先去找座位刷手机,服务员做好后叫你去取。

1.3 性能对比

场景同步方式asyncio方式提升倍数
并发请求100个URL~50秒~2秒约25倍
批量数据库查询1000次~30秒~3秒约10倍
读取500个小文件~8秒~1秒约8倍

重要提示:asyncio适合IO密集型任务(网络请求、数据库、文件操作)。对于CPU密集型任务(大规模计算、图像处理),建议使用multiprocessing多进程方案。

二、协程基础:async与await关键字

2.1 什么是协程?

协程(Coroutine)是一种可以在执行过程中暂停并让出控制权的函数。与普通函数不同,调用协程函数不会立即执行,而是返回一个”协程对象”,需要由事件循环来驱动执行。

定义一个协程函数只需在def前加async

python

import asyncio

async def say_hello(name: str) -> None:
    """一个简单的协程函数"""
    # await关键字:暂停当前协程,让出事件循环控制权
    await asyncio.sleep(1)  # 模拟1秒的IO等待
    print(f"Hello, {name}!")

# 运行协程
asyncio.run(say_hello("World"))

预期输出:

plaintext

Hello, World!

常见误区:直接调用say_hello("World")不会执行任何代码,只会返回一个协程对象<coroutine object say_hello>。必须通过asyncio.run()await来驱动执行。

2.2 await关键字的作用

await只能用在async函数内部,它的作用是:暂停当前协程的执行,等待右侧的可等待对象完成,期间将控制权交还给事件循环。

可以被await的对象有三类:

  • 协程对象(coroutine)
  • Task对象(asyncio.Task)
  • Future对象(asyncio.Future)

python

async def fetch_data(url: str) -> str:
    """模拟一次网络请求"""
    print(f"开始请求: {url}")
    await asyncio.sleep(2)  # ← 在这里暂停,让出控制权
    print(f"请求完成: {url}")
    return f"来自 {url} 的数据"

async def main() -> None:
    result = await fetch_data("https://api.example.com/data")
    print(result)

asyncio.run(main())

2.3 事件循环:asyncio的心脏

事件循环(Event Loop)是asyncio的核心调度器。它持续监听所有注册的协程和IO事件,当某个协程在await处暂停时,立即切换去执行另一个就绪的协程——这就是”并发”的来源。

Python 3.7+推荐使用asyncio.run(),它会自动创建、运行并关闭事件循环:

python

import asyncio

async def main() -> None:
    """所有异步代码的入口点"""
    await asyncio.sleep(1)
    print("事件循环运行完毕")

# ✅ 推荐写法(Python 3.7+)
asyncio.run(main())

# ❌ 旧写法(不推荐,Python 3.10已废弃)
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())

三、并发执行:让多个任务同时运行

3.1 asyncio.create_task()与asyncio.gather()

要实现真正的并发,需要把协程封装成Task,然后同时调度多个任务:

python

import asyncio
import time

async def fetch_url(index: int) -> str:
    """模拟请求一个URL"""
    print(f"开始请求 {index}")
    await asyncio.sleep(1)  # 模拟1秒的网络延迟
    print(f"完成请求 {index}")
    return f"数据_{index}"

async def main():
    start = time.time()
    
    # 创建任务列表
    tasks = [
        asyncio.create_task(fetch_url(i))
        for i in range(10)
    ]
    
    # 等待所有任务完成
    results = await asyncio.gather(*tasks)
    
    print(f"\n总耗时: {time.time() - start:.2f}秒")
    print(f"收到 {len(results)} 个结果")

asyncio.run(main())

运行结果:

plaintext

开始请求 0
开始请求 1
...
开始请求 9
完成请求 0
完成请求 1
...
完成请求 9

总耗时: 1.02秒
收到 10 个结果

10个1秒的任务只用了1秒完成,这就是并发执行的威力。

3.2 asyncio.Task的灵活控制

获取任务返回结果

python

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.create_task(async_request(session, i)) for i in range(3)]
        # gather返回所有任务的结果列表
        results = await asyncio.gather(*tasks)
        print(f"所有请求结果:{results}")  # 输出:[200, 200, 200]

超时控制

避免任务无限等待:

python

async def main():
    async with aiohttp.ClientSession() as session:
        task = asyncio.create_task(async_request(session, 0))
        try:
            # 等待任务,超时时间0.5秒
            result = await asyncio.wait_for(task, timeout=0.5)
        except asyncio.TimeoutError:
            print("请求超时!")

取消任务

python

async def long_task():
    try:
        print("任务开始执行...")
        await asyncio.sleep(5)  # 模拟长时间任务
        print("任务执行完成")
    except asyncio.CancelledError:
        print("任务被取消")

async def main():
    task = asyncio.create_task(long_task())
    # 1秒后取消任务
    await asyncio.sleep(1)
    task.cancel()
    # 等待任务取消完成
    await task

四、实战:异步HTTP客户端

4.1 为什么不能用requests库?

asyncio的事件循环是单线程的,所有任务共享一个线程。如果在异步代码里使用requests库(同步阻塞),会卡住整个事件循环,其他任务都无法执行。

正确的做法是使用异步HTTP库,推荐:

  • aiohttp:最流行的异步HTTP客户端
  • httpx:同时支持同步和异步
  • curl_cffi:高性能HTTP客户端

4.2 aiohttp实战:并发请求URL

python

import asyncio
import aiohttp
import time

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    """异步请求单个URL"""
    async with session.get(url) as response:
        return {
            "url": url,
            "status": response.status,
            "data": await response.text()
        }

async def fetch_all(urls: list[str]) -> list[dict]:
    """并发请求所有URL"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
    
    start = time.time()
    results = await fetch_all(urls)
    
    print(f"总耗时: {time.time() - start:.2f}秒")
    print(f"成功获取 {len(results)} 个响应")

asyncio.run(main())

4.3 带错误处理的健壮实现

实际项目中,网络请求经常会出现超时、连接错误等情况,需要做好错误处理:

python

import asyncio
import aiohttp
from aiohttp import ClientError

async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3
) -> dict:
    """带重试机制的异步请求"""
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
                return {
                    "url": url,
                    "status": response.status,
                    "data": await response.json()
                }
        except (ClientError, asyncio.TimeoutError) as e:
            if attempt == max_retries - 1:
                return {"url": url, "error": str(e)}
            await asyncio.sleep(2 ** attempt)  # 指数退避

async def main():
    urls = ["https://api.example.com/data1", "https://invalid-url-that-will-fail.com"]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_retry(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for result in results:
            if "error" in result:
                print(f"请求失败: {result['url']} - {result['error']}")
            else:
                print(f"请求成功: {result['url']} - 状态码: {result['status']}")

五、避坑指南:新手最容易踩的5个坑

5.1 坑1:在异步函数里用了同步阻塞操作

python

# ❌ 错误:用了 time.sleep 而不是 asyncio.sleep
async def bad_example():
    import time
    time.sleep(5)  # 这会阻塞整个事件循环!
    return "done"

# ✅ 正确:使用 await asyncio.sleep()
async def good_example():
    await asyncio.sleep(5)
    return "done"

time.sleep会卡住当前线程,事件循环在这5秒内完全无法工作。所有同步阻塞操作都不能直接用在异步函数里,包括:

  • requests.get() → 用aiohttp
  • time.sleep() → 用await asyncio.sleep()
  • open().read() → 用aiofiles

5.2 坑2:忘记加await

python

# ❌ 错误:协程对象没有被await,它永远不会执行
async def wrong():
    coro = some_async_function()
    print(coro)  # 打印 <coroutine object>
    # 协程对象没有被await,它永远不会执行

# ✅ 正确:必须await或调度协程
async def correct():
    result = await some_async_function()
    print(result)

协程对象被创建后,必须被await或者被asyncio.create_task()调度,否则它不会执行。

5.3 坑3:在同步代码里调用异步函数

python

# ❌ 错误:不能在同步函数里直接await
def sync_function():
    result = await async_function()  # SyntaxError

# ✅ 正确:用 asyncio.run()
def sync_function():
    result = asyncio.run(async_function())

await只能在async def定义的函数内部使用。

5.4 坑4:创建了太多任务导致资源耗尽

python

# ❌ 错误:一下子创建几万个任务
for i in range(100000):
    asyncio.create_task(heavy_task(i))
await asyncio.gather(*tasks)  # 可能内存爆炸

# ✅ 正确:使用信号量限制并发数
async def main():
    semaphore = asyncio.Semaphore(50)  # 最多同时50个任务
    
    async def bounded_task(i):
        async with semaphore:
            await heavy_task(i)
    
    tasks = [asyncio.create_task(bounded_task(i)) for i in range(100000)]
    await asyncio.gather(*tasks)

5.5 坑5:混用同步和异步数据库操作

python

# ❌ 错误:同步数据库驱动会阻塞事件循环
import pymysql  # 同步驱动

async def bad_db_query():
    conn = pymysql.connect(host='localhost')  # 阻塞!
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")
    conn.close()

# ✅ 正确:使用异步数据库驱动
import aiomysql  # 异步驱动

async def good_db_query():
    pool = await aiomysql.create_pool(host='localhost', port=3306, user='root')
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("SELECT * FROM users")
            result = await cursor.fetchall()
    pool.close()

六、进阶应用:异步编程的最佳实践

6.1 异步上下文管理器

如果自定义的类需要在异步环境中进行初始化和清理,可以使用异步上下文管理器:

python

class AsyncDatabasePool:
    """异步数据库连接池"""
    
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool = None
    
    async def __aenter__(self):
        """进入异步上下文时创建连接池"""
        self.pool = await create_async_pool(self.dsn)
        return self.pool
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """退出异步上下文时关闭连接池"""
        if self.pool:
            self.pool.close()
            await self.pool.wait_closed()

# 使用方式
async def main():
    async with AsyncDatabasePool(dsn="mysql://user:pass@localhost/db") as pool:
        # pool可以直接使用
        result = await pool.query("SELECT * FROM users")
        return result

6.2 asyncio.Queue实现生产者-消费者模式

python

import asyncio

async def producer(queue: asyncio.Queue):
    """生产者:生成任务"""
    for i in range(20):
        await queue.put(i)
        print(f"生产: {i}")
    await queue.put(None)  # 发送结束信号

async def consumer(queue: asyncio.Queue, name: str):
    """消费者:处理任务"""
    while True:
        item = await queue.get()
        if item is None:  # 收到结束信号
            queue.task_done()
            break
        print(f"[{name}] 处理: {item}")
        await asyncio.sleep(0.5)
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    
    # 启动1个生产者和3个消费者
    await asyncio.gather(
        producer(queue),
        consumer(queue, "消费者A"),
        consumer(queue, "消费者B"),
        consumer(queue, "消费者C"),
    )

asyncio.run(main())

6.3 异步生成器

python

async def async_data_generator(batch_size: int = 100):
    """异步生成器:流式处理大量数据"""
    offset = 0
    while True:
        # 模拟从数据库或API分批获取数据
        batch = await fetch_data_batch(offset=offset, limit=batch_size)
        if not batch:
            break
        for item in batch:
            yield item
        offset += batch_size

async def main():
    async for item in async_data_generator():
        await process_item(item)

七、实战项目:构建高并发爬虫

7.1 项目需求

抓取1000个商品页面的信息,每个请求约0.5秒,使用异步并发提升效率。

7.2 完整实现

python

import asyncio
import aiohttp
import json
import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class Product:
    id: str
    name: str
    price: float
    rating: float

class ProductCrawler:
    """高并发商品爬虫"""
    
    def __init__(self, concurrency: int = 50):
        self.concurrency = concurrency
        self.semaphore = asyncio.Semaphore(concurrency)
        self.results: list[Product] = []
    
    async def fetch_product(self, session: aiohttp.ClientSession, product_id: str) -> Optional[Product]:
        """抓取单个商品信息"""
        url = f"https://api.example.com/products/{product_id}"
        
        async with self.semaphore:  # 限制并发数
            try:
                async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
                    if response.status == 200:
                        data = await response.json()
                        return Product(
                            id=product_id,
                            name=data.get("name", ""),
                            price=float(data.get("price", 0)),
                            rating=float(data.get("rating", 0))
                        )
            except Exception as e:
                print(f"抓取失败 {product_id}: {e}")
                return None
    
    async def crawl_all(self, product_ids: list[str]) -> list[Product]:
        """并发抓取所有商品"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_product(session, pid) for pid in product_ids]
            results = await asyncio.gather(*tasks)
            return [r for r in results if r is not None]
    
    async def run(self, product_ids: list[str]):
        """运行爬虫"""
        print(f"开始抓取 {len(product_ids)} 个商品,最大并发: {self.concurrency}")
        start = time.time()
        
        self.results = await self.crawl_all(product_ids)
        
        elapsed = time.time() - start
        print(f"\n抓取完成!")
        print(f"成功: {len(self.results)} 个")
        print(f"失败: {len(product_ids) - len(self.results)} 个")
        print(f"总耗时: {elapsed:.2f}秒")
        print(f"平均耗时: {elapsed/len(product_ids):.3f}秒/个")
        print(f"预估同步耗时: {len(product_ids) * 0.5:.1f}秒")
        print(f"性能提升: {(len(product_ids) * 0.5) / elapsed:.1f}x")

async def main():
    # 模拟1000个商品ID
    product_ids = [f"PROD_{i:05d}" for i in range(1000)]
    
    crawler = ProductCrawler(concurrency=100)
    await crawler.run(product_ids)
    
    # 保存结果
    with open("products.json", "w") as f:
        json.dump([
            {"id": p.id, "name": p.name, "price": p.price, "rating": p.rating}
            for p in crawler.results
        ], f, indent=2)

asyncio.run(main())

7.3 性能分析

假设每个请求耗时0.5秒:

方式1000个请求耗时
同步串行500秒
10并发50秒
50并发(推荐)10秒
100并发5秒

注意:并发数不是越高越好,过高的并发可能导致:

  • 目标服务器IP被封
  • 本地内存压力增大
  • 触发反爬机制

建议并发数控制在50-200之间,并根据实际情况调整。

总结

Python asyncio异步编程是处理IO密集型任务的神器,通过本文你学会了:

  1. 协程基础:async/await关键字的使用
  2. 事件循环:asyncio的核心调度机制
  3. 并发执行:asyncio.create_task和asyncio.gather
  4. 异步HTTP:aiohttp的使用和最佳实践
  5. 避坑指南:5个新手最容易犯的错误
  6. 实战应用:高并发爬虫的完整实现

异步编程的关键是理解事件循环的工作原理避免同步阻塞操作。一旦掌握这个思维模式,你会发现很多性能问题都能迎刃而解。

建议从简单的HTTP请求开始练手,逐步尝试数据库操作、文件IO等场景。等你熟练之后,可以探索更高级的异步框架如FastAPI,它就是基于asyncio构建的异步Web框架。

相关资源推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注