系统相关
首页 > 系统相关> > 利用multiprocessing.managers开发跨进程生产者消费者模型

利用multiprocessing.managers开发跨进程生产者消费者模型

作者:互联网

研究了下multiprocessing.managers,略有收获,随笔一篇;

核心思路是构造一个manager进程,这个进程可以通过unix socket或tcp socket与其它进程通信;因为利用了socket,所以通信的进程间不要求具备父子关系,甚至可以跨主机(通过tcp socket);

通过manager进行数据结构共享,可以应用于很多的IPC场景;这里只做一个示例,在manager内维护一个队列,做为生产者消费者模型中的消息队列;

 

# coding:utf-8

import os
import time
import subprocess

from multiprocessing import Process
#from multiprocessing.managers import SyncManager
from multiprocessing.managers import BaseManager
from multiprocessing import JoinableQueue

#父进程
pid = os.getpid()
print("parent pid %d" % pid)

class TestManager(BaseManager):
  pass

#用于共享的队列
jqueue = JoinableQueue()

#用于获取队列的方法
def _get_queue():
    return jqueue

#启动server(manager)
def make_server(port, authkey):
    # 注册rpc,获取共享的队列(的代理,这段代码的实现很有意思,建议看看源码)
    TestManager.register("get_queue",callable=lambda : _get_queue())
    # 如果要走tcp socket,就用这一行
    #manager = TestManager(address=('', port), authkey=authkey)
    # 如果要走unix socket,就用这一行
    manager = TestManager(authkey=authkey)
    # 启动server(manager)进程
    manager.start()
    return manager

# consumer进程的入口
def do_consume(manager):
    print ("consumer pid %d" % os.getpid())
    queue=manager.get_queue()
    count = 0
    while True:
        item = queue.get(block=True)
        #print(item)
        #time.sleep(1)
        queue.task_done()
        if item is None:
            conn = queue._tls.connection
            break
        count += 1
    print ("done consuming %d" % count)

#构造新的manager实例做为client连接manager server
def make_client(address, authkey):
    # 注册rpc,用于非父子进程环境时,新构造的manager识别rpc方法
    TestManager.register("get_queue")
    manager = TestManager(address=address, authkey=authkey)
    manager.connect()
    return manager

# producer进程的入口
def do_produce(address, authkey):
    print ("producer pid %d" % os.getpid())
    client=make_client(address, authkey)
    queue=client.get_queue()
    for i in range(10000):
        queue.put(i, block=True)
    print ("done producing")

# terminator进程的入口
def do_terminate(address, authkey):
    client=make_client(address, authkey)
    queue=client.get_queue()
    queue.put(None, block=True)

authkey = b'foo'
manager=make_server(6666, authkey)
address = manager._address
# 查看manager的进程号
print ("manager pid %d" % manager._process.ident)

# 通过父子进程变量传递的方式,向consumer进程传递manager
consumer = Process(target=do_consume, args=(manager, ))
consumer.start()

# 伪造非父子进程传递address和authkey的方式,向producer进程传递连接manager需要的信息
producer = Process(target=do_produce, args=(address, authkey))
producer.start()

# 查看当前的进程树
status, output = subprocess.getstatusoutput('pstree -p %d' % pid)
print (output) 

producer.join()

# 伪造非父子进程传递address和authkey的方式,再启动一个terminator进程结束通信
terminator = Process(target=do_terminate, args=(address, authkey))
terminator.start()
terminator.join()

consumer.join()

  

 

以上示例代码的过程如下:

  1. 构造manager server;
  2. 构造一个consumer进程,直接从父进程获取到manager对象;
  3. 再构造一个producer进程,通过传递address和authkey,新构造manager client,并连接manager server;
  4. producer获取到共享队列,生产消息;
  5. consumer获取到共享队列,消费消息;
  6. terminator(producer)生产一个空消息;
  7. consumer获取到空消息,消费结束;

 

结果:

parent pid 30460
manager pid 30461
consumer pid 30463
producer pid 30464
python(30460)-+-pstree(30465)
              |-python(30461)-+-{python}(30462)
              |               |-{python}(30472)
              |               |-{python}(30474)
              |               `-{python}(30475)
              |-python(30463)
              `-python(30464)
done producing
done consuming 10000

  

可以看到共生成4个子进程,一个manager server、一个consumer、一个producer、还有一个pstree查看进程树;

标签:queue,managers,producer,生产者,authkey,manager,address,进程,multiprocessing
来源: https://www.cnblogs.com/ZisZ/p/10770436.html