线程
线程的创建方式
方法创建
Python 的标准库提供了两个模块:_thread
和threading
,_thread
是低级模块,threading
是高级模块,对_thread
进行了封装。绝大多数情况下,我们只需要使用threading
这个高级模块。
from threading import Thread
from time import sleep
def func1(name):
print(f"线程{name}开始")
for i in range(3):
print(f"线程:{name}打印的 :{i}")
sleep(1)
print(f"线程{name}结束")
if __name__ == "__main__":
print("主线程,start")
# 创建线程
t1 = Thread(target=func1, args=("t1",))
t2 = Thread(target=func1, args=("t2",))
# 启动线程
t1.start()
t2.start()
print("主线程,end")
###
主线程,start
线程:t1打印的 :0
线程t2开始
主线程,end
线程:t2打印的 :0
线程:t1打印的 :1
线程:t2打印的 :1
线程:t1打印的 :2
线程:t2打印的 :2
线程t1结束
线程t2结束
###
提示
运行结果可能会出现换行问题,是因为多个线程抢夺控制台输出的 IO 流。
类创建
from threading import Thread
from time import sleep
class MyThread(Thread):
def __init__(self, name):
Thread.__init__(self)
self.name = name
def run(self):
for i in range(3):
print(f"线程:{self.name}打印的 :{i}")
sleep(1)
if __name__ == "__main__":
print("主线程,start")
# 创建线程
t1 = MyThread('t1')
t2 = MyThread('t2')
# 启动线程
t1.start()
t2.start()
print("主线程,end")
###
主线程,start
线程:t1打印的 :0
线程:t2打印的 :0
主线程,end
线程:t2打印的 :1
线程:t1打印的 :1
线程:t1打印的 :2
线程:t2打印的 :2
###
线程_join()和守护线程
之前的代码,主线程不会等待子线程结束。
如果需要等待子线程结束后,再结束主线程,可使用 join()方法。
from threading import Thread
from time import sleep
def func1(name):
print(f"线程{name}开始")
for i in range(3):
print(f"线程:{name}打印的 :{i}")
sleep(1)
print(f"线程{name}结束")
if __name__ == "__main__":
print("主线程,start")
# 创建线程
t1 = Thread(target=func1, args=("t1",))
t2 = Thread(target=func1, args=("t2",))
# 启动线程
t1.start()
t2.start()
# 主线程会等待t1,t2结束后,再往下执行
t1.join()
t2.join()
print("主线程,end")
###
主线程,start
线程t1开始
线程:t1打印的 :0
线程t2开始
线程:t2打印的 :0
线程:t2打印的 :1
线程:t1打印的 :1
线程:t1打印的 :2
线程:t2打印的 :2
线程t1结束
线程t2结束
主线程,end
###
在行为上还有一种叫守护线程,主要的特征是它的生命周期。主线程死亡,它也就随之死亡。在 python 中,线程通过 setDaemon(True|False)来设置是否为守护线程。3.10 后被废弃
# encoding=utf-8
from threading import Thread
from time import sleep
class MyThread(Thread):
def __init__(self, name):
Thread.__init__(self)
self.name = name
def run(self):
for i in range(3):
print(f"thread:{self.name} :{i}")
sleep(1)
if __name__ == "__main__":
print("主线程,start")
# 创建线程(类的方式)
t1 = MyThread("t1")
# t1设置为守护线程
t1.daemon=True # 3.10后被废弃,可以直接:t1.daemon=True
# t1.setDaemon(True) # 3.10后被废弃,可以直接:t1.daemon=True
# 启动线程
t1.start()
print("主线程,end")
###
主线程,start
thread:t1 :0
主线程,end
###
线程同步和互斥锁
现实生活中,我们会遇到“同一个资源,多个人都想使用”的问题。 比如:教室里,只有一台电脑,多个人都想使用。天然的解决办法就是,在电脑旁边,大家排队。前一人使用完后,后一人再使用。再比如,上厕所排队。
没有线程同步机制,两个线程同时操作同一个账户对象,竟然从只有 100 元的账户,轻松取出 80*2=160 元,账户余额竟然成为了-60。这么大的问题,显然银行不会答应的。
多线程操作同一个对象(未使用线程同步)
from threading import Thread
from time import sleep
class Account:
def __init__(self, money, name):
self.money = money
self.name = name
# 模拟提款操作
class Drawing(Thread):
def __init__(self, drawingNum, account):
Thread.__init__(self)
self.drawingNum = drawingNum
self.account = account
self.expenseTotal = 0
def run(self):
if self.account.money-self.drawingNum < 0:
return
sleep(1) # 判断完后阻塞。其他线程开始运行。
self.account.money -= self.drawingNum
self.expenseTotal += self.drawingNum
print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")
print(f"账户:{self.account.name},余额是:{self.account.money}")
if __name__ == '__main__':
a1 = Account(100, "gaoqi")
draw1 = Drawing(80, a1) # 定义取钱线程对象;
draw2 = Drawing(80, a1) # 定义取钱线程对象;
draw1.start() # 你取钱
draw2.start() # 你老婆取钱
###
账户:gaoqi,总共取了:80
账户:gaoqi,余额是:20
账户:gaoqi,总共取了:80
账户:gaoqi,余额是:-60
###
多线程操作同一个对象(增加互斥锁,使用线程同步)
from threading import Thread, Lock
from time import sleep
class Account:
def __init__(self, money, name):
self.money = money
self.name = name
# 模拟提款操作
class Drawing(Thread):
def __init__(self, drawingNum, account):
Thread.__init__(self)
self.drawingNum = drawingNum
self.account = account
self.expenseTotal = 0
def run(self):
lock1.acquire()
if self.account.money-self.drawingNum < 0:
return
sleep(1) # 判断完后阻塞。其他线程开始运行。
self.account.money -= self.drawingNum
self.expenseTotal += self.drawingNum
lock1.release()
print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")
print(f"账户:{self.account.name},余额是:{self.account.money}")
if __name__ == '__main__':
a1 = Account(100, "gaoqi")
lock1 = Lock()
draw1 = Drawing(80, a1) # 定义取钱线程对象;
draw2 = Drawing(80, a1) # 定义取钱线程对象;
draw1.start() # 你取钱
draw2.start() # 你老婆取钱
###
账户:gaoqi,总共取了:80
账户:gaoqi,余额是:20
###
互斥锁是什么?
互斥锁: 对共享数据进行锁定,保证同一时刻只能有一个线程去操作。
注意: 互斥锁是多个线程一起去抢,抢到锁的线程先执行,没有抢到锁的线程需要等待,等互斥锁使用完释放后,其它等待的线程再去抢这个锁。
threading
模块中定义了 Lock 变量,这个变量本质上是一个函数,通过调用这个函数可以获取一把互斥锁。
死锁
在多线程程序中,死锁问题很大一部分是由于一个线程同时获取多个锁造成的。举例:
有两个人都要做饭,都需要“锅”和“菜刀”才能炒菜。
from threading import Thread, Lock
from time import sleep
def fun1():
lock1.acquire()
print("fun1拿到菜刀")
sleep(2)
lock2.acquire()
print("fun1拿到锅")
lock2.release()
print("fun1释放锅")
lock1.release()
print("fun1释放菜刀")
def fun2():
lock2.acquire()
print("fun2拿到锅")
lock1.acquire()
print("fun2拿到菜刀")
lock1.release()
print("fun2释放菜刀")
lock2.release()
print("fun2释放锅")
if __name__ == "__main__":
lock1 = Lock()
lock2 = Lock()
t1 = Thread(target=fun1)
t2 = Thread(target=fun2)
t1.start()
t2.start()
死锁是由于“同步块需要同时持有多个锁造成”的,要解决这个问题,思路很简单,就是:同一个代码块,不要同时持有两个对象锁。
信号量(Semaphore)
互斥锁使用后,一个资源同时只有一个线程访问。如果某个资源,我们同时想让 N 个(指定数值)线程访问?这时候,可以使用信号量。
信号量控制同时访问资源的数量。信号量和锁相似,锁同一时间只允许一个对象(进程)通过,信号量同一时间允许多个对象(进程)通过。
应用场景
- 在读写文件的时候,一般只能只有一个线程在写,而读可以有多个线程同时进行,如果需要限制同时读文件的线程个数,这时候就可以用到信号量了(如果用互斥锁,就是限制同一时刻只能有一个线程读取文件)。
- 在做爬虫抓取数据时。
信号量底层就是一个内置的计数器。每当资源获取时(调用 acquire)计数器-1,资源释放时(调用 release)计数器+1
from threading import Thread
from time import sleep
from multiprocessing import Semaphore
"""
一个房间一次只允许两个人通过
若不使用信号量,会造成所有人都进入这个房子
若只允许一人通过可以用锁-Lock()
"""
def home(name, se):
se.acquire() # 拿到一把钥匙
print(f"{name}进入了房间")
sleep(3)
print(f"******************{name}走出来房间")
se.release() # 还回一把钥匙
if __name__ == "__main__":
se = Semaphore(2) # 创建信号量的对象,有两把钥匙
for i in range(7):
p = Thread(target=home, args=(f"tom{i}", se))
p.start()
###
tom0进入了房间
tom1进入了房间
******************tom0走出来房间
tom2进入了房间
******************tom1走出来房间
tom3进入了房间
******************tom2走出来房间
******************tom3走出来房间
tom4进入了房间
tom5进入了房间
******************tom5走出来房间
******************tom4走出来房间
tom6进入了房间
******************tom6走出来房间
###
事件(Event)
事件 Event 主要用于唤醒正在阻塞等待状态的线程;
Event()
可以创建一个事件管理标志,该标志(event)默认为 False,event 对象主要有四种方法可以调用:
方法名 | 说明 |
---|---|
event.wait(timeout=None) | 调用该方法的线程会被阻塞,如果设置了 timeout 参数,超时后,线程会停止阻塞继续执行; |
event.set() | 将 event 的标志设置为 True,调用 wait 方法的所有线程将被唤醒 |
event.clear() | 将 event 的标志设置为 False,调用 wait 方法的所有线程将被阻塞 |
event.is_set() | 判断 event 的标志是否为 True |
小伙伴们,围着吃火锅,当菜上齐了,请客的主人说:开吃!于是小伙伴一起动筷子,这种场景如何实现
#coding:utf-8
import threading
import time
def chihuoguo(name):
# 等待事件,进入等待阻塞状态
print(f'{name}已经启动')
print(f'小伙伴{name}已经进入就餐状态!')
time.sleep(1)
event.wait()
# 收到事件后进入运行状态
print(f'{name}收到通知了.')
print(f'小伙伴{name}开始吃咯!')
if __name__ == '__main__':
event = threading.Event()
# 创建新线程
thread1 = threading.Thread(target=chihuoguo, args=("tom", ))
thread2 = threading.Thread(target=chihuoguo, args=("cherry", ))
# 开启线程
thread1.start()
thread2.start()
time.sleep(10)
# 发送事件通知
print('---->>>主线程通知小伙伴开吃咯!')
event.set()
###
tom已经启动
小伙伴tom已经进入就餐状态!
cherry已经启动
小伙伴cherry已经进入就餐状态!
---->>>主线程通知小伙伴开吃咯!
tom收到通知了.
cherry收到通知了.
小伙伴cherry开始吃咯!
小伙伴tom开始吃咯!
###
生产者和消费者模式
多线程环境下,我们经常需要多个线程的并发和协作。这个时候,就需要了解一个重要的多线程并发协作模型“生产者/消费者模式”。
相关信息
什么是生产者?
生产者指的是负责生产数据的模块(这里模块可能是:方法、对象、线程、进程)。
什么是消费者?
消费者指的是负责处理数据的模块(这里模块可能是:方法、对象、线程、进程)
什么是缓冲区?
消费者不能直接使用生产者的数据,它们之间有个“缓冲区”。生产者将生产好的数据放入“缓冲区”,消费者从“缓冲区”拿要处理的数据。
缓冲区是实现并发的核心,缓冲区的设置有 3 个好处
实现线程的并发协作
有了缓冲区以后,生产者线程只需要往缓冲区里面放置数据,而不需要管消费者消费的情况;同样,消费者只需要从缓冲区拿数据处理即可,也不需要管生产者生产的情况。 这样,就从逻辑上实现了“生产者线程”和“消费者线程”的分离。解耦了生产者和消费者
生产者不需要和消费者直接打交道解决忙闲不均,提高效率
生产者生产数据慢时,缓冲区仍有数据,不影响消费者消费;消费者处理数据慢时,生产者仍然可以继续往缓冲区里面放置数据
从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue 库中的队列了。创建一个被多个线程共享的 Queue 对象,这些线程通过使用 put()
和 get()
操作来向队列中添加或者删除元素。Queue 对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据。
from queue import Queue
from threading import Thread
from time import sleep
def producer():
num = 1
while True:
if queue.qsize() < 5:
print(f"生产:{num}号,大馒头")
queue.put(f"大馒头:{num}号")
num += 1
else:
print("馒头框满了,等待来人消费啊!")
sleep(1)
def consumer():
while True:
print(f"获取馒头:{queue.get()}")
sleep(2)
if __name__ == "__main__":
queue = Queue()
t = Thread(target=producer)
c = Thread(target=consumer)
t.start()
# c.start()