其他分享
首页 > 其他分享> > 第五十五篇 死锁、GIL锁以及Pool

第五十五篇 死锁、GIL锁以及Pool

作者:互联网

目录

一、死锁

1.死锁现象

1.定义:死锁指的是某个资源被占用后一直得不到释放,导致其他需要这个资源的线程进入阻塞状态

2.产生死锁的原因:

from threading import Lock, Thread
import time
mutex = Lock()
# 加锁两次,没有释放,程序无法向下执行
mutex.acquire()
mutex.acquire()
w = Lock()
k = Lock()

def eat1():
    # 抢到了其中一把锁
    w.acquire()
    time.sleep(0.2)   # 睡眠一下
    # 由于时间太慢无法抢到第二把锁,而第一把锁也没有释放,所以两个线程都会等待
    k.acquire()
    print('我  开吃了')
    k.release()
    w.release()
    
def eat2():
    # 抢到了另一把锁
    k.acquire()
    time.sleep(0.2)
    # 只有同时拥有两把锁才能向下执行,等待对方释放
    w.acquire()
    print('你  开吃了')
    k.release()
    w.release()
    
t1 = Thread(target=eat1)
t2 = Thread(target=eat2)

t1.start()
t2.start()

# 最终都无法执行

3.解决方法:

l = Lock()
l.acquire()
l.acquire(timeout=3) 

2.递归锁(可重入锁)

import time
from threading import Thread, RLock, currentThread
# currentThread = current_thread
r = RLock()

1.特点:同一个线程可以对这个锁执行多次acquire,而不会造成卡死

# 对同一把锁加锁两次
r.acquire()
r.acquire()
print('递归锁,可以锁多次')  # 任然可以执行全部代码

2.注意:同一个线程必须保证,加锁的次数和解锁的次数相同,其他线程才能抢到这把锁

def task1():
    # 加锁几次就解锁几次,以便后面的线程使用控制台资源
    time.sleep(2)
    r.acquire()
    r.acquire()
    print(currentThread().name)
    r.release()
    r.release()
    
def task2():
    time.sleep(2)
    r.acquire()
    r.acquire()
    print(currentThread().name)
    r.release()
    r.release()

# 多线程并发,谁先抢到锁,谁就先执行
Thread(target=task1).start()
Thread(target=task2).start()

3.信号量

1.信号量可以限制同时并发执行公共代码的线程数量
2.如果限制数量为1,则与普通互斥锁没有区别
3.注意:信号量不是用来解决安全问题的,而是用于限制最大的并发量

from threading import Semaphore, currentThread, Thread
import time

# 限制同时访问公共代码的线程数量为5
sp = Semaphore(5)

def task():
    sp.acquire()
    time.sleep(1)
    print(currentThread().name)
    s.release()
    
for i in range(10)
    # 开启十个线程,但是同一时间只有5个线程可以共同使用公共代码
    Thread(target=task).start()

二、GIL(全局解释器锁)

1.什么是GIL

1.GIL(全局解释器锁):==在CPython中,防止多个线程在同一时间执行python字节码的一个互斥锁==

2.特点:GIL是非常重要的,因为CPython的内存管理是非线程安全的,很多其他特性都依赖于GIL锁,所以即使它影响了程序效率,也无法将其去除

3.总结:在CPython中,GIL会把线程的并行变成并发,导致效率降低

4.延申:需要知道的是,解释器并不只有CPython,还有PyPy 、JPython等等。GIL也仅存在于CPython中,这并不是python语言的问题,而是CPython解释器的问题

2.GIL带来的问题

1.单个线程开启流程

1.执行python文件的三个步骤:

2.注意:每当执行一个py文件,就会立即启动一个python解释器

3.执行python文件时的内存结构图,如下

2.多个线程开启流程

1.GIL叫做全局解释器锁,是用于加到解释器上的一把互斥锁,那么这把锁就会对应用程序有所影响

2.py文件中的内容本质都是字符串,只有在被解释器解释时,才具备语法意义,解释器会将py代码翻译为当前系统支持的指令交给系统去执行

3.当进程中仅存在一条线程时,GIL锁的存在不会影响效率,但是如果进程中有多个线程时,GIL锁就会发挥它的作用了

4.解释:

3.为什么需要GIL

1.GC线程

