线程池是一种自动高效管理多个线程的机制,允许任务并发执行。Python 并没有直接通过 threading 模块提供线程池功能。
但是,Python 提供了基于线程的池化功能,通过 multiprocessing.dummy 模块和 concurrent.futures 模块。这些模块提供了创建和管理线程池的便捷接口,使得并发任务执行变得更加容易。
什么是线程池?
线程池是由池管理的一组线程集合。池中的每个线程被称为工作线程或工作者线程。这些线程可以被重复利用来执行多个任务,从而减少了频繁创建和销毁线程的开销。
线程池控制线程的创建及其生命周期,使其更有效地处理大量任务。
我们可以使用以下类在 Python 中实现线程池:
-
-
Python ThreadPoolExecutor 类
使用 Python ThreadPool 类
multiprocessing.pool.ThreadPool
类在 multiprocessing
模块内提供了一个线程池接口。它管理一个工作者线程池,向其中提交作业以便并发执行。
ThreadPool
对象简化了多线程管理,通过处理任务在工作者线程之间的创建和分发。它共享与最初设计用于进程的 Pool
类相同的接口,但已被调整为适用于线程。
ThreadPool
实例与 Pool
实例完全接口兼容,应该作为上下文管理器使用,或者手动调用 close()
和 terminate()
。
示例
下面的示例演示了如何使用 Python 线程池对一组数字并行执行平方和立方函数,其中每个函数使用最多三个线程并发应用到数字上,每次执行之间有 1 秒钟的延迟。
from multiprocessing.dummy import Pool as ThreadPool
import time
def square(number):
sqr = number * number
time.sleep(1)
print("Number:{} Square:{}".format(number, sqr))
def cube(number):
cub = number * number * number
time.sleep(1)
print("Number:{} Cube:{}".format(number, cub))
numbers = [1, 2, 3, 4, 5]
pool = ThreadPool(3)
pool.map(square, numbers)
pool.map(cube, numbers)
pool.close()
输出:
Number:2 Square:4
Number:1 Square:1
Number:3 Square:9
Number:4 Square:16
Number:5 Square:25
Number:1 Cube:1
Number:2 Cube:8
Number:3 Cube:27
Number:4 Cube:64
Number:5 Cube:125
使用 Python ThreadPoolExecutor 类
concurrent.futures
模块中的 ThreadPoolExecutor
类提供了使用线程异步执行函数的高级接口。concurrent.futures
模块包含 Future
类和两种 Executor
类——ThreadPoolExecutor
和 ProcessPoolExecutor
。
Future
类
concurrent.futures.Future
类负责处理任何可调用对象(如函数)的异步执行。为了获取 Future
对象,你应该在任何 Executor
对象上调用 submit()
方法。你不应该直接通过构造函数创建它。
Future
类中的重要方法有:
-
result(timeout=None)
:此方法返回调用返回的值。如果调用尚未完成,则此方法将等待最多 timeout
秒。如果调用在 timeout
秒内未完成,则会引发 TimeoutError
。如果不指定 timeout
,则等待时间没有限制。
-
cancel()
:此方法尝试取消调用。如果调用当前正在执行或已完成无法取消,则此方法将返回布尔值 False
。否则,调用将被取消,并且该方法返回 True
。
-
cancelled()
:如果调用成功取消则返回 True
。
-
running()
:如果调用当前正在执行并且无法取消,则返回 True
。
-
done()
:如果调用成功取消或完成执行,则返回 True
。
ThreadPoolExecutor
类
此类表示具有指定最大工作者线程数量的池,用于异步执行调用。
concurrent.futures.ThreadPoolExecutor(max_workers)
示例
下面是一个使用 concurrent.futures.ThreadPoolExecutor
类在 Python 中管理和异步执行任务的例子。具体来说,它展示了如何向线程池提交多个任务以及如何检查它们的执行状态。
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def square(numbers):
for val in numbers:
ret = val * val
sleep(1)
print("Number:{} Square:{}".format(val, ret))
def cube(numbers):
for val in numbers:
ret = val * val * val
sleep(1)
print("Number:{} Cube:{}".format(val, ret))
if __name__ == '__main__':
numbers = [1, 2, 3, 4, 5]
executor = ThreadPoolExecutor(4)
thread1 = executor.submit(square, numbers)
thread2 = executor.submit(cube, numbers)
print("Thread 1 executed ? :", thread1.done())
print("Thread 2 executed ? :", thread2.done())
sleep(2)
print("Thread 1 executed ? :", thread1.done())
print("Thread 2 executed ? :", thread2.done())
它将产生如下输出:
Thread 1 executed ? : False
Thread 2 executed ? : False
Number:1 Square:1
Number:1 Cube:1
Thread 1 executed ? : False
Thread 2 executed ? : False
Number:2 Square:4
Number:2 Cube:8
Number:3 Square:9
Number:3 Cube:27
Number:4 Square:16
Number:4 Cube:64
Number:5 Square:25
Number:5 Cube:125