Skip to main content

多线程详解

03a268d2d8cd40de8ba169f5a1daa214_tplv-k3u1fbpfcp-zoom-in-crop-mark_1304_0_0_0

一、什么是线程?

我们知道工人都是同时在工厂工作,复制各自的工作的。他们就是一个一个独立运行的单位!

线程也是类似这样的一个独立的运行单位,多线程,就是多个独立的运行单位,同时执行同样的事情。

简单这样理解,后面会进行对比。

threading.Thread 类是Python中的线程类,它封装了线程的信息和一些常用的方法,比如启动线程/查看线程状态。

线程有状态,拿工人一天的状态来比喻很合适,早上上班,然后工作,有时候需要停下来休息,最后下班。

复制运行下面的代码看看:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/21 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello

import threading

mythread = threading.Thread()
print("mythread:", mythread)
print("is_alive:", mythread.is_alive())
mythread.start()
print("mythread:", mythread)
print("is_alive:", mythread.is_alive())

下面是运行结果:

屏幕快照 2021-11-21 上午12.09.32.png

建议读者先运行一下,再来解释线程的代码.

代码解释

上面我们使用了threading这个库,然后创建Thread类的对象实例,赋值给mythread变量。

接着打印了对象和线程对象的一个函数is_alive()是否活跃状态。

两次都是False(这个后面再说)

但是第二次我们看到线程对象打印出来变成‘stopped'.

也就是说我们跑完了start函数(该函数为线程启动函数)之后,线程就进入stopped状态了。

上面那个就是线程,可是貌似啥也没做,我们下面让它做点事情呗。

线程触发业务函数,线程调用业务函数

比如这次的业务是:关注和点赞。

def dianzan_guanzhu():
now = datetime.datetime.now() #初始化时间变量
name = "python萌新"
print("%s name:%s" % (now, name)) #第一次打印时间和粉丝名字
time.sleep(1)
result = "好棒!" + name + " 关注雷学委,学到了好多知识和开发经验!"
print("%s result:%s" % (now, result)) #第二次打印时间和粉丝活动
return result

我们可以使用线程来调用。下面学委写了一个带参数的函数。 通过线程调用业务函数的时候指定:

  • target:设置为即将被调用的函数
  • kwargs: 如果有参数,直接通过传递一个k-v dict即可。
def dianzan_guanzhu(name):
#省略一些代码

mythread = threading.Thread(target=dianzan_guanzhu, kwargs={"name": "python萌新"})

好下面,编写全部代码,使用线程来点赞,和直接调用点赞。

我们看看下面的代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/21 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello

import threading
import datetime
import time

"""学委定义了一个关注函数"""
def dianzan_guanzhu():
now = datetime.datetime.now()
name = "python萌新"
print("%s name:%s" % (now, name))
time.sleep(1)
result = "好棒!" + name + " 关注雷学委,白嫖了好多知识和开发经验!"
print("%s result:%s" % (now, result))
return result


mythread = threading.Thread(target=dianzan_guanzhu)
print("mythread:", mythread)
print("is_alive:", mythread.is_alive())
mythread.start()
print("mythread:", mythread)
print("is_alive:", mythread.is_alive())
dianzan_guanzhu()
print("is_alive:", mythread.is_alive())

直接复制运行,这里我们这个dianzan_guanzhu函数被调用了两次

第一次是mythread.start函数。

第二次是我们直接脱离线程调用dianzan_guanzhu函数。

下面是运行结果:

屏幕快照 2021-11-21 上午11.32.00.png

好像没啥的样子。

再看一次,注意关注每次打印的时间,输入的时间好像错乱了?没错,不是眼花,是正确运行结果。

因为进入dianzan_guanzhu函数之后,初始化了now变量,这个时间固定了。

但是在线程外面也调用dianzan_guanzhu函数,所以这里是:两个线程在同时做同样的事情。

多了一个线程是哪个?

这里补充一下,我们写python脚本,运行代码的时候,本身是在一个主线程中的。

只是之前一直没解除线程概念,没写多线程程序,没有感知到这事情。

从现在开始,你要清楚知道:每个程序运行都有一个主线程

回到结果,两个线程先后依次调用通过函数:

首先,先后依次打印第一行输出。 分开休眠了一秒(sleep(1))。 最后,先后依次打印第二行输出。

小结

我们先把线程的基础知识搞懂。

  • 每个程序启动至少是有一个主线程
  • 需要启动更多线程使用Thread类来做

下一节再分享线程的更多知识。

二、什么多线程?

多线程,就是多个独立的运行单位,同时执行同样的事情。

想想一下,文章发布后同时被很多读者阅读,这些读者在做的事情‘阅读’就是一个一个的线程。 多线程就是多个读者同时阅读这篇文章。重点是:同时多个读者在做阅读这件事情。

如果是多个读者,分时间阅读,最后任意时刻只有一个读者在阅读,虽然是多个读者,但还是单线程。

我们再拿前面分享的代码:关注和点赞。

def dianzan_guanzhu():
now = datetime.datetime.now()
name = "python萌新"
print("%s name:%s" % (now, name))
time.sleep(1)
result = "好棒!" + name + " 关注雷学委,学会了开发知识!"
print("%s result:%s" % (now, result))
return result

我们看看下面的代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/21 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello

import threading
import datetime
import time


def dianzan_guanzhu():
now = datetime.datetime.now()
name = "python萌新"
print("%s name:%s" % (now, name))
time.sleep(1)
result = "好棒!" + name + " 关注雷学委,学会了开发知识!"
print("%s result:%s" % (now, result))
return result


for i in range(3):
mythread = threading.Thread(name="t-" + str(i), target=dianzan_guanzhu)
print("mythread:", mythread)
print("is_alive:", mythread.is_alive())
mythread.start()
print("is_alive:", mythread.is_alive())

Thread类可以传入name指定线程名字。

直接复制运行,这里我们创建了3个线程。

它们依次调用了dianzan_guanzhu函数

下面是运行结果:

屏幕快照 2021-11-23 上午12.24.41.png

这3个线程不同时间打印完成了,但是内容打印乱序了,甚至还串行了。

读者同学可以多运行几次。

获取活跃线程相关数据

threading.active_count函数: 可以获取活跃线程数。

threading.current_thread函数:可以获取活跃线程对象,这样我们可以获取这样获取线程名称:threading.current_thread().getName()。

前文说过了,加上主线程,一共是4个线程。

运行下面代码看看:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/21 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello
import random
import threading
import datetime
import time


def dianzan_guanzhu():
thread_name = threading.current_thread().getName()
now = datetime.datetime.now()
print("线程启动了:", thread_name)
name = "python萌新"+thread_name
print("%s - %s name:%s" % (thread_name, now, name))
time.sleep(1)
result = "好棒!" + name + " 关注雷学委,学会了开发知识!"
print("%s - %s result:%s" % (thread_name, now, result))
return result


for i in range(3):
mythread = threading.Thread(name="t-" + str(i), target=dianzan_guanzhu)
print("mythread:", mythread)
print("is_alive:", mythread.is_alive())
mythread.start()
ac = threading.active_count()
print("active_count:", ac)

如果我们把活跃线程数打印,那么等3个线程都start调用了。

加上主线程,最多是4个活跃线程。

屏幕快照 2021-11-23 上午12.26.44.png

今天先展示一下多个线程执行同个任务的代码实现。

明天再细致分享多线程的协调,这个才是噩梦的开始,算是劝退系列了。

三、线程同步

如下文,明明t-0 > t-1 > t-2 (按照线程创建时间早晚排列)。最后输出居然是t-1最落后。

屏幕快照 2021-11-23 上午12.26.44.png

我们怎么样做避免错乱呢, 下面看看。

多线程,就是多个独立的运行单位,同时执行同样的事情。

多线程不是已经做到同时执行了吗?还需要同步干嘛?

是的,线程是同时被调用执行了,但是每个线程之间互相独立,也互相竞争了。

这就跟跑道上有3个运动员,枪响之后同时开跑,但是他们通常却不是同时到达终点。

同步是什么意思?

同步就是原本这条跑道跑三个人的加上同步之后,在任意时间上,只有一个人在跑道。

听起来是不是匪夷所思,怎么多线程不是为多个任务提高效率吗?加个同步不就一个时间只有一个任务执行了,这还扯啥多线程。

很遗憾,同步就是这个意思,我们有时会说完整一点,同步互斥!总结来说就是:同步是一种机制,它保证跑道上面任何时候只有一个运动员。技术上来说就是,同步保证 程序数据 任何时候只被一个线程操作。

我们使用同步机制的时候,也是在找那些应该被限制的’跑道‘,利用同步机制保证在那个跑道上任意时刻只有一个‘运动员’在上面跑步。

(解释的很清楚了,看不懂的可以找同学讨论上面的这几句)

我们了解了同步机制,下面看看锁。

threading.Lock获取同步锁

threading.Lock是一个类,我们能用它创建一个对象。