1.python程序(进程:python.exe)本质上就是一堆字符串,所以运行一个python程序时,必须开启一个解释器,但是在一个python程序中只有一个解释器,当有多个线程要执行时,就会产生线程安全问题

2.是不是我们不开启子线程就没有问题呢,答案是否定的。在使用python解释器编程时,程序员无需参与内存的管理工作,这是因为python有自带的内存管理机制,简称GC

3.python中的垃圾回收就是GC参与完成的,当内存占用达到某个阈值时,GC会将其他线程挂起,然后执行垃圾清理操作,执行这个操作的GC本身也是一串代码,也即需要开启一个线程来执行

4.也就是说,就算程序没有自己开启线程,内部也会有多个线程,GC线程与我们程序中的线程竞争解释器资源,就会产生安全问题

示例:
   1.假设线程A要定义一个变量 a = 100,那么它的步骤是:1.先申请一块内存空间,并把数据100放进去;2.将100的内存地址与变量名a进行绑定,引用计数加一
   2.如果线程A进行到第一步完成时,CPU切换到了GC线程,GC发现100的地址的引用计数为0,就会将它当成垃圾清理掉,等CPU再次切换到线程A时,刚刚保存的数据100就没有了,导致定义变量失败
   3.当然其他一些涉及到内存的操作同样可能产生问题,为了避免GC与其他线程竞争解释器带来的安全问题,CPython简单粗暴的给解释器加了互斥锁

2.GIL带来的问题

1.==互斥锁的特性使得多线程无法并行,只能并发==

2.详细解释:GIL是以把互斥锁,互斥锁只能让线程来回切换,导致效率降低,因此,在CPython中即使开启了多线程,而且是多核CPU,也是无法执行多线程并行的,因为在一个进程中只有一个解释器,而且同一时间只能有一个任务在执行(由于GIL锁的缘故)

3.如何解决GIL锁导致的效率问题

4.我们不能因为CPython对于多线程无法实现并行,就否定python这门语言,因为:

5.对于GIL锁产生的问题的总结:

3.GIL锁的作用

==有了GIL之后,多个线程将不可能在同一时间使用解释器,从而保证了解释器的数据安全,因此CPython中的内存管理就是线程安全的了==

4.关于GIL的性能

1.GIL的加锁与解锁时机

1.加锁时机:在调用解释器时立即加锁

2.解锁时机:

2.性能测试

from multiprocessing import Process
from threading import Thread
import time

1.IO密集型(如浏览网页)

# 读写文件的操作也是IO操作,和输入输出类似
def task():
    # 利用多线程/进程循环打开文件
    for i in range(150):
        with open(r'test.py', 'r', encoding='utf-8') as f:
            f.read()


# 记录进程开始的时间
start_time = time.time()
    
tl = list()   # 用于将实例化的进程对象/线程对象放入容器
for i in range(10):
    # p = Process(target=task)  # 使用多进程时,必须在main判断下进行
    t = Thread(target=task)
    t.start()
    tl.append(t)
        
    # 遍历列表
    for j in tl:
        j.join()  # 无论是哪个进程对象/线程对象,主进程/主线程都会等待它们执行完,再运行自身
        
    # 计算从进程开始到进程结束所花费的时间
    print(time.time() - start_time)

2.计算密集型(比如人脸识别、图像处理)

def task():
    for i in range(1000000):
        a = 6 + 6
        
if __name__ == '__main__':
    start_time = time.time()
    
    pl = list()
    for i in range(6):
        p = Process(target=task)
        # t = Thread(target=task)   # 多线程不需要在main判断下进行
        p.start()
        pl.append(p)
        
    for j in pl:
        j.join()
        
    print(time.time() - start_time)

3.GIL与自定义锁的区别

1.GIL保护的是解释器级别的数据安全,比如对象的引用计数、垃圾分代数据

2.自定义锁保护的是解释器之外的共享资源的安全,比如硬盘上的文件、控制台,所以当程序中出现了共享自定义的数据时,就需要自己加锁

from threading import Thread, Lock
import time   # 为了模拟多线程竞争共享资源而导致数据错乱,需要导入时间模块来控制两个线程的速度

a = 0
mutex = Lock

def task():
    # mutex.acquire()
    global a 
    temp = a 
    # 如果不加锁,则两个线程都会运行到这里等待,它们获取的temp都是0,因为第一个线程还没有改变a的值
    time.sleep(0.1)  
    a = temp + 1
    # mutex.release()
    
