跳至主要內容

线程

刘春龙原创...大约 10 分钟Python教程文档

线程的创建方式

方法创建

Python 的标准库提供了两个模块:_threadthreading_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。

from threading import Thread
from time import sleep
def func1(name):
    print(f"线程{name}开始")
    for i in range(3):
        print(f"线程:{name}打印的 :{i}")
        sleep(1)
    print(f"线程{name}结束")
if __name__ == "__main__":
    print("主线程,start")
    # 创建线程
    t1 = Thread(target=func1, args=("t1",))
    t2 = Thread(target=func1, args=("t2",))
    # 启动线程
    t1.start()
    t2.start()
    print("主线程,end")
###
主线程,start
线程:t1打印的 :0
线程t2开始
主线程,end
线程:t2打印的 :0
线程:t1打印的 :1
线程:t2打印的 :1
线程:t1打印的 :2
线程:t2打印的 :2
线程t1结束
线程t2结束
###

提示

运行结果可能会出现换行问题,是因为多个线程抢夺控制台输出的 IO 流。

类创建

from threading import Thread
from time import sleep
class MyThread(Thread):
    def __init__(self, name):
        Thread.__init__(self)
        self.name = name
    def run(self):
        for i in range(3):
            print(f"线程:{self.name}打印的 :{i}")
            sleep(1)
if __name__ == "__main__":
    print("主线程,start")
    # 创建线程
    t1 = MyThread('t1')
    t2 = MyThread('t2')
    # 启动线程
    t1.start()
    t2.start()
    print("主线程,end")
###
主线程,start
线程:t1打印的 :0
线程:t2打印的 :0
主线程,end
线程:t2打印的 :1
线程:t1打印的 :1
线程:t1打印的 :2
线程:t2打印的 :2
###

线程_join()和守护线程

之前的代码,主线程不会等待子线程结束。

如果需要等待子线程结束后,再结束主线程,可使用 join()方法。

from threading import Thread
from time import sleep
def func1(name):
    print(f"线程{name}开始")
    for i in range(3):
        print(f"线程:{name}打印的 :{i}")
        sleep(1)
    print(f"线程{name}结束")
if __name__ == "__main__":
    print("主线程,start")
    # 创建线程
    t1 = Thread(target=func1, args=("t1",))
    t2 = Thread(target=func1, args=("t2",))
    # 启动线程
    t1.start()
    t2.start()
    # 主线程会等待t1,t2结束后,再往下执行
    t1.join()
    t2.join()
    print("主线程,end")
###
主线程,start
线程t1开始
线程:t1打印的 :0
线程t2开始
线程:t2打印的 :0
线程:t2打印的 :1
线程:t1打印的 :1
线程:t1打印的 :2
线程:t2打印的 :2
线程t1结束
线程t2结束
主线程,end
###

在行为上还有一种叫守护线程,主要的特征是它的生命周期。主线程死亡,它也就随之死亡。在 python 中,线程通过 setDaemon(True|False)来设置是否为守护线程。3.10 后被废弃

# encoding=utf-8
from threading import Thread
from time import sleep


class MyThread(Thread):
    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        for i in range(3):
            print(f"thread:{self.name} :{i}")
            sleep(1)


if __name__ == "__main__":
    print("主线程,start")
    # 创建线程(类的方式)
    t1 = MyThread("t1")
    # t1设置为守护线程
    t1.daemon=True  # 3.10后被废弃,可以直接:t1.daemon=True
    # t1.setDaemon(True)  # 3.10后被废弃,可以直接:t1.daemon=True
    # 启动线程
    t1.start()
    print("主线程,end")
###
主线程,start
thread:t1 :0
主线程,end
###

线程同步和互斥锁

现实生活中,我们会遇到“同一个资源,多个人都想使用”的问题。 比如:教室里,只有一台电脑,多个人都想使用。天然的解决办法就是,在电脑旁边,大家排队。前一人使用完后,后一人再使用。再比如,上厕所排队。

没有线程同步机制,两个线程同时操作同一个账户对象,竟然从只有 100 元的账户,轻松取出 80*2=160 元,账户余额竟然成为了-60。这么大的问题,显然银行不会答应的。

多线程操作同一个对象(未使用线程同步)

from threading import Thread
from time import sleep
class Account:
    def __init__(self, money, name):
        self.money = money
        self.name = name
# 模拟提款操作
class Drawing(Thread):
    def __init__(self, drawingNum, account):
        Thread.__init__(self)
        self.drawingNum = drawingNum
        self.account = account
        self.expenseTotal = 0
    def run(self):
        if self.account.money-self.drawingNum < 0:
            return
        sleep(1)  # 判断完后阻塞。其他线程开始运行。
        self.account.money -= self.drawingNum
        self.expenseTotal += self.drawingNum
        print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")
        print(f"账户:{self.account.name},余额是:{self.account.money}")
