文章目录

Python 协程Send与Throw方法的高级用法

发布于 2026-04-13 16:27:09 · 浏览 19 次 · 评论 0 条

Python 协程 Send 与 Throw 方法的高级用法

1. 理解协程基础

创建协程使用 async def 语法,调用协程会返回一个协程对象,需要通过 await 或其他方式来执行。

import asyncio

async def simple_coroutine():
    print("协程开始")
    await asyncio.sleep(1)
    print("协程结束")

# 创建协程对象
coro = simple_coroutine()
# 执行协程
asyncio.run(coro)

2. Send方法基础

调用 send() 方法可以向协程发送数据,使其从 yield 表达式中恢复执行。

async def data_consumer():
    print("准备接收数据")
    while True:
        data = yield
        print(f"接收到数据: {data}")

# 创建协程
consumer = data_consumer()
# 启动协程
next(consumer)  # 首次执行到第一个yield
# 发送数据
consumer.send("hello")
consumer.send("world")

3. Send方法高级用法

3.1 数据双向传递

实现协程间的数据双向传递,让协程不仅接收数据,还能返回处理结果。

async def processor():
    while True:
        # 接收数据
        data = yield
        # 处理数据
        processed = data.upper()
        # 返回处理结果
        yield processed

def run_processor():
    proc = processor()
    next(proc)  # 启动协程

    # 发送数据并获取结果
    result = proc.send("hello")
    print(result)  # 输出: HELLO

    result = proc.send("python")
    print(result)  # 输出: PYTHON

3.2 协程管道

构建协程管道,让多个协程串联处理数据。

async def upper_case():
    while True:
        data = yield
        yield data.upper()

async def add_exclamation():
    while True:
        data = yield
        yield f"{data}!"

async def printer():
    while True:
        data = yield
        print(f"最终结果: {data}")

def setup_pipeline():
    # 创建协程
    upper = upper_case()
    excl = add_exclamation()
    prt = printer()

    # 启动所有协程
    next(upper)
    next(excl)
    next(prt)

    # 连接协程
    while True:
        # 发送数据到第一个协程
        input_data = yield
        if input_data is None:
            break

        # 通过管道传递数据
        upper_to_excl = upper.send(input_data)
        excl_to_prt = excl.send(upper_to_excl)
        prt.send(excl_to_prt)

# 使用管道
pipeline = setup_pipeline()
next(pipeline)  # 启动管道

# 发送数据
pipeline.send("hello")
pipeline.send("python")

# 关闭管道
pipeline.send(None)

4. Throw方法基础

使用 throw() 方法可以在协程中引发异常,使协程处理异常后继续执行或结束。

async def error_handler():
    try:
        while True:
            data = yield
            print(f"处理数据: {data}")
    except ValueError as e:
        print(f"捕获到ValueError: {e}")

handler = error_handler()
next(handler)  # 启动协程

# 正常发送数据
handler.send("正常数据")

# 发送数据并引发异常
handler.throw(ValueError, "这是一个错误")

5. Throw方法高级用法

5.1 自定义异常处理

设计复杂的异常处理逻辑,让协程能够区分不同类型的异常并作出相应反应。

async def advanced_error_handler():
    try:
        while True:
            data = yield
            try:
                # 模拟可能出错的操作
                if isinstance(data, str):
                    raise ValueError("字符串数据无效")
                if isinstance(data, int) and data < 0:
                    raise TypeError("负数不被支持")
                print(f"成功处理: {data}")
            except ValueError as ve:
                yield f"值错误处理: {ve}"
            except TypeError as te:
                yield f"类型错误处理: {te}"
    except Exception as e:
        yield f"未捕获的异常: {e}"

handler = advanced_error_handler()
next(handler)  # 启动协程

# 发送不同类型数据测试
print(handler.send("test"))  # 会引发ValueError
print(handler.send(-1))     # 会引发TypeError
print(handler.send([1,2,3])) # 正常处理

5.2 协程状态转换

利用 throw() 实现协程状态转换,例如从正常处理模式切换到错误处理模式。

async def stateful_coroutine():
    state = "normal"
    try:
        while True:
            data = yield state
            if data == "error":
                state = "error"
                raise Exception("进入错误状态")
            elif data == "normal":
                state = "normal"
            else:
                print(f"处理数据: {data}")
    except:
        state = "error"
        while True:
            data = yield state
            if data == "revert":
                state = "normal"
                continue
            print(f"错误处理: {data}")

coro = stateful_coroutine()
next(coro)  # 启动协程

# 正常模式
print(coro.send("正常数据1"))  # 输出: normal
print(coro.send("正常数据2"))  # 输出: normal

