Python 线程池:concurrent.futures.ThreadPoolExecutor
在处理 I/O 密集型任务(如网络请求、文件读写)时,为了提高程序运行效率,我们需要使用多线程。Python 标准库中的 concurrent.futures 模块提供了一个高级接口 ThreadPoolExecutor(线程池执行器),它比底层的 threading 模块更易用,能自动管理线程的生命周期和任务分配。
1. 准备工作与基础概念
ThreadPoolExecutor 的核心思想是:提交任务到线程池,线程池自动分配空闲线程去执行任务,执行完毕后返回结果。开发者无需手动创建和销毁线程,避免了资源耗尽的风险。
在使用之前,确保你的 Python 环境已安装(Python 3.2+ 版本已内置该模块)。打开你的代码编辑器或 IDE,新建一个 Python 文件,例如 thread_pool_demo.py。
2. 基础用法:submit 与 result
这是最常用的一种方式,适用于需要异步执行单个或多个任务,并在后续获取结果的情况。
2.1 定义任务函数
首先,我们需要定义一个耗时的任务函数,这里模拟一个网络请求或文件读取操作。
输入以下代码:
import time
from concurrent.futures import ThreadPoolExecutor
def worker(task_id, delay):
"""模拟一个耗时任务"""
print(f"任务 {task_id} 开始执行,需要耗时 {delay} 秒...")
time.sleep(delay)
result = f"任务 {task_id} 的结果"
print(f"任务 {task_id} 执行完毕")
return result
2.2 创建线程池并提交任务
使用 with 语句可以确保线程池在任务完成后自动关闭,释放资源。
输入以下代码:
def main():
# 1. 创建线程池,最大工作线程数为 3
with ThreadPoolExecutor(max_workers=3) as executor:
# 2. 提交任务
# submit 方法会立即返回一个 Future 对象
future1 = executor.submit(worker, "A", 2)
future2 = executor.submit(worker, "B", 1)
future3 = executor.submit(worker, "C", 3)
# 3. 获取结果
# result() 方法会阻塞,直到任务完成并返回结果
print(f"获取到结果: {future1.result()}")
print(f"获取到结果: {future2.result()}")
print(f"获取到结果: {future3.result()}")
if __name__ == "__main__":
main()
运行脚本。你会发现即使任务 B 只耗时 1 秒,由于我们调用 result() 的顺序是 1、2、3,主线程会等待任务 A 完成后才会去处理任务 B 的结果。若想根据任务完成顺序获取结果,请看第 4 节。
3. 批量用法:map 方法
如果你有一批数据需要用同一个函数处理,map 方法是最佳选择,它的用法类似于 Python 内置的 map() 函数。
修改 main 函数如下:
def main():
data = [
("任务1", 1),
("任务2", 2),
("任务3", 1),
("任务4", 3)
]
with ThreadPoolExecutor(max_workers=3) as executor:
# 1. 使用 map 方法分发任务
# executor.map 会自动解包 data 中的元组作为参数传给 worker
results = executor.map(worker, *zip(*data))
# 2. 迭代获取结果
# 注意:map 返回的结果顺序与输入数据的顺序一致
for res in results:
print(f"最终输出: {res}")
运行代码。注意,尽管任务可能先后完成,但 map 方法保证输出的顺序与输入列表 data 的顺序严格一致。这是 map 与 submit 的一大区别。
4. 进阶技巧:as_completed 与异常处理
在实际场景中,我们往往希望哪个任务先完成就先处理哪个,而不是按顺序傻等。此外,任务可能会抛出异常,需要妥善处理。
4.1 按完成顺序获取结果 (as_completed)
导入 as_completed:
from concurrent.futures import ThreadPoolExecutor, as_completed
编写如下逻辑:
def main_as_completed():
tasks = [1, 2, 3, 4]
with ThreadPoolExecutor(max_workers=3) as executor:
# 创建一个字典来保存 future 和任务ID的映射,方便追踪
future_to_task = {executor.submit(worker, task_id, task_id): task_id for task_id in tasks}
# as_completed 返回一个迭代器,当 Future 完成时 yield 该 Future
for future in as_completed(future_to_task):
task_id = future_to_task[future]
try:
# 获取结果
data = future.result()
print(f"{task_id} -> 结果: {data}")
except Exception as exc:
print(f"{task_id} -> 抛出异常: {exc}")
执行上述代码。你会看到输出顺序取决于任务执行速度(即 delay 的大小),而非提交顺序。
4.2 异常处理演示
修改 worker 函数,让它随机抛出异常:
import random
def worker_with_error(task_id, delay):
print(f"任务 {task_id} 开始")
time.sleep(delay)
if random.random() < 0.5: # 50% 概率出错
raise ValueError(f"任务 {task_id} 模拟出错")
return f"任务 {task_id} 成功"
结合 future.result() 使用。当任务内部抛出异常,future.result() 会重新抛出该异常。因此,在代码中使用 try...except 块包裹 future.result() 是捕获并处理线程内异常的标准方式。
5. 核心参数对比与选择
为了方便在实际开发中选择合适的方法,以下是 submit、map 和 as_completed 的对比。
| 方法 | 适用场景 | 顺序性 | 返回值处理 |
|---|---|---|---|
submit |
单个任务提交,或需要精细控制每个任务 | 无序(取决于执行速度) | 返回 Future 对象,需手动调用 .result() |
map |
批量处理相同逻辑的任务,且需要保持输入输出顺序 | 严格有序(按输入顺序输出) | 返回结果的迭代器,直接遍历即可 |
as_completed |
批量处理,且需要“谁跑得快先处理谁” | 按完成顺序 | 返回完成任务的 Future 迭代器,需调用 .result() |
6. 实战示例:模拟网页爬虫
假设我们需要抓取多个网页的标题,每个请求耗时不同。
编写完整代码如下:
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
def mock_crawl(url):
"""模拟爬取网页"""
delay = random.uniform(0.5, 3.0) # 随机延时 0.5 到 3 秒
time.sleep(delay)
# 模拟偶尔的请求失败
if "error" in url:
raise ConnectionError(f"无法连接到 {url}")
return f"Title of {url}"
def main_spider():
urls = [
"http://example.com/page1",
"http://example.com/page2",
"http://error.com/page3", # 这个会报错
"http://example.com/page4",
]
print("开始爬取...")
# 设置最大并发数为 2,防止给服务器过大压力
with ThreadPoolExecutor(max_workers=2) as executor:
future_to_url = {executor.submit(mock_crawl, url): url for url in urls}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
title = future.result()
print(f"成功抓取 [{url}]: {title}")
except ConnectionError as e:
print(f"抓取失败 [{url}]: {e}")
except Exception as e:
print(f"未知错误 [{url}]: {e}")
print("所有任务结束")
if __name__ == "__main__":
main_spider()
执行该脚本。通过控制台输出,你可以清晰地看到任务是如何并发执行的,以及 max_workers=2 是如何限制同时活跃的线程数量的。即使 page3 失败了,程序也不会崩溃,而是打印出错误信息并继续执行其他任务。

暂无评论,快来抢沙发吧!