if __name__ == '__main__':
    a1 = Account(100, "gaoqi")
    draw1 = Drawing(80, a1)  # 定义取钱线程对象;
    draw2 = Drawing(80, a1)  # 定义取钱线程对象;
    draw1.start()  # 你取钱
    draw2.start()  # 你老婆取钱
###
账户:gaoqi,总共取了:80
账户:gaoqi,余额是:20
账户:gaoqi,总共取了:80
账户:gaoqi,余额是:-60
###

多线程操作同一个对象(增加互斥锁,使用线程同步)

from threading import Thread, Lock
from time import sleep
class Account:
    def __init__(self, money, name):
        self.money = money
        self.name = name
# 模拟提款操作
class Drawing(Thread):
    def __init__(self, drawingNum, account):
        Thread.__init__(self)
        self.drawingNum = drawingNum
        self.account = account
        self.expenseTotal = 0
    def run(self):
        lock1.acquire()
        if self.account.money-self.drawingNum < 0:
            return
        sleep(1)  # 判断完后阻塞。其他线程开始运行。
        self.account.money -= self.drawingNum
        self.expenseTotal += self.drawingNum
        lock1.release()
        print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")
        print(f"账户:{self.account.name},余额是:{self.account.money}")
if __name__ == '__main__':
    a1 = Account(100, "gaoqi")
    lock1 = Lock()
    draw1 = Drawing(80, a1)  # 定义取钱线程对象;
    draw2 = Drawing(80, a1)  # 定义取钱线程对象;
    draw1.start()  # 你取钱
    draw2.start()  # 你老婆取钱
###
账户:gaoqi,总共取了:80
账户:gaoqi,余额是:20
###

互斥锁是什么?

互斥锁: 对共享数据进行锁定,保证同一时刻只能有一个线程去操作。

注意: 互斥锁是多个线程一起去抢,抢到锁的线程先执行,没有抢到锁的线程需要等待,等互斥锁使用完释放后,其它等待的线程再去抢这个锁。

threading模块中定义了 Lock 变量,这个变量本质上是一个函数,通过调用这个函数可以获取一把互斥锁。

死锁

在多线程程序中,死锁问题很大一部分是由于一个线程同时获取多个锁造成的。举例:

有两个人都要做饭,都需要“锅”和“菜刀”才能炒菜。

from threading import Thread, Lock
from time import sleep


def fun1():
    lock1.acquire()
    print("fun1拿到菜刀")
    sleep(2)
    lock2.acquire()
    print("fun1拿到锅")

    lock2.release()
    print("fun1释放锅")
    lock1.release()
    print("fun1释放菜刀")


def fun2():
    lock2.acquire()
    print("fun2拿到锅")
    lock1.acquire()
    print("fun2拿到菜刀")
    lock1.release()
    print("fun2释放菜刀")
    lock2.release()
    print("fun2释放锅")


if __name__ == "__main__":
    lock1 = Lock()
    lock2 = Lock()

    t1 = Thread(target=fun1)
    t2 = Thread(target=fun2)
    t1.start()
    t2.start()

死锁是由于“同步块需要同时持有多个锁造成”的,要解决这个问题,思路很简单,就是:同一个代码块,不要同时持有两个对象锁。

信号量(Semaphore)

互斥锁使用后,一个资源同时只有一个线程访问。如果某个资源,我们同时想让 N 个(指定数值)线程访问?这时候,可以使用信号量。

信号量控制同时访问资源的数量。信号量和锁相似,锁同一时间只允许一个对象(进程)通过,信号量同一时间允许多个对象(进程)通过。

应用场景

  • 在读写文件的时候,一般只能只有一个线程在写,而读可以有多个线程同时进行,如果需要限制同时读文件的线程个数,这时候就可以用到信号量了(如果用互斥锁,就是限制同一时刻只能有一个线程读取文件)。
  • 在做爬虫抓取数据时。

信号量底层就是一个内置的计数器。每当资源获取时(调用 acquire)计数器-1,资源释放时(调用 release)计数器+1

from threading import Thread
from time import sleep
from multiprocessing import Semaphore

"""
一个房间一次只允许两个人通过
若不使用信号量,会造成所有人都进入这个房子
若只允许一人通过可以用锁-Lock()
"""

def home(name, se):
    se.acquire()  # 拿到一把钥匙
    print(f"{name}进入了房间")
    sleep(3)
    print(f"******************{name}走出来房间")
    se.release()  # 还回一把钥匙