t1 = Thread(target=task)
t2 = Thread(target=task)

t1.start()
t2.start()

t1.join()
t2.join()

print(a)

# 如果不加锁,则a为1;加锁之后由于每次只有一个线程会执行共享代码(数据),所以a的值会被加两次,则a为2

三、线程池与进程池

import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import activeCount, enumerate, currentThread

1.线程池

1.池表示一个容器,线程池本质上就是一个存储线程的列表

2.如果是IO密集型任务,则使用线程池

# 创建一个线程池,指定最多可以容纳的线程数量
pool = ThreadPoolExecutor(10)  # 自定义一个最多可以容纳10个线程的线程池(如果不指定,则默认为CPU的个数乘以5)

def task():
    time.sleep(1)   # 让多个线程在同一起跑线,并打印自己的名字
    print(currentThread().name)
    
# 提交任务到池子中
pool.submit(task)
pool.submit(task)
pool.submit(task)

print(active_count()) # 存活的线程数量(包括主线程)
print(enumerate())  # 当前存活线程信息(列表形式)

'''
4
[<_MainThread(MainThread, started 19612)>, <Thread(ThreadPoolExecutor-0_0, started daemon 42040)>, <Thread(ThreadPoolExecutor-0_1, started daemon 47792)>, <Thread(ThreadPoolExecutor-0_2, started daemon 47232)>]
ThreadPoolExecutor-0_1
ThreadPoolExecutor-0_0
ThreadPoolExecutor-0_2
'''

2.进程池

def task():
    time.sleep(1)
    print(os.getpid(), os.getppid())

if __name__ == '__main__':
    pool = ProcessPoolExecutor(5)  # 如果不指定个数,则默认为CPU的个数
    pool.submit(task)
    pool.submit(task)
    pool.submit(task)
    pool.submit(task)
    pool.submit(task)
    print(os.getpid())
    
'''
51012
50860 51012
50512 51012
51024 51012
50540 51012
51180 51012
'''

3.线程池与进程池

1.为什么要使用线程池/进程池

2.注意:如果进程不结束,池子里面的进程/线程也会一直存活

四、同步与异步

1.回顾

1.程序的运行状态:阻塞与非阻塞

2.处理任务的方式:并行、并发、串行

3.提交任务的方式:同步、异步

2.同步

1.同步(指的是调用):提交任务后必须在原地等待,直到任务结束才能执行下面的代码

2.同步会有等待的效果,但是和阻塞完全不同,阻塞时程序会被剥夺CPU执行权,而同步调用则不会

def task():
    for i in range(1000000):
        6 + 6

print('start...')
task()  # 不是阻塞,而是在进行大量的计算,称为同步执行
print('end')  # 要等到上一行代码执行完毕

3.异步

1.异步相关概念

1.异步(异步调用):发起任务后不用等待任务执行完毕,可以立即开启执行其他操作

2.异步效率高于同步,但是会出现另一个问题,就是任务发起方不知道任务合适处理完成

2.解决异步无法知晓任务状态的问题

1.轮询:每隔一段时间就询问一次(效率低、无法及时获取结果)

# 不推荐轮询的方法
from threading import Thread
import time

is_start = False

def server_task():
    global is_start
    print('服务器正在启动...')
    time.sleep(2)
    print('服务器启动成功')
    is_start = True
    
def client_task():
    while True:
        time.sleep(0.2)
        if is_start:
            print('连接成功')
            break
        else:
            print('请耐心等待...')
            
t1 = Thread(target=server_task)
t2 = Thread(target=client_task)

t1.start()
t2.start()

print('异步——轮询方法')

2.异步回调:让任务的执行方主动通知任务的执行状态(可以及时拿到任务的结果)

from threading import Thread

# 具体的任务
def task(callback):
    print('子线程start')
    for i in range(1000000):
        6 + 7
    
    callback('子线程end')
    
# 回调函数(参数是表示任务的结果)
def call_back(res):
    print(res)
    
print('主线程 start')
t = Thread(target=task, args=(call_back,))
t.start()
print('主线程 end')
from concurrent.futures import ThreadPoolExecutor
import time

# 具体的任务
def task():
    time.sleep(2)
    print('子线程end')
    return 'ok'
    
# 回调函数(参数是表示任务的结果)
def call_back(arg):
    print(arg)   # <Future at 0x1345dd8e9b0 state=finished returned str>
    print(arg.result())  # ok

