Python 协程 Send 与 Throw 方法的高级用法
1. 理解协程基础
创建协程使用 async def 语法,调用协程会返回一个协程对象,需要通过 await 或其他方式来执行。
import asyncio
async def simple_coroutine():
print("协程开始")
await asyncio.sleep(1)
print("协程结束")
# 创建协程对象
coro = simple_coroutine()
# 执行协程
asyncio.run(coro)
2. Send方法基础
调用 send() 方法可以向协程发送数据,使其从 yield 表达式中恢复执行。
async def data_consumer():
print("准备接收数据")
while True:
data = yield
print(f"接收到数据: {data}")
# 创建协程
consumer = data_consumer()
# 启动协程
next(consumer) # 首次执行到第一个yield
# 发送数据
consumer.send("hello")
consumer.send("world")
3. Send方法高级用法
3.1 数据双向传递
实现协程间的数据双向传递,让协程不仅接收数据,还能返回处理结果。
async def processor():
while True:
# 接收数据
data = yield
# 处理数据
processed = data.upper()
# 返回处理结果
yield processed
def run_processor():
proc = processor()
next(proc) # 启动协程
# 发送数据并获取结果
result = proc.send("hello")
print(result) # 输出: HELLO
result = proc.send("python")
print(result) # 输出: PYTHON
3.2 协程管道
构建协程管道,让多个协程串联处理数据。
async def upper_case():
while True:
data = yield
yield data.upper()
async def add_exclamation():
while True:
data = yield
yield f"{data}!"
async def printer():
while True:
data = yield
print(f"最终结果: {data}")
def setup_pipeline():
# 创建协程
upper = upper_case()
excl = add_exclamation()
prt = printer()
# 启动所有协程
next(upper)
next(excl)
next(prt)
# 连接协程
while True:
# 发送数据到第一个协程
input_data = yield
if input_data is None:
break
# 通过管道传递数据
upper_to_excl = upper.send(input_data)
excl_to_prt = excl.send(upper_to_excl)
prt.send(excl_to_prt)
# 使用管道
pipeline = setup_pipeline()
next(pipeline) # 启动管道
# 发送数据
pipeline.send("hello")
pipeline.send("python")
# 关闭管道
pipeline.send(None)
4. Throw方法基础
使用 throw() 方法可以在协程中引发异常,使协程处理异常后继续执行或结束。
async def error_handler():
try:
while True:
data = yield
print(f"处理数据: {data}")
except ValueError as e:
print(f"捕获到ValueError: {e}")
handler = error_handler()
next(handler) # 启动协程
# 正常发送数据
handler.send("正常数据")
# 发送数据并引发异常
handler.throw(ValueError, "这是一个错误")
5. Throw方法高级用法
5.1 自定义异常处理
设计复杂的异常处理逻辑,让协程能够区分不同类型的异常并作出相应反应。
async def advanced_error_handler():
try:
while True:
data = yield
try:
# 模拟可能出错的操作
if isinstance(data, str):
raise ValueError("字符串数据无效")
if isinstance(data, int) and data < 0:
raise TypeError("负数不被支持")
print(f"成功处理: {data}")
except ValueError as ve:
yield f"值错误处理: {ve}"
except TypeError as te:
yield f"类型错误处理: {te}"
except Exception as e:
yield f"未捕获的异常: {e}"
handler = advanced_error_handler()
next(handler) # 启动协程
# 发送不同类型数据测试
print(handler.send("test")) # 会引发ValueError
print(handler.send(-1)) # 会引发TypeError
print(handler.send([1,2,3])) # 正常处理
5.2 协程状态转换
利用 throw() 实现协程状态转换,例如从正常处理模式切换到错误处理模式。
async def stateful_coroutine():
state = "normal"
try:
while True:
data = yield state
if data == "error":
state = "error"
raise Exception("进入错误状态")
elif data == "normal":
state = "normal"
else:
print(f"处理数据: {data}")
except:
state = "error"
while True:
data = yield state
if data == "revert":
state = "normal"
continue
print(f"错误处理: {data}")
coro = stateful_coroutine()
next(coro) # 启动协程
# 正常模式
print(coro.send("正常数据1")) # 输出: normal
print(coro.send("正常数据2")) # 输出: normal
# 切换到错误模式
print(coro.send("error")) # 输出: error
# 错误模式
print(coro.send("错误数据1")) # 输出: error
print(coro.send("错误数据2")) # 输出: error
# 恢复正常模式
print(coro.send("revert")) # 输出: normal
print(coro.send("恢复正常数据")) # 输出: normal
6. Send与Throw的组合使用
组合使用 send() 和 throw() 方法实现复杂的数据流控制和错误处理。
async def data_processor():
try:
while True:
# 接收数据
command = yield
# 根据命令执行不同操作
if command == "process":
data = yield
try:
# 模拟数据处理
result = data * 2
yield {"status": "success", "result": result}
except Exception as e:
yield {"status": "error", "message": str(e)}
elif command == "error":
raise ValueError("手动触发错误")
elif command == "exit":
return "协程已退出"
except GeneratorExit:
yield "协程被关闭"
except Exception as e:
yield f"处理异常: {e}"
processor = data_processor()
next(processor) # 启动协程
# 发送处理命令
processor.send("process")
print(processor.send(5)) # 输出: {'status': 'success', 'result': 10}
# 发送错误命令
processor.send("error")
# 下次调用将捕获异常
print(next(processor)) # 输出: 处理异常: 手动触发错误
# 重新启动协程
processor = data_processor()
next(processor)
# 发送退出命令
processor.send("exit")
print(next(processor)) # 输出: 协程已退出
7. 实际应用场景
7.1 异步数据处理管道
实现一个异步数据处理管道,能够处理大量数据并优雅地处理错误。
import asyncio
import random
async def data_source():
"""模拟数据源,生成随机数据"""
for _ in range(10):
await asyncio.sleep(0.5)
data = random.randint(1, 100)
print(f"生成数据: {data}")
yield data
async def validator():
"""数据验证器"""
try:
while True:
data = yield
if data < 0 or data > 100:
raise ValueError(f"数据 {data} 超出范围")
yield f"验证通过: {data}"
except ValueError as e:
yield f"验证失败: {e}"
async def transformer():
"""数据转换器"""
while True:
data = yield
# 模拟偶尔失败的操作
if random.random() < 0.2: # 20%概率失败
raise RuntimeError("转换操作失败")
transformed = data * 2
yield f"转换后: {transformed}"
async def error_handler():
"""错误处理器"""
while True:
try:
data = yield
print(f"处理成功: {data}")
except Exception as e:
print(f"处理失败: {e}")
async def setup_pipeline():
"""设置协程管道"""
source = data_source()
validator_coro = validator()
transformer_coro = transformer()
error_handler_coro = error_handler()
# 启动所有协程
next(source)
next(validator_coro)
next(transformer_coro)
next(error_handler_coro)
# 设置管道连接
while True:
try:
# 从数据源获取数据
data = next(source)
# 通过验证器
validator_result = validator_coro.send(data)
# 如果验证通过,继续处理
if "验证通过" in validator_result:
# 尝试转换数据
try:
transformer_result = transformer_coro.send(int(validator_result.split(": ")[1]))
error_handler_coro.send(transformer_result)
except RuntimeError as e:
error_handler_coro.throw(e)
else:
# 验证失败,直接发送到错误处理器
error_handler_coro.send(validator_result)
except StopIteration:
print("数据源已耗尽")
break
except Exception as e:
error_handler_coro.throw(e)
# 运行管道
asyncio.run(setup_pipeline())
7.2 状态机实现
使用 send() 和 throw() 实现一个复杂的状态机,能够响应外部事件和异常。
async def state_machine():
"""实现一个简单的状态机"""
state = "idle"
transition = None
try:
while True:
# 接收状态转换命令
command = yield state
# 根据当前状态和命令处理转换
if state == "idle" and command == "start":
state = "running"
transition = f"从idle转换到running"
elif state == "running" and command == "pause":
state = "paused"
transition = f"从running转换到paused"
elif state == "paused" and command == "resume":
state = "running"
transition = f"从paused转换到running"
elif state == "running" and command == "stop":
state = "idle"
transition = f"从running转换到idle"
elif command == "error":
raise Exception("手动触发错误状态")
elif command == "invalid":
raise ValueError("无效的状态转换")
else:
transition = "未识别的命令"
# 返回转换信息
yield transition
except ValueError as ve:
state = "error"
yield f"错误: {ve}"
except Exception as e:
state = "critical_error"
yield f"严重错误: {e}"
finally:
yield f"状态机结束,最终状态: {state}"
# 使用状态机
def run_state_machine():
sm = state_machine()
next(sm) # 启动状态机
# 正常状态转换
print(sm.send("start")) # 输出: running
print(sm.send("pause")) # 输出: 从running转换到paused
print(sm.send("resume")) # 输出: 从paused转换到running
print(sm.send("stop")) # 输出: 从running转换到idle
# 无效转换
print(sm.send("invalid")) # 输出: 错误: 无效的状态转换
# 手动触发错误
try:
print(sm.send("error")) # 输出: 严重错误: 手动触发错误状态
except Exception as e:
print(f"捕获到异常: {e}")
# 最终状态
try:
print(next(sm)) # 输出: 状态机结束,最终状态: critical_error
except StopIteration:
print("状态机已结束")
run_state_machine()
8. 最佳实践与注意事项
8.1 性能优化
避免在协程中进行不必要的 send() 调用,特别是在高频率场景下。
# 不推荐的做法 - 过于频繁的send调用
async def inefficient_coroutine():
while True:
data = yield
# 处理数据
processed = data * 2
yield processed
# 推荐的做法 - 批量处理数据
async def efficient_coroutine():
batch = []
while True:
command = yield
if command == "process":
# 批量处理数据
results = [item * 2 for item in batch]
batch = []
yield results
else:
# 收集数据
batch.append(command)
8.2 错误处理策略
设计健壮的错误处理机制,确保协程在各种异常情况下都能正确恢复或优雅退出。
async def robust_coroutine():
error_count = 0
max_errors = 3
try:
while True:
try:
data = yield
if data == "error":
error_count += 1
if error_count > max_errors:
raise Exception("错误次数超过限制")
raise ValueError("数据处理错误")
# 正常处理数据
result = data * 2
yield f"处理结果: {result}"
except ValueError as ve:
yield f"处理错误: {ve}"
# 可以添加恢复逻辑
except Exception as e:
yield f"严重错误: {e},协程将退出"
finally:
yield "协程清理完成"
def run_robust_coroutine():
coro = robust_coroutine()
next(coro)
# 正常数据
print(coro.send(5)) # 输出: 处理结果: 10
# 错误数据(可恢复)
print(coro.send("error")) # 输出: 处理错误: 数据处理错误
print(coro.send("error")) # 输出: 处理错误: 数据处理错误
print(coro.send("error")) # 输出: 处理错误: 数据处理错误
# 严重错误(不可恢复)
print(coro.send("error")) # 输出: 严重错误: 错误次数超过限制,协程将退出
print(next(coro)) # 输出: 协程清理完成
run_robust_coroutine()
8.3 协程生命周期管理
实现完善的协程生命周期管理,包括启动、运行、暂停、恢复和终止。
class CoroutineManager:
def __init__(self):
self.coroutine = None
self.status = "stopped"
def start(self, coro_func, *args, **kwargs):
"""启动协程"""
if self.status == "running":
raise RuntimeError("协程已在运行")
self.coroutine = coro_func(*args, **kwargs)
next(self.coroutine)
self.status = "running"
return self
def send(self, value):
"""发送数据到协程"""
if self.status != "running":
raise RuntimeError("协程未运行")
try:
result = self.coroutine.send(value)
return result
except StopIteration:
self.status = "stopped"
raise
def throw(self, exc_type, exc_value=None, exc_traceback=None):
"""向协程抛出异常"""
if self.status != "running":
raise RuntimeError("协程未运行")
try:
result = self.coroutine.throw(exc_type, exc_value, exc_traceback)
return result
except StopIteration:
self.status = "stopped"
raise
def pause(self):
"""暂停协程"""
if self.status != "running":
raise RuntimeError("协程未运行")
self.status = "paused"
def resume(self):
"""恢复协程"""
if self.status != "paused":
raise RuntimeError("协程未暂停")
self.status = "running"
# 使用协程管理器
async def sample_coroutine():
try:
while True:
data = yield
if data == "
暂无评论,快来抢沙发吧!