文章目录

Python 线程池:concurrent.futures.ThreadPoolExecutor

发布于 2026-04-17 04:27:54 · 浏览 23 次 · 评论 0 条

Python 线程池:concurrent.futures.ThreadPoolExecutor

在处理 I/O 密集型任务(如网络请求、文件读写)时,为了提高程序运行效率,我们需要使用多线程。Python 标准库中的 concurrent.futures 模块提供了一个高级接口 ThreadPoolExecutor(线程池执行器),它比底层的 threading 模块更易用,能自动管理线程的生命周期和任务分配。


1. 准备工作与基础概念

ThreadPoolExecutor 的核心思想是:提交任务到线程池,线程池自动分配空闲线程去执行任务,执行完毕后返回结果。开发者无需手动创建和销毁线程,避免了资源耗尽的风险。

在使用之前,确保你的 Python 环境已安装(Python 3.2+ 版本已内置该模块)。打开你的代码编辑器或 IDE,新建一个 Python 文件,例如 thread_pool_demo.py


2. 基础用法:submit 与 result

这是最常用的一种方式,适用于需要异步执行单个或多个任务,并在后续获取结果的情况。

2.1 定义任务函数

首先,我们需要定义一个耗时的任务函数,这里模拟一个网络请求或文件读取操作。

输入以下代码:

import time
from concurrent.futures import ThreadPoolExecutor

def worker(task_id, delay):
    """模拟一个耗时任务"""
    print(f"任务 {task_id} 开始执行,需要耗时 {delay} 秒...")
    time.sleep(delay)
    result = f"任务 {task_id} 的结果"
    print(f"任务 {task_id} 执行完毕")
    return result

2.2 创建线程池并提交任务

使用 with 语句可以确保线程池在任务完成后自动关闭,释放资源。

输入以下代码:

def main():
    # 1. 创建线程池,最大工作线程数为 3
    with ThreadPoolExecutor(max_workers=3) as executor:

        # 2. 提交任务
        # submit 方法会立即返回一个 Future 对象
        future1 = executor.submit(worker, "A", 2)
        future2 = executor.submit(worker, "B", 1)
        future3 = executor.submit(worker, "C", 3)

        # 3. 获取结果
        # result() 方法会阻塞,直到任务完成并返回结果
        print(f"获取到结果: {future1.result()}")
        print(f"获取到结果: {future2.result()}")
        print(f"获取到结果: {future3.result()}")

if __name__ == "__main__":
    main()

运行脚本。你会发现即使任务 B 只耗时 1 秒,由于我们调用 result() 的顺序是 1、2、3,主线程会等待任务 A 完成后才会去处理任务 B 的结果。若想根据任务完成顺序获取结果,请看第 4 节。


3. 批量用法:map 方法

如果你有一批数据需要用同一个函数处理,map 方法是最佳选择,它的用法类似于 Python 内置的 map() 函数。

修改 main 函数如下:

def main():
    data = [
        ("任务1", 1),
        ("任务2", 2),
        ("任务3", 1),
        ("任务4", 3)
    ]

    with ThreadPoolExecutor(max_workers=3) as executor:
        # 1. 使用 map 方法分发任务
        # executor.map 会自动解包 data 中的元组作为参数传给 worker
        results = executor.map(worker, *zip(*data))

        # 2. 迭代获取结果
        # 注意:map 返回的结果顺序与输入数据的顺序一致
        for res in results:
            print(f"最终输出: {res}")

运行代码。注意,尽管任务可能先后完成,但 map 方法保证输出的顺序与输入列表 data 的顺序严格一致。这是 mapsubmit 的一大区别。


4. 进阶技巧:as_completed 与异常处理

在实际场景中,我们往往希望哪个任务先完成就先处理哪个,而不是按顺序傻等。此外,任务可能会抛出异常,需要妥善处理。

4.1 按完成顺序获取结果 (as_completed)

导入 as_completed

from concurrent.futures import ThreadPoolExecutor, as_completed

编写如下逻辑:

def main_as_completed():
    tasks = [1, 2, 3, 4]
    with ThreadPoolExecutor(max_workers=3) as executor:
        # 创建一个字典来保存 future 和任务ID的映射,方便追踪
        future_to_task = {executor.submit(worker, task_id, task_id): task_id for task_id in tasks}

        # as_completed 返回一个迭代器,当 Future 完成时 yield 该 Future
        for future in as_completed(future_to_task):
            task_id = future_to_task[future]
            try:
                # 获取结果
                data = future.result()
                print(f"{task_id} -> 结果: {data}")
            except Exception as exc:
                print(f"{task_id} -> 抛出异常: {exc}")

执行上述代码。你会看到输出顺序取决于任务执行速度(即 delay 的大小),而非提交顺序。

4.2 异常处理演示

修改 worker 函数,让它随机抛出异常:

import random

def worker_with_error(task_id, delay):
    print(f"任务 {task_id} 开始")
    time.sleep(delay)
    if random.random() < 0.5:  # 50% 概率出错
        raise ValueError(f"任务 {task_id} 模拟出错")
    return f"任务 {task_id} 成功"

结合 future.result() 使用。当任务内部抛出异常,future.result() 会重新抛出该异常。因此,在代码中使用 try...except 块包裹 future.result() 是捕获并处理线程内异常的标准方式。


5. 核心参数对比与选择

为了方便在实际开发中选择合适的方法,以下是 submitmapas_completed 的对比。

方法 适用场景 顺序性 返回值处理
submit 单个任务提交,或需要精细控制每个任务 无序(取决于执行速度) 返回 Future 对象,需手动调用 .result()
map 批量处理相同逻辑的任务,且需要保持输入输出顺序 严格有序(按输入顺序输出) 返回结果的迭代器,直接遍历即可
as_completed 批量处理,且需要“谁跑得快先处理谁” 按完成顺序 返回完成任务的 Future 迭代器,需调用 .result()

6. 实战示例:模拟网页爬虫

假设我们需要抓取多个网页的标题,每个请求耗时不同。

编写完整代码如下:

import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed

def mock_crawl(url):
    """模拟爬取网页"""
    delay = random.uniform(0.5, 3.0)  # 随机延时 0.5 到 3 秒
    time.sleep(delay)
    # 模拟偶尔的请求失败
    if "error" in url:
        raise ConnectionError(f"无法连接到 {url}")
    return f"Title of {url}"

def main_spider():
    urls = [
        "http://example.com/page1",
        "http://example.com/page2",
        "http://error.com/page3",  # 这个会报错
        "http://example.com/page4",
    ]

    print("开始爬取...")

    # 设置最大并发数为 2,防止给服务器过大压力
    with ThreadPoolExecutor(max_workers=2) as executor:
        future_to_url = {executor.submit(mock_crawl, url): url for url in urls}

        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                title = future.result()
                print(f"成功抓取 [{url}]: {title}")
            except ConnectionError as e:
                print(f"抓取失败 [{url}]: {e}")
            except Exception as e:
                print(f"未知错误 [{url}]: {e}")

    print("所有任务结束")

if __name__ == "__main__":
    main_spider()

执行该脚本。通过控制台输出,你可以清晰地看到任务是如何并发执行的,以及 max_workers=2 是如何限制同时活跃的线程数量的。即使 page3 失败了,程序也不会崩溃,而是打印出错误信息并继续执行其他任务。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文