pool = ThreadPoolExecutor(10)
res = pool.submit(task)  # 异步提交方式
print(res)
# print(res.result()) # result是阻塞函数,它会阻塞到任务执行完毕为止
res.add_done_callback(call_back)  # 为这个任务绑定回调函数

print('主线程 end')


# 使用案例
def task(num):
    time.sleep(1)
    print(num)
    return 'ok'  # 返回值就包含在res这个对象中
    
def callback(obj): 
    print(obj.result())   # 绑定的回调函数会接收返回值对象res,它是一个对象,只能通过打印 对象.result() 才能得到任务的返回值
    
pool = ThreadPoolExecutor()
res = pool.submit(task, 666)  # res接收的是一个返回值对象(区别于函数返回值)
res.add_done_callback(callback)

print('over')

3.异步回调详解

1.定义:在发起一个异步任务的同时指定一个函数,在异步任务完成时会自动的调用这个函数

2.为什么需要异步回调

3.总结

4.注意:

5.异步回调的应用

import requests,re,os,random,time
from concurrent.futures import ProcessPoolExecutor

def get_data(url):
    print("%s 正在请求%s" % (os.getpid(),url))
    time.sleep(random.randint(1,2))
    response = requests.get(url)
    print(os.getpid(),"请求成功 数据长度",len(response.content))
    #parser(response) # 3.直接调用解析方法  哪个进程请求完成就那个进程解析数据  强行使两个操作耦合到一起了
    return response

def parser(obj):
    data = obj.result()
    htm = data.content.decode("utf-8")
    ls = re.findall("href=.*?com",htm)
    print(os.getpid(),"解析成功",len(ls),"个链接")

if __name__ == '__main__':
    pool = ProcessPoolExecutor(3)
    urls = ["https://www.baidu.com",
            "https://www.sina.com",
            "https://www.python.org",
            "https://www.tmall.com",
            "https://www.mysql.com",
            "https://www.apple.com.cn"]
    # objs = []
    for url in urls:
        # res = pool.submit(get_data,url).result() # 1.同步的方式获取结果 将导致所有请求任务不能并发
        # parser(res)

        obj = pool.submit(get_data,url) # 
        obj.add_done_callback(parser) # 4.使用异步回调,保证了数据可以被及时处理,并且请求和解析解开了耦合
        # objs.append(obj)
        
    # pool.shutdown() # 2.等待所有任务执行结束在统一的解析
    # for obj in objs:
    #     res = obj.result()
    #     parser(res)
    # 1.请求任务可以并发 但是结果不能被及时解析 必须等所有请求完成才能解析
    # 2.解析任务变成了串行,

五、线程事件Event

1.什么是事件

1.事件表示在某个时间发生了某个事情的通知信号,用于线程间的协同工作

2.作用:因为不同线程之间是独立运行的状态,不可预测,所以一个线程与另一个线程间的数据是不同步的,当一个线程需要利用另一个线程的状态来确定自己的下一步操作时,就必须保持线程间数据的同步,Event就可以实现线程间同步

2.Event相关概念

1.Event对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生
2.在初始情况下,Event对象中的信号标志被设置为假,如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞,直到该标志为真
3.一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程
4.如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行

3.Event对象的方法

event.isSet():返回event的状态值;等价于event.is_set()
event.wait():将阻塞线程;直到event的状态为True
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。

4.如何使用Event

通过wait函数阻塞当前线程,直到Event对象的状态从False变为True


from threading import Thread, Event
import time

e = Event()  # 实例化一个事件对象,它的初始值时False

def start_server():
    print('server loading...')
    time.sleep(2)
    print('server start')
    e.set()  # 当服务器线程执行完时,事件对象通过set方法将其状态(bool值)标为True
    
def connect_task():
    e.wait()  # 在并发的多线程中,连接线程由于事件对象的wait方法,会一直处于阻塞状态,直到事件对象的bool值为True时,才会变为非阻塞
    if e.is_set():  # is_set方法可以获取事件对象的状态
        print('connect sucessful')
        
t1 = Thread(target=start_server)
t2 = Thread(target=connect_task)

t1.start()
t2.start()

标签:解释器,第五十五,死锁,task,线程,time,print,GIL,Pool
来源: https://www.cnblogs.com/itboy-newking/p/11185355.html