day37
作者:互联网
一、进程间的数据是隔离的
from multiprocessing import Process def task(): global n n = 100 print("子进程中:", n) if __name__ == '__main__': p = Process(target=task, ) p.start() n = 10 # task() print("主进程中:", n) # 运行结果 # 主进程中: 10 # 子进程中: 100
二、Queue=》队列
1、进程间的通信
IPC(Inter-Process Communication)
2、队列
概念介绍
创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递
Queue([maxsize]) 创建共享的进程队列
参数:maxsize是队列中心中允许的最大项数。如果省略此处参数,则无大小限制。
底层队列使用管道和锁定实现
方法介绍
Queue([maxsize])
:创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
Queue的实例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
:返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
q.get_nowait()
:同q.get(False)
方法。
q.put(item [, block [,timeout ] ] )
:将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
q.qsize()
:返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
q.empty()
:判断q是否有位置,如果没有返回False,若还有位置返回True
q.full()
:如果q已满,返回True;如果q未满,返回False。
其他方法(了解)
q.close()
:关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()
操作上,关闭生产者中的队列不会导致get()
方法返回错误。
q.cancel_join_thread()
:不会再进程退出时自动连接后台线程。这可以防止join_thread()
方法阻塞。
q.join_thread()
:连接队列的后台线程。此方法用于在调用q.close()
方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()
方法可以禁止这种行为。
三、解决进程间的数据隔离问题
from multiprocessing import Queue, Process import os, time def task(queue): print("这个进程id:%s开始放数据了" % os.getpid()) time.sleep(2) queue.put('ly is dsb') print("这个进程id:%s数据放完了" % os.getpid()) if __name__ == '__main__': q = Queue(3) p = Process(target=task, args=(q,)) p.start() print("主进程") res = q.get() print("主进程中取值:", res)
四、多进程放入数据到Queue
from multiprocessing import Queue, Process import os, time def get_task(q): print("%s:%s" % (os.getpid(), q.get())) def put_task(q): q.put("%s开始写数据了" % os.getpid()) if __name__ == '__main__': q = Queue(3) p = Process(target=put_task, args=(q,)) p.start() p1 = Process(target=put_task, args=(q,)) p1.start() p2 = Process(target=get_task, args=(q,)) p2.start() p3 = Process(target=get_task, args=(q,)) p3.start()
五、生产者消费者模型
代码演示:
import os import time import random from multiprocessing import Process,Queue # 版本1: # 生产者: def producer(queue): # 把数据全部放在Queue for i in range(10): data = "这个进程id:%s, 蒸了%s个包子" % (os.getpid(), i) print(data) time.sleep(random.randint(1,3)) # 放入数据 queue.put("第%s个包子" % i) def consumer(queue): while True: res = queue.get() data = "这个进程id:%s, 吃了%s" % (os.getpid(), res) print(data) if __name__ == '__main__': q = Queue(3) p = Process(target=producer, args=(q, )) p.start() p1 = Process(target=consumer, args=(q,)) p1.start() # 版本2: # 生产者: def producer(queue): # 把数据全部放在Queue for i in range(10): data = "这个进程id:%s, 蒸了%s个包子" % (os.getpid(), i) print(data) time.sleep(random.randint(1, 3)) # 放入数据 queue.put("第%s个包子" % i) queue.put(None) def consumer(queue): while True: res = queue.get() if not res:break data = "这个进程id:%s, 吃了%s" % (os.getpid(), res) print(data) if __name__ == '__main__': q = Queue(3) p = Process(target=producer, args=(q,)) p.start() p1 = Process(target=consumer, args=(q,)) p1.start() # 版本3: # 生产者: def producer(queue): # 把数据全部放在Queue for i in range(10): data = "这个进程id:%s, 蒸了%s个包子" % (os.getpid(), i) print(data) time.sleep(random.randint(1, 3)) # 放入数据 queue.put("第%s个包子" % i) def consumer(queue): while True: res = queue.get() if not res:break data = "这个进程id:%s, 吃了%s" % (os.getpid(), res) print(data) if __name__ == '__main__': q = Queue(3) p = Process(target=producer, args=(q,)) p.start() p1 = Process(target=consumer, args=(q,)) p1.start() # time.sleep(1000) # none放在这里是不行的,原因是主进程直接执行了put none, 消费者直接获取到None, 程序直接结束了 p.join() q.put(None) # 版本4:多生产者 多消费者 # 生产者: def producer(queue, food): # 把数据全部放在Queue for i in range(10): data = "这个进程id:%s, 生产了%s个%s" % (os.getpid(), i, food) print(data) time.sleep(random.randint(1, 3)) # 放入数据 queue.put("第%s个%s" % (i, food)) def consumer(queue): while True: res = queue.get() if not res:break data = "这个进程id:%s, 吃了%s" % (os.getpid(), res) print(data) if __name__ == '__main__': q = Queue(3) p1 = Process(target=producer, args=(q, '面包')) p2 = Process(target=producer, args=(q, '奶粉')) p3 = Process(target=producer, args=(q, '冰淇淋')) p1.start() p2.start() p3.start() p4 = Process(target=consumer, args=(q,)) p5 = Process(target=consumer, args=(q,)) p4.start() p5.start() # time.sleep(1000) # none放在这里是不行的,原因是主进程直接执行了put none, 消费者直接获取到None, 程序直接结束了 # p.join() # q.put(None) p1.join() p2.join() p3.join() q.put(None) q.put(None) q.put(None) # 版本5:多生产者 多消费者 消费者大于生产者 # 生产者: def producer(queue, food): # 把数据全部放在Queue for i in range(10): data = "这个进程id:%s, 生产了%s个%s" % (os.getpid(), i, food) print(data) time.sleep(random.randint(1, 3)) # 放入数据 queue.put("第%s个%s" % (i, food)) def consumer(queue, name): while True: try: res = queue.get(timeout=5) if not res:break data = "这个消费者:%s, 吃了%s" % (name, res) print(data) except Exception as e: print(e) break if __name__ == '__main__': q = Queue(3) p1 = Process(target=producer, args=(q, '面包')) p2 = Process(target=producer, args=(q, '奶粉')) p3 = Process(target=producer, args=(q, '冰淇淋')) p1.start() p2.start() p3.start() p4 = Process(target=consumer, args=(q, '许鹏')) p5 = Process(target=consumer, args=(q, '勇哥')) p6 = Process(target=consumer, args=(q, '勇哥2')) p7 = Process(target=consumer, args=(q, '勇哥3')) p4.start() p5.start() p6.start() p7.start() # time.sleep(1000) # none放在这里是不行的,原因是主进程直接执行了put none, 消费者直接获取到None, 程序直接结束了 # p.join() # q.put(None) p1.join() p2.join() p3.join() q.put(None) q.put(None) q.put(None) """ Queue: httpsqs rabbiemq kafka """
标签:__,target,Process,day37,Queue,start,put 来源: https://www.cnblogs.com/Gnomeshghy/p/15045571.html