系统相关
首页 > 系统相关> > Python 初学笔记 - 第五章-多进程

Python 初学笔记 - 第五章-多进程

作者:互联网

目录

概念

Python 提供了 multiprocessing,multiprocessing 模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块
threading 的编程接口类似,multiprocessing 模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了 Process 、 Queue 、Pipe 、 Lock 等组件。

注意:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

开启多进程

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,可用来开启一个子进程。

强调:

参数介绍:

使用方式一

# 直接调用Process
import time
from multiprocessing import Process


def foo(t):
    time.sleep(t)
    print(f'subprocess{t} running')
    

if __name__ == '__main__':
    for i in range(3):
        p = Process(target=foo, args=(i,))
        p.start()
    print('main')
main
subprocess0 running
subprocess1 running
subprocess2 running

使用方式二

# 自定义一个Process
import time
from multiprocessing import Process


class MyProcess(Process):

    def __init__(self, name, t):
        super().__init__()
        self.name = name
        self.t = t

    # 一定要把运行的子进程写为run,因为p.start 会调用note方法
    def note(self):
        print(f'SubProcess {self.name} note ...')
        time.sleep(self.t)
        print(f'{self.name} done')


if __name__ == '__main__':
    p1 = MyProcess('info1', 4)
    p2 = MyProcess('info2', 3)
    p3 = MyProcess('info3', 2)
    p4 = MyProcess('info4', 1)
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print('test')
test
SubProcess info1 note ...
SubProcess info3 note ...
SubProcess info4 note ...
SubProcess info2 note ...
info4 done
info3 done
info2 done
info1 done

函数属性

import time
from multiprocessing import Process


def work(name, t):
    print(f'{name} Subprocess is run ...')
    time.sleep(t)
    print(f'{name} Subprocess done ...')


if __name__ == '__main__':
    p1 = Process(target=work, args=('p1', 1))
    p2 = Process(target=work, args=('p2', 1))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print('Process done')
p1 Subprocess is run ...
p2 Subprocess is run ...
p1 Subprocess done ...
p2 Subprocess done ...
Process done
import time
from multiprocessing import Process


def work(name, t):
    print(f'{name} Subprocess is run ...')
    time.sleep(t)
    print(f'{name} Subprocess done ...')


if __name__ == '__main__':
    p1 = Process(target=work, args=('p1', 1))
    p2 = Process(target=work, args=('p2', 1))
    p1.start()
    p2.start()
    # time.sleep(2)  # 这句执行p1就有时间打印语句
    p1.terminate()
    p1.join()
    p2.join()
    print('Process done')
# p1 刚请求开启就马上就被强制终止,所以没有任何输出
p2 Subprocess is run ...
p2 Subprocess done ...
Process done
import time
from multiprocessing import Process


def work(name, t):
    print(f'{name} Subprocess is note ...')
    time.sleep(t)
    print(f'{name} Subprocess done ...')


if __name__ == '__main__':
    p1 = Process(target=work, args=('p1', 1))
    p1.start()
    p1.terminate()
    time.sleep(2)
    print(p1.is_alive())
    print('Process done')
False
Process done

数据属性

import time
from multiprocessing import Process


def work(name, t):
    print(f'{name} Subprocess is note ...')
    time.sleep(t)
    print(f'{name} Subprocess done ...')


if __name__ == '__main__':
    p1 = Process(target=work, args=('p1', 1), name='test')
    p1.start()
    p1.terminate()
    time.sleep(2)
    print(p1.is_alive())

    print(p1.name, p1.pid, p1.daemon)
    print('Process done')
False
test 13536 False
Process done

守护进程

from multiprocessing import Process
import time


def subprocess(name):
    print(f'{name} start ...')
    time.sleep(2)
    print(f'{name} done ...')


if __name__ == '__main__':
    p1 = Process(target=subprocess, args=('subprocess 1',), name='process', daemon=True)
    # p.daemon = True  # 与上面开启的方式效果相同
    p1.start()
    print('main process note ...')

#  会发现因为主进程运行得很快就结束了,作为守护进程的 p 都来不及执行
main process note ...

竞争问题

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端 是有问题的,而共享带来的是竞争,竞争带来的结果就是错乱。

from multiprocessing import Process, Lock
import time


def subprocess(name):
    print(f'{name} start ...')
    time.sleep(1)
    print(f'{name} done ...')


if __name__ == '__main__':
    p1 = Process(target=subprocess, args=('subprocess 1',), name='process 1')
    p2 = Process(target=subprocess, args=('subprocess 2',), name='process 2')
    p3 = Process(target=subprocess, args=('subprocess 3',), name='process 3')
    p1.start()
    p2.start()
    p3.start()
    print('main process note ...')

# 会发现并发出现了竞争的问题,我们想要的是一个子进程竞争成功输出 start 后其它进程就不要输出了,等这个进程
# done 后,剩余的进程再继续竞争,但是结果显示三个进程都输出了 start
main process note ...
subprocess 3 start ...
subprocess 2 start ...
subprocess 1 start ...
subprocess 3 done ...
subprocess 2 done ...
subprocess 1 done ...

互斥锁

互斥锁的作用也就是一个子程序竞争成功后,其它子进程等待下一次的竞争。

from multiprocessing import Process, Lock


def subprocess(name, lock):
    lock.acquire()
    print(f'{name} start ...')
    print(f'{name} done ...')
    lock.release()


if __name__ == '__main__':
    lock = Lock()
    p1 = Process(target=subprocess, args=('subprocess 1', lock), name='process 1')
    p2 = Process(target=subprocess, args=('subprocess 2', lock), name='process 2')
    p3 = Process(target=subprocess, args=('subprocess 3', lock), name='process 3')
    p1.start()
    p2.start()
    p3.start()
    print('main process note ...')
# 下面可以看到竞争成功的其它子进程将等待
main process note ...
subprocess 1 start ...
subprocess 1 done ...
subprocess 3 start ...
subprocess 3 done ...
subprocess 2 start ...
subprocess 2 done ...

模拟抢票

在实际中,抢票也是一个竞争问题,当我们查询剩余的票的时候每个人看到剩余的票数是一样的没有竞争问题,但是在抢票的时候,大家同时抢,一张票肯定只能给一个人,这个时候需要考虑竞争。

下面是模拟 5 个人抢 2 张票的情况。

#  db模拟数据库剩余票数, db文件内容如下(json格式):
# {"ticket": 2}

from multiprocessing import Process, Lock
import time
import json


# 模拟抢票之前先查询剩余票
def search(name):
    db = json.load(open('db'))
    ticket = db['ticket']
    print(f'{name} 查询剩余车票, 车票剩余 : {ticket}')


# 抢票
def get_ticket(name):
    db = json.load(open('db'))
    if db['ticket'] > 0:  # 如果数据库中票数大于0,则抢票成功
        db['ticket'] -= 1
        with open('db', 'w') as f:
            json.dump(db, f)
        print(f'{name} 抢票成功')
    else:
        print(f'{name} 抢票失败, 票已抢完')


# 真正执行的抢票流程
def task(name, lock):
    search(name)  # 先查询票,看到票还剩多少张
    time.sleep(1) # 模拟查询用的时间

    lock.acquire()  # 加锁
    get_ticket(name)  # 抢票
    time.sleep(1)  # 模拟抢票用的时间
    lock.release()  # 释放锁


if __name__ == '__main__':
    lock = Lock()
    # 5 个用户
    p1 = Process(target=task, args=('user1', lock), name='process 1')
    p2 = Process(target=task, args=('user2', lock), name='process 2')
    p3 = Process(target=task, args=('user3', lock), name='process 3')
    p4 = Process(target=task, args=('user4', lock), name='process 4')
    p5 = Process(target=task, args=('user5', lock), name='process 5')
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p5.start()
user3 查询剩余车票, 车票剩余 : 2
user1 查询剩余车票, 车票剩余 : 2
user5 查询剩余车票, 车票剩余 : 2
user2 查询剩余车票, 车票剩余 : 2
user4 查询剩余车票, 车票剩余 : 2
user3 抢票成功
user1 抢票成功
user5 抢票失败, 票已抢完
user2 抢票失败, 票已抢完
user4 抢票失败, 票已抢完

互斥锁与 join 的区别

互斥锁与 join 都可以用于抢票问题的解决,但是使用 join 的话,在一个用户使用的时候可以查票和购票而其它用户什么都做不了,使用互斥锁我们可以针对的使用限制。

总结:加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

虽然可以用文件共享数据实现进程间通信,但问题是:

因此我们最好找寻一种解决方案能够兼顾:

这就是 multiprocessing 模块为我们提供的基于消息的 IPC 通信机制:队列和管道。

队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 因而队列才是进程间通信的最佳选择。

队列(Queue)

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing 模块支持两种形式,队列和管道,这两种方式都是使用消息传递的。

创建队列的类(底层就是以管道和锁定的方式实现):
Queue([maxsize]):创建共享的进程队列,Queue 是多进程安全的队列,可以使用 Queue 实现多进程之间的数据传递。maxsize 是队列中允许最大项数,省略则无大小限制。

注意:

主要方法介绍:

使用

from multiprocessing import Queue

# 生成队列,最大为3个
q = Queue(3)

# 向队列添加数据
q.put('str')
q.put([1, 2, 3])
q.put({'dict': 1})
# q.put((1,2))  # 再添加将会阻塞,因为超出最大个数

# 查看队列是否满了
print(q.full())

# 取出队列消息
res1 = q.get()
res2 = q.get()
print('res1:', res1)
print('res2:', res2)

# 判断队列是否是空的
print(q.empty())

# 判断队列是否是满的
print(q.full())

# 关闭队列
q.close()
True
res1: str
res2: [1, 2, 3]
False
False

生产者消费者模型

当有多个生产者或者多个消费者时,消费者想要获取数据就需要去问每个生产者是否有产生数据,当生产者数量很多时效率就会低,而这个时候我们就
可建立一个临时的仓库,只要生产者产生一个数据就放入仓库中,而消费者就不需要去问没有生产者获取数据,只要不停的查看仓库就可以了。

实现

下面是模拟生产者与消费者通过队列进行信息通信的过程,需要注意的是当生产者生产完数据后没有告诉消费者生产数据已经结束了,消费者就会卡住,
所以下面的处理办法是等待生产者执行完毕后传递一个 None 信息给消费者告诉执行完毕,不用等待了。

from multiprocessing import Process, Queue
import time


def producer(name, n, q, t):
    """
    生产者
    :param name: 生产者名字
    :param n: 生产者生产个数
    :param q: 队列
    :param t: 生产时间
    :return: None
    """
    for i in range(n):
        time.sleep(t)
        res = f'{name}的第{i+1}个包子'
        print(f'{name} 生产了 {res}...')
        q.put(res)


def consumer(name, q, t):
    """
    消费者
    :param name: 消费者名字
    :param q: 队列
    :param t: 消费时间
    :return: None
    """
    while True:
        time.sleep(t)
        res = q.get()
        if res is None:
            break
        print(f'{name} 吃掉了 {res}')


if __name__ == '__main__':
    q = Queue()
    # 生成3个生产者
    p1 = Process(target=producer, args=('厨师1', 3, q, 1))
    p2 = Process(target=producer, args=('厨师2', 4, q, 2))
    p3 = Process(target=producer, args=('厨师3', 2, q, 3))
    # 生成2个消费者
    c1 = Process(target=consumer, args=('食客1', q, 2))
    c2 = Process(target=consumer, args=('食客2', q, 2))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    
    # 等待生产者生产完毕发送结束消息给消费者,不加下面的代码,程序会在consumer这里卡住
    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    q.put(None)
厨师1 生产了 厨师1的第1个包子...
厨师2 生产了 厨师2的第1个包子...
厨师1 生产了 厨师1的第2个包子...
食客2 吃掉了 厨师1的第1个包子
食客1 吃掉了 厨师2的第1个包子
厨师1 生产了 厨师1的第3个包子...
厨师3 生产了 厨师3的第1个包子...
厨师2 生产了 厨师2的第2个包子...
食客2 吃掉了 厨师1的第2个包子
食客1 吃掉了 厨师1的第3个包子
厨师2 生产了 厨师2的第3个包子...
食客2 吃掉了 厨师3的第1个包子
食客1 吃掉了 厨师2的第2个包子
厨师3 生产了 厨师3的第2个包子...
厨师2 生产了 厨师2的第4个包子...
食客2 吃掉了 厨师2的第3个包子
食客1 吃掉了 厨师3的第2个包子
食客2 吃掉了 厨师2的第4个包子

JoinableQueue 实现

上面的实现无非是发送结束信号而已,有另外一种队列提供了这种机制。

JoinableQueue([maxsize])
这就像是一个 Queue 对象,但队列允许项目的使用者通知生成者项目已经被成功处理,通知进程是使用共享的信号和条件变量来实现的。

JoinableQueue 的实例 p 除了与 Queue 对象相同的方法之外还具有:

from multiprocessing import Process, JoinableQueue
import time


def producer(name, n, q, t):
    """
    生产者
    :param name: 生产者名字
    :param n: 生产者生产个数
    :param q: 队列
    :param t: 生产时间
    :return: None
    """
    for i in range(n):
        time.sleep(t)
        res = f'{name}的第{i+1}个包子'
        print(f'{name} 生产了 {res}...')
        q.put(res)

    # 等待消费者处理数据发送q.task_done()信号,确保每个数据都被处理
    q.join()


def consumer(name, q, t):
    """
    消费者
    :param name: 消费者名字
    :param q: 队列
    :param t: 消费时间
    :return: None
    """
    while True:
        time.sleep(t)
        res = q.get()
        print(f'{name} 吃掉了 {res}')

        # 每处理一个数据发送一次已处理信号给q.join()
        q.task_done()


if __name__ == '__main__':
    q = JoinableQueue()  # 使用JoinableQueue

    p1 = Process(target=producer, args=('厨师1', 3, q, 1))
    p2 = Process(target=producer, args=('厨师2', 4, q, 2))
    p3 = Process(target=producer, args=('厨师3', 2, q, 3))

    # 生成2个消费者, 设为守护进程, 当消费者处理完数据就和主程序一起结束
    c1 = Process(target=consumer, args=('食客1', q, 2), daemon=True)
    c2 = Process(target=consumer, args=('食客2', q, 2), daemon=True)

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    # 等待生产者生产完毕后结束主程序, 而生产者是等待消费者把数据处理完才会结束.
    # 也就是说生产者等待消费者处理完数据就结束, 主程序等待生产者结束才结束, 主程序结束带动消费者结束
    p1.join()
    p2.join()
    p3.join()
厨师1 生产了 厨师1的第1个包子...
食客1 吃掉了 厨师1的第1个包子
厨师1 生产了 厨师1的第2个包子...
食客2 吃掉了 厨师1的第2个包子
厨师2 生产了 厨师2的第1个包子...
厨师1 生产了 厨师1的第3个包子...
厨师3 生产了 厨师3的第1个包子...
食客1 吃掉了 厨师2的第1个包子
食客2 吃掉了 厨师1的第3个包子
厨师2 生产了 厨师2的第2个包子...
食客1 吃掉了 厨师3的第1个包子
食客2 吃掉了 厨师2的第2个包子
厨师3 生产了 厨师3的第2个包子...
厨师2 生产了 厨师2的第3个包子...
食客1 吃掉了 厨师3的第2个包子
食客2 吃掉了 厨师2的第3个包子
厨师2 生产了 厨师2的第4个包子...
食客1 吃掉了 厨师2的第4个包子

标签:__,...,name,start,Python,Process,厨师,初学,第五章
来源: https://www.cnblogs.com/sugarq/p/14022557.html