文章目录

Python concurrent.futures.as_completed按完成顺序获取结果

发布于 2026-04-24 03:28:59 · 浏览 9 次 · 评论 0 条

Python concurrent.futures.as_completed按完成顺序获取结果

在并发编程中,处理批量任务(如网络请求、文件读写或复杂计算)时,通常会遇到两个核心需求:一是加速执行(利用多线程或多进程并行),二是实时处理结果(谁先跑完先处理谁)。concurrent.futures.as_completed 正是为此而生。它能让你在任务完成的第一时间获取结果,而不必像 map 方法那样必须等待所有任务按提交顺序依次返回。

下面通过具体步骤演示如何使用 as_completed 优化代码效率。


1. 理解执行顺序的差异

在编写代码前,必须理清 Executor.mapas_completed 的本质区别。

假设提交了 3 个任务,耗时分别为:任务 A(5秒)、任务 B(1秒)、任务 C(2秒)。

方法 结果返回顺序 特点
executor.map A -> B -> C 按提交顺序返回。必须等 A 结束后才能拿 B 的结果,总耗时受最慢任务限制。
as_completed B -> C -> A 按完成顺序返回。B 结束立即拿到结果,无需等待 A。

使用 as_completed 可以极大缩短获取第一批结果的等待时间。从数学角度看,假设各任务独立且并行执行,获取所有结果的总耗时 $T_{total}$ 近似于:

$$ T_{total} = \max(t_1, t_2, \dots, t_n) $$

其中 $t_i$ 为单个任务的耗时。这意味着程序的整体运行速度仅取决于最慢的那个任务。


2. 编写基础并发任务代码

首先构建一个模拟耗时的任务函数,并展示如何提交任务。

  1. 打开 Python 编辑器或 IDE。
  2. 导入 必要的 timeconcurrent.futures 模块。
  3. 定义 一个模拟任务函数 worker,该函数接收一个整数 seconds,休眠对应时间后返回结果。
import time
import concurrent.futures

def worker(seconds):
    print(f"开始任务:耗时 {seconds} 秒")
    time.sleep(seconds)
    return f"任务完成:耗时 {seconds} 秒"

3. 使用 as_completed 按完成顺序获取结果

本步骤将提交一组乱序耗时的任务,并观察 as_completed 的输出顺序。

  1. 创建 一个任务列表 tasks,设置时长为 [3, 1, 2, 4, 0] 秒。
  2. 实例化 ThreadPoolExecutor(CPU密集型任务请使用 ProcessPoolExecutor)。
  3. 提交 任务:使用列表推导式将所有任务提交给执行器,获得 future 对象列表。
  4. 遍历 concurrent.futures.as_completed(futures):循环中直接处理结果。
def run_as_completed_demo():
    # 定义不同耗时的任务
    task_durations = [3, 1, 2, 4, 0]
    futures = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # 1. 提交所有任务
        for duration in task_durations:
            future = executor.submit(worker, duration)
            futures.append(future)

        print("--- 开始按完成顺序获取结果 ---")

        # 2. 使用 as_completed 迭代
        for future in concurrent.futures.as_completed(futures):
            # 3. 获取结果
            result = future.result()
            print(result)

if __name__ == "__main__":
    run_as_completed_demo()

观察 输出顺序,你会发现结果是按 01234 的完成时间排序输出的,而不是提交时的 31240


4. 处理任务中的异常

在并发环境中,某个任务失败不应中断整个程序的运行。as_completed 允许你针对每个独立的 future 捕获异常。

  1. 修改 worker 函数,使其在特定条件下(如时长为 2 秒时)抛出异常。
  2. 使用 try...except 块包裹 future.result() 调用。
  3. 捕获 Exception(或具体的异常类型)并打印错误信息。
def worker_with_error(seconds):
    time.sleep(seconds)
    if seconds == 2:
        raise ValueError("故意模拟的错误:耗时不能为2秒")
    return f"任务成功:耗时 {seconds} 秒"

def run_with_error_handling():
    durations = [3, 2, 1]
    futures = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        for d in durations:
            futures.append(executor.submit(worker_with_error, d))

        for future in concurrent.futures.as_completed(futures):
            try:
                print(future.result())
            except Exception as e:
                print(f"捕获到异常: {e}")

这样,即使耗时 2 秒的任务报错,耗时 1 秒和 3 秒的任务依然能正常返回结果。


5. 建立任务参数与结果的映射

在实际业务中,提交任务时通常携带参数(如 URL、文件名)。当 as_completed 返回 future 对象时,你往往需要知道这个结果对应的是哪个参数。

虽然 future 对象本身不直接存储传入的参数,但可以通过字典映射来解决。

  1. 准备 一个包含业务数据的列表 data_list(例如 URL 列表)。
  2. 创建 一个字典 future_to_data
  3. 在提交任务的同时,将 future 对象作为键,对应的业务数据作为值存入字典。
  4. 在迭代时,利用 future 键从字典中取出原始数据。
import random

def fetch_data(url):
    time.sleep(random.randint(1, 3))
    if "bad" in url:
        raise ConnectionError("无法连接")
    return f"{url} 的数据内容"

def run_with_mapping():
    urls = [
        "http://api.example.com/users",
        "http://api.example.com/bad_url",  # 这个会失败
        "http://api.example.com/posts"
    ]

    future_to_url = {}

    with concurrent.futures.ThreadPoolExecutor() as executor:
        # 提交任务并建立映射
        for url in urls:
            future = executor.submit(fetch_data, url)
            future_to_url[future] = url

        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]  # 取出原始 URL
            try:
                data = future.result()
                print(f"成功: {url} -> {data}")
            except Exception as exc:
                print(f"失败: {url} -> 原因: {exc}")

if __name__ == "__main__":
    run_with_mapping()

通过这种方式,无论任务以何种顺序完成,你都能精准地将结果或异常追溯到最原始的请求参数。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文