编程语言
首页 > 编程语言> > Python_并发编程总结

Python_并发编程总结

作者:互联网

Python 并发编程

文章目录

一、前言

1. 为什么要搞并发编程

在大学阶段,科班出身的应该都学过《操作系统》这门课程,里面花了大篇幅的时间 去介绍进程、线程、并发、并行等概念。那么,并发编程需要各位的操作系统基础,本文不会对操作系统的部分进行过多的解释。

在这里,我们只简单的告诉大家为什么要搞并发编程。举一个例子:有一大堆砖需要搬,一个人干肯定要花很多时间(因此,如果你只会 if……elif,那么很多时候,真的会拖慢运行速度)。多线程就好比是增加了人手,所以干的快。简而言之:就是为了增加运行速度

而随着编程的不断深入,为了使运行速度更快,并发编程几乎是任何程序员都绕不过去的东西。

2. 增加运行速度的方法

在现实当中,我们有很多办法来提升程序的运行速度,比如说:我可以对某个算法进行优化,但是,这种优化往往比较有限。因此,我们可以采用多线程,多 CPU,以及多机器并行等方式。

其中,多机器并行当中,就包括我们常常听说的大数据之类的东西,包括 Spark,Hadoop 等。而大数据是一个单独的学科,非常值得研究,因此在这里,我们只介绍一下跟并发编程相关的东西。

3. 实现 Python 并发的方式

主要有三个:多线程,多进程,多协程。学过《操作系统》的同学,应该对这三个概念都不陌生。如果要了解这三个概念之间到底是什么关系,我们首先得了解一下 CPU 密集型,以及 I/O 密集型

3.1 程序分类

3.1.1 CPU 密集型(CPU-bound)

其实通过英文翻译,不难发现,英文和中文翻译还是有点意思上的差异的。所谓的 CPU-bound,其实就是说:运行的速度最终会受到 CPU 计算的限制。因此 CPU-bound 有些时候也被翻译成计算密集型。具体就是指:I/O 相对很少,但是却需要 CPU 进行非常大的计算处理,因此,CPU 占用率非常的高。比如说:压缩、解压缩;加密 / 解密;正则表达式搜索等等。

3.1.2 I/O 密集型(I/O bound)

这个是与 CPU-bound 相对的概念。与 CPU-bound 相反,此类程序计算相对较少,但是要花费大量的时间在 I/O 上面,CPU 占用率低,但是运行速度却怎么也提不上去。这个时候就要考虑异步 I/O(异步 IO 其实属于多协程)。那么此类程序包括:文件处理程序、爬虫程序、数据库的读取程序等。

3.2 多线程,多进行,多协程的对比

在《操作系统》这门课程当中,科班的同学大体都比较熟悉这几段话:

如果用一幅图,来表示这三者的关系,那么就是:

但是,除了上面这些东西,还应当了解:

3.3 如何选择

那么针对以上这些情况,Python 都为我们提供了什么方法呢?

4.Python 的全局解释器锁

为什么要介绍这个呢?因为它是 Python 运行比较慢的重要原因,当然了,除了底层封装的原因,Python 本身边解释,边运行的特点,也注定 Python 确实比较慢。

全局解释器锁:(Global Interpreter Lock,简称:GIL)。由于并发,并行引发的进程或者线程的不同步,因此,需要一个机制让各个进程的运行保持同步。这就是 GIL 诞生的原因。但是,它比较简单粗暴,它使得任何时刻仅有一个线程在执行。即使是多核处理器,GIL 也是如此

即:在计算的时候,开启 GIL,I/O 的时候,关闭 GIL

Python 之所以引入这么一个 GIL,是历史遗留问题。由于 Python 最初就是用来做数据处理的,既然用来计算数据,我们并不希望由于进程或者线程不同步而引发数据计算出现结果不一致的问题,所以才引入了 GIL 机制,但是,随着 Python 的发展,Python 应用的领域越来越多,网页编程,游戏开发等也会用 Python,这个情况下,GIL 反而成了累赘。

因此,在 Python 的并发编程当中,我们更多的还是要针对 I/O,如果把多线程用于 CPU 密集型计算,由于 GIL 的存在,反而会拖慢速度。

与此同时,Python 的开发者们也意识到了这个问题,于是就想到了一个办法:既然 GIL 只是针对线程的。那么,我用多进程不就可以了。所以 Python 才会出现 multiprocessing

二、多进程编程

1. 进程创建步骤

进程的创建大致分为如下几个步骤:

2. 多进程演示

import time
import multiprocessing

# sing
def sing():
  for i in range(3):
      print("sing……")
      time.sleep(0.5)
       
# dance
def dance():
  for i in range(3):
      print("dance……")
      time.sleep(0.5)

if __name__ == "__main__":
  #创建进程
  sing_process = multiprocessing.Process(target = sing)
  dance_process = multiprocessing.Process(target = dance)
   
  #启动进程
  sing_process.start()
  dance_process.start()

那么,Process 当中的字典或者元组参数呢?

import time
import multiprocessing

# sing
def sing(num, name):
  for i in range(num):
      print("sing……")
      time.sleep(0.5)
       
# dance
def dance(num,name):
  for i in range(num):
      print("dance……")
      time.sleep(0.5)

if __name__ == "__main__":
  #创建进程
  sing_process = multiprocessing.Process(target = sing,args = (3,'xiaomi'))
  dance_process = multiprocessing.Process(target = dance,kwargs = {'name':'xiaohong','num':2})
   
  #启动进程
  sing_process.start()
  dance_process.start()

那么上面这个程序就相当于,给 sing,dance 加了主语,并且还限定的循环次数。那么上面这个程序的运行结果,就是 xiaomi sing…… 运行三次,xiaohong dance…… 运行两次。

注意:

3. 获取进程编号

在现实开发当中,往往可能并发程度很高。所以,进程数量就会很多。如果没有办法区分父进程,子进程,那么势必就会造成混乱。于是,进程当都要赋予他们编号(也就是《操作系统》当中经常提及的 pID),方便管理。

获取进程主要有两种方法:

import time
import multiprocessing
import os


# sing
def sing(num, name):
  print("sing进程编号:", os.getpid())
  print("sing父进程:", os.getppid())
  for i in range(num):
      print(name + " sing……")
      time.sleep(0.5)


# dance
def dance(num, name):
  print("dance进程编号:", os.getpid())
  print("dance父进程:", os.getppid())
  for i in range(num):
      print(name + " dance……")
      time.sleep(0.5)


if __name__ == "__main__":
  print("主进程id", os.getpid())
  # 创建进程
  sing_process = multiprocessing.Process(target=sing, args=(3, 'xiaomi'))
  dance_process = multiprocessing.Process(target=dance, kwargs={'name': 'xiaohong', 'num': 2})

  # 启动进程
  sing_process.start()
  dance_process.start()

输出结果:

主进程id 19144
dance进程编号: 17128
sing进程编号: 25512
sing父进程: 19144
xiaomi sing……
dance父进程: 19144
xiaohong dance……
xiaomi sing……
xiaohong dance……
xiaomi sing……

我们只看各个进程的 id,会发现 sing 和 dance 是两个不同的进程,拥有两个不同的进程编号。但是这两个都属于一个父进程:19144

4. 一些要点

首先,默认情况下,主进程会在所有子进程都执行完毕后,才会关闭,我们用以下代码验证一下:

import time
import multiprocessing
import os

def work():
for i in range(3):
print('working')
time.sleep(0.2)

if __name__ == '__main__':
work_process = multiprocessing.Process(target = work)
work_process.start()
# 主进程等待一秒钟
time.sleep(1)
print("主进程finish")

输出结果:

working
working
working
主进程finish

如果,我不想这样呢?我想主进程一结束,子进程也要跟着结束。这个时候就需要另外一个东西:主进程守护。要点只有一个,看代码:

import time
import multiprocessing
import os

def work():
for i in range(20):
print('working')
time.sleep(0.2)

if __name__ == '__main__':
work_process = multiprocessing.Process(target = work)
work_process.daemon = True # 这句话一定要在进程启动之前设置
work_process.start()
# 主进程等待一秒钟
time.sleep(1)
print("主进程finish")
working
working
working
working
working
主进程finish

我们看输出结果,我们设置循环了 20 次,如果还是默认情况下,一定会执行 20 次,但是设置了守护主进程,主进程已结束,work 也就连带着不执行了。

5.Process() 的常用方法

我们假设,我们创建了一个进程 p,那么:

is_alive()如果 p 仍然运行,返回 True
join([timeout]) 等待进程 p 终止。Timeout 是可选的超时期限,进程可以被链接无数次,但如果连接自身则会出错
run() 进程启动时运行的方法。默认情况下,会调用传递给 Process 构造函数的 target。定义进程的另一种方法是继承 Process 类并重新实现 run() 函数
start() 启动进程,这将运行代表进程的子进程,并调用该子进程中的 run() 函数
terminate() 强制终止进程。如果调用此函数,进程 p 将被立即终止,同时不会进行任何清理动作。如果进程 p 创建了它自己的子进程,这些进程将变为僵尸进程。使用此方法时需要特别小心。如果 p 保存了一个锁或参与了进程间通信,那么终止它可能会导致死锁或 I/O 损坏

这当中,join 方法好像比较抽象,我们用代码演示一下:

import time
import multiprocessing

# sing
def sing(num, name):
for i in range(num):
print(name + " sing……")
time.sleep(0.5)


# dance
def dance(num, name):
for i in range(num):
print(name + " dance……")
time.sleep(0.5)


if __name__ == "__main__":

sing_process1 = multiprocessing.Process(target=sing, args=(2, 'xiaomi'))
sing_process2 = multiprocessing.Process(target = sing, args = (3,'xiaoGang'))
sing_process3 = multiprocessing.Process(target=sing, args = (4,'xiaoLi'))
dance_process = multiprocessing.Process(target=dance, kwargs={'name': 'xiaohong', 'num': 2})

# 启动进程
sing_process1.start()
sing_process2.start()

# 下面这两个运行之前,cpu里面已经有两个进程了
sing_process1.join() # sing_process1执行完了,再往下执行
sing_process2.join() # sing_process2执行完了,再往下执行

sing_process3.start()
dance_process.start()

输出结果:

xiaomi sing……
xiaoGang sing……
xiaomi sing……xiaoGang sing……

xiaoGang sing……
xiaoLi sing……
xiaohong dance……
xiaoLi sing……
xiaohong dance……
xiaoLi sing……
xiaoLi sing……

我们可以看到,在 xiaomi 执行了两次,xiaoGang 执行了三次之后,才执行的 dance 和 xiaoLi 的唱歌

6. 进程池

在现实当中,往往进程会很多,几百个的情况非常多见,这个时候,恐怕你再用以上的编程方式,就会累的要死,这个时候,我们就需要 Pool 来对进程进行相关的代码管理了。

6.1 进程池简介

Pool(
processes: Optional[int] = ...,
initializer: Optional[Callable[..., Any]] = ...,
initargs: Iterable[Any] = ...,
maxtasksperchild: Optional[int] = ...,
)

其中 processes 是要创建的进程数。如果省略此参数,将使用 cpu_count() 的值。Initializer 是每个工作进程启动时要执行的可调用对象。Initargs 是要传递给 initializer 的参数元祖。Initializer 默认为 None。

6.2 进程池的相关方法

方法描述
apply(func [,args [,kwargs]]) 在一个池工作进程中执行函数(*args,**kwargs),然后返回结果。
apply_async(func [, args [,kwargs [,callback] ] ]) 在一个池工作进程中异步地执行函数(*args,**kwargs),然后返回结果。此方法的结果是 AsyncResult 类的实例,稍后可用于获得最终结果。Callback 是可调用对象,接受输入参数。当 func 的结果变为可用时,将立即传递给 callback。Callback 禁止执行任何阻塞操作,否则将阻塞接收其他异步操作中的结果
close() 关闭进程池,防止进行进一步操作。如果还有挂起的操作,它们将在工作进程终止之前完成
join() 等待所有工作进程退出。此方法只能在 close()或者 terminate()方法之后调用
imap(func,iterable [ ,chunksize] ) map()函数的版本之一,返回迭代器而非结果列表
imap_unordered(func,iterable [,chunksize] ) 同 imap()函数一样,只是结果的顺序根据从工作进程接收到的时间任意确定
map(func,iterable [,chunksize] ) 将可调用对象 func 应用给 iterable 中的所有项,然后以列表的形式返回结果。通过将 iterable 划分为多块并将工作分派给工作进程,可以并行地执行这项操作。chunksize 指定每块中的项数。如果数量较大,可以增大 chunksize 的值来提升性能
map_async(func,iterable [,chunksize [,callback]] ) 同 map()函数,但结果的返回是异步的。返回值是 AsyncResult 类的实例,稍后可用与获取结果。Callback 是指接受一个参数的可调对象。如果提供 callable,当结果变为可用时,将使用结果调用 callable
terminate() 立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果 p 被垃圾回收,将自动调用此函数
get([ timeout] ) 返回结果,如果有必要则等待结果到达。Timeout 是可选的超时。如果结果在指定时间内没有到达,将引发 multiprocessing.TimeoutError 异常。如果远程操作中引发了异常,它将在调用此方法时再次被引发
ready() 如果调用完成,则返回 True
sucessful() 如果调用完成且没有引发异常,返回 True。如果在结果就绪之前调用此方法,将引发 AssertionError 异常
wait([timeout] ) 等待结果变为可用。Timeout 是可选的超时

6.3 代码演示:

首先先看一个非阻塞的案例

import multiprocessing
import time

def func(msg):
print("start:", msg)
time.sleep(3)
print("end:",msg)

if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 3)
for i in range(5):
msg = "hello %d" %(i)
#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
pool.apply_async(func, (msg, ))

pool.close()#进程池关闭之后不再接收新的请求

#调用join之前,先调用close函数,否则会出错。
# 执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
pool.join()

输出结果:

start: hello 0
start: hello 1
start: hello 2
end: hello 0
end:hello 2start:hello 3
end:hello 1


start: hello 4
end: hello 3
end: hello 4

7. 进程通信

为什么会出现这个东西呢?因为,可能会有某一个全局的变量,给各个进程使用,如果不合理安排,那么就会造成运算结果的不唯一性。所以,务必要加入进程的通信机制,使得各个进程之间能够协调配合。

7.1 Queue

在进程通信当中,最为常用的东西就是:Queue。顾名思义,它是一个队列。具体来说,是多进程的安全队列。如果一个进程想入队,那么就调用当中的 put 方法,put 方法还有两个可选参数:blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,该方法会阻塞 timeout 指定的时间,直到该队列有剩余的空间。如果超时,会抛出 Queue.full 异常。如果 blocked 为 False,但该 Queue 已满,会立即抛出 Queue.full 异常。

如果某个进程需要出队,那么就调用 get 方法,这个方法在出队的同时,还会返回队头进程的信息。和 put 方法一样,get 方法也有 blocked 和 timeout 参数。如果 blocked 为 True(默认值),并且 timeout 为正值,那么在等待时间内没有取到任何元素,会抛出 Queue.Empty 异常。如果 blocked 为 False,有两种情况存在,如果 Queue 有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出 Queue.Empty 异常。

7.2 Queue 的相关方法

方法描述
cancle_join_thread() 不会在进程退出时自动连接后台线程。这可以防止 join_thread() 方法阻塞
close() 关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列尚未写入数据,但将在此方法完成时马上关闭
empty() 如果调用此方法时 q 为空,返回 True
full() 如果 q 已满,返回 True
get([block [,timeout]) 返回 q 中的一个项。如果 q 为空,此方法将阻塞,直到队列中有项可用为止。Block 用于控制阻塞行为,默认为 True。如果设置为 False,将引发 Queue.Empty 异常 (定义在 Queue 模块中)。Timeout 是可选超时时间,用在阻塞模式中。如果在指定的时间间隔内没有项变为可用,将引发 Queue.Empty 异常
join_thread() 连接队列的后台线程。此方法用于在调用 q.close() 方法之后,等待所有队列项被消耗。默认情况下此方法由不是 q 的原始创建者的所有进程调用。调用 q.cancle_join_thread() 方法可以禁止这种行为
put(item [ , block [, timeout]]) 将 item 放入队列。如果队列已满,此方法将阻塞至有空间可用为止。Block 控制阻塞行为,默认为 True。如果设置为 False,将引发 Queue.Empty 异常 (定义在 Queue 模块中)。Timeout 指定在阻塞模式中等待可用时空间的时间长短。超时后将引发 Queue.Full 异常。
qsize() 返回目前队列中项的正确数量。
joinableQueue([maxsize]) 创建可连接的共享进程队列。这就像是一个 Queue 对象,但队列允许项的消费者通知生产者项已经被成功处理。通知进程是使用共享的信号和条件变量来实现的
task_done() 消费者使用此方法发出信号,表示 q.get() 返回的项已经被处理。如果调用此方法的次数大于从队列中删除的项的数量,将引发 ValueError 异常
join() 生产者使用此方法进行阻塞,知道队列中的所有项均被处理。阻塞将持续到位队列中的每个项均调用 q.task_done() 方法为止

案例:进程池创建进程完成进程之间的通信

from multiprocessing import Manager,Pool
import time
def write(q):
#将列表中的元素写入队列中
for i in ["a","b","c"]:
print('开始写入值%s' % i)
q.put(i)
time.sleep(1)

#读取
def read(q):
print('开始读取')
while True:
if not q.empty():
print('读取到:',q.get())
time.sleep(1)
else:
break
if __name__=='__main__':
#创建队列
q=Manager().Queue()
#创建进程池
p=Pool(3)
#使用阻塞模式创建进程
p.apply(write,(q,))
p.apply(read,(q,))
p.close()
p.join()

三. 多线程编程

1. 线程创建步骤

论步骤,其实跟多进程大同小异,只是用到的模块和方法不一样。与进程编程一样,大致也是分三步骤:

那么,关于使用,与进程也大同小异。

2. 获取线程信息

可以用 current_thread() 方法来获取线程相关信息

import threading
import time

def task():
time.sleep(1)
thread = threading.current_thread()
print(thread)

if __name__ == '__main__':
for i in range(5):
sub_thread = threading.Thread(target=task)
sub_thread.start()

我们如果多次运行上面的代码,会发现:每次的输出结果都不同。由此可以知道一个很重要的事情:线程的运行是无序的。

除了 current_thread(),Python 还为我们提供了:

3. Thread 类常用方法

方法名描述
run() 用以表示线程活动的方法
start() 启动线程活动
join([time]) 等待至线程中止。这阻塞调用线程直至线程的 join() 方法被调用中止 - 正常退出或者抛出未处理的异常 - 或者是可选的超时发生
isAlive() 返回线程是否活动的
getName() 返回线程名
setName() 设置线程名

那么对于线程,其实用法什么的,跟进程都大同小异,因此不再赘余。

四. 锁

学过《操作系统》的同学,对于进程的互斥与同步那一部分的内容应该印象深刻。那么,为了实现进程的互斥或者同步,最为常用的就是信号量机制。在这个方面的使用当中,锁是一个绕不开的话题。

锁有两种状态——锁定和未锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为 “锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定” 状态,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

使用 Thread 对象的 Lock 可以实现简单的进程同(如果是线程,则是 RLock),有上锁 acquire 方法和 释放 release 方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间。

1. 互斥锁

比如说,我们写一个 * 互斥锁 * 机制:

from threading import Thread,Lock
#定义全局变量num
num=0
#创建一把互斥锁
mutex=Lock()
def test1():
global num
'''
在两个线程中都调用上锁的方法,则这两个线程就会抢着上锁,
如果有1方成功上锁,那么导致另外一方会堵塞(一直等待)直到这个锁被解开
'''
for i in range(100000):
mutex.acquire() # 上锁
num+=1
mutex.release()
print('test1输出num:',num)

def test2():
global num
for i in range(100000):
mutex.acquire() # 上锁
num+=1
mutex.release()
print('test2输出num:',num)

if __name__=='__main__':
t1=Thread(target=test1)
t2=Thread(target=test2)
t1.start()
t2.start()
t1.join()
t2.join()

输出结果:

test1输出num: 100000
test2输出num: 200000

2. 死锁

所谓死锁,其实就是各个线程或者进程由于资源分配不当,导致了互相争抢资源,但是由于资源都没够数,所以程序便推行不下去,唯有通过外力作用才能让程序继续推进。

比如如下代码:

import time
from threading import Thread,Lock
import threading
mutexA=threading.Lock()
mutexB=threading.Lock()
class MyThread1(Thread):
def run(self):
if mutexA.acquire():
print(self.name,'执行')
time.sleep(1)
if mutexB.acquire():
print(self.name,'执行')
mutexB.release()
mutexA.release()


class MyThread2(Thread):
def run(self):
if mutexB.acquire():
print(self.name,'执行')
time.sleep(1)
if mutexA.acquire():
print(self.name,'执行')
mutexA.release()
mutexB.release()

if __name__ == '__main__':
t1=MyThread1()
t2=MyThread2()
t1.start()
t2.start()

那么最后,执行的时候就会出现类似 “死循环” 的症状,程序进行不下去,永远也没有结束的迹象。那么上面这个程序为什么会死锁呢?

我们看 t1.start() 那里,由于在 Mythread1 当中,会率先获取 mutexA,而在 t2.start()后,Mythread2 中会率先获取 mutexB,此时 Mythread1 想要获取 mutexB,但是 mutexB 却已经让 t2 获得了,因此,t1 进程无法获得 muytexB。而 t2 想要获得 mutexA,但 mutexA 却已经让 t1 获得了,还是拿不着。所以二者就一直这样互相争抢资源,导致程序无法运行下去

3. 同步

这里就只以线程同步为例了,当然了,各个进程之间也是可以实现同步的。

import time
from threading import Thread,Lock
import threading
lock1=Lock()
lock2=Lock()
lock3=Lock()
lock2.acquire()
lock3.acquire()
class Task1(Thread):
def run(self):
while True:
if lock1.acquire():
print('...task1...')
time.sleep(1)
lock2.release()

class Task2(Thread):
def run(self):
while True:
if lock2.acquire():
print('...task2...')
time.sleep(1)
lock3.release()

class Task3(Thread):
def run(self):
while True:
if lock3.acquire():
print('...task3...')
time.sleep(1)
lock1.release()

if __name__ == '__main__':
t1=Task1()
t2=Task2()
t3=Task3()
t1.start()
t2.start()
t3.start()

运行结果:

...task1...
...task2...
...task3...
...task1...
...task2...
...task3...
...task1...
...task2...
...task3...

<不断的循执行下去>

五. 多协程

在 Python 当中,用的最多的是 yield

标签:__,name,Python,编程,print,并发,线程,进程,sing
来源: https://www.cnblogs.com/Gaimo/p/15435506.html