在 Python 的异步编程中,asyncio 提供了强大的高层抽象(如 async/await、asyncio.run)。然而,对于需要精细控制任务执行顺序、实现自定义调度逻辑或与特定底层 I/O 机制集成的场景,我们需要深入了解其低级 API,特别是事件循环(Event Loop)的直接方法。
本文将重点介绍如何使用事件循环实例上的 call_soon() 和 call_later() 等方法,实现对任务执行的精确控制,从而模拟自定义调度策略。
了解低级调度 API
asyncio 事件循环(由 AbstractEventLoop 定义)的核心职责之一是管理回调函数的执行队列。我们通过以下低级方法直接将任务(回调函数)放入队列:
- ****loop.call_soon(callback, *args): 安排一个回调函数在下一次迭代中尽快执行。这是实现任务立即排队的基本机制。
- ****loop.call_later(delay, callback, *args): 安排一个回调函数在指定的 delay 秒后执行。这依赖于循环内部的定时器。
- ****loop.call_at(when, callback, *args): 安排一个回调函数在指定的绝对时间(基于循环内部时间)执行。
通过手动组合这些 API,我们可以绕过标准的 asyncio.create_task,实现更为灵活的调度逻辑,例如基于优先级的调度或严格的顺序执行。
实践示例:自定义顺序调度器
下面的示例演示了如何获取事件循环实例,并手动安排三个任务的执行顺序,包括一个立即任务和一个延迟任务,从而控制它们在循环中的执行时刻。
import asyncio
import time
# 定义一些简单的回调函数
def heavy_lifting_task(name):
"""一个模拟长时间运行的任务"""
# 实际应用中,不应在同步回调中执行长时间阻塞操作,这里仅作演示
print(f"[{time.time():.4f}] --> 任务 {name} 开始执行。")
# time.sleep(0.1) # 实际运行中会导致阻塞
print(f"[{time.time():.4f}] <-- 任务 {name} 执行完毕。")
def stop_loop(loop):
"""停止事件循环的回调"""
print(f"[{time.time():.4f}] 停止循环。")
loop.stop()
def custom_scheduler(loop):
print(f"[{time.time():.4f}] 开始自定义调度流程...")
# 1. 立即调度高优先级任务 A
# 它会被放在 call_soon 队列的最前面
loop.call_soon(heavy_lifting_task, "A (高优先级)")
# 2. 调度一个延迟任务 B,将在 1.5 秒后执行
# 这展示了时间调度控制
loop.call_later(1.5, heavy_lifting_task, "B (延迟任务)")
# 3. 立即调度一个普通任务 C
# 它会在任务 A 之后执行
loop.call_soon(heavy_lifting_task, "C (普通任务)")
# 4. 确保循环在所有任务完成后停止 (设置一个最大运行时间)
loop.call_later(2.0, stop_loop, loop)
# --- 主执行逻辑 ---
# 获取或创建事件循环
loop = asyncio.get_event_loop()
print("初始化事件循环并设置调度。")
# 启动自定义调度
custom_scheduler(loop)
# 运行循环直到 stop() 被调用
loop.run_forever()
print("程序退出。")
运行结果分析
观察运行结果,你会发现 heavy_lifting_task(“A”) 和 heavy_lifting_task(“C”) 几乎同时(在循环的第一次迭代中)被执行,而 heavy_lifting_task(“B”) 则等待了约 1.5 秒才被执行。同时,我们通过 loop.call_later(2.0, loop.stop) 确保了循环的优雅退出。
这展示了我们如何通过低级 API 精确地控制回调函数何时进入事件循环的待处理队列,从而实现标准的异步/等待语法无法直接提供的自定义时间或顺序调度策略。这种技术是实现自定义 I/O 驱动程序或更复杂并发模型的基础。
汤不热吧