线程间通信指的是在 Python 多线程程序中启用线程之间的通信和同步的过程。
通常情况下,Python 中的线程在同一进程的内存空间内共享,这使得它们能够通过共享变量、对象以及 threading 模块提供的专门同步机制来交换数据和协调活动。
为了促进线程间的通信,threading 模块提供了各种同步原语,如 Locks、Events、Conditions 和 Semaphores 对象。在本教程中,您将学习如何使用 Event 和 Condition 对象来在多线程程序中提供线程间的通信。
Event 对象
Event 对象管理一个内部标志的状态,以便线程可以等待或设置。Event 对象提供了用于控制此标志状态的方法,允许线程基于共享条件来同步它们的活动。
该标志最初为假,在使用 set() 方法后变为真,并在使用 clear() 方法后重置为假。wait() 方法会在标志为真时阻塞。
以下为 Event 对象的关键方法:
-
is_set(): 如果内部标志为真则返回 True。
-
set(): 将内部标志设置为真。所有等待标志变为真的线程将被唤醒。在标志为真之后调用 wait() 的线程将不会被阻塞。
-
clear(): 将内部标志重置为假。随后调用 wait() 的线程将阻塞,直到 set() 被调用再次将内部标志设置为真。
-
wait(timeout=None): 阻塞直到内部标志为真。如果进入时内部标志为真,则立即返回。否则,阻塞直到另一线程调用 set() 将标志设置为真,或者直到可选的超时发生。当存在超时参数并且不为 None 时,它应为指定操作超时秒数的浮点数。
示例
以下代码试图模拟交通流量由红绿灯状态控制的情况。
程序中有两个线程,分别针对两个不同的函数。signal_state() 函数周期性地设置和重置事件,指示信号从绿变红。
traffic_flow() 函数等待事件被设置,并在事件保持设置的情况下运行一个循环。
from threading import Event, Thread
import time
terminate = False
def signal_state():
global terminate
while not terminate:
time.sleep(0.5)
print("交通警察给出绿灯信号")
event.set()
time.sleep(1)
print("交通警察给出红灯信号")
event.clear()
def traffic_flow():
global terminate
num = 0
while num < 10 and not terminate:
print("等待绿灯信号")
event.wait()
print("绿灯信号... 交通可以移动")
while event.is_set() and not terminate:
num += 1
print("车辆编号:", num," 通过信号灯")
time.sleep(1)
print("红灯信号... 交通需要等待")
event = Event()
t1 = Thread(target=signal_state)
t2 = Thread(target=traffic_flow)
t1.start()
t2.start()
time.sleep(5)
terminate = True
t1.join()
t2.join()
print("退出主线程")
输出:
等待绿灯信号
交通警察给出绿灯信号
绿灯信号... 交通可以移动
车辆编号: 1 通过信号灯
交通警察给出红灯信号
红灯信号... 交通需要等待
等待绿灯信号
交通警察给出绿灯信号
绿灯信号... 交通可以移动
车辆编号: 2 通过信号灯
车辆编号: 3 通过信号灯
交通警察给出红灯信号
交通警察给出绿灯信号
车辆编号: 4 通过信号灯
交通警察给出红灯信号
红灯信号... 交通需要等待
交通警察给出绿灯信号
交通警察给出红灯信号
退出主线程
Condition 对象
Python 的 threading 模块中的 Condition 对象提供了一个更高级的同步机制。它允许线程等待另一个线程的通知再继续。Condition 对象总是与一个锁关联,并提供线程间信号的机制。
以下是 threading.Condition() 类的语法:
threading.Condition(lock=None)
以下是 Condition 对象的关键方法:
-
acquire(*args): 获取底层锁。此方法调用底层锁上的相应方法;返回值是该方法返回的值。
-
release(): 释放底层锁。此方法调用底层锁上的相应方法;没有返回值。
-
wait(timeout=None): 此方法释放底层锁,然后阻塞直到它被同一条件变量在另一线程中的 notify() 或 notify_all() 调用唤醒,或者直到可选的超时发生。一旦被唤醒或超时,它重新获取锁并返回。
-
wait_for(predicate, timeout=None): 此实用方法可能会反复调用 wait() 直到满足谓词,或者直到发生超时。返回值是谓词的最后一个返回值,并将在方法超时时评估为假。
-
notify(n=1): 此方法最多唤醒 n 个等待条件变量的线程;如果没有线程在等待,则此方法是空操作。
-
notify_all(): 唤醒所有等待此条件的线程。此方法的行为像 notify(),但是唤醒所有等待的线程而不是一个。如果调用此方法时调用线程未获取锁,则抛出 RuntimeError。
示例
此示例演示了使用 Python 的 threading 模块中的 Condition 对象进行线程间通信的简单形式。在此,thread_a 和 thread_b 通过一个 Condition 对象进行通信,thread_a 在接收到 thread_b 的通知之前处于等待状态。thread_b 在通知 thread_a 之前休眠两秒,然后完成。
from threading import Condition, Thread
import time
c = Condition()
def thread_a():
print("线程 A 开始")
with c:
print("线程 A 等待许可...")
c.wait()
print("线程 A 获得许可!")
print("线程 A 完成")
def thread_b():
print("线程 B 开始")
with c:
time.sleep(2)
print("通知线程 A...")
c.notify()
print("线程 B 完成")
Thread(target=thread_a).start()
Thread(target=thread_b).start()
输出:
线程 A 开始
线程 A 等待许可...
线程 B 开始
通知线程 A...
线程 B 完成
线程 A 获得许可!
线程 A 完成
示例
以下是另一个代码示例,演示了 Condition 对象是如何用于提供线程间的通信的。在此,线程 t2 运行 taskB() 函数,而线程 t1 运行 taskA() 函数。线程 t1 获取条件并通知它。
此时,线程 t2 处于等待状态。在条件被释放后,等待的线程继续消费由通知函数生成的随机数。
from threading import Condition, Thread
import time
import random
numbers = []
def taskA(c):
for _ in range(5):
with c:
num = random.randint(1, 10)
print("生成随机数:", num)
numbers.append(num)
print("通知已发出")
c.notify()
time.sleep(0.3)
def taskB(c):
for i in range(5):
with c:
print("等待更新")
while not numbers:
c.wait()
print("获得随机数", numbers.pop())
time.sleep(0.3)
c = Condition()
t1 = Thread(target=taskB, args=(c,))
t2 = Thread(target=taskA, args=(c,))
t1.start()
t2.start()
t1.join()
t2.join()
print("完成")
当你执行这段代码时,它会产生如下输出:
等待更新
生成随机数: 2
通知已发出
获得随机数 2
生成随机数: 5
通知已发出
等待更新
获得随机数 5
生成随机数: 1
通知已发出
等待更新
获得随机数 1
生成随机数: 9
通知已发出
等待更新
获得随机数 9
生成随机数: 2
通知已发出
等待更新
获得随机数 2
完成