IO多路复用
作者:互联网
IO多路复用
同步IO、异步IO、IO多路复用
IO两个阶段
- IO过程分两阶段:
- 数据准备阶段。从设备读取数据到内核空间的缓冲区
- 内核空间复制回用户空间进程缓冲区阶段
- 发生IO的时候:
- 内核从IO设备读数据(例如:淘米,把米放在饭锅里煮饭)
- 进程从内核复制数据(盛饭,从内核这个饭锅里把饭装到碗里来)
- 系统调用----read函数
IO模型
同步IO模型包括阻塞IO、非阻塞IO、IO多路复用
-
阻塞IO:进程等待(阻塞),直到读写完成。(全程等待)
中文版
-
非阻塞IO
- 进程调用recvfrom操作,如果IO设备没有准备好,立即返回ERROR,进程不阻塞。用户可以再次发起系统调用(可以轮询),如果内核已经准备好,就阻塞,然后复制数据到用户空间。
- 第一阶段数据没有准备好,可以先忙别的,等会再来看看。检查数据是否准备好了的过程是非阻塞的。
- 第二阶段是阻塞的,即内核空间和用户空间之间复制数据时阻塞的。
- 生活中的例子:淘米、蒸饭我不等,我去玩会,盛饭过程我等着你装好饭,但是要等到盛好饭才算完事,这是同步的,结果就是盛好饭。
中文版
-
IO多路复用
- IO多路复用,也称为:Event-driven IO
- 所谓IO多路复用,就是同时监控多个IO,有一个准备好了,就不需要继续等待其他IO,可以开始处理,提交了同时处理IO的能力。
- select几乎所有操作系统平台都支持,poll是对select的升级。
- epoll,Linux系统内核2.5+开始支持,对select和poll的增强,在监视的基础上,增加回调机制。BSD、Mac平台有kqueue,windows server有iocp。
- 以select为例,将关注的IO操作告诉select函数并调用,进程阻塞,内核“监视”select关注的文件描述符fd,被关注的任何一个fd对应的IO准备好了数据,select返回。再使用read将数据复制到用户进程。
- select举例:食堂供应很多菜(众多的IO),你需要吃某三菜一汤,大师傅(操作系统)所要现做,需要等待,你只好等待大师傅叫。其中一样菜好了,大师傅叫你,说你点的菜有好的了,你得自己遍历找找看哪一样才好了,请服务员把做好的菜打给你。
- epoll是有菜准备好了,大师傅叫你去几号窗口直接打菜,不用自己找菜了。
- 一般情况下,select最多能监听1024个fd(文件描述符,监听数量可以修改,但不建议修改),但是由于select采用轮询的方式,当管理的IO多了,每次都要遍历全部fd,效率低下。
- epoll没有管理的fd的上限,且是回调机制,不需遍历,效率很高。
-
信号驱动IO
- 进程在IO访问时,先通过sigaction系统调用,提交一个信号处理函数,立即返回。
- 进程不阻塞。当内核准备好数据后,产生一个SIGIO信号(电平触发)并投递给信号处理函数。可以在此函数中调用recvfrom函数操作数据从内核控件复制到用户控件,这段过程进程阻塞。
-
异步IO
- 在进程发起异步IO请求,立即返回。内核完成IO的两个阶段,内核给进程发一个信号。
- 举例:来打饭,跟大师傅说饭好了叫你,饭菜准备好了,窗口服务员把饭盛好了打电话叫你。两个阶段都是异步的。整个过程中,进程都可以忙别的,等好了才过来。
- 举例:今天不想出去到饭店吃饭了,点外卖,饭菜在饭店做好了(第一阶段),快递员从饭店送到你家门口(第二阶段)。
- Linux的aio的系统调用,内核从版本2.6开始支持
对应的中文版
- 总结
前面4个都是同步IO,因为核心操作recv函数调用时,进程阻塞直到拿到最终结果为止,而异步IO进程全程不阻塞。
Python中IO多路复用
- IO多路复用
- 大多数操作系统都支持select和poll
- Linux2.5+支持epoll
- BSD、Mac支持kqueue
- Solaris实现了/dev/poll
- Windows server的IOCP
- Python的select库实现了select、poll系统调用,这个基本上操作系统都支持。部分实现了epoll。它是底层的IO多路复用模块。
- 开发中的选择
- 完全跨平台,使用select、poll。但是性能较差
- 针对不同操作系统自行选择支持的技术,这样做会提高IO处理的性能
- select维护一个文件描述数据结构,单个进程使用有上限,通常是1024,线性扫描这个数据结构。效率低。
- pool和select的区别是内部数据结构使用链表,没有这个最大限制,但是依然是线性遍历才知道那个设备就绪了。
- epoll使用事件通知机制,使用回调机制提高效率
- select/poll还要从内核空间复制消息到用户空间,而epoll通过内核空间和用户空间共享一块内存来减少复制。
selectors库
3.4版本提供selectors库,高级IO复用库。
- 层次结构:
- BaseSelector
- SelectSelector #实现select
- PollSelector #实现poll
- EpollSelector #实现epoll
- DevpollSelector #实现devpoll
- KqueueSelector #实现kqueue
- BaseSelector
- selectors.DefaultSelector返回当前平台最有效、性能最高的实现。但是,由于没有实现Windows下的IOCP,所以,Windows下只能退化为select。
# 在selects模块源码最下面有如下代码
# Choose the best implementation, roughly:
# epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
DefaultSelector = PollSelector
else:
DefaultSelector = SelectSelector
- 事件注册代码
class SelectSelector(_BaseSelectorImpl):
"""Select-based selector."""
def __init__(self):
super().__init__()
self._readers = set()
self._writers = set()
def register(self, fileobj, events, data=None):
key = super().register(fileobj, events, data)
if events & EVENT_READ:
self._readers.add(key.fd)
if events & EVENT_WRITE:
self._writers.add(key.fd)
return key
- 为selector注册一个文件对象,监视它的IO事件。返回SelectKey对象
- fileobj被监视文件对象,例如:socket对象
- events事件,该文件对象必须等待事件
- data可选的与此文件对象相关联的不透明数据,例如,关联用来存储每个客户端的会话ID,关联方法。通过这个参数在关注事件产生后让selector干什么事。
Event常量 | 含义 |
---|---|
EVENT_READ | 可读0b01,内核已经准备好输入输出设备,可以开始读了 |
EVENT_WRITE | 可写0b10,内核准备好了,可以往里写了 |
-
selectors.SelectorKey有4个属性:
- fileobj注册的文件对象
- fd文件描述符
- events等待上面的文件描述符的文件对象的事件
- data注册时关联的数据
-
selectors的基本使用方法
- 获取操作系统最优select
- selectors.DefaultSelector()
- 注册需要监听的文件描述符对象
- selectors.register(fileobj, events, data=None)->Event对象
- fileobj #文件描述符对象,socket也是文件描述符对象
- events #事件,有selectors.EVENT_READ和selectors.EVENT_WRITE
- data #数据,默认为None
- selectors.register(fileobj, events, data=None)->Event对象
- 监听事件selectors.select() 会产生阻塞,如果监听到会返回一个Events列表,表示所有已经触发的事件。
- 处理事情
- 注销所有监听
- selectors.unregister(fileobj)
- fileobj 正在监听的文件描述符对象
- selectors.unregister(fileobj)
- 关闭selectors对象 selectors.close()
- 获取操作系统最优select
练习:IO多路复用TCP Server
- 完成一个TCP Server,能够接受客户端请求并回应客户客户端消息。
import logging
import sys
import selectors
import socket
logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)d %(message)s",stream=sys.stdout,level=logging.INFO)
#构成Socket
sk = socket.socket()
sk.bind(("127.0.0.1",3999))
sk.setblocking(False) #注意:建议非阻塞
sk.listen()
#构建本系统最优的Selector
server = selectors.DefaultSelector()
#回调函数,sock的读事件
#形参自己定义
def accep(sk:socket.socket,mask:int):
conn,raddr = sk.accept()
server.register(conn,selectors.EVENT_READ,read)
def read(conn:socket.socket,mask):
data = conn.recv(1024)
msg = "{}-{}".format(conn.getpeername(), data.decode())
logging.info(msg)
conn.send(msg.encode())
#在内核中注册sk的读入事件,返回SelectorKey对象
#key记录了fileobje,fileobj的fd,events,data
server.register(sk,selectors.EVENT_READ,accep)
#开始循环
while True:
#监听注册的对象的事件,发生被关注事件则返回events
events = server.select()
logging.info(events) #[(key,mask)]
for key,mask in events:
key.data(key.fileobj,mask)
实战:IO多路复用群聊软件
不需要启动多线程来执行socket的accept,recv方法
import logging
import sys
import selectors
import socket
import threading
logging.basicConfig(format="%(asctime)s %(threadName)s %(thread)d %(message)s",stream=sys.stdout,level=logging.INFO)
class ChatServer:
def __init__(self,ip="127.0.0.1",port=3999):
self.sock = socket.socket()
self.laddr = ip,port
self.event = threading.Event()
#构建本系统最优Selector
self.select = selectors.DefaultSelector()
def start(self):
self.sock.bind(self.laddr)
self.sock.listen()
self.sock.setblocking(False) #设置为非阻塞
#注册sock的被关注数据,返回SelectorKey对象
#key记录了fileobj,fileobj的fd,events,data
self.select.register(self.sock,selectors.EVENT_READ,"accept")
#事件监听循环
threading.Thread(target=self.run,name="run",daemon=True).start()
def run(self):
try:
#开始循环
while not self.event.is_set():
for key, mask in self.select.select():
#根据不同的data做不通的处理
if key.data == "accept":
self.accept(mask)
else:
key.data(key.fileobj, mask)
finally:
#循环结束时反注册所有事件监听,并关闭conn
conns = [k.fileobj for k in self.select.get_map().values()]
for cn in conns:
self.select.unregister(cn)
cn.close()
#self.sock的事件监听
def accept(self,mk):
conn,raddr = self.sock.accept()
conn.setblocking(False) #设置为非阻塞
logging.info("一个新的链接建立:{}".format(raddr))
self.select.register(conn,selectors.EVENT_READ,self.recv)
#回调函数,mk--》mask:事件的掩码
def recv(self,conn:socket.socket,mk:int):
try:
data = conn.recv(1024)
except Exception as e:
logging.info(e)
self.select.unregister(conn)
conn.close()
return
if data == b"quit" or data == b"":
self.select.unregister(conn)
logging.info("退出了一个连接{}".format(conn.getpeername()))
conn.close()
return
msg = "[{}] {}".format(conn.getpeername(),data.decode()).encode()
logging.info(msg)
for cn in self.select.get_map().values():
if cn.data == "accept": continue
cn.fileobj.send(msg)
def stop(self):
self.event.set()
self.select.close()
@classmethod
def main(cls):
chatserver = cls()
chatserver.start()
while not chatserver.event.is_set():
cmd = input(">>>")
if cmd.strip() == "quit":
chatserver.stop()
else:
logging.info(threading.enumerate())
ChatServer.main()
总结
- 使用IO多路复用 + (select、epoll)并不一定比多线程 + 同步阻塞IO性能好,其最大优势可言处理更多的链接。
- 多线程 + 同步阻塞IO模型
- 开辟太多线程,线程开辟,销毁开销还是较大,倒是可以使用线程池;线程多,线程自己使用的内存也很可观;多线程切换时要保护现场和恢复现场,线程过多,切换会占用大量时间。
- 链接比较少,多线程+同步阻塞IO模型比较合适,效率也不低
- 如果链接非常多,对服务端程序来说,IO并发还是比较高的,这时候,开辟太多线程其实业不是很划算,这时候IO多路复用或许是更好的选择。
标签:多路复用,self,selectors,conn,IO,data,select 来源: https://blog.csdn.net/u013008795/article/details/93381151