什么是锁? 维持同步互斥机制的媒介 相当于跑道有个大门,每次只开门让一个程序员进去跑 说错了,运动员(程序员还是需要多锻炼啊)。

锁要是坏了,后果可以自己想象(后面文章会说)。

我们下面代码会用到Lock的两个函数:

acquire函数:获取锁 release函数:释放锁

前文说过了,加上主线程,一共是4个线程。

运行下面代码看看:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/21 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello

import threading
import datetime
import time


def dianzan_guanzhu(lock: threading.Lock):
thread_name = threading.current_thread().getName()
print("线程启动了:", thread_name)
now = datetime.datetime.now()
name = "python萌新" + thread_name
lock.acquire()
print("%s - %s name:%s" % (thread_name, now, name))
time.sleep(1)
result = "好棒!" + name + " 关注雷学委,学会了开发知识!"
print("%s - %s result:%s" % (thread_name, now, result))
lock.release()
return result


my_lock = threading.Lock()
for i in range(3):
mythread = threading.Thread(name="t-" + str(i), target=lambda: dianzan_guanzhu(my_lock))
print("mythread:", mythread)
print("is_alive:", mythread.is_alive())
mythread.start()
ac = threading.active_count()
print("active_count:", ac)

下面是运行结果:

屏幕快照 2021-11-23 上午12.46.35.png

我们看到每个线程都完整完成了任务,不会出现三个线程互相穿插错乱的输出。

这里初学者可以感受一下同步的作用,效果。

四、同步机制

假设现在有一个xuewei的账号里面有 100W。

然后有多个任务在转账,转入转出都是跟这个xuewei账号相关的。

而且这些任务发生是随机的。

我们先把上面的场景写成代码:

xuewei_account = 100


# amount为负数即是转出金额
def transfer(money):
global xuewei_account
xuewei_account += money

下面是多个线程,多线程模拟转账事件,我们假设有4个事件在同时发生。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/24 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello
import random
import threading
import datetime
import time

xuewei_account = 100


# amount为负数即是转出金额
def transfer(money):
global xuewei_account
xuewei_account += money


# 创建4个任务给学委账户转账
for i in range(10000):
threading.Thread(target=lambda: transfer(-1)).start()
threading.Thread(target=lambda: transfer(1)).start()
threading.Thread(target=lambda: transfer(-1)).start()
threading.Thread(target=lambda: transfer(1)).start()

# 等待活跃线程只剩下主线程MainThread
time.sleep(10)
print("-" * 16)
print("活跃线程数:", threading.active_count())
print("活跃线程:", threading.current_thread().name)
print("学委账户余额:", xuewei_account)

这里启动了4个线程循环了10000次,也就是4万个线程,分别于学委的账户进行转账。

下面是运行结果:

屏幕快照 2021-11-25 上午1.06.18.png

运行几次学委的账户还是正确的,余额还是100W。

上面的代码线程几万个,但每次运行的操作都很简单,完成一次加法。

线程一个接一个start,非常快速就切换下一个线程, 我们看到程序没有出现问题。

下面进行改造,这次不要就4万线程了,我们让转账这个任务耗时更多,每启动一个线程进行模拟10万次转账

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/24 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello
import random
import threading
import datetime
import time

xuewei_account = 100


# amount为负数即是转出金额
def transfer(money):
global xuewei_account
for x in range(100000):
xuewei_account += money


# 创建4个任务给重复学委账户转账
for i in range(10):
threading.Thread(target=lambda: transfer(-1)).start()
threading.Thread(target=lambda: transfer(1)).start()
threading.Thread(target=lambda: transfer(-1)).start()
threading.Thread(target=lambda: transfer(1)).start()

time.sleep(10)
print("-" * 16)
print("活跃线程数:", threading.active_count())
print("活跃线程:", threading.current_thread().name)
print("学委账户余额:", xuewei_account)

这里运行的结果就比较出乎意料了:

屏幕快照 2021-11-25 上午1.26.14.png

多线程编程复杂的地方就在这里了, 有时候明明平平无奇的代码,改造成多线程,就很容易出bug!

当然上面的代码并不是平平无奇,相比第一段代码,上面的转账函数做的事件更多,更耗时。

如何解决这个问题?非常简单,我们加上锁。

代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/24 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello
import random
import threading
import datetime
import time

xuewei_account = 100

lock = threading.Lock()
# amount为负数即是转出金额
def transfer(money):
lock.acquire()
global xuewei_account
for x in range(100000):
xuewei_account += money
lock.release()


# 创建4个任务给重复学委账户转账
for i in range(10):
threading.Thread(target=lambda: transfer(-1)).start()
threading.Thread(target=lambda: transfer(1)).start()
threading.Thread(target=lambda: transfer(-1)).start()
threading.Thread(target=lambda: transfer(1)).start()

time.sleep(10)
print("-" * 16)
print("活跃线程数:", threading.active_count())
print("活跃线程:", threading.current_thread().name)
print("学委账户余额:", xuewei_account)

运行结果如下:

屏幕快照 2021-11-25 上午1.30.11.png

上面的代码不管怎么运行,运行多少次最后学委的账户都是100.(PS:学委不会联系读者转账的,这个特别注意)。

不管多少个线程,每次转账函数内部转账的代码(从global到 += money这一段代码)只会被一个线程调用

小结

基于此文和前文,都展示了同步机制解决一些编程问题的思路。读者可以多多借鉴,思考锁的应用。

为什么在对amount重度操作(本文第二段代码)的时候,计算就出错了!

这里amount相当于多线程都在操作的变量,也就是共享变量,多线程编程要特别注意这类变量,避免出现对共享变量的操作,有些程序在并发规模很小的时候一点问题也没有。

并发编程是高度利用CPU计算能力的编程方式,并发程序也就是在并行执行同类任务的程序。这个可以跟单线程应用比较。

五、共享变量

前文说了转账问题

下面展示另一种转账的方式:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/24 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello
import random
import threading
import datetime
import time

xuewei = {'balance': 157}


# amount为负数即是转出金额
def transfer(money):
name = threading.current_thread().getName()
print("%s 给xuewei转账 %s " % (name, money))
xuewei['balance'] += money
print("xuewei账户余额:", xuewei['balance'])


lists = [-7, 20, -20, 7] # 4次转账的数额,负数为学委的账户转出,正数为他人转入。
# 创建4个任务给学委转账上面lists的金额
threads = []
for i in range(4):
amount = lists[i]
name = "t-" + str(i)
print("%s 计划转账 %s" % (name, amount))
mythread = threading.Thread(name=name, target=lambda: transfer(amount))
threads.append(mythread)

# 开始转账
for t in threads:
t.start()

# 等待3秒让上面的转账任务都完成,我们在看看账户余额
time.sleep(3)
print("-" * 16)
print("学委账户余额:", xuewei['balance'])

这里启动了4个线程,每个线程内有个lambda表达式,分别于学委的账户进行转账,但是最后结果是185. 而不是157.

下面是运行结果:

屏幕快照 2021-11-24 上午12.48.35.png

PS: 这只是一种运行结果。多线程的运行结果不是永远一样的。

如何解决这个问题?

根据观测结果我们发先amount只保留了最后一个值。

好,下面改造一下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/24 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello
import random
import threading
import datetime
import time

xuewei = {'balance': 157}

lists = [-7, 20, -20, 7] # 4次转账的数额,负数为学委的账户转出,正数为他人转入。


def transfer(amount):
name = threading.current_thread().getName()
print("%s 给xuewei转账 %s " % (name,amount))
xuewei['balance'] += amount
print("xuewei账户余额:", xuewei['balance'])


# 创建4个任务给学委转账上面lists的金额
for i in range(4):
amount = lists[i]
name = str(i)
# mythread = threading.Thread(name=name, target=lambda: transfer(amount))
def event():
print("%s 计划转账 %s" % (name, amount))
transfer(amount)
mythread = threading.Thread(name=name, target=event)
mythread.start()


# 等待3秒让上面的转账任务都完成,我们在看看账户余额
time.sleep(3)
print("-" * 16)
print("学委账户余额:", xuewei['balance'])

学委这里加了一个event函数,把转账计划打印出来。

从下面的一次运行结果看,event函数的输出结果没错,所有”计划转账“金额都如预期[-7, 20, -20 7]。 问题是transfer函数再多线程执行的时候,我们发现amount被多线程竞争修改了:

用户0转账金额变成20 用户1转账金额变成-20 用户2转账金额变成7 用户3转账金额变成7

屏幕快照 2021-11-25 上午9.07.11.png

也就是说,amount被后面的线程修改了,但是前面线程还没有执行完。 用户0应该转账-7的,中间还没有执行完毕,结果被线程1修改了amount为20,用户0继续执行转账,余额变成177. 其他依次推理。

amount这个变量被多个线程竞争修改了,这个就是程序的共享变量

到底如何解决?