# 切换到错误模式
print(coro.send("error"))      # 输出: error

# 错误模式
print(coro.send("错误数据1"))   # 输出: error
print(coro.send("错误数据2"))   # 输出: error

# 恢复正常模式
print(coro.send("revert"))     # 输出: normal
print(coro.send("恢复正常数据")) # 输出: normal

6. Send与Throw的组合使用

组合使用 send()throw() 方法实现复杂的数据流控制和错误处理。

async def data_processor():
    try:
        while True:
            # 接收数据
            command = yield

            # 根据命令执行不同操作
            if command == "process":
                data = yield
                try:
                    # 模拟数据处理
                    result = data * 2
                    yield {"status": "success", "result": result}
                except Exception as e:
                    yield {"status": "error", "message": str(e)}

            elif command == "error":
                raise ValueError("手动触发错误")

            elif command == "exit":
                return "协程已退出"

    except GeneratorExit:
        yield "协程被关闭"
    except Exception as e:
        yield f"处理异常: {e}"

processor = data_processor()
next(processor)  # 启动协程

# 发送处理命令
processor.send("process")
print(processor.send(5))  # 输出: {'status': 'success', 'result': 10}

# 发送错误命令
processor.send("error")
# 下次调用将捕获异常
print(next(processor))  # 输出: 处理异常: 手动触发错误

# 重新启动协程
processor = data_processor()
next(processor)

# 发送退出命令
processor.send("exit")
print(next(processor))  # 输出: 协程已退出

7. 实际应用场景

7.1 异步数据处理管道

实现一个异步数据处理管道,能够处理大量数据并优雅地处理错误。

import asyncio
import random

async def data_source():
    """模拟数据源,生成随机数据"""
    for _ in range(10):
        await asyncio.sleep(0.5)
        data = random.randint(1, 100)
        print(f"生成数据: {data}")
        yield data

async def validator():
    """数据验证器"""
    try:
        while True:
            data = yield
            if data < 0 or data > 100:
                raise ValueError(f"数据 {data} 超出范围")
            yield f"验证通过: {data}"
    except ValueError as e:
        yield f"验证失败: {e}"

async def transformer():
    """数据转换器"""
    while True:
        data = yield
        # 模拟偶尔失败的操作
        if random.random() < 0.2:  # 20%概率失败
            raise RuntimeError("转换操作失败")
        transformed = data * 2
        yield f"转换后: {transformed}"

async def error_handler():
    """错误处理器"""
    while True:
        try:
            data = yield
            print(f"处理成功: {data}")
        except Exception as e:
            print(f"处理失败: {e}")

async def setup_pipeline():
    """设置协程管道"""
    source = data_source()
    validator_coro = validator()
    transformer_coro = transformer()
    error_handler_coro = error_handler()

    # 启动所有协程
    next(source)
    next(validator_coro)
    next(transformer_coro)
    next(error_handler_coro)

    # 设置管道连接
    while True:
        try:
            # 从数据源获取数据
            data = next(source)

            # 通过验证器
            validator_result = validator_coro.send(data)

            # 如果验证通过,继续处理
            if "验证通过" in validator_result:
                # 尝试转换数据
                try:
                    transformer_result = transformer_coro.send(int(validator_result.split(": ")[1]))
                    error_handler_coro.send(transformer_result)
                except RuntimeError as e:
                    error_handler_coro.throw(e)
            else:
                # 验证失败,直接发送到错误处理器
                error_handler_coro.send(validator_result)

        except StopIteration:
            print("数据源已耗尽")
            break
        except Exception as e:
            error_handler_coro.throw(e)

# 运行管道
asyncio.run(setup_pipeline())

7.2 状态机实现

使用 send()throw() 实现一个复杂的状态机,能够响应外部事件和异常。

async def state_machine():
    """实现一个简单的状态机"""
    state = "idle"
    transition = None

    try:
        while True:
            # 接收状态转换命令
            command = yield state

            # 根据当前状态和命令处理转换
            if state == "idle" and command == "start":
                state = "running"
                transition = f"从idle转换到running"
            elif state == "running" and command == "pause":
                state = "paused"
                transition = f"从running转换到paused"
            elif state == "paused" and command == "resume":
                state = "running"
                transition = f"从paused转换到running"
            elif state == "running" and command == "stop":
                state = "idle"
                transition = f"从running转换到idle"
            elif command == "error":
                raise Exception("手动触发错误状态")
            elif command == "invalid":
                raise ValueError("无效的状态转换")
            else:
                transition = "未识别的命令"

            # 返回转换信息
            yield transition

    except ValueError as ve:
        state = "error"
        yield f"错误: {ve}"
    except Exception as e:
        state = "critical_error"
        yield f"严重错误: {e}"
    finally:
        yield f"状态机结束,最终状态: {state}"