if __name__ == "__main__":
    se = Semaphore(2)  # 创建信号量的对象,有两把钥匙
    for i in range(7):
        p = Thread(target=home, args=(f"tom{i}", se))
        p.start()
###
tom0进入了房间
tom1进入了房间
******************tom0走出来房间
tom2进入了房间
******************tom1走出来房间
tom3进入了房间
******************tom2走出来房间
******************tom3走出来房间
tom4进入了房间
tom5进入了房间
******************tom5走出来房间
******************tom4走出来房间
tom6进入了房间
******************tom6走出来房间
###

事件(Event)

事件 Event 主要用于唤醒正在阻塞等待状态的线程;

Event()可以创建一个事件管理标志,该标志(event)默认为 False,event 对象主要有四种方法可以调用:

方法名说明
event.wait(timeout=None)调用该方法的线程会被阻塞,如果设置了 timeout 参数,超时后,线程会停止阻塞继续执行;
event.set()将 event 的标志设置为 True,调用 wait 方法的所有线程将被唤醒
event.clear()将 event 的标志设置为 False,调用 wait 方法的所有线程将被阻塞
event.is_set()判断 event 的标志是否为 True

小伙伴们,围着吃火锅,当菜上齐了,请客的主人说:开吃!于是小伙伴一起动筷子,这种场景如何实现

#coding:utf-8
import threading
import time
def chihuoguo(name):
    # 等待事件,进入等待阻塞状态
    print(f'{name}已经启动')
    print(f'小伙伴{name}已经进入就餐状态!')
    time.sleep(1)
    event.wait()
    # 收到事件后进入运行状态
    print(f'{name}收到通知了.')
    print(f'小伙伴{name}开始吃咯!')
if __name__ == '__main__':
    event = threading.Event()
    # 创建新线程
    thread1 = threading.Thread(target=chihuoguo, args=("tom", ))
    thread2 = threading.Thread(target=chihuoguo, args=("cherry", ))
    # 开启线程
    thread1.start()
    thread2.start()
    time.sleep(10)
    # 发送事件通知
    print('---->>>主线程通知小伙伴开吃咯!')
    event.set()
###
tom已经启动
小伙伴tom已经进入就餐状态!
cherry已经启动
小伙伴cherry已经进入就餐状态!
---->>>主线程通知小伙伴开吃咯!
tom收到通知了.
cherry收到通知了.
小伙伴cherry开始吃咯!
小伙伴tom开始吃咯!
###

生产者和消费者模式

多线程环境下,我们经常需要多个线程的并发和协作。这个时候,就需要了解一个重要的多线程并发协作模型“生产者/消费者模式”。

相关信息

什么是生产者?

生产者指的是负责生产数据的模块(这里模块可能是:方法、对象、线程、进程)。


什么是消费者?

消费者指的是负责处理数据的模块(这里模块可能是:方法、对象、线程、进程)


什么是缓冲区?

消费者不能直接使用生产者的数据,它们之间有个“缓冲区”。生产者将生产好的数据放入“缓冲区”,消费者从“缓冲区”拿要处理的数据。


缓冲区是实现并发的核心,缓冲区的设置有 3 个好处

  • 实现线程的并发协作
    有了缓冲区以后,生产者线程只需要往缓冲区里面放置数据,而不需要管消费者消费的情况;同样,消费者只需要从缓冲区拿数据处理即可,也不需要管生产者生产的情况。 这样,就从逻辑上实现了“生产者线程”和“消费者线程”的分离。

  • 解耦了生产者和消费者
    生产者不需要和消费者直接打交道

  • 解决忙闲不均,提高效率
    生产者生产数据慢时,缓冲区仍有数据,不影响消费者消费;消费者处理数据慢时,生产者仍然可以继续往缓冲区里面放置数据

从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue 库中的队列了。创建一个被多个线程共享的 Queue 对象,这些线程通过使用 put()get()操作来向队列中添加或者删除元素。Queue 对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据。

from queue import Queue
from threading import Thread
from time import sleep


def producer():
    num = 1
    while True:
        if queue.qsize() < 5:
            print(f"生产:{num}号,大馒头")
            queue.put(f"大馒头:{num}号")
            num += 1
        else:
            print("馒头框满了,等待来人消费啊!")
        sleep(1)


def consumer():
    while True:
        print(f"获取馒头:{queue.get()}")
        sleep(2)


if __name__ == "__main__":
    queue = Queue()
    t = Thread(target=producer)
    c = Thread(target=consumer)
    t.start()
    # c.start()
上次编辑于:
贡献者: 刘春龙
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.7