JavaIO四大模型:NIO(IO多路复用)
作者:互联网
文章目录
概述
Java中的 Selector 和 Linux中的 epoll 都是基友IO多路复用的,有时也被称为异步阻塞IO。
我们之前介绍过,同步阻塞I/O和同步非阻塞IO。对于同步阻塞I/O来说,每次进行I/O时,我们的用户线程都会阻塞,显然这在高并发下效率很低。对于同步非阻塞I/O来说,每次进行I/O,虽然在内核缓冲区还没有数据的情况下,会给用户线程一个信息,此时用户线程是不阻塞的,但是用户线程会不停地发起IO系统调用,查看内核缓冲区是否已经有数据,这样虽然在内核缓冲区没有数据的情况下,用户线程不阻塞,但是不停地发起IO系统调用,会占用大量CPU空间,导致CPU使用率降低,如何避免用户线程不断地发起IO系统调用,这就是IO多路复用所解决的问题了。
IO多路复用模型,就是通过一种新的系统调用,一个进程可以监视多个文件描述符,一旦某个描述符就绪(一般是内核缓冲区可读/可写),内核能够通知程序进行相应的IO系统调用。
Java中实现IO多路复用的模型叫做NIO,与操作系统说的IO模型中的NIO是不同的。
Java实现IO多路复用的基本原理是:通过select/epoll系统调用,单个线程不断地轮询select/epoll系统调用所负责的成百上千的socket连接,当某个或某些socket网络有连接数据到达了,就返回这些可以读写的连接。好处就显而易见了,通过一个系统调用,就可以查询到可以读写的一个甚至多个网络连接。
NIO原理图:
在NIO这种IO模型下,首先不是进行read系统调用,而是进行select/epoll系统调用,这里有一个前提是,需要将目标连接提前注册到select/epoll的可查询的socket列表中,然后才可以开启整个IO的读流程。
进行select/epoll系统调用,查询可以读的连接,内核会查询所有select的可查询的socket列表,当任何一个socket的数据准备好了,select系统调用就会返回,
当用户进程调用了select,那么用户线程会被阻塞。也就是说,一个用户发起了select/epoll系统调用后,一直会被阻塞,直到有可读连接返回。
用户线程获得了目标连接后,发起read系统调用,用户线程阻塞,内核开始复制数据到用户进程缓冲区
当用户读取完数据后,用户线程返回。
和同步非阻塞IO相似,NIO也需要不停轮询。不过NIO轮询的select/epoll系统调用,且一个线程可以管理成百上千个连接。
NIO的优点在于:可以同时处理成千上万个连接。与一条线程维护一个连接相比,NIO最大优势在于系统不必创建线程,也不必维护这些线程,只需要把连接注册到select/epoll选择器上,然后让select选择器去监听是否有可读可写事件,如果有可读/可写事件,就返回当前可读/可写事件的sokcet连接,从而大大减少了系统的开销。
不过NIO也有缺点,它的缺点在于:select/epoll本质上是同步IO,也就是阻塞IO。都需要在读写就绪后,自己负责进行读写,也就是说,读写过程是阻塞的。
NIO涉及到的名词
Channel:通道类,类似于流。在NIO中,我们使用通道进行传输数据。我们既可以从通道中读取数据,又可以写数据到通道里。
Buffer:在Java NIO中的Buffer是用于和NIO通道进行交互。即数据是从通道中读取到buffer缓冲区中,从buffer写到通道中的。
缓冲区本质上是一块可以写入数据,然后从中读取数据的内存,这块内存被封装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。
使用Buffer读写数据一般遵循以下四步:
- 写入数据到Buffer
- 调用flip()
- 从Buffer中读取数据
- 调用clear()或compact()
当向Buffer中写入数据时,buffer会记录下写了多少数据。一旦要读取数据,需要通过flip(),来切换读写模式。
一旦读完了所有数据,就需要清空Buffer,让它可以再次被写入,有两种方法可以清空缓冲区:clear()和compact()。clear()清空整个缓冲区,而compact()清空已经读过的数据,还没有被读到的数据被移动到缓冲区的起始处,新写入的数据放到缓冲区中未读数据的后面。
Buffer的capacity、position、limit:
position和limit的含义,取决于Buffer处在读模式还是写模式
capacity
:表示缓冲区的容量
position
:当你写入数据到Buufer中时,position表示当前的位置。初始的position为0,当写入一个数据时,position会向前移动到下一个可写的位置。position最大为capacity-1。
当读数据时,从某个特定的位置开始读。将Buufer从写模式切换到读模式时,position会被重置为0,当从Buffer的position处读取数据时,position向前移动至下一个可读的位置。
limit
:在写模式下。limit表示你最多能往Buffer写多少数据,读模式下,limit等于capacity.
给Buffer指定大小的: 调用Buffer实现类的静态方法allocate()
Buffer中的put()
,表示向Buffer中写数据,get()
表示从Buffer中读数据。
Selector选择器:Selector选择器是Java NIO中能够检测一到多个NIO通道,并能够知道通道中是否有可读可写事件。这样一个单独的线程可以管理多个channel,从而管理多个网络连接。
Selector的创建:调用静态方法open()
为了实现一个Selector选择器关注多个通道上是否有可读、可写事件。所以我们需要将多个channel注册到selector选择器上:使用channel.register(Selector s, int a)。其中,第二个参数表示通过Selector选择监听时对哪些事件感兴趣。这些事件分别为:Connect:可连接
、Accept:可接收
、Read:可读
、Write:可写
。通道触发一个事件的意思是该事件已经就绪,这四种事件用SelectionKey
的四个常量来表示:SelectionKey.OP_CONNECT
、SelectionKey.OP_ACCEPT
、SelectionKey.OP_READ
、SelectionKey.OP_WRITE
。
如果对不止一个事件感兴趣,可以使用"位或"操作符将常量连接起来。即SelectionKey.OP_READ | SelectKey.OP_WRITE。 既关注读事件,又关注写事件。
某个channel成功连接到另一个服务器称为“连接就绪”,以个server socket channel 准备好接收 新进入的 连接称为“接收就绪”。一个有数据可读的通道可以说是“读就绪”。等待写数据的通道可以说是“写就绪”。
SelectionKey:当向Selector注册Channel时,register()会返回一个SelectionKey对象。这些对象包含了一些属性:interest集合
:感兴趣的时间的集合。 ready集合
:已经准备就绪的时间集合。
通过Selector中的select()方法来选择通道:一旦向Selector选择器上注册了一个或多个通道,就可以调用重载的select(),这些方法返回所感兴趣的时间中已经准备就绪的事件的通道。举个例子就是说,如果你对“读事件” 感兴趣,调用select(),会返回读事件已经就绪的那些通道。
不同select():
- int select():阻塞,直到至少有一个通道中有就绪事件
- int select(long timeout):阻塞timeout时间
- int selectNow():不会阻塞,不管什么事件通道,直接返回。
SelectedKeys:一旦调用了select()后,并且返回值表明有事件就绪,就可以调用selector的selectedKeys(),访问这些已就绪通道中的事件类型。
JavaNIO 服务端和客户端流程
服务端:
- 实例化一个SeverSocketChannel的实例
- 绑定端口
- 设置为非阻塞
- 创建Selector的实例
- 将ServerSocketChannel注册到Selector上,关注OP_ACCEPT事件
- 调用Selector的select()方法,若返回值大于0,则表示关注的事件中有部分就绪
- 遍历就绪事件,找到自己感兴趣的事件
- 如果是可接收事件,连接客户端
- 如果是读事件,进行读操作,读完后,将channel又注册到selector上,关注写事件。
10.如果是可写事件,进行可写操作,写完后,又将channel注册到selector上,关注读事件。
public class NIOServer {
public static void main(String[] args) {
ServerSocketChannel serverSocketChannel = null;
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
try{
// 创建ServerSocketChannel实例
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9999));
System.out.println("服务器已启动...");
// 设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 创建Selector实例
Selector selector = Selector.open();
// 将该通道注册到Selector选择器上,关注可接受事件。
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
// 通道上事件就绪了
while(selector.select() > 0){
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
// 如果事件是可接受的
if (key.isAcceptable()){
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
System.out.println("客户端:" + socketChannel.getRemoteAddress() + "与服务器连接成功");
// 将这个通道又注册到Selector上,关注读事件
socketChannel.register(selector,SelectionKey.OP_READ);
}
// 如果就绪事件是可读事件
if (key.isReadable()){
System.out.println("服务器可读...");
SocketChannel socketChannel = (SocketChannel)key.channel();
// 将数据读到buffer数组中
int read = socketChannel.read(readBuffer);
// 没有读到数据,说明没有数据可读,则关闭通道
if (read==-1){
socketChannel.close();
key.cancel();
continue;
}
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
// 将readBuffer数组中的数据存放到bytes数组中
readBuffer.get(bytes);
System.out.println("客户端:" + socketChannel.getRemoteAddress() + "发来的消息:" + new String(bytes,0,bytes.length));
readBuffer.clear();
// 接着关注写事件
socketChannel.register(selector,SelectionKey.OP_WRITE);
}
// 如果是可写事件
if (key.isWritable()){
System.out.println("服务器可写...");
SocketChannel channel = (SocketChannel)key.channel();
// 控制台写
System.out.print("给客户端发消息:");
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
String s = bufferedReader.readLine();
//向Buffer数组中写
writeBuffer.put((s+"\n").getBytes());
writeBuffer.flip();
channel.write(writeBuffer);
writeBuffer.clear();
channel.register(selector,SelectionKey.OP_READ);
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally{
if (serverSocketChannel!=null){
try {
serverSocketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客户端流程:
- 实例化SocketChannel
- 调用connect绑定IP地址和端口号
- 设置为非阻塞
- 创建Selector实例
- connect()会返回一个boolean值,若已经连接则返回true。
- 若返回false,则将该SocketChannel实例注册到Selector上关注OP.CONNECT事件
- 遍历已经就绪的时间,若为可连接的事件,调用channel的finishConnect()完成连接
- 由于客户端的写操作是主动发起的,所以不用注册到Selector上。
public class NIOClient {
public static void main(String[] args) {
SocketChannel socketChannel = null;
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
socketChannel = SocketChannel.open();
boolean connect = socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
System.out.println("客户端已启动...");
socketChannel.configureBlocking(false);
Selector selector = Selector.open();
// 没有连接成功
if (!connect){
// 关注可连接事件
socketChannel.register(selector,SelectionKey.OP_CONNECT);
while(selector.select()>0){
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isConnectable()){
SocketChannel channel = (SocketChannel)key.channel();
channel.finishConnect();
}
}
}
}
System.out.println("客户端: " + socketChannel.getRemoteAddress() + "与服务端连接成功...");
while(true){
// 向服务端发消息
System.out.print("给服务器发消息:");
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
String s = bufferedReader.readLine();
if ("exit".equals(s) || "".equals(s)){
break;
}
buffer.put((s+"\n").getBytes());
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
//读操作
int read = socketChannel.read(buffer);
if (read == -1){
break;
}
buffer.flip();
byte [] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
System.out.println("服务器响应的信息:" + new String(bytes,0,bytes.length));
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
if (socketChannel!=null){
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
标签:调用,NIO,channel,Selector,JavaIO,事件,IO,socketChannel,select 来源: https://blog.csdn.net/Colorful_X/article/details/117670880