# 使用状态机
def run_state_machine():
    sm = state_machine()
    next(sm)  # 启动状态机

    # 正常状态转换
    print(sm.send("start"))     # 输出: running
    print(sm.send("pause"))     # 输出: 从running转换到paused
    print(sm.send("resume"))    # 输出: 从paused转换到running
    print(sm.send("stop"))      # 输出: 从running转换到idle

    # 无效转换
    print(sm.send("invalid"))   # 输出: 错误: 无效的状态转换

    # 手动触发错误
    try:
        print(sm.send("error"))   # 输出: 严重错误: 手动触发错误状态
    except Exception as e:
        print(f"捕获到异常: {e}")

    # 最终状态
    try:
        print(next(sm))          # 输出: 状态机结束,最终状态: critical_error
    except StopIteration:
        print("状态机已结束")

run_state_machine()

8. 最佳实践与注意事项

8.1 性能优化

避免在协程中进行不必要的 send() 调用,特别是在高频率场景下。

# 不推荐的做法 - 过于频繁的send调用
async def inefficient_coroutine():
    while True:
        data = yield
        # 处理数据
        processed = data * 2
        yield processed

# 推荐的做法 - 批量处理数据
async def efficient_coroutine():
    batch = []
    while True:
        command = yield
        if command == "process":
            # 批量处理数据
            results = [item * 2 for item in batch]
            batch = []
            yield results
        else:
            # 收集数据
            batch.append(command)

8.2 错误处理策略

设计健壮的错误处理机制,确保协程在各种异常情况下都能正确恢复或优雅退出。

async def robust_coroutine():
    error_count = 0
    max_errors = 3

    try:
        while True:
            try:
                data = yield
                if data == "error":
                    error_count += 1
                    if error_count > max_errors:
                        raise Exception("错误次数超过限制")
                    raise ValueError("数据处理错误")

                # 正常处理数据
                result = data * 2
                yield f"处理结果: {result}"

            except ValueError as ve:
                yield f"处理错误: {ve}"
                # 可以添加恢复逻辑

    except Exception as e:
        yield f"严重错误: {e},协程将退出"
    finally:
        yield "协程清理完成"

def run_robust_coroutine():
    coro = robust_coroutine()
    next(coro)

    # 正常数据
    print(coro.send(5))       # 输出: 处理结果: 10

    # 错误数据(可恢复)
    print(coro.send("error")) # 输出: 处理错误: 数据处理错误
    print(coro.send("error")) # 输出: 处理错误: 数据处理错误
    print(coro.send("error")) # 输出: 处理错误: 数据处理错误

    # 严重错误(不可恢复)
    print(coro.send("error")) # 输出: 严重错误: 错误次数超过限制,协程将退出
    print(next(coro))         # 输出: 协程清理完成

run_robust_coroutine()

8.3 协程生命周期管理

实现完善的协程生命周期管理,包括启动、运行、暂停、恢复和终止。


class CoroutineManager:
    def __init__(self):
        self.coroutine = None
        self.status = "stopped"

    def start(self, coro_func, *args, **kwargs):
        """启动协程"""
        if self.status == "running":
            raise RuntimeError("协程已在运行")

        self.coroutine = coro_func(*args, **kwargs)
        next(self.coroutine)
        self.status = "running"
        return self

    def send(self, value):
        """发送数据到协程"""
        if self.status != "running":
            raise RuntimeError("协程未运行")

        try:
            result = self.coroutine.send(value)
            return result
        except StopIteration:
            self.status = "stopped"
            raise

    def throw(self, exc_type, exc_value=None, exc_traceback=None):
        """向协程抛出异常"""
        if self.status != "running":
            raise RuntimeError("协程未运行")

        try:
            result = self.coroutine.throw(exc_type, exc_value, exc_traceback)
            return result
        except StopIteration:
            self.status = "stopped"
            raise

    def pause(self):
        """暂停协程"""
        if self.status != "running":
            raise RuntimeError("协程未运行")

        self.status = "paused"

    def resume(self):
        """恢复协程"""
        if self.status != "paused":
            raise RuntimeError("协程未暂停")

        self.status = "running"

# 使用协程管理器
async def sample_coroutine():
    try:
        while True:
            data = yield
            if data == "

评论 (0)

暂无评论,快来抢沙发吧!

扫一扫,手机查看

扫描上方二维码,在手机上查看本文