其他分享
首页 > 其他分享> > IO多路复用

IO多路复用

作者:互联网

IO多路复用

同步IO、异步IO、IO多路复用

IO两个阶段

IO模型

同步IO模型包括阻塞IO非阻塞IOIO多路复用

  1. 阻塞IO:进程等待(阻塞),直到读写完成。(全程等待)
    system_002
    中文版
    system_003

  2. 非阻塞IO

    • 进程调用recvfrom操作,如果IO设备没有准备好,立即返回ERROR,进程不阻塞。用户可以再次发起系统调用(可以轮询),如果内核已经准备好,就阻塞,然后复制数据到用户空间。
    1. 第一阶段数据没有准备好,可以先忙别的,等会再来看看。检查数据是否准备好了的过程是非阻塞的。
    2. 第二阶段是阻塞的,即内核空间和用户空间之间复制数据时阻塞的。
    • 生活中的例子:淘米、蒸饭我不等,我去玩会,盛饭过程我等着你装好饭,但是要等到盛好饭才算完事,这是同步的,结果就是盛好饭。
      system_004
      中文版
      system_005
  3. IO多路复用

    • IO多路复用,也称为:Event-driven IO
    • 所谓IO多路复用,就是同时监控多个IO,有一个准备好了,就不需要继续等待其他IO,可以开始处理,提交了同时处理IO的能力。
    • select几乎所有操作系统平台都支持,poll是对select的升级。
    • epoll,Linux系统内核2.5+开始支持,对select和poll的增强,在监视的基础上,增加回调机制。BSD、Mac平台有kqueue,windows server有iocp。
      system_006
    • 以select为例,将关注的IO操作告诉select函数并调用,进程阻塞,内核“监视”select关注的文件描述符fd,被关注的任何一个fd对应的IO准备好了数据,select返回。再使用read将数据复制到用户进程。
    • select举例:食堂供应很多菜(众多的IO),你需要吃某三菜一汤,大师傅(操作系统)所要现做,需要等待,你只好等待大师傅叫。其中一样菜好了,大师傅叫你,说你点的菜有好的了,你得自己遍历找找看哪一样才好了,请服务员把做好的菜打给你。
    • epoll是有菜准备好了,大师傅叫你去几号窗口直接打菜,不用自己找菜了。
    • 一般情况下,select最多能监听1024个fd(文件描述符,监听数量可以修改,但不建议修改),但是由于select采用轮询的方式,当管理的IO多了,每次都要遍历全部fd,效率低下。
    • epoll没有管理的fd的上限,且是回调机制,不需遍历,效率很高。
      system_007
  4. 信号驱动IO

    • 进程在IO访问时,先通过sigaction系统调用,提交一个信号处理函数,立即返回。
    • 进程不阻塞。当内核准备好数据后,产生一个SIGIO信号(电平触发)并投递给信号处理函数。可以在此函数中调用recvfrom函数操作数据从内核控件复制到用户控件,这段过程进程阻塞。
      system_008
  5. 异步IO

    • 在进程发起异步IO请求,立即返回。内核完成IO的两个阶段,内核给进程发一个信号。
    • 举例:来打饭,跟大师傅说饭好了叫你,饭菜准备好了,窗口服务员把饭盛好了打电话叫你。两个阶段都是异步的。整个过程中,进程都可以忙别的,等好了才过来。
    • 举例:今天不想出去到饭店吃饭了,点外卖,饭菜在饭店做好了(第一阶段),快递员从饭店送到你家门口(第二阶段)。
    • Linux的aio的系统调用,内核从版本2.6开始支持
      system_009
      对应的中文版
      system_010

Python中IO多路复用

  1. IO多路复用
    • 大多数操作系统都支持select和poll
    • Linux2.5+支持epoll
    • BSD、Mac支持kqueue
    • Solaris实现了/dev/poll
    • Windows server的IOCP

selectors库

3.4版本提供selectors库,高级IO复用库。

  1. 层次结构:
    • BaseSelector
      • SelectSelector #实现select
      • PollSelector #实现poll
      • EpollSelector #实现epoll
      • DevpollSelector #实现devpoll
      • KqueueSelector #实现kqueue