方法非常简单:直接干掉共享变量。

下面就是消除共享变量的方法: 让共享变成每个线程访问独立运行空间

所以代码改动如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/24 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello
import random
import threading
import datetime
import time

xuewei = {'balance': 157}



lists = [-7, 20, -20, 7] # 4次转账的数额,负数为学委的账户转出,正数为他人转入。
# 我们不要依赖amount变量了
def transfer():
name = threading.current_thread().getName()
xuewei['balance'] += lists[int(name)] #通过线程名字来获取对应金额
print("xuewei账户余额:", xuewei['balance'])

# 创建4个任务给学委转账上面lists的金额
threads = []
for i in range(4):
amount = lists[i]
name = str(i)
print("%s 计划转账 %s" % (name, amount))
# mythread = threading.Thread(name=name, target=lambda: transfer())
def event():
transfer()
mythread = threading.Thread(name=name, target=event)
threads.append(mythread)

# 开始转账
for t in threads:
t.start()

# 等待3秒让上面的转账任务都完成,我们在看看账户余额
time.sleep(3)
print("-" * 16)
print("学委账户余额:", xuewei['balance'])

运行结果如下:

屏幕快照 2021-11-25 上午12.04.44.png

上面的代码不管怎么运行,运行多少次最后学委的账户都是157.(PS:学委不会联系读者转账的,这个特别注意)。

这次展示的另一种方式来避开多线程出现bug的方法,使用一个list下标跟线程名字一一对应,这样只要是对应名字的线程拿到的数值不错错乱。

六、认识线程安全

什么是线程安全?

线程安全,名字就非常直接,在多线程情况下是安全的,多线程操作上的安全。

比如一个计算加法的函数,不管是一千个还是一万个线程,我们希望它执行的结果总是正确的,1+1 必须永远等于2, 而不是线程少的时候1+1 变成3或者4了。

通常我们都用线程安全来修饰一个类,修饰一个函数。

我们会说我设计的这个类是线程安全的 这意味着,在多线程环境下,同时调用这个类的函数不会出现函数设置预期之外的异常(上述的1+1=3的情况)

在Python中有哪些类是线程安全的?

dict 和 list,tuple这些都是线程安全。

它们是被全局解释器保障了,这个锁:GIL(全局解释器锁)确保了任何时候只能有一个线程执行相应操作的字节码

docs.python.org/3/glossary.…

屏幕快照 2021-11-26 上午12.23.06.png

但是这番话也是说的不清不楚的。

现在我们拿转账来解析吧:

xuewei_account = dict()
xuewei_account['amount'] = 100

# amount为负数即是转出金额
def transfer(money):
xuewei_account['amount'] += money

如上,代码为一个函数对xuewei_account(账户)进行转入金额操作。

这里用了dict类型,GIL会保证只有一个线程操作账户。

下面是多个线程进行操作的代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/24 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : testthread_safe.py
# @Project : hello
import random
import threading
import datetime
import time

xuewei_account = dict()
xuewei_account['amount'] = 100


# amount为负数即是转出金额
def transfer(money):
xuewei_account['amount'] += money


# 创建4个任务给重复学委账户转账
threads = []
for i in range(200):
t1 = threading.Thread(target=lambda: transfer(-1))
threads.append(t1)
t2 = threading.Thread(target=lambda: transfer(1))
threads.append(t2)

for t in threads:
t.start()

# 这次不用sleep了,用join来等待所有线程执行完毕
# join函数必须线程start后才能调用,否则出错。
for t in threads:
t.join()

print("-" * 16)
print("活跃线程数:", threading.active_count())
print("活跃线程:", threading.current_thread().name)
print("学委账户余额:", xuewei_account)

这段代码运行结果正常。

屏幕快照 2021-11-26 上午12.27.39.png

但是我们把赋值修改dict的操作变多之后(特别是一个线程内反复多次获取值然后修改),像下面的代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/24 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : testthread_safe.py
# @Project : hello
import random
import threading
import datetime
import time

xuewei_account = dict()
xuewei_account['amount'] = 100


# amount为负数即是转出金额
def transfer(money):
for i in range(100000):
xuewei_account['amount'] = xuewei_account['amount'] + money


# 创建400个任务重复给学委账户转账
threads = []
for i in range(200):
t1 = threading.Thread(target=lambda: transfer(-1))
threads.append(t1)
t2 = threading.Thread(target=lambda: transfer(1))
threads.append(t2)

for t in threads:
t.start()
for t in threads:
t.join()

print("-" * 16)
print("活跃线程数:", threading.active_count())
print("活跃线程:", threading.current_thread().name)
print("学委账户余额:", xuewei_account)

这是某一次运行结果(不保证每次acount的数值一样):

屏幕快照 2021-11-25 下午11.57.09.png

我们看到dict还是扛不住多个线程反复的写操作。

这里区别是:每个线程只对xuewei_account进行大量读写,虽然dict是安全的,但是多个线程中间穿插修改了account,程序方法栈出现操作到旧值(看下面的图)。

主要是下面这段代码:

xuewei_account['amount'] += money # 即是 xuewei_account['amount'] = xuewei_account['amount']+ money

再一步抽象简化可以写成:

a = a + b

每个线程都执行 +b 操作,最后a的值应该是a+2b。

上面的操作意味这下面的情况发生了:

屏幕快照 2021-11-26 上午1.12.59.png

在某个线程中可能出现某一个线程T1获取了a值 ,准备加上b。

另外一个线程T2已经完成了a+b操作,把a的值变成了a+b了。

但是接下来T1 拿了a的值再执行a+b操作,把a的值变成a+b。

这样就少加了一个b,本来最后结果是a+2b 的变成了 a+b(因为T1拿了a的旧值,中间T2执行完,T1才继续执行)

当然实际多线程之间交互比上图还要随机。

如何做到真正线程安全?

dict读取数据是线程安全,但是被反复读写就容易出现数据混乱。

如果我们要设计一个线程安全的函数,那么它必须不涉及任何共享变量或者是完全没有状态依赖的函数

def thread_safe_method():
pass

无状态函数

比如下面的加法函数,不管多少个线程调用,返回值永远是预期的a+b。

def add(a, b):
return a + b

另一种 化繁为简

或许我们可以把多线程转换为单线程,这个需要一个线程安全的媒介

七、安全的线程队列

Python的线程07 带你验收一款线程安全的队列

是否有一种神器,解决线程/并发的问题呢?

有,它就是队列(Queue)

什么是队列?

像排队一样,从头到尾排成一排,还可以有人继续往后排队,这就是队列。

这里学委想说的是Queue这个类, 它是queue这个内置模块内的一个类。

import queue
q = queue.Queue(5) #可以传入参数指定队列大小
queue.Queue()# 不传或者给0或者<0的数字则创建一个无限长度的队列

它提供了很多函数,下面几个函数,我们使用的比较多:

  • get: 获取并移除队头元素,就是出队
  • put: 往队列末尾加入元素,也就是后来者排队
  • qsize: 获取队列的长度
  • empty: 队列空了,没有人在排了
  • full: 队列满了。

看着比较枯燥,学委画了下图展示:

屏幕快照 2021-11-28 下午10.31.03.png

这个队列put了3次,依次放入:持续学习,持续开发,我雷学委。队列长度为3

队列基操 入队/出队/查队列状态

学委准备了下面的代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/24 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : threadsafe_queue0.py
# @Project : hello


import queue

q = queue.Queue(5)

print("学委粉丝队列:", q)
print("空队,学委粉丝队列大小:", q.qsize())
print("空队列?", q.empty())
for i in range(5):
q.put(i)

print("队列满了?", q.full())
print("排满了,学委粉丝队列大小:", q.qsize())

while not q.empty():
print("粉丝 %s 出队点赞!" % q.get())
print("最后,学委粉丝队列大小:", q.qsize())

这段代码创建了一个长的为5的队列。

然后一个循环写满队列,接着再依此出队,粉丝出队点赞。

下面是运行效果:

屏幕快照 2021-11-26 下午11.05.06.png

是不是很简单。

好消息,Queue这个是一个线程安全的类

前面学委展示了几篇文章,碰到下面的代码(反复读写共享变量)结果总是出乎依赖!

amount = 100
def transfer(money):
global amount
for i in range(100000):
amount += money

如果我们对队列进行反复读写,会不会出现问题呢?

不妨,写个代码验收一下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/24 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : threadsafe_queue1.py
# @Project : hello


import queue
import threading

xuewei_fans_q = queue.Queue()


def transfer(money):
for i in range(100000):
xuewei_fans_q.put(money)
xuewei_fans_q.get()


# 创建4个任务重复给学委加关注/脱粉(还是希望各位编程的明日之星跟着学习,共同进步!)
t_group = []
for i in range(10):
t = threading.Thread(target=lambda: transfer(-1))
t_group.append(t)
t.start()
t = threading.Thread(target=lambda: transfer(1))
t_group.append(t)
t.start()
t = threading.Thread(target=lambda: transfer(-1))
t_group.append(t)
t.start()
t = threading.Thread(target=lambda: transfer(1))
t_group.append(t)
t.start()

