在高性能的并发编程中,控制任务的执行时间和优雅地管理资源至关重要。Python 的 concurrent.futures 模块提供了一个高级接口来异步执行可调用对象,它内置了对任务超时和资源释放的支持。
本文将详细介绍如何使用 ThreadPoolExecutor 实现任务超时控制,并讲解线程池的资源管理(即优雅的“缩容”或关闭)。
1. 任务超时控制(Enforcing Task Timeouts)
在线程池中提交任务后,我们通常使用 Future 对象的 result() 方法来获取结果。通过向 result() 方法传入 timeout 参数,我们可以强制设置一个等待时间限制。如果任务在指定时间内没有完成,result() 方法将抛出 TimeoutError 异常。
这是一个实现任务超时控制的示例:
import concurrent.futures
import time
def long_running_task(duration):
"""模拟耗时的任务"""
print(f"Task started, expected duration: {duration}s")
time.sleep(duration)
return f"Task finished after {duration}s"
# 创建线程池,最大容纳3个工作线程
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# 任务1:预期完成时间 2 秒 (会成功)
future1 = executor.submit(long_running_task, 2)
# 任务2:预期完成时间 5 秒 (会超时)
future2 = executor.submit(long_running_task, 5)
print("--- 开始检查任务结果 (Timeout = 3秒) ---")
# 检查任务1
try:
result1 = future1.result(timeout=3)
print(f"[Success] Task 1 Result: {result1}")
except concurrent.futures.TimeoutError:
print("[Timeout] Task 1 did not finish in 3 seconds.")
# 检查任务2
try:
result2 = future2.result(timeout=3)
print(f"[Success] Task 2 Result: {result2}")
except concurrent.futures.TimeoutError:
# 即使这里捕获了 TimeoutError,任务本身仍在后台运行直到完成
print("[Timeout] Task 2 did not finish in 3 seconds. Continuing execution...")
except Exception as e:
print(f"[Error] Task 2 encountered an error: {e}")
print("所有任务提交并等待完毕。")
关键点: TimeoutError 仅表示等待结果的操作超时,而不能取消或停止后台正在运行的任务(除非你额外使用了如信号或多进程终止机制)。如果任务超时后你不再关心它的结果,通常的做法是让它继续运行直到完成,然后依赖线程池的优雅关闭。
2. 线程池的资源优雅管理与“缩容”
concurrent.futures 中的 ThreadPoolExecutor 采用固定大小(通过 max_workers 设置)。它不会像某些语言(如Java的CachedThreadPool)那样自动根据负载动态创建和销毁线程(即动态缩容)。一旦线程被创建,它们通常会保持活动状态,等待新任务,直到执行器被明确关闭。
实现“动态缩容”或资源释放的最佳实践是确保线程池被正确关闭。
2.1 使用上下文管理器 (with 语句)
推荐使用 with 语句创建和管理线程池。当 with 块退出时,它会自动调用 executor.shutdown(wait=True),这会确保所有已提交的任务都完成后,才会关闭线程池并释放所有工作线程资源。
import concurrent.futures
import time
def worker(n):
print(f"Worker {n}: Starting...")
time.sleep(1)
print(f"Worker {n}: Finished.")
print("--- 线程池开始 (自动管理) ---")
# 当代码块结束时,线程池会自动调用 shutdown(),等待所有任务完成。
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for i in range(1, 6):
executor.submit(worker, i)
print("--- 线程池已关闭,资源已释放 ---")
# 此时,所有工作线程已终止。
2.2 手动调用 shutdown()
如果你不使用 with 语句,你需要手动调用 shutdown() 方法。该方法有两个重要参数:
- wait=True (默认):等待所有已提交的任务完成,然后关闭执行器。这是实现优雅关闭(即等待已在执行中的任务完成)的关键。
- cancel_futures=False (默认):不取消待处理队列中的任务。如果你设置为 True,则会尝试取消尚未开始执行的任务。
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
future = executor.submit(lambda: time.sleep(2))
# 提交更多任务...
# ...
# 手动关闭:等待所有任务完成后再关闭
executor.shutdown(wait=True)
通过结合任务超时控制(确保单个任务不会无限期阻塞)和优雅的 shutdown 机制(确保执行器释放所有工作线程),可以有效地管理并发资源。
汤不热吧