Python concurrent.futures.as_completed按完成顺序获取结果
在并发编程中,处理批量任务(如网络请求、文件读写或复杂计算)时,通常会遇到两个核心需求:一是加速执行(利用多线程或多进程并行),二是实时处理结果(谁先跑完先处理谁)。concurrent.futures.as_completed 正是为此而生。它能让你在任务完成的第一时间获取结果,而不必像 map 方法那样必须等待所有任务按提交顺序依次返回。
下面通过具体步骤演示如何使用 as_completed 优化代码效率。
1. 理解执行顺序的差异
在编写代码前,必须理清 Executor.map 和 as_completed 的本质区别。
假设提交了 3 个任务,耗时分别为:任务 A(5秒)、任务 B(1秒)、任务 C(2秒)。
| 方法 | 结果返回顺序 | 特点 |
|---|---|---|
executor.map |
A -> B -> C | 按提交顺序返回。必须等 A 结束后才能拿 B 的结果,总耗时受最慢任务限制。 |
as_completed |
B -> C -> A | 按完成顺序返回。B 结束立即拿到结果,无需等待 A。 |
使用 as_completed 可以极大缩短获取第一批结果的等待时间。从数学角度看,假设各任务独立且并行执行,获取所有结果的总耗时 $T_{total}$ 近似于:
$$ T_{total} = \max(t_1, t_2, \dots, t_n) $$
其中 $t_i$ 为单个任务的耗时。这意味着程序的整体运行速度仅取决于最慢的那个任务。
2. 编写基础并发任务代码
首先构建一个模拟耗时的任务函数,并展示如何提交任务。
- 打开 Python 编辑器或 IDE。
- 导入 必要的
time和concurrent.futures模块。 - 定义 一个模拟任务函数
worker,该函数接收一个整数seconds,休眠对应时间后返回结果。
import time
import concurrent.futures
def worker(seconds):
print(f"开始任务:耗时 {seconds} 秒")
time.sleep(seconds)
return f"任务完成:耗时 {seconds} 秒"
3. 使用 as_completed 按完成顺序获取结果
本步骤将提交一组乱序耗时的任务,并观察 as_completed 的输出顺序。
- 创建 一个任务列表
tasks,设置时长为[3, 1, 2, 4, 0]秒。 - 实例化
ThreadPoolExecutor(CPU密集型任务请使用ProcessPoolExecutor)。 - 提交 任务:使用列表推导式将所有任务提交给执行器,获得
future对象列表。 - 遍历
concurrent.futures.as_completed(futures):循环中直接处理结果。
def run_as_completed_demo():
# 定义不同耗时的任务
task_durations = [3, 1, 2, 4, 0]
futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 1. 提交所有任务
for duration in task_durations:
future = executor.submit(worker, duration)
futures.append(future)
print("--- 开始按完成顺序获取结果 ---")
# 2. 使用 as_completed 迭代
for future in concurrent.futures.as_completed(futures):
# 3. 获取结果
result = future.result()
print(result)
if __name__ == "__main__":
run_as_completed_demo()
观察 输出顺序,你会发现结果是按 0、1、2、3、4 的完成时间排序输出的,而不是提交时的 3、1、2、4、0。
4. 处理任务中的异常
在并发环境中,某个任务失败不应中断整个程序的运行。as_completed 允许你针对每个独立的 future 捕获异常。
- 修改
worker函数,使其在特定条件下(如时长为 2 秒时)抛出异常。 - 使用
try...except块包裹future.result()调用。 - 捕获
Exception(或具体的异常类型)并打印错误信息。
def worker_with_error(seconds):
time.sleep(seconds)
if seconds == 2:
raise ValueError("故意模拟的错误:耗时不能为2秒")
return f"任务成功:耗时 {seconds} 秒"
def run_with_error_handling():
durations = [3, 2, 1]
futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for d in durations:
futures.append(executor.submit(worker_with_error, d))
for future in concurrent.futures.as_completed(futures):
try:
print(future.result())
except Exception as e:
print(f"捕获到异常: {e}")
这样,即使耗时 2 秒的任务报错,耗时 1 秒和 3 秒的任务依然能正常返回结果。
5. 建立任务参数与结果的映射
在实际业务中,提交任务时通常携带参数(如 URL、文件名)。当 as_completed 返回 future 对象时,你往往需要知道这个结果对应的是哪个参数。
虽然 future 对象本身不直接存储传入的参数,但可以通过字典映射来解决。
- 准备 一个包含业务数据的列表
data_list(例如 URL 列表)。 - 创建 一个字典
future_to_data。 - 在提交任务的同时,将
future对象作为键,对应的业务数据作为值存入字典。 - 在迭代时,利用
future键从字典中取出原始数据。
import random
def fetch_data(url):
time.sleep(random.randint(1, 3))
if "bad" in url:
raise ConnectionError("无法连接")
return f"{url} 的数据内容"
def run_with_mapping():
urls = [
"http://api.example.com/users",
"http://api.example.com/bad_url", # 这个会失败
"http://api.example.com/posts"
]
future_to_url = {}
with concurrent.futures.ThreadPoolExecutor() as executor:
# 提交任务并建立映射
for url in urls:
future = executor.submit(fetch_data, url)
future_to_url[future] = url
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future] # 取出原始 URL
try:
data = future.result()
print(f"成功: {url} -> {data}")
except Exception as exc:
print(f"失败: {url} -> 原因: {exc}")
if __name__ == "__main__":
run_with_mapping()
通过这种方式,无论任务以何种顺序完成,你都能精准地将结果或异常追溯到最原始的请求参数。

暂无评论,快来抢沙发吧!