for t in t_group:
t.join()
print("-" * 16)
print("活跃线程数:", threading.active_count())
print("活跃线程:", threading.current_thread().name)
#反复对队列进行添加数据,移除数据,队列最后清零了
print("学委粉丝队列:", xuewei_fans_q.qsize())

不管运行多少次,队列(希望是黑粉队列)都为0元素。(这背后其实是锁保证了操作的原子性,以后的文章会再谈到)

屏幕快照 2021-11-26 下午11.05.06.png

小结

本篇学委分享了一个线程安全的队列Queue,这个非常重要!

前面花了很多篇幅讲解多线程出现的一些问题和解决,很麻烦。

但是队列Queue这个类是线程安全的,这个是经过验证的,读者朋友务必掌握牢固。

虽然展示的队列是粉丝队列,学委还是希望各位编程的明日之星跟着学习,一起共同进步!

下一篇学委将分享用队列改造转账程序,优雅的解决转账金额错误的问题。

八、使用队列来改造转账场景

再次看看转账场景的问题

前面有两篇文章展示了转账反复读写amount,导致结果出错。

xuewei_account = dict()
xuewei_account['amount'] = 100

# amount为负数即是转出金额
def transfer(money):
for i in range(100000):
xuewei_account['amount'] = xuewei_account['amount'] + money



我们前几篇使用多个线程反复转长:+1和-1。

按常理,结果应该仍旧是100.

这个是全部代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/26 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : threadsafe_queue1.py
# @Project : hello
import random
import threading
import datetime
import time

xuewei_account = dict()
xuewei_account['amount'] = 100


# amount为负数即是转出金额
def transfer(money):
for i in range(100000):
xuewei_account['amount'] = xuewei_account['amount'] + money


# 创建20个任务重复给学委账户转账
threads = []
for i in range(10):
t1 = threading.Thread(target=lambda: transfer(-1))
threads.append(t1)
t2 = threading.Thread(target=lambda: transfer(1))
threads.append(t2)

for t in threads:
t.start()
for t in threads:
t.join()

print("-" * 16)
print("活跃线程数:", threading.active_count())
print("活跃线程:", threading.current_thread().name)
print("学委账户余额:", xuewei_account)

等待所有转账线程运行结束,我们看到结果是错误的:

屏幕快照 2021-11-27 下午11.21.21.png

这种问题怎么使用队列来解决呢?

前面说了,多线程反复读写共享数据,是问题的根源。

改代码为同步互斥模式,保证任意一个时间一个线程更新共享数据,那么问题就解决了。(这前面也展示了,用的是Lock锁的方案)

这个能怎么用队列呢?

可以先思考10秒,根据学习到的加锁和队列的特性,想想这个怎么做。

好,答案现在揭晓:

Queue这个队列有多个函数,一个是put函数,一个是get函数。

一个负责放入数据到队尾,一个可以从对头取出元素。

刚好适合转账业务,我们是不是可以把每次转账操作变成一个一个指令/事件。 比如下面的:

event(amount=1,acount=xuewei_account)
....
event(amount=-1,acount=xuewei_account)
....
event(amount=1,acount=xuewei_account)
....
event(amount=-1,acount=xuewei_account)

20个线程,每个10万次数据读写,共200万个事件。

所以我们可以把这个事情转换为:200万个转账事件。

因为Queue是线程安全的,所以我们可以并发200万次转账,另外交给一线程进行转账处理。

这样就保证每次只有一个线程对xuewei_account学委账户进行读写。

改造,使用队列来解决问题

展示代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/26 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : threadsafe_queue2.py
# @Project : hello
import random
import threading
import datetime
import time
import queue

q = queue.Queue()

xuewei_account = dict()
xuewei_account['amount'] = 100


# amount为负数即是转出金额
def transfer(money):
for i in range(100000):
q.put(money)


def handle_amount():
while not q.empty():
amount = q.get()
xuewei_account['amount'] += amount


def monitor_q():
counter = 0
time.sleep(3)
while counter < 1000 and not q.empty():
print("q size:", q.qsize())
time.sleep(3)
counter+=1


q_thread = threading.Thread(name="Q监控", target=monitor_q)
q_thread.start()
# 创建20个任务重复给学委账户转账
threads = []
for i in range(10):
t1 = threading.Thread(target=lambda: transfer(-1))
threads.append(t1)
t2 = threading.Thread(target=lambda: transfer(1))
threads.append(t2)

for t in threads:
t.start()

vip_thread = threading.Thread(name="处理转账专线", target=handle_amount)
vip_thread.start()



for t in threads:
t.join()
vip_thread.join()

print("-" * 16)
print("活跃线程数:", threading.active_count())
print("活跃线程:", threading.current_thread().name)
print("学委账户余额:", xuewei_account)

这里运行了多个线程执行转账(发送转账金额进队列)。

然后运行一个vip通道(单独线程)处理学委账户的转账业务。

同时也运行了一个监控队列的线程,每隔一段时间打印队列的任务情况。

下面是运行结果,运行几次结果都是正确的。

屏幕快照 2021-11-27 下午11.36.54.png

运行几次最终账户余额都是100, 改造成功。

小结

本篇学委分享了线程安全的队列Queue解决并发转账问题。

其实代码还可以再度优化的,为了控制篇幅,代码也不少,希望读者朋友们能够先看熟学会,掌握队列的使用。

九、认识Event信号

前面分享了多篇文章的线程创建的代码,一个循环创建多个线程并启动,它们并不是保证它们同时开始做一个事情。

怎么样才能保证,多个线程公平公正的竞争呢?

就像田径跑道上蹲在起点的运动员,不分先后,同时听到枪响,就开跑呢?

本文,我们先学习一下threading.Event类。

Event 是什么

写过很多Java应用的同学,我们谈到的Event通常都是一些指令性的消息,比如,转账10元,某某已关注, 而且创建事件后通常不会再进行修改。

不过Python内置的threading.Event类,确实不太一样。

threading.Event类,维护了一个标志位flag变量。提供一系列的函数如把flag的值从False 转变为True。 另外,这个标志位flag对外不开放的,不可直接访问,仅通过is_set()函数查看其状态。

学委认为把Event名字后面加个Signal可能更加合适一些,EventSignal(事件信号)。叫做Event,实在是太过宽泛了。

我们可以对threading.Event持有的flag进行操作:

  • 通过set()函数把Event维护的flag值转变为True;
  • 通过clear()函数把Event维护的flag值变为None。

学委准备了下面的代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/27 10:43 下午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : thread_event0.py
# @Project : hello
import threading
import time

xuewei_event = threading.Event()

print("event:", xuewei_event)
print("event对应flag 的默认值:", xuewei_event.is_set())

xuewei_event.set()
print("after set, flag:", xuewei_event.is_set())
print("after clear, flag:", xuewei_event.clear())
print("wait 3 second, flag:", xuewei_event.wait(3))
xuewei_event.set()
print("after set, flag:", xuewei_event.is_set())

稍微说明一下,我们在代码创建了一个xuewei_event对象。

然后调用了set()函数,打印出来的xuewei_event的flag值是True的。

然后调用了clear()函数,把对象的flag状态转为了False, 所以接着在xuewei_event.wait(3)这里会等待3秒,然后返回False

我们看看完整运行结果:

屏幕快照 2021-11-28 下午11.15.22.png

读者可以把clear()函数调用注释掉查看运行结果。

Event是一个线程安全的类

再特点说一下wait函数。

上面的代码传入参数所以程序在3秒后继续切换回主线程了。

xuewei_event.wait(3)

如果是下面的函数调用(不带参数),则会让调用wait函数的线程一直等待。

直到xuewei_event 对象所持有的flag的值为True,线程继续运行wait函数后面的代码。

xuewei_event.wait()

小结

Event类的这种结构设计很简单,封装了一个对外不可直接操作的flag变量。

因为它线程安全,天然的适合做多线程应用开发。

十、使用Event保证多线程同时执行

前面分享了threading.Event类,它维持了一个信号(True/False)状态。

像田径跑道上蹲在起点的运动员,不分先后,同时听到枪响就开跑,用这个类来做很适合。

模拟:发出一声枪响

当然不是真的有枪响,而且代码调用Event类的对象实例的set函数。

因为Event类的函数是线程安全的,所以我们可以把运动员看成一个一个的线程,并排在跑道起点

所以这个思路代码就有了

for _ in range(n):
def run():
wait for event ready
run
threading.Thread(target=run).start()

稍微写好一点,代码最终如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/27 10:43 下午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : thread_event.py
# @Project : hello
import threading
import time

xuewei_event = threading.Event()

print("event:", xuewei_event)
print("is_set:", xuewei_event.is_set())


def run():
print(" %s ready" % threading.current_thread().name)
xuewei_event.wait()
print(" %s go" % threading.current_thread().name)
time.sleep(0.5)
print(" %s completed" % threading.current_thread().name)


threads = []
for i in range(4):
t_name = "t-" + str(i)
t = threading.Thread(name=t_name, target=run)
threads.append(t)
t.start()

# 学委提示:赛场鸣枪,运动员开跑
for i in [3, 2, 1]:
print("学委倒数 count %s" % i)
time.sleep(1)
xuewei_event.set()
print("is_set:", xuewei_event.is_set())

for t in threads:
t.join()

这是运行结果:

屏幕快照 2021-11-29 下午10.55.07.png

多线程还有点意思

看看上面的代码,学委还模拟了3/2/1倒数,再开枪。

如图所示,多个线程都听到这个枪响不约而同的喊出了‘go’,最后不同时间达到终点了。

十一、 Semaphore信号量

前面说到的Event,学委觉得它就像一个总开关一样,一开全开。

这篇我们讲讲thread.Semaphore。

什么是Semaphore

Semaphore 也就是信号量, 跟Event和Lock很不一样。

Semapore允许传入一个初始数值,这个数值限定了可以顺畅调用acquire的次数

threading.Semaphore(3) #运行3个线程同时acquire不阻塞,如果有4个线程acquire将有一个线程等待。

顺畅调用就是调用的时候不阻塞不需要等待。

超过限定次数,就无法顺畅调用了,那就堵在这了。

直到有人释放了锁,其他阻塞的线程再去抢锁继续运行。

下面还是前面Event的demo程序,学委把Event改成了Semaphore类,设置了初始值为1。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/27 10:43 下午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : thread_event.py
# @Project : hello
import threading
import time

xuewei_semaphore = threading.Semaphore(1)

print("xuewei_semaphore:", xuewei_semaphore)


def run():
print(" %s ready" % threading.current_thread().name)
xuewei_semaphore.acquire() #获取信号量
print(" %s go" % threading.current_thread().name)
time.sleep(0.5)
print(" %s completed" % threading.current_thread().name)
#这里没有释放,必须释放的,后面的文章会补充介绍


threads = []
for i in range(4):
t_name = "t-" + str(i)
t = threading.Thread(name=t_name, target=run)
threads.append(t)
t.start()

# 学委提示:赛场鸣枪,运动员开跑
for i in [3, 2, 1]:
print("学委倒数 count %s" % i)
time.sleep(1)

for t in threads:
t.join()

结果呢,学委还没有喊3/2/1,第一个运动员就先跑到,还没有喊完3/2/1,这个运动就已经跑到终点了(第一个线程先结束了)。

其他两个运动员就很乖,还在哪里等待。

当然以上结果都是如预期执行的,没有bug.

屏幕快照 2021-12-01 上午12.07.42.png

Semaphore信号量就是这样。

读者不妨把threading.Semaphore传入的初始值改为2/3/4,观察一下运行结果。

xuewei_semaphore = threading.Semaphore(2)
#或者改为4
xuewei_semaphore = threading.Semaphore(4)

小结

本篇先简单介绍信号量,下一篇学委会运用信号量做一个简易限流器。

十二、简易限流

前篇学委展示了Semaphore信号量,这个工具可以让开发者设置阀值,简单的控制并发的数量。

不知道读者还记得前篇设置信号量为1的时候,三个运动员只有一个先跑了。

因为一开始给了信号量为1,所以学委还没有喊完3/2/1,就有人抢跑了。

好,拉回主题 - 限流器

什么是限流器?

大家记不记得经常坐地铁(早晚)高峰期的时候,在入口的地方就有工作人员搬了一下遮挡栏,

每次就放而二三十人进入。一般4个地铁口,两个入。两个出,出口一般没有限流,除了那种对接客运站的出口,会有限流。

限流器,就是类似的概念,程序实现这样的一个遮挡栏,实现这样的定期的放入定额数量的人员执行任务。

翻译成技术话语,那就是,一个管理机制,保证了任意时间只有最大限额数量的线程,同时做事情。

这就是限流器。

另外,我们接触的软件,其实都是生活的映射。

怎么做限流器呢?

本篇说了信号量,那么我们就用它来做吧。

信号量保证了限定数额的线程,但是它是一次性的啊。

比如下面的缺陷:

threading.Semaphore(3) #运行3个线程同时acquire不阻塞,如果有4个线程acquire将有一个线程等待。

恰好,它还给我们提供了release函数。

所以我们只要知道限额生成信号量初始值,然后在程序中不断release,这样信号量就恢复。

等候的其他线程又能获得锁,执行自己的任务了。

问题不就迎刃而解了吗?

十三、简易限流器实现

前篇学委提出了Semaphore信号量来制作限流器的思路

简单总结就是:动态的release,保证任意时刻都有固定数量可用的信号量。

我们通常会这样使用信号量

xuewei_semaphore = threading.Semaphore(4) #申请信号量

#在某个地方使用信号量
xuewei_semaphore.acquire()

//do something here
....

xuewei_semaphore.release()

限流的过程其实就是不断的使用这个有限信号量的过程。

因为设置了4信号额度,最多允许4个线程同时运行。

任意时间只要获取超过4个后,其他线程只能等待,这就跟我们进站排队很像。安检人员看到进入排队的人太多的,把后面的拦住,知道等候的人数减少,再放行一些人员进入车站等候区。

直接上代码吧,后面再解释。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/27 10:43 下午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : threading_semephore.py
# @Project : hello
import threading
import time
import queue

xuewei_semaphore = threading.Semaphore(4)

print("xuewei_semaphore:", xuewei_semaphore)

waiting_for_train = {"value": 0}


def run():
new_joiner = threading.current_thread().name
# print(" %s ready" %new_joiner )
xuewei_semaphore.acquire()
print(" %s go" % new_joiner)
waiting_for_train['value'] += 1
time.sleep(1)
print(" %s completed" % threading.current_thread().name)
xuewei_semaphore.release()
waiting_for_train['value'] -= 1


def log_the_waiting_area_status():
while True:
time.sleep(0.5)
name = threading.current_thread().name
print("name %s - size %s " % (name, waiting_for_train['value']))


q_watcher = threading.Thread(name="waiting area", target=log_the_waiting_area_status)
q_watcher.start()
threads = []
for i in range(100):
t_name = "t-" + str(i)
t = threading.Thread(name=t_name, target=run)
threads.append(t)
t.start()

for t in threads:
t.join()


这里我们申请了信号量4个空槽。

然后启动100个线程,不停的去获取信号量,然后做完就释放。

同时我们有一个缓冲队列,只存放当前新进站的人数。

通过打印这个waiting_for_train的状态,我们可以看到任意时刻队列最多只有4人进入。

也不会超过4个。

运行效果

在运行过程,我们发现queue的大小一直为4.

屏幕快照 2022-01-21 上午12.50.51.png

最后所有进站人员都进站上车了,等候的人就清零了。

屏幕快照 2022-01-21 上午12.55.15.png

这里总共有102个线程,一个主线程,一个等候区状态展示线程,还有另外一个百个线程,代表了100个进站人员。

semaphore初始化了4个度量,所以每次最多可以进站等候的人数最多只有4个。

跟地铁拦截进站一样。

我们也可以尝试把进站处理的代码修改为下方代码,读者自行运行看一下效果。

xuewei_semaphore.acquire()
print(" %s go" % new_joiner)
waiting_for_train['value'] += 1
time.sleep(1)
waiting_for_train['value'] -= 1
print(" %s completed" % threading.current_thread().name)
xuewei_semaphore.release()

小结

好,这个限流器非常简单,配套在这个中级编程简单带过一下。

读者朋友们可以把代码拷贝,运行几次,思考一下。

后面更新到UI开发的教程,学委会再写个带界面的把这个直观展示出来。

十四、有界信号量,这是一个有底线的信号量

前篇学委提出了Semaphore信号量来制作限流器的思路和一个简单的限流器实现。

代码运行起来了,看起来没有错误。

不过,细心的读者会发现,如果我们出现像下面的信号量使用方式:

xuewei_semaphore.acquire() #获取信号量
//do something
....
xuewei_semaphore.release() #释放信号量
#>>> xuewei_semaphore.release() #如果这里再调用一次release呢?

也就是 Semaphore 进行的释放的操作,比获取的还多一点点,会不会出问题?

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/27 10:43 下午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : thread_semaphore_boundvs_unbound.py
# @Project : hello
import threading
import time

xuewei_semaphore = threading.Semaphore(1) #后面改成有界信号量

print("xuewei_semaphore:", xuewei_semaphore)


def run():
print(" %s ready" % threading.current_thread().name)
xuewei_semaphore.acquire()
print(" %s go" % threading.current_thread().name)
time.sleep(0.1)
print(" %s completed" % threading.current_thread().name)
xuewei_semaphore.release()


def abnormal_run():
run()
xuewei_semaphore.release() # 多release了一次


t = threading.Thread(name="正常使用信号量", target=run)
t.start()
time.sleep(1)
t = threading.Thread(name="非正常使用信号量", target=abnormal_run)
t.start()

上面学委结合了第一篇分享信号量的文章, 简化了一下,重点展示多释放信号量的效果。

我们发现程序没有任何报错:

屏幕快照 2022-01-22 下午11.11.12.png

如果我们把反常的使用增加调用呢,比如下面的代码:

#在上方代码后面加上下面这段代码
for i in range(10000):
t = threading.Thread(name="非正常使用信号量"+str(i), target=abnormal_run)
t.start()

全程没有报错!!!

屏幕快照 2022-01-22 下午11.19.34.png

我直接震惊了!Python容错率太高了(这是高情商的说法,😂)

有界信号量 走过来把这个坑填上了

threading.BoundedSemaphore 这个类就是有界信号量,英文名非常直观了。

什么是有界?

跟做人一样,守住底线。那么Semaphore的底线在哪里?我们可以点击这个类去看一部分代码。

Semaphore维护了一个_value, 有界的信号量其实就是加多了一个变量_initial_value,去记录了信号量初始值。

后续,我们调用release函数的时候,有界信号量,每次都会检查,_value是否越界了(就是一旦超过了_initial_value,就抛出一个ValueError,提示越界了!)

这个太友好了.

下面就使用有界信号量展示,对比第一段代码,仅仅把

xuewei_semaphore = threading.Semaphore(1)

改为了:

xuewei_semaphore = threading.BoundedSemaphore(1)

所以读者可以直接改,当然也可以选择直接复制下面的代码再存一个文件运行。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/27 10:43 下午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : thread_semaphore_boundvs_unbound3.py
# @Project : hello
import threading
import time

xuewei_semaphore = threading.BoundedSemaphore(1)

print("xuewei_semaphore:", xuewei_semaphore)


def run():
print(" %s ready" % threading.current_thread().name)
xuewei_semaphore.acquire()
print(" %s go" % threading.current_thread().name)
time.sleep(0.1)
print(" %s completed" % threading.current_thread().name)
xuewei_semaphore.release()


def abnormal_run():
run()
xuewei_semaphore.release() # 多release了一次


t = threading.Thread(name="正常使用信号量", target=run)
t.start()
time.sleep(1)
t = threading.Thread(name="非正常使用信号量", target=abnormal_run)
t.start()

使用有界之后,果然,过度释放,系统就报错了!终于放心了。

屏幕快照 2022-01-22 下午11.22.24.png

小结

大家应该尽量使用有界信号量,它能帮我们守住边界。它不会因为我们不小心写错程序,写多一次release而放任程序继续正常运行。(可能是熬夜写代码,建议不要!写代码需要一个舒适的状态,质量更高)

如果使用纯Semaphore,那请务必封装起来(成对调用封闭起来)不对外开放acquire和release的使用,这样才能安装。

十五、可重入锁RLock

解决了什么难题

前面学委介绍线程安全的时候,提到过threading.Lock这个类。

这个所以我们acquire之后,就不能继续acquire了,必须执行一次release,其他线程才可以继续acquire。

这样一个时间只有一个线程做事情,这次我们看看RLock(ReentrantLock,可重入锁)。

RLock 是什么?

简单理解,它跟Lock类似,都是用来协调对受限资源的访问,加上锁来保护受限资源的访问。

但是,它们还是有明显的区别的。 第一个是,RLock可以被acquire多次。

我们看看下面的代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/24 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello
import random
import threading
import datetime
import time

xuewei_account = 100
lock = threading.RLock()


# amount为负数即是转出金额
def transfer(money):
transfer_only(money)
lock.release()
print("release")


# amount为负数即是转出金额
def transfer_only(money):
lock.acquire()
print("transfer now")
global xuewei_account
for x in range(100000):
xuewei_account += money
print("transfer done")


transfer_only(100)
transfer_only(-100)
print("-" * 16)
print("学委账户余额:", xuewei_account)

运行效果如下:

屏幕快照 2022-01-23 下午11.53.30.png

transfer_only 函数被调用了两次,但我们还没有release过。

整个过程没有阻塞。

我们把使用Lock的代码展示一下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/24 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello
import random
import threading
import datetime
import time

xuewei_account = 100
lock = threading.Lock()


# amount为负数即是转出金额
def transfer(money):
transfer_only(money)
lock.release()
print("release")


# amount为负数即是转出金额
def transfer_only(money):
lock.acquire()
print("transfer now")
global xuewei_account
for x in range(100000):
xuewei_account += money
print("transfer done")


transfer_only(100)
transfer_only(-100)
print("-" * 16)
print("学委账户余额:", xuewei_account)

运行效果如下:

屏幕快照 2022-01-23 下午11.57.30.png

两断代码的区别,仅仅为创造lock对象的时候,前者是基于RLock类型,后者是基于Lock(普通的锁)。

但我们看到Lock,不允许持有锁的线程(同一个或者其他线程)再次acqure,transfer_only仅仅执行完一次。

多个线程能不能都来acquire RLock

试试多个线程acquire,代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/24 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : __init__.py.py
# @Project : hello
import random
import threading
import datetime
import time

xuewei_account = 100
lock = threading.RLock()


# amount为负数即是转出金额
def transfer(money):
transfer_only(money)
lock.release()
print("release")


# amount为负数即是转出金额
def transfer_only(money):
lock.acquire()
print("transfer now")
global xuewei_account
for x in range(100000):
xuewei_account += money
print("transfer done")


threading.Thread(name="thread小白", target=lambda: transfer_only(100)).start()
threading.Thread(name="thread小花", target=lambda: transfer_only(-100)).start()
print("-" * 16)
print("学委账户余额:", xuewei_account)

运行效果如下:

屏幕快照 2022-01-23 下午11.53.17.png

很明显,第二个线程被锁定了。

这个符合RLock的设定,运行获取锁的线程多次acqure,但是必须release之后才能重新分配给其他线程。

两断代码区别仅仅为:

第一段,我们是在主线程,也就是同一个线程多次acquire RLock对象。

transfer_only(100)
transfer_only(-100)

第二段,我们是在两个不同线程间多次acquire RLock对象。(虽然没有打印出来,但是这里线程名称也特意设置不一样了,读者可以修改打印查看核对)

threading.Thread(name="thread小白", target=lambda: transfer_only(100)).start()
threading.Thread(name="thread小花", target=lambda: transfer_only(-100)).start()

小结

今天这篇我们先简单了解了RLock,它的设计让我们在一个线程内可以多次使用锁

它优先分配锁给持有锁的线程,减少线程切换的消耗,更重要的是,可重入锁设计更利于防止死锁。(这里简单举例,想象一下,一个线程调用多个方法都在acquire同个锁,这时候使用普通Lock就会进入死锁状态,后面学委再写一篇吧)

但是release必须要获取锁的线程进行,而且acquire几次就必须release几次,这个下篇再展示。

十六、死锁,单身狗谈何爱情

死锁 是什么?

想象一下,身边有哪些无法解开的问题?

是不是有个问题想要跟对象讲道理,一开始好好聊,还能说明白。然后聊着聊着,对象反将一军:“你不够爱我了!(爱会消失。。。)”

搞的一下子懵了,本来是要讨论谁不该干嘛干嘛的事情,就陷入了僵局。

对,死锁就是这种感觉。

这里不一样的是,讨论问题从两个人,变成了两个线程/进程(或者更多,多个就类似你爸妈在讨论该不该生叉烧比较好还是你比较好,最后追溯到爷爷奶奶,祖祖辈辈牵扯整个家族的问题)

好,简单概括这种感觉就是:两个或者多个执行单元(线程/进程)请求受限资源碰到了僵局,争持不下。

特征上表现为:

一号线程持有锁A,继续执行的过程尝试获取锁B; 二号线程持有锁B,继续执行过程尝试获取锁A。

这样两个线程就是互相僵持,不让步,谁也没法让步。

就像下面的代码一样:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/24 12:02 上午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : testthread_lock_really_deadlock.py
# @Project : hello
import threading
import time

xuewei_account = 100
lock_01 = threading.Lock()
lock_02 = threading.Lock()


# amount为负数即是转出金额
def transfer1(money):
print("%s - try to acquire lock_01" % threading.current_thread().name)
lock_01.acquire()
print("%s - acquired lock_01 transfer now" % threading.current_thread().name)
time.sleep(3)
print("%s - try to acquire lock_02" % threading.current_thread().name)
lock_02.acquire()
print("%s - acquired lock_02 transfer now" % threading.current_thread().name)
print("transfer done")
lock_02.release()
lock_01.release()
print("release")


# amount为负数即是转出金额
def transfer2(money):
print("%s - try to acquire lock_02" % threading.current_thread().name)
lock_02.acquire()
print("%s - acquired lock_02 transfer now" % threading.current_thread().name)
time.sleep(3)
print("%s - try to acquire lock_01" % threading.current_thread().name)
lock_01.acquire()
print("%s - acquired lock_01 transfer now" % threading.current_thread().name)
print("transfer done")
lock_01.release()
lock_02.release()
print("release")


threading.Thread(name="这个线程%s要请求锁" % 1, target=lambda: transfer1(100)).start()
threading.Thread(name="这个线程%s要请求锁" % 2, target=lambda: transfer2(-100)).start()
print("-" * 16)
print("学委账户余额:", xuewei_account)

运行效果如下:

屏幕快照 2022-01-24 下午11.18.16.png

我们可以看到,两个线程一分别获取了lock_01和lock_02之后,

尝试获取lock_02 和 lock_01,然后就不继续运行了,没错,这就是死锁。

然而,主线程 则在一旁围观,他们锁了,没碍着主线程。主线程无欲无求,继续干着活,编写这代码,默默地输出xuewei_account余额。

小结

这篇小短文,简单介绍了一下死锁,展示了一下。

死锁是发生在两个线程之间的。

具体一点,这是死锁发生的4个必要条件:

  • 互斥条件:资源任意时刻只有一个持有者;仅当资源持有者释放,资源才可再次被申请
  • 不可剥夺条件:资源持有者保持锁的唯一话语权,持有者不释放资源,其他人只能干等
  • 请求与保持条件:资源申请者保持对已持有锁的锁定状态(吃碗看锅)
  • 循环等待条件:两个或者以上竞争者头尾相接等待资源,A->B->A如此循环等待

所以,一个线程如果发生了(代码不小心写了个死循环acquire同一个锁),那不算死锁(不满足学术定义)

简单理解可以是,”单身狗谈何爱情“,死锁是发生在两个或者更多资源竞争者之间的。

十七、Condition类,解决死锁

前面介绍了死锁,Lock,Rlock,这篇我们介绍一下Condition

Condition 是什么?

这个类跟Lock和Rlock类似,但是多了wait/notify/notify_all(Java里面对应的是notifyAll)等方法,这个搞过Java的同学就非常熟悉了。

如下图,它的acquire和release方法沿用了传入的lock对象(或者默认的Rlock对象)。

屏幕快照 2022-01-26 上午12.07.06.png

所以这两个熟悉的方法,可以参考前面分享的Lock和Rlock文章。

学委这里重点讲述wait和notify,notify_all

模拟10个运动员等候信号,准备开跑!

我们看看下面的代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/1/23 11:19 下午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : thread_condition.py
# @Project : hello
import threading
import time

condition = threading.Condition()


def wait_and_run():
condition.acquire()
print("%s wait for signal " % threading.current_thread().name)
condition.wait()
print("% run" % threading.current_thread().name)
time.sleep(1)
print("% completed" % threading.current_thread().name)
condition.release()


threads = []
for i in range(1, 11):
t = threading.Thread(name="运动员" + str(i), target=wait_and_run)
threads.append(t)


def fire():
condition.acquire()
print("预备")
condition.notify_all()
condition.wait()
print("3...")
time.sleep(1)
print("2...")
time.sleep(1)
print("1...")
time.sleep(1)
print("fire")
condition.notify_all()



threading.Thread(name="主裁判", target=fire).start()
for t in threads:
t.start()
for t in threads:
t.join()

print("看热闹")

这个代码有10个运动员线程,他们都在等待一个信号,然后开始竞赛,跑起来。

我们看到线程后面有一个主裁判线程,他负责处罚fire函数。

我们看到fire函数,会执行notify_all, 唤醒所有等待的运动员。

屏幕快照 2022-01-26 上午12.13.58.png

问题来了?等半天,裁判咋不喊3/2/1啊?

你知道为什么吗?

这里展示的notify_all(下面的第二行,已经通知了所有运动员开始准备。

接着裁判程序就运行到condition.wait(), 这时候裁判线程跟其他运动员一样,都进入等待状态了。

问题来了,谁来喊裁判和所有运动员(开始跑)。

def fire():
condition.acquire()
condition.notify_all()
print("预备")
condition.wait()
//3/2/1/ 然后再notify_all

Condition类,相比Lock/Rlock更复杂。

它可以实现多个线程等待一个它条件,就像田径赛场上的主裁判一样。

如果我们打开这个类的源代码(最前面的一张截图),我们会发现其内部维护了一个Rlock(默认情况下)。

可以在创建Condition对象的时候,传入一个lock对象(可以是Lock或者Rlock)来修改。

限时等待 : wait(一个时长)

我们知道Condition对象调用wait之后,线程会进入等待。

当运动员和主裁判都在等待,场外也没有线程告诉主裁判可以开始喊

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/1/23 11:19 下午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : thread_condition.py
# @Project : hello
import threading
import time

condition = threading.Condition()
read_condition = threading.Condition()
counter = {"value": 0}


def wait_and_run():
condition.acquire()
print("%s wait for signal " % threading.current_thread().name)
counter['value'] += 1
condition.wait()
print("%s run" % threading.current_thread().name)
time.sleep(0.5)
print("%s completed" % threading.current_thread().name)
condition.release()


threads = []
for i in range(1, 11):
t = threading.Thread(name="运动员" + str(i), target=wait_and_run)
threads.append(t)


def fire():
condition.acquire()
print("预备")
condition.notify_all()
condition.wait(1)
print("3...")
condition.wait(1)
print("2...")
condition.wait(1)
print("1...")
print("fire")
condition.notify_all()
condition.wait(5)



threading.Thread(name="主裁判", target=fire).start()
print("看热闹")
for t in threads:
t.start()
for t in threads:
t.join()
print("比赛结束")

我们先看一下运行效果:

第一阶段,运动员等待(前篇做的)完之后裁判开始喊3/2/1。 因为wait方法等待时间达到。 屏幕快照 2022-01-27 上午12.00.06.png

然后进入第二阶段,在主裁判线程调用了notify_all方法之后,运动员开始跑起来。

屏幕快照 2022-01-26 下午11.59.56.png

运动员一个一个跑完,比赛结束了。

代码解析

这个代码跟前篇区别除了一些打印(print)输出,重点的改变就是wait函数传入时间。

通过这样设计,我们不需要第三个线程去通知主裁判继续喊。

这跟现实场景一样:赛场上,主裁判,观察完全部运动员准备好,然后就鸣枪(类似的3/2/1)。

小结

不知道有没有在看的读者朋友想到了这个实现?

面对田径运动场的场景,还有很多工具可以实现同等的场景,本篇旨在展示一个案例,各位可以通过学习这个代码,了解Condition类的使用。

喜欢Python的朋友,请关注学委的 Python基础专栏 or Python入门到精通大专栏

十八、拖延症患者Timer类

什么是Timer

准确点说,它是一个延迟执行的事件的单元(threading模块下的一个类),用来在指定时间后执行该事件。

使用上非常简单:

def do_something_after_new_year():
#新年不🉐️事事顺利,万事如意
pass

#Timer类,传入一个时间(秒数),第二个参数为到点执行的函数。
threading.Timer(60,do_something_after_new_year).start()

这样就设置了一个60秒后执行的程序了。 类似时间管理器,设置多少时间后干嘛

下面就是一个完整可执行的代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/1/27 11:16 下午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : timer_demo.py
# @Project : hello
import datetime
import threading


def do_after_5_second():
print("%s - thread %s " % (datetime.datetime.now(), threading.current_thread().name))
print("do something now ")


print("%s - thread %s " % (datetime.datetime.now(), threading.current_thread().name))
threading.Timer(5, do_after_5_second).start()

运行效果如下:

屏幕快照 2022-01-27 下午11.22.32.png

为了演示方便,这里就设置了等待间隔为5秒,程序很快就结束输出了。

在timer执行的函数中,它是另起一个线程。

Timer 还可以中途取消

我们查看源码可以看到:

屏幕快照 2022-01-27 下午11.33.21.png

这个类也没别的,还有一个cancel的方法。

那么,学委也准备了代码,展示一下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/1/27 11:16 下午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : timer_demo2.py
# @Project : hello
import datetime
import threading


def do_after_5_second():
print("%s - thread %s " % (datetime.datetime.now(), threading.current_thread().name))
print("do something now ")


def do_after_1_min():
print("%s - thread %s " % (datetime.datetime.now(), threading.current_thread().name))
print("do_after_1_min ")


def stop_another_timer(timer):
print("%s - thread %s " % (datetime.datetime.now(), threading.current_thread().name))
print("I will cancel another time")
print("before cancel - timer :", timer.is_alive())
timer.cancel()
print("after cancel - timer :", timer.is_alive())


print("%s - thread %s order a timer" % (datetime.datetime.now(), threading.current_thread().name))
threading.Timer(5, do_after_5_second).start()
print("%s - thread %s order another timer " % (datetime.datetime.now(), threading.current_thread().name))
timer = threading.Timer(60, do_after_1_min)
timer.start()
print("%s - thread %s order another timer " % (datetime.datetime.now(), threading.current_thread().name))
#读者可以注释下方代码,查看前面timer没被中途取消的执行结果。
threading.Timer(10, lambda: stop_another_timer(timer)).start()

下面是运行效果:

屏幕快照 2022-01-27 下午11.36.08.png

我们看到,cancel方法不会把线程的状态(通过is_alive()查看)改变。因为cancel方法改变的是Timer类维护的finished(Event类型,这个学委的另一篇文章也分享过)

最后等候时长的timer被取消了,没有后续执行,程序也就结束了。

小结

很明显,Timer就是一个拖延症患者啊,干啥事都得拖一拖。

十九、强迫症患者之Barrier类

这篇继续介绍threading库里面的类,threading.Barrier类。

很多人说它是一个栅栏,或者屏障,我更愿意说它是一个强迫症患者。

怎么说呢?它跟参与的线程都杠上了,设置了几个,它就必然等待几个人同时准备好,才肯放行!

什么是Barrier类

Barrier类是一个多线程协调工具,它让多个持有Barrier的线程互相等待。 直到满足Barrier实例指定的n个线程(parties参数数值),也就是达到n个线程都调用了Barrier对象的wait方法。 然后这些线程才能同时进入下一个阶段。

阅读Threading库的源码,可以看到原来这玩意是受到Java的CyclicBarrier的启发(这个学委以前也经常用到)才引入到Python中来的。

所以说,很多语言其实是互相促成,互相成长的(互相吸收精华,消除自身劣势的)。

我们作为开发者,应该是看哪个语言在哪个场景下工作最适合来选择。

而非不假思索,就一口咬定,Python就是比啥啥啥好,Java就是比啥啥啥好。

从这些语言的发展,我们应该学习到的是:取长补短,见贤思齐!(这才是最有生命力,最有活力的)

屏幕快照 2022-01-28 下午11.33.23.png

不小心扯远了。

我们看看它的构造方法:

def __init__(self, parties, action=None, timeout=None):
"""Create a barrier, initialised to 'parties' threads."""
self._cond = Condition(Lock())
self._action = action
self._timeout = timeout
self._parties = parties
self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
self._count = 0
复制代码

学委删除了一下参数的注释,意思是这样的thread.Barrier(parties=带屏蔽的线程数量)

其他参数后面的文章还会说。

下面准备了一个程序:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/1/28 11:44 下午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : testthreadingbarrier1.py
# @Project : hello
import threading

barrier = threading.Barrier(10)


def prepare_before_run(barrier):
print("运动员准备 - %s" % threading.current_thread().name)
barrier_counter = barrier.wait()
print("remaining: %s" % barrier_counter)


threads = []
for i in range(9):
t = threading.Thread(name="运动员" + str(i + 1), target=prepare_before_run, args=(barrier,))
threads.append(t)

for t in threads:
t.start()
for t in threads:
t.join()

print("学委的Demo程序结束")
复制代码

这个程序创建了参与线程数位10的一个Barrier对象。然后启动了9个线程获取Barrier对象,准备好后,执行wait方法。

运行效果如下:

屏幕快照 2022-01-28 下午11.54.52.png

如Barrier所设计的功能一样,所有运动员线程都进入等待。

他们在等待第十个运动员到场(并执行wait函数),才开始竞赛。

如果读者把代码中的线程数9改为10,那么程序不会一直等待。

Barrrier保障了全员到齐之后,才放行的效果。

屏幕快照 2022-01-29 上午12.00.51.png

感受到了Barrier的特殊用途了吧。

再次阅读一下Barrier类的构造方法(如下),我们看到,它内部持有了Condition对象和_count, _parties。

基本上Barrier利用了这三个实现了一个等待释放的机制,下面看看wait源码。

另外,还有其他参数像action,这个会再整个Barrier被release之前由某一个线程调用。

def __init__(self, parties, action=None, timeout=None):
self._cond = Condition(Lock())
self._action = action
self._timeout = timeout
self._parties = parties
self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
self._count = 0
复制代码

学委准备了下面这个小demo方法,读者可以在前面分享的代码中贴上。

def post_action():
print("运动员调用了barrier释放后的方法- %s" % threading.current_thread().name)
barrier = threading.Barrier(parties=10,action=post_action)
复制代码

重新运行代码可以看到,只有某一个运动员线程(哪个运动员是随机的,看实际项目中是那个线程最后调用wait方法的)执行了post_aciton方法。

下面是某一次执行的效果展示:

屏幕快照 2022-01-29 下午11.58.37.png

重点是这个wait方法

它像Condition一样也支持timeout(也就是线程数量凑不够,在一个固定等待延迟后强制执行wait后面的代码)。

def wait(self, timeout=None):
if timeout is None:
timeout = self._timeout
with self._cond:
self._enter() # Block while the barrier drains.
index = self._count
self._count += 1
try:
if index + 1 == self._parties:
# We release the barrier
self._release()
else:
# We wait until someone releases us
self._wait(timeout)
return index
finally:
self._count -= 1
# Wake up any threads waiting for barrier to drain.
self._exit()
复制代码

这段代码也非常直观,每一个线程调用一次wait方法,Barrier对象持有的_count自增1(从0开始)。

知道_count 为_parties-1的时候,Barrier调用_release释放内部持有的锁。

好,继续看一下_release 函数,这个更加简单了,重点我们看到:self._cond.notify_all(),这句执行了。

它调用了notify_all函数,释放了其他调用的condition.wait方法。

这就是Barrier类的内部实现。

def _release(self):
try:
if self._action:
self._action()
# enter draining state
self._state = 1
self._cond.notify_all()
except:
#an exception during the _action handler. Break and reraise
self._break()
raise
复制代码

另外,这里我们也看到_action方法被调用了,这也验证了学委上面贴上的demo 运行图,10号运动员打印了post_action方法。

补充一下with方式的使用Condition,也可以应用在Lock/Rlock对象。

像下面的代码:

with xuewei_lock:
do_something()
复制代码

等效于:

xuewei_lock.acquire()
do_something()
xuewei_lock.release()
复制代码

这种风格让这个锁的使用看起来更加优雅,也不容易漏了。

毕竟有时候方法不小心写多了,后面容易就给忘记了,特别是多人协作的项目上。(后来者非常有可能在项目非常紧急的情况下,修改代码导致某些情况锁没有被释放,产生系统故障,这就非常可惜。)

总结

Barrier类内部使用了Condition这个资源控制单元来实现,加上一个阀值和计数器变量,实现多线程‘屏障’功能。

二十、Threading.local() 线程的本地数据

什么是线程的本地数据?

线程的本地数据,就是专属于特定一个线程的数据。

再看看怎么创建线程本地数据,创建方式为threading.local(),学委准备了如下代码:

tldata = threading.local()
tldata.slogan = "持续学习,持续开发,我是雷学委!"


# Call by below code will get no attribute exception
# AttributeError: '_MainThread' object has no attribute 'slogan'
print("current thread slogan:%s ",threading.current_thread().slogan)
复制代码

我们这里直接运行,获取当前线程,发现不能直接线程对象 + “.slogan" 的方式获取属性。

这里说明threading.local()的后续修改不会修改线程的属性。

好,下面代码,就展示到了两个线程获取各自的本地数据。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/1/30 11:36 下午
# @Author : LeiXueWei
# @CSDN/Juejin/Wechat: 雷学委
# @XueWeiTag: CodingDemo
# @File : threading_local.py
# @Project : hello
import threading

tldata = threading.local()
tldata.slogan = "持续学习"


# Call by below code will get no attribute exception
# AttributeError: '_MainThread' object has no attribute 'slogan'
# print("current thread slogan:%s ",threading.current_thread().slogan)


def print_thread_data():
print("学委demo-thread %s : tldata=%s " % (threading.current_thread().name, tldata))
current_tl_data = threading.local()
print("学委demo-thread %s : current_tl_data=%s " % (threading.current_thread().name, current_tl_data))



print("thread %s : tldata:%s " % (threading.current_thread().name, tldata))
t1 = threading.Thread(name="another thread", target=print_thread_data)
t1.start()

print("thread %s : tldata:%s " % (threading.current_thread().name, tldata))
复制代码

运行效果如下:

屏幕快照 2022-01-30 下午11.54.17.png

从运行结果,我们可以发现,tldata这个属于主线程的本地数据,是可以被传递到前提线程的target调用函数内的。

这没啥问题。

重点是,我们在t1线程内执行threading.local()获取到的current_tl_data,并不跟tldata相同(它们并不指向同个内存地址。

至此,已经展示了threading.local()创建本地数据的特点了。

每个线程内调用这个方法,获取的是本线程专属的本地数据。

当然,如果我们把主线程的本地数据的变量,传递到其他线程,被它们修改了,那么肯定是会被修改的。这个没毛病。

所以,使用本地数据,我们更多是希望这份专属的数据只被持有的线程操纵。