编程语言
首页 > 编程语言> > JavaIO四大模型:NIO(IO多路复用)

JavaIO四大模型:NIO(IO多路复用)

作者:互联网

文章目录

概述

        Java中的 Selector 和 Linux中的 epoll 都是基友IO多路复用的,有时也被称为异步阻塞IO。

        我们之前介绍过,同步阻塞I/O和同步非阻塞IO。对于同步阻塞I/O来说,每次进行I/O时,我们的用户线程都会阻塞,显然这在高并发下效率很低。对于同步非阻塞I/O来说,每次进行I/O,虽然在内核缓冲区还没有数据的情况下,会给用户线程一个信息,此时用户线程是不阻塞的,但是用户线程会不停地发起IO系统调用,查看内核缓冲区是否已经有数据,这样虽然在内核缓冲区没有数据的情况下,用户线程不阻塞,但是不停地发起IO系统调用,会占用大量CPU空间,导致CPU使用率降低,如何避免用户线程不断地发起IO系统调用,这就是IO多路复用所解决的问题了。

        IO多路复用模型,就是通过一种新的系统调用,一个进程可以监视多个文件描述符,一旦某个描述符就绪(一般是内核缓冲区可读/可写),内核能够通知程序进行相应的IO系统调用。

        Java中实现IO多路复用的模型叫做NIO,与操作系统说的IO模型中的NIO是不同的。

        Java实现IO多路复用的基本原理是:通过select/epoll系统调用,单个线程不断地轮询select/epoll系统调用所负责的成百上千的socket连接,当某个或某些socket网络有连接数据到达了,就返回这些可以读写的连接。好处就显而易见了,通过一个系统调用,就可以查询到可以读写的一个甚至多个网络连接。

        NIO原理图:
在这里插入图片描述
        在NIO这种IO模型下,首先不是进行read系统调用,而是进行select/epoll系统调用,这里有一个前提是,需要将目标连接提前注册到select/epoll的可查询的socket列表中,然后才可以开启整个IO的读流程。

        进行select/epoll系统调用,查询可以读的连接,内核会查询所有select的可查询的socket列表,当任何一个socket的数据准备好了,select系统调用就会返回,

        当用户进程调用了select,那么用户线程会被阻塞。也就是说,一个用户发起了select/epoll系统调用后,一直会被阻塞,直到有可读连接返回。

        用户线程获得了目标连接后,发起read系统调用,用户线程阻塞,内核开始复制数据到用户进程缓冲区

        当用户读取完数据后,用户线程返回。

        和同步非阻塞IO相似,NIO也需要不停轮询。不过NIO轮询的select/epoll系统调用,且一个线程可以管理成百上千个连接。

        NIO的优点在于:可以同时处理成千上万个连接。与一条线程维护一个连接相比,NIO最大优势在于系统不必创建线程,也不必维护这些线程,只需要把连接注册到select/epoll选择器上,然后让select选择器去监听是否有可读可写事件,如果有可读/可写事件,就返回当前可读/可写事件的sokcet连接,从而大大减少了系统的开销。

        不过NIO也有缺点,它的缺点在于:select/epoll本质上是同步IO,也就是阻塞IO。都需要在读写就绪后,自己负责进行读写,也就是说,读写过程是阻塞的。

NIO涉及到的名词

        Channel:通道类,类似于流。在NIO中,我们使用通道进行传输数据。我们既可以从通道中读取数据,又可以写数据到通道里。

        Buffer:在Java NIO中的Buffer是用于和NIO通道进行交互。即数据是从通道中读取到buffer缓冲区中,从buffer写到通道中的。

                        缓冲区本质上是一块可以写入数据,然后从中读取数据的内存,这块内存被封装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。

        使用Buffer读写数据一般遵循以下四步

  1. 写入数据到Buffer
  2. 调用flip()
  3. 从Buffer中读取数据
  4. 调用clear()或compact()

        当向Buffer中写入数据时,buffer会记录下写了多少数据。一旦要读取数据,需要通过flip(),来切换读写模式。
        一旦读完了所有数据,就需要清空Buffer,让它可以再次被写入,有两种方法可以清空缓冲区:clear()和compact()。clear()清空整个缓冲区,而compact()清空已经读过的数据,还没有被读到的数据被移动到缓冲区的起始处,新写入的数据放到缓冲区中未读数据的后面

        Buffer的capacity、position、limit:

        position和limit的含义,取决于Buffer处在读模式还是写模式

        capacity:表示缓冲区的容量

        position:当你写入数据到Buufer中时,position表示当前的位置。初始的position为0,当写入一个数据时,position会向前移动到下一个可写的位置。position最大为capacity-1。

        当读数据时,从某个特定的位置开始读。将Buufer从写模式切换到读模式时,position会被重置为0,当从Buffer的position处读取数据时,position向前移动至下一个可读的位置。

        limit:在写模式下。limit表示你最多能往Buffer写多少数据,读模式下,limit等于capacity.

        给Buffer指定大小的: 调用Buffer实现类的静态方法allocate()

        Buffer中的put(),表示向Buffer中写数据,get()表示从Buffer中读数据。

        Selector选择器:Selector选择器是Java NIO中能够检测一到多个NIO通道,并能够知道通道中是否有可读可写事件。这样一个单独的线程可以管理多个channel,从而管理多个网络连接

        Selector的创建:调用静态方法open()

        为了实现一个Selector选择器关注多个通道上是否有可读、可写事件。所以我们需要将多个channel注册到selector选择器上使用channel.register(Selector s, int a)。其中,第二个参数表示通过Selector选择监听时对哪些事件感兴趣。这些事件分别为:Connect:可连接Accept:可接收Read:可读Write:可写。通道触发一个事件的意思是该事件已经就绪,这四种事件用SelectionKey的四个常量来表示:SelectionKey.OP_CONNECTSelectionKey.OP_ACCEPTSelectionKey.OP_READSelectionKey.OP_WRITE

        如果对不止一个事件感兴趣,可以使用"位或"操作符将常量连接起来。即SelectionKey.OP_READ | SelectKey.OP_WRITE。 既关注读事件,又关注写事件。

        某个channel成功连接到另一个服务器称为“连接就绪”,以个server socket channel 准备好接收 新进入的 连接称为“接收就绪”。一个有数据可读的通道可以说是“读就绪”。等待写数据的通道可以说是“写就绪”。

        SelectionKey:当向Selector注册Channel时,register()会返回一个SelectionKey对象。这些对象包含了一些属性:interest集合:感兴趣的时间的集合。 ready集合:已经准备就绪的时间集合。

        通过Selector中的select()方法来选择通道:一旦向Selector选择器上注册了一个或多个通道,就可以调用重载的select(),这些方法返回所感兴趣的时间中已经准备就绪的事件的通道。举个例子就是说,如果你对“读事件” 感兴趣,调用select(),会返回读事件已经就绪的那些通道。

        不同select():

        SelectedKeys:一旦调用了select()后,并且返回值表明有事件就绪,就可以调用selector的selectedKeys(),访问这些已就绪通道中的事件类型。

JavaNIO 服务端和客户端流程

        服务端:

  1. 实例化一个SeverSocketChannel的实例
  2. 绑定端口
  3. 设置为非阻塞
  4. 创建Selector的实例
  5. 将ServerSocketChannel注册到Selector上,关注OP_ACCEPT事件
  6. 调用Selector的select()方法,若返回值大于0,则表示关注的事件中有部分就绪
  7. 遍历就绪事件,找到自己感兴趣的事件
  8. 如果是可接收事件,连接客户端
  9. 如果是读事件,进行读操作,读完后,将channel又注册到selector上,关注写事件。
    10.如果是可写事件,进行可写操作,写完后,又将channel注册到selector上,关注读事件。
public class NIOServer {
    public static void main(String[] args) {
        ServerSocketChannel serverSocketChannel = null;
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
        try{
            // 创建ServerSocketChannel实例
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(9999));
            System.out.println("服务器已启动...");
            // 设置为非阻塞
            serverSocketChannel.configureBlocking(false);
            // 创建Selector实例
            Selector selector = Selector.open();
            // 将该通道注册到Selector选择器上,关注可接受事件。
            serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

            // 通道上事件就绪了
            while(selector.select() > 0){
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while(iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    iterator.remove();
                
                    // 如果事件是可接受的
                    if (key.isAcceptable()){
                        ServerSocketChannel channel = (ServerSocketChannel)key.channel();
                        SocketChannel socketChannel = channel.accept();
                        socketChannel.configureBlocking(false);
                        System.out.println("客户端:" + socketChannel.getRemoteAddress() + "与服务器连接成功");
                        // 将这个通道又注册到Selector上,关注读事件
                        socketChannel.register(selector,SelectionKey.OP_READ);
                    }

                    // 如果就绪事件是可读事件
                    if (key.isReadable()){
                        System.out.println("服务器可读...");
                        SocketChannel socketChannel = (SocketChannel)key.channel();
                        // 将数据读到buffer数组中
                        int read = socketChannel.read(readBuffer);
                        // 没有读到数据,说明没有数据可读,则关闭通道
                        if (read==-1){
                            socketChannel.close();
                            key.cancel();
                            continue;
                        }
                        readBuffer.flip();
                        byte[] bytes = new byte[readBuffer.remaining()];
                        // 将readBuffer数组中的数据存放到bytes数组中
                        readBuffer.get(bytes);
                        System.out.println("客户端:" + socketChannel.getRemoteAddress() + "发来的消息:" + new String(bytes,0,bytes.length));
                        readBuffer.clear();

                        // 接着关注写事件
                        socketChannel.register(selector,SelectionKey.OP_WRITE);
                    }

                    // 如果是可写事件
                    if (key.isWritable()){
                        System.out.println("服务器可写...");
                        SocketChannel channel = (SocketChannel)key.channel();
                        // 控制台写
                        System.out.print("给客户端发消息:");
                        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
                        String s = bufferedReader.readLine();
                        //向Buffer数组中写
                        writeBuffer.put((s+"\n").getBytes());
                        writeBuffer.flip();
                        channel.write(writeBuffer);
                        writeBuffer.clear();

                        channel.register(selector,SelectionKey.OP_READ);
                    }
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally{
            if (serverSocketChannel!=null){
                try {
                    serverSocketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

        客户端流程:

  1. 实例化SocketChannel
  2. 调用connect绑定IP地址和端口号
  3. 设置为非阻塞
  4. 创建Selector实例
  5. connect()会返回一个boolean值,若已经连接则返回true。
  6. 若返回false,则将该SocketChannel实例注册到Selector上关注OP.CONNECT事件
  7. 遍历已经就绪的时间,若为可连接的事件,调用channel的finishConnect()完成连接
  8. 由于客户端的写操作是主动发起的,所以不用注册到Selector上。
public class NIOClient {
    public static void main(String[] args) {
        SocketChannel socketChannel = null;
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        try {
            socketChannel = SocketChannel.open();
            boolean connect = socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
            System.out.println("客户端已启动...");
            socketChannel.configureBlocking(false);
            Selector selector = Selector.open();
            // 没有连接成功
            if (!connect){
                // 关注可连接事件
                socketChannel.register(selector,SelectionKey.OP_CONNECT);
                while(selector.select()>0){
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while(iterator.hasNext()){
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isConnectable()){
                            SocketChannel channel = (SocketChannel)key.channel();
                            channel.finishConnect();
                        }
                    }
                }
            }
            System.out.println("客户端: " + socketChannel.getRemoteAddress() + "与服务端连接成功...");
            while(true){
                // 向服务端发消息
                System.out.print("给服务器发消息:");
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
                String s = bufferedReader.readLine();
                if ("exit".equals(s) || "".equals(s)){
                    break;
                }
                buffer.put((s+"\n").getBytes());
                buffer.flip();
                socketChannel.write(buffer);
                buffer.clear();

                //读操作
                int read = socketChannel.read(buffer);
                if (read == -1){
                    break;
                }
                buffer.flip();
                byte [] bytes = new byte[buffer.remaining()];
                buffer.get(bytes);
                System.out.println("服务器响应的信息:" + new String(bytes,0,bytes.length));
                buffer.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if (socketChannel!=null){
                try {
                    socketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

标签:调用,NIO,channel,Selector,JavaIO,事件,IO,socketChannel,select
来源: https://blog.csdn.net/Colorful_X/article/details/117670880