文章目录

Python协程Asyncio中Task取消信号的传递与处理

发布于 2026-05-13 09:19:37 · 浏览 14 次 · 评论 0 条

Python协程Asyncio中Task取消信号的传递与处理

在Asyncio编程中,Task(任务)是协程的载体。当需要停止一个正在运行的协程时,就需要用到Task的取消机制。理解并正确处理Task的取消信号,是编写健壮异步程序的关键。


1. 理解Task取消

Task取消不是强制终止,而是向协程发送一个“请求取消”的信号。被取消的协程会接收到一个 asyncio.CancelledError 异常。协程可以选择捕获这个异常,进行清理工作,然后正常退出。

  • 取消信号来源: 主程序、其他Task、超时机制(如 asyncio.wait_for)等。
  • 取消后的状态: Task的状态会变为 CANCELLED。如果协程没有处理这个异常,它会向上传播,最终由事件循环处理。

2. 发送取消信号

使用 task.cancel() 方法可以向指定的Task发送取消信号。

import asyncio

async def my_coroutine():
    print("Coroutine started")
    await asyncio.sleep(5)
    print("Coroutine finished")

async def main():
    # 创建一个Task
    task = asyncio.create_task(my_coroutine())

    # 等待2秒
    await asyncio.sleep(2)

    # 发送取消信号
    task.cancel()

    # 等待Task完成,以观察其状态变化
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

    print(f"Task state: {task.done()}")

asyncio.run(main())

输出:

Coroutine started
Task was cancelled
Task state: True

在这个例子中,my_coroutine 运行了2秒后被取消,它没有处理 CancelledError,所以异常被 main 函数捕获。


3. 处理取消信号

为了优雅地处理取消,协程应该捕获 asyncio.CancelledError 异常。

import asyncio

async def my_coroutine():
    try:
        print("Coroutine started")
        await asyncio.sleep(5)
        print("Coroutine finished")
    except asyncio.CancelledError:
        print("Coroutine cancelled, cleaning up...")
        # 在这里进行资源清理
        raise  # 可选:重新抛出异常或直接结束

async def main():
    task = asyncio.create_task(my_coroutine())

    await asyncio.sleep(2)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Main caught the cancellation")

    print(f"Task state: {task.done()}")

asyncio.run(main())

输出:

Coroutine started
Coroutine cancelled, cleaning up...
Main caught the cancellation
Task state: True

my_coroutine 捕获了 CancelledError,打印了一条清理消息,然后重新抛出异常,最终被 main 函数捕获。


4. 实战案例:带取消的长时间运行任务

假设有一个下载文件的协程,我们需要在下载过程中能够取消它。

import asyncio

async def download_file(url):
    print(f"Starting download from {url}")
    for i in range(1, 6):
        if asyncio.current_task().cancelled():
            print("Download cancelled during progress check")
            return
        print(f"Downloading... {i * 20}%")
        await asyncio.sleep(1)
    print("Download completed")

async def main():
    download_task = asyncio.create_task(download_file("https://example.com/file.zip"))

    # 等待3秒后取消下载
    await asyncio.sleep(3)
    download_task.cancel()

    try:
        await download_task
    except asyncio.CancelledError:
        print("Download was cancelled by user")

    print("Main function finished")

asyncio.run(main())

输出:

Starting download from https://example.com/file.zip
Downloading... 20%
Downloading... 40%
Downloading... 60%
Download cancelled during progress check
Download was cancelled by user
Main function finished

在这个例子中,我们在下载过程中定期检查Task是否被取消,一旦发现被取消,就立即退出协程。


5. 高级技巧与注意事项

5.1 取消未启动的Task

如果Task还没有被调度到事件循环中(即未启动),调用 cancel() 方法是无效的。

import asyncio

async def my_coroutine():
    await asyncio.sleep(1)
    print("Coroutine finished")

async def main():
    # 创建但未启动Task
    task = asyncio.Task(my_coroutine())

    # 取消未启动的Task
    task.cancel()

    # 启动Task
    asyncio.create_task(task)

    await asyncio.sleep(2)
    print(f"Task state: {task.done()}")

asyncio.run(main())

输出:

Task state: True

5.2 取消正在等待的Task

如果一个Task正在 await 另一个Future或Task,取消它会导致它立即从 await 状态中恢复,并抛出 CancelledError

import asyncio

async def waiting_coroutine():
    print("Waiting for another task...")
    await asyncio.sleep(5)  # 模拟等待
    print("Wait finished")

async def main():
    task = asyncio.create_task(waiting_coroutine())

    await asyncio.sleep(2)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Waiting task was cancelled")

    print("Main finished")

asyncio.run(main())

输出:

Waiting for another task...
Waiting task was cancelled
Main finished

5.3 资源清理

始终在 finally 块中进行资源清理,确保无论是否发生取消,资源都能被正确释放。

import asyncio

async def resource_intensive_coroutine():
    resource = None
    try:
        print("Acquiring resource...")
        resource = "some_resource"  # 模拟获取资源
        await asyncio.sleep(5)
    finally:
        if resource:
            print("Releasing resource...")
            resource = None  # 模拟释放资源

async def main():
    task = asyncio.create_task(resource_intensive_coroutine())

    await asyncio.sleep(2)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task cancelled")

    print("Main finished")

asyncio.run(main())

输出:

Acquiring resource...
Releasing resource...
Task cancelled
Main finished

5.4 避免取消风暴

如果多个Task同时取消,可能会导致连锁反应。应确保每个Task都能独立处理自己的取消逻辑,避免一个Task的取消导致其他Task异常。


通过以上步骤,你可以掌握在Python Asyncio中如何发送和处理Task的取消信号,编写出更加健壮和可维护的异步程序。

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文