# 在selects模块源码最下面有如下代码
# Choose the best implementation, roughly:
#    epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector
class SelectSelector(_BaseSelectorImpl):
    """Select-based selector."""

    def __init__(self):
        super().__init__()
        self._readers = set()
        self._writers = set()

    def register(self, fileobj, events, data=None):
        key = super().register(fileobj, events, data)
        if events & EVENT_READ:
            self._readers.add(key.fd)
        if events & EVENT_WRITE:
            self._writers.add(key.fd)
        return key
Event常量 含义
EVENT_READ 可读0b01,内核已经准备好输入输出设备,可以开始读了
EVENT_WRITE 可写0b10,内核准备好了,可以往里写了

练习:IO多路复用TCP Server

import logging
import sys
import selectors
import socket

logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)d %(message)s",stream=sys.stdout,level=logging.INFO)

#构成Socket
sk = socket.socket()
sk.bind(("127.0.0.1",3999))
sk.setblocking(False) #注意:建议非阻塞
sk.listen()
#构建本系统最优的Selector
server = selectors.DefaultSelector()

#回调函数,sock的读事件
#形参自己定义
def accep(sk:socket.socket,mask:int):
    conn,raddr = sk.accept()
    server.register(conn,selectors.EVENT_READ,read)

def read(conn:socket.socket,mask):
    data = conn.recv(1024)
    msg = "{}-{}".format(conn.getpeername(), data.decode())
    logging.info(msg)
    conn.send(msg.encode())

#在内核中注册sk的读入事件,返回SelectorKey对象
#key记录了fileobje,fileobj的fd,events,data
server.register(sk,selectors.EVENT_READ,accep)
#开始循环
while True:
    #监听注册的对象的事件,发生被关注事件则返回events
    events = server.select()
    logging.info(events) #[(key,mask)]
    for key,mask in events:
        key.data(key.fileobj,mask)

实战:IO多路复用群聊软件

不需要启动多线程来执行socket的accept,recv方法

import logging
import sys
import selectors
import socket
import threading

logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)d %(message)s",stream=sys.stdout,level=logging.INFO)

class ChatServer:
    def __init__(self,ip="127.0.0.1",port=3999):
        self.sock = socket.socket()
        self.laddr = ip,port
        self.event = threading.Event()
        #构建本系统最优Selector
        self.select = selectors.DefaultSelector()

    def start(self):
        self.sock.bind(self.laddr)
        self.sock.listen()
        self.sock.setblocking(False) #设置为非阻塞
        #注册sock的被关注数据,返回SelectorKey对象
        #key记录了fileobj,fileobj的fd,events,data
        self.select.register(self.sock,selectors.EVENT_READ,"accept")

        #事件监听循环
        threading.Thread(target=self.run,name="run",daemon=True).start()

    def run(self):
        try:
            #开始循环
            while not self.event.is_set():
                for key, mask in self.select.select():
                    #根据不同的data做不通的处理
                    if key.data == "accept":
                        self.accept(mask)
                    else:
                        key.data(key.fileobj, mask)
        finally:
            #循环结束时反注册所有事件监听,并关闭conn
            conns = [k.fileobj for k in self.select.get_map().values()]
            for cn in conns:
                self.select.unregister(cn)
                cn.close()

    #self.sock的事件监听
    def accept(self,mk):
        conn,raddr = self.sock.accept()
        conn.setblocking(False) #设置为非阻塞
        logging.info("一个新的链接建立:{}".format(raddr))
        self.select.register(conn,selectors.EVENT_READ,self.recv)

    #回调函数,mk--》mask:事件的掩码
    def recv(self,conn:socket.socket,mk:int):
        try:
            data = conn.recv(1024)
        except Exception as e:
            logging.info(e)
            self.select.unregister(conn)
            conn.close()
            return
        if data == b"quit" or data == b"":
            self.select.unregister(conn)
            logging.info("退出了一个连接{}".format(conn.getpeername()))
            conn.close()
            return
        msg = "[{}] {}".format(conn.getpeername(),data.decode()).encode()
        logging.info(msg)
        for cn in self.select.get_map().values():
            if cn.data == "accept": continue
            cn.fileobj.send(msg)

    def stop(self):
        self.event.set()
        self.select.close()

    @classmethod
    def main(cls):
        chatserver = cls()
        chatserver.start()
        while not chatserver.event.is_set():
            cmd = input(">>>")
            if cmd.strip() == "quit":
                chatserver.stop()
            else:
                logging.info(threading.enumerate())

ChatServer.main()

总结

标签:多路复用,self,selectors,conn,IO,data,select
来源: https://blog.csdn.net/u013008795/article/details/93381151