其他分享
首页 > 其他分享> > Netty网络框架学习笔记V4.x-1(NIO知识_2022-01-20)

Netty网络框架学习笔记V4.x-1(NIO知识_2022-01-20)

作者:互联网

前言: (Netty官网GitHub)

Netty 是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。

Netty 是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。

Netty 主要针对在 TCP 协议下,面向 Clients 端的高并发应用,或者 Peer-to-Peer 场景下的大量数据持续传输的

应用。

Netty 本质是一个 NIO 框架,适用于服务器通讯相关的多种应用场景

“快速和简单”并不意味着生成的应用程序会受到可维护性或性能问题的影响。Netty 是根据从实现许多协议(如 FTP、SMTP、HTTP 以及各种二进制和基于文本的旧协议)中获得的经验精心设计的。因此,Netty 成功地找到了一种方法,可以在不妥协的情况下实现易于开发、性能、稳定性和灵活性。

img

运行条件最低要求只有两个: 最新版本的 Netty 和 JDK 1.6 或更高版本, 要透彻理解 Netty , 需要先学习 NIO

1.0 I/O 模型

I/O 模型简单的理解:就是用什么样的通道进行数据的发送和接收,很大程度上决定了程序通信的性能

Java 共支持 3 种网络编程模型/IO 模式:BIO、NIO、AIO

  1. Java BIO : 同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销

BIO 就是传统的 java io 编程,其相关的类和接口在 java.io

72SSeI.png

  1. Java NIO : 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注

册到多路复用器上,多路复用器轮询到连接有 I/O 请求就进行处理

72Fr1f.md.png

  1. Java AIO(NIO.2) : 异步非阻塞,AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效

的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较

多且连接时间较长的应用

2.0 BIO、NIO、AIO 适用场景

3.0 同步(Synchronous)与异步(Asynchronous)

同步和异步都是基于应用程序和操作系统处理 IO 事件所采用的方式。

同步:是应用程序要直接参与 IO 读写的操作。

异步:所有的 IO 读写交给操作系统去处理,应用程序只需要等待通知。

同步方式在处理 IO 事件的时候,必须阻塞在某个方法上面等待我们的 IO 事件完成(阻塞 IO 事件或者通过轮询 IO事件的方式),对于异步来说,所有的 IO 读写都交给了操作系统。这个时候,我们可以去做其他的事情,并不需要去完成真正的 IO 操作,当操作完成 IO 后,会给我们的应用程序一个通知。

所以异步相比较于同步带来的直接好处就是在我们处理IO数据的时候,异步的方式我们可以把这部分等待所消耗的资源用于处理其他事务,提升我们服务自身的性能。

72VQbD.md.png

4.0 BIO与NIO对比

4.1 BIO

BIO的缺点: 每个请求都需要创建独立的线程,与对应的客户端进行数据 Read,业务处理,数据 Write 。

当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。

连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费。

BIO的好处: 编码简单, 便于调试、理解

4.2 NIO

NIO的缺点: 编码需要一定学习成本, 不容易调试、理解

NIO的好处: 使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入, 这个线程同时可以去做别的事情, 通过Selector(选择器)用于监听多个通道的事件(比如:连接请求, 数据到达等),因此使用单个线程就可以监听多个客户端通道。

NIO是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配 50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个, 这就大量节约了系统资源。

同样HTTP2.0 也使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比 HTTP1.1 大了好几个数量级

5 NIO三大核心部分通信的简单模型

72KyiF.md.png

6.0 缓冲区(Buffer)

缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。

Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer

72Mgk8.md.png

在 NIO 中,Buffer 是一个顶层父类,它是一个抽象类, 类的层级关系图:

72QMAf.md.png

Buffer 类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息:

以及相关方法:

721kyd.md.png

从前面可以看出对于 Java 中的基本数据类型(boolean 除外),都有一个 Buffer 类型与之相对应,最常用的自然是 ByteBuffer 类(二进制数据),

该类的主要方法如下:

7Hush9.png

7.0 通道(Channel)

7.1 基本介绍

NIO 的通道类似于流,但有些区别如下:

BIO 中的 stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道(Channel) 是双向的,可以读操作,也可以写操作。

7.2 Channel

在 NIO 中是一个接口

public interface Channel extends Closeable {
    public boolean isOpen();
    public void close() throws IOException;
}

常 用 的 Channel 类有: FileChannel(文件的读写) , DatagramChannel(UDP的数据读写) , ServerSocketChannelSocketChannel (TCP的数据读写)。

【ServerSocketChanne 类似 ServerSocket , SocketChannel 类似 Socket】

7RC2dA.md.png

7.2.1 FileChannel 类 (例子在基础笔记中的FileChannel读写文件)

FileChannel 主要用来对本地文件进行 IO 操作,FileChannel只能在阻塞模式下工作,所以无法搭配Selector,

常见的方法有

7.3 关于 Buffer 和 Channel 的注意事项和细节

//得到一个只读的 
Buffer ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); 
//读取 
while (readOnlyBuffer.hasRemaining()) { 
	System.out.println(readOnlyBuffer.get()); 
}
readOnlyBuffer.put((byte)100); //ReadOnlyBufferException
RandomAccessFile randomAccessFile=new RandomAccessFile("1.txt", "rw");
 //获取对应的通道 
 FileChannel channel=randomAccessFile.getChannel();

/***  参数 1: FileChannel.MapMode.READ_WRITE 使用的读写模式  
	* 参数 2: 0 : 可以直接修改的起始位置 
	* 参数 3: 5: 是映射到内存的大小(不是索引位置) ,即将 1.txt 的多少个字节映射到内存 
	* 可以直接修改的范围就是 0-5 
	* mappedByteBuffer实际类型是 DirectByteBuffer
    */
 MappedByteBuffer mappedByteBuffer=channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
 mappedByteBuffer.put(0, (byte)'H');
 mappedByteBuffer.put(3, (byte)'9');
 mappedByteBuffer.put(5, (byte)'Y'); //这里抛出异常IndexOutOfBoundsException 
 randomAccessFile.close();

Scattering:将数据写入到 buffer 时,可以采用 buffer 数组,依次写入 [分散]

Gathering: 从 buffer 读取数据时,可以采用 buffer

void contextLoads() throws Exception {
        RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw");
        //获取对应的通道
        FileChannel channel = randomAccessFile.getChannel();
        //创建 buffer 数组
        ByteBuffer[] byteBuffers = new ByteBuffer[2];
        byteBuffers[0] = ByteBuffer.allocate(5);
        byteBuffers[1] = ByteBuffer.allocate(3);
        // 循环添加
        for (ByteBuffer byteBuffer : byteBuffers) {
            byteBuffer.put("8".getBytes(StandardCharsets.UTF_8));
        }
        channel.write(byteBuffers); // byteBuffers数组写 (也可以读)
    }

8.0 Selector

8.1 基本介绍

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用 (如下图)

多路复用仅针对网络 IO,普通文件 IO 无法利用多路复用

77gqUS.png

  1. Netty 的 IO 线程 NioEventLoop 聚合了 Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接。
  2. 当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。
  3. 线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。
  4. 由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
  5. 一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。

8.2 Selector

相关方法

77fPVP.png

8.2.1 重要的属性字段

selector=>AbstractSelector=>SelectorImpl重要的属性字段:

7X2KG6.md.png

8.3 NIO 非阻塞 网络编程原理分析图

NIO 非阻塞 网络编程相关的(Selector、SelectionKey、ServerScoketChannel 和 SocketChannel) 关系梳理图

7H6z9J.md.png

  1. 当客户端连接时,会通过 ServerSocketChannel 得到 SocketChannel
  2. Selector 进行监听 select 方法, 返回有事件发生的通道的个数.
  3. 将 socketChannel 注册到 Selector 上, register(Selector sel, int ops), 一个 selector 上可以注册多个 SocketChannel
  4. 注册后返回一个 SelectionKey, 会和该 Selector 关联(集合), 进一步得到各个 SelectionKey (有事件发生)
  5. 在通过 SelectionKey 反向获取 SocketChannel , 方法 channel()
  6. 可以通过 得到的 channel , 完成业务处理

8.4 SelectionKey

SelectionKey,表示 Selector 和网络通道的注册关系(或者监听的事件), 共四种:

8.4.1 SelectionKey 相关方法

7Hhqxg.png

8.5 ServerSocketChannel

ServerSocketChannel 在服务器端监听新的客户端 Socket 连接

相关方法如下:

7Ho8L4.png

8.6 SocketChannel

SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。

相关方法如下:

7HHymF.png

9.0 一个简单的NIO非阻塞配合复用器的网络编程

9.1 NIO服务端

/**
 * @Author: ZhiHao
 * @Date: 2022/1/25 17:58
 * @Description: NIO网络编程演示
 * @Versions 1.0
 **/
@Slf4j
public class NIOSelectorServiceDemo {

    public static void main(String[] args) throws Exception {
        // 1-打开多路复用器
        Selector selector = Selector.open();
        // 2-打开一个ServerSocketChannel通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 3-绑定到8888端口
        serverSocketChannel = serverSocketChannel.bind(new InetSocketAddress(8888));
        // 4-ServerSocketChannel通道设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        // 4-ServerSocketChannel通道注册到Selector多路复用器
        SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 5-接受请求
        for (; ; ) {
            // 6-若没有事件就绪, 就阻塞等待直到有事件发生
            log.info("NIOServiceDemo-0, 准备阻塞直到等待事件发生");
            int select = selector.select();
            log.info("NIOServiceDemo-1, select:{}", select);
            if (select <= 0) {
                log.info("NIOServiceDemo-2, 没有事件发生!");
                continue;
            }
            // 7-遍历复用器里面的所有事件key
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                // 获取事件key
                SelectionKey key = iterator.next();
                // 手动移除当前selectionKey, 防止重复操作
                iterator.remove();
                // 8-当key的事件绑定的是接收连接进行处理
                SocketChannel clientSocketChannel = null;
                if (key.isAcceptable()) {
                    log.info("NIOServiceDemo-3, 发生接受连接事件!");
                    ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) key.channel();
                    clientSocketChannel = serverSocketChannel1.accept();
                    clientSocketChannel.configureBlocking(false);
                    // 9-服务端通道注册读取事件到复用器, 并设置一个通道共享数据
                    // (这里设置容量10是为了演示一次性读不完进行扩展例子, 正常应该经过客户端协商定义大小)
                    clientSocketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(10));
                }
                // 10-当key的事件是读取事件时候进行处理
                try {
                    // 关闭客户端时候总会触发一个客户端事件
                    if (key.isReadable()) {
                        log.info("NIOServiceDemo-4, 发生读取事件!");
                        clientSocketChannel = (SocketChannel) key.channel();
                        // 11-获取通道共享数据
           ByteBuffer attachment = (ByteBuffer) Optional.ofNullable(key.attachment()).orElseGet(()-> ByteBuffer.allocate(1024));
                        int read;
                        while ((read = clientSocketChannel.read(attachment)) > 0) {
                            log.info("NIOServiceDemo-read, 读取到的数量:{}", read);
                            // position=limit, 说明ByteBuffer容量不足以一次性全部读完通道数据, 需要进行扩容
                            if (attachment.position() == attachment.limit()) {
                                ByteBuffer newByteBuffer = ByteBuffer.allocate(attachment.limit() * 2);
                                attachment.flip(); // 转换读取模式
                                newByteBuffer.put(attachment);
                                attachment = newByteBuffer;
                                key.attach(attachment); // 替换对应通道的共享数据
                            }
                        }

                        // 如果是客户端正常主动取消, read = -1
                        if (read == -1) {
                            log.error("NIOServiceDemo-cance, 客户端主动关闭了通道!");
                            cancelAndClose(key, clientSocketChannel);
                            continue;
                        }
                        attachment.flip();
                        String str = new String(attachment.array(), attachment.position(), attachment.limit());
                        System.out.println(str);
                        // 12-数据写回给客户端 (wrap方法会根据内容分配大小)
                        clientSocketChannel.write(ByteBuffer.wrap((str + "我写回数据啦").getBytes(StandardCharsets.UTF_8)));
                    }
                } catch (IOException e) {
                    log.error("NIOServiceDemo-5, 客户端异常断开了!, 进行取消注册与关闭通道");
                    cancelAndClose(key, clientSocketChannel);
                }
            }
        }
    }

    private static void cancelAndClose(SelectionKey key, SocketChannel clientSocketChannel) {
        key.cancel(); // 取消注册
        IoUtil.close(clientSocketChannel); // 关闭通道
    }
}

9.2 NIO客户端

@Slf4j
public class NIOClientDemo {

    public static void main(String[] args) throws Exception {
        // 1-打开多路复用器 (思考这里是不是可以使用服务端相同的一个多路复用器)
        Selector selector = Selector.open();
        // 2-打开得到ClientSocketChannel
        SocketChannel clientSocketChannel = SocketChannel.open();
        // 3-设置非阻塞 (默认是true阻塞)
        clientSocketChannel.configureBlocking(false);
        // 4-客户端通道注册连接事件到复用器
        SelectionKey clientSelectionKey = clientSocketChannel.register(selector, SelectionKey.OP_CONNECT);
        // 5-连接服务端
        boolean connect = clientSocketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
        log.info("NIOClientDemo-1, 连接需要时间, 这里不会阻塞,结果connect:{}", connect);

        for (; ; ) {
            // 6-若没有事件就绪, 就阻塞等待直到有事件发生
            log.info("NIOClientDemo-0, 准备阻塞直到等待事件发生");
            int select = selector.select();
            log.info("NIOClientDemo-1, 结果select:{}", select);
            if (select == 0) {
                log.info("NIOClientDemo-2, 没有事件发生!");
                continue;
            }
            // 7-获取所有的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                // 8-是连接事件
                if (key.isConnectable()) {
                    log.info("NIOClientDemo-3, 连接事件发生!");
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    // 如果没有成功则循环等待
                    while (!socketChannel.finishConnect()) {}
                    socketChannel.configureBlocking(false);
                    // 9-客户端通道注册一个读取服务器响应写回的数据事件 (注意如果通道没有设置为非阻塞,注册会抛出异常)
                    socketChannel.register(selector, SelectionKey.OP_READ);
                    // 10-响应数据给服务端
                    // 包装字节数组方式, 数组长度多长就分配多大buffer, 节约空间
                    ByteBuffer byteBuffer = ByteBuffer.wrap("测试发送参数666".getBytes(StandardCharsets.UTF_8));
                    socketChannel.write(byteBuffer);
//                    ByteBuffer allocate = ByteBuffer.allocate(1000);
//                    allocate.put("测试发送参数666".getBytes(StandardCharsets.UTF_8));
//                    allocate.flip();
//                    socketChannel.write(allocate);
                }
                // 11-是读取事件, 进行读取数据
                if (key.isReadable()) {
                    log.info("NIOClientDemo-4, 读取服务端事件发生!");
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 1024);
                    socketChannel.read(byteBuffer);
                    byteBuffer.flip();
                    String str = new String(byteBuffer.array(), byteBuffer.position(), byteBuffer.limit());
                    System.out.println(str);
                }
            }
        }
    }
}

结果:

16:34:27.062 [main] INFO com.zhihao.nio.NIOServiceDemo - NIOServiceDemo-0, 准备阻塞直到等待事件发生
16:34:31.030 [main] INFO com.zhihao.nio.NIOServiceDemo - NIOServiceDemo-1, select:1
16:34:31.032 [main] INFO com.zhihao.nio.NIOServiceDemo - NIOServiceDemo-3, 发生接受连接事件!
16:34:31.032 [main] INFO com.zhihao.nio.NIOServiceDemo - NIOServiceDemo-0, 准备阻塞直到等待事件发生
16:34:31.037 [main] INFO com.zhihao.nio.NIOServiceDemo - NIOServiceDemo-1, select:1
16:34:31.037 [main] INFO com.zhihao.nio.NIOServiceDemo - NIOServiceDemo-4, 发生读取事件!
测试发送参数666
16:34:31.038 [main] INFO com.zhihao.nio.NIOServiceDemo - NIOServiceDemo-0, 准备阻塞直到等待事件发生

-------------------------------------------------------------------------------------
16:34:31.032 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-1, 连接需要时间, 这里不会阻塞,结果connect:false
16:34:31.036 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-0, 准备阻塞直到等待事件发生
16:34:31.037 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-1, 结果select:1
16:34:31.037 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-3, 连接事件发生!
16:34:31.038 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-0, 准备阻塞直到等待事件发生
16:34:31.038 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-1, 结果select:1
16:34:31.038 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-4, 读取服务端事件发生!
测试发送参数666我写回数据啦
16:34:31.039 [main] INFO com.zhihao.nio.NIOClientDemo - NIOClientDemo-0, 准备阻塞直到等待事件发生

10. 为什么处理完需要remove()

因为触发事件添加进 selectedKeys 容器中, 而selectedKeys又不会主动删除, 下次迭代事件selectedKeys的时候就会处理了这些已经处理过的通道事件, 但这些事件又没有真正发生事件, 操作就会报错。

扩展

非阻塞模式配合复用器

  1. 打开对应服务/客户端的通道, 设置异步模式
  2. 打开Selector, 并将对应通道监听那些事件注册到Selector
  3. 阻塞监听通道是否发生事件
  4. 发生事件后遍历所有的事件key
  5. 根据事件key类型执行对应操作类型

阻塞模式

  1. 打开对应服务/客户端的通道, 设置阻塞
  2. 当接收到新连接后, 开启一个新线程进行阻塞读

服务端

缺点是当有很多客户端连接时候, 就会有多少个线程进行处理! 线程资源很宝贵的

private static ExecutorService executors = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8888));
        serverSocketChannel.configureBlocking(true); // 默认true
        ByteBuffer allocate = ByteBuffer.allocate(10);
        StringBuilder builder = new StringBuilder();
        for (; ; ) {
            log.info("线程名称:{}, 阻塞等待连接!",Thread.currentThread().getName());
            // 这里阻塞当前线程等待新连接
            SocketChannel channel = serverSocketChannel.accept();
            channel.configureBlocking(true);
            executors.execute(()->{
                // 接收到的通道也设置是阻塞的情况, channel.read()读取完了之后, 在次读也会阻塞,
                // 这个线程就被阻塞了, 所以需要新开线程处理, 这也是阻塞IO的缺点, 每个连接都需要开一个线程处理
                try {
                    log.info("阻塞的连接成功!");
                    int read;
                    while ((read =channel.read(allocate)) > 0) {
                        log.info("读取到多少个字节:{}",read);
                        allocate.flip();
                        builder.append(new String(allocate.array(),allocate.position(),allocate.limit()));
                        allocate.clear();
                        log.info("读取的结果:{}",builder.toString());
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
    }

客户端 (多个也是代码一样)

public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(true);
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
        while (!socketChannel.finishConnect()) {
        }
        waitWrite(socketChannel);
        LockSupport.park();
    }

    private static void waitWrite(SocketChannel channel) throws IOException {
        Scanner scan = new Scanner(System.in);
        log.info("GroupChatNIOClient1号, 求输入需要发送的信息, 格式 1-xxxxx");
        System.out.print("输入数据:");
        String str = null;  // 接收数据
        while (StrUtil.isNotBlank((str = scan.next()))) {
            break;
        }
        String[] split = str.split("-");
        JSONObject jsonObject = new JSONObject();
        if (split.length > 1) {
            jsonObject.putOnce("clientNum", split[0]);
            jsonObject.putOnce("data", split[1]);
        } else {
            jsonObject.putOnce("data", split[0]);
        }
        log.info("GroupChatNIOClient1号, 输入需要发送的信息为:{}", jsonObject.toString());
        channel.write(ByteBuffer.wrap(jsonObject.toString().getBytes(StandardCharsets.UTF_8)));
    }
11:34:55.292 [main] INFO com.zhihao.nio.BlockNIOService - 线程名称:main, 阻塞等待连接!
11:34:59.061 [main] INFO com.zhihao.nio.BlockNIOService - 线程名称:main, 阻塞等待连接!
11:34:59.061 [pool-1-thread-1] INFO com.zhihao.nio.BlockNIOService - 阻塞的连接成功!
11:35:08.550 [pool-1-thread-1] INFO com.zhihao.nio.BlockNIOService - 读取到多少个字节:10
11:35:08.551 [pool-1-thread-1] INFO com.zhihao.nio.BlockNIOService - 读取的结果:{"data":"8
11:35:08.551 [pool-1-thread-1] INFO com.zhihao.nio.BlockNIOService - 读取到多少个字节:10
11:35:08.551 [pool-1-thread-1] INFO com.zhihao.nio.BlockNIOService - 读取的结果:{"data":"88888888888
11:35:08.551 [pool-1-thread-1] INFO com.zhihao.nio.BlockNIOService - 读取到多少个字节:7
11:35:08.551 [pool-1-thread-1] INFO com.zhihao.nio.BlockNIOService - 读取的结果:{"data":"8888888888888888"}
11:35:22.601 [main] INFO com.zhihao.nio.BlockNIOService - 线程名称:main, 阻塞等待连接!
11:35:22.601 [pool-1-thread-2] INFO com.zhihao.nio.BlockNIOService - 阻塞的连接成功!
11:35:33.533 [pool-1-thread-2] INFO com.zhihao.nio.BlockNIOService - 读取到多少个字节:10
11:35:33.533 [pool-1-thread-2] INFO com.zhihao.nio.BlockNIOService - 读取的结果:{"data":"8888888888888888"}{"data":"9
11:35:33.533 [pool-1-thread-2] INFO com.zhihao.nio.BlockNIOService - 读取到多少个字节:4
11:35:33.533 [pool-1-thread-2] INFO com.zhihao.nio.BlockNIOService - 读取的结果:{"data":"8888888888888888"}{"data":"998"}
// -----------------------------------------------------------------------------------
输入数据:8888888888888888
11:35:08.548 [main] INFO com.zhihao.nio.BlockNIOClient1 - , 输入需要发送的信息为:{"data":"8888888888888888"}
//-----------------------------------------------------------------------------------
输入数据:998
11:35:33.531 [main] INFO com.zhihao.nio.BlockNIOClient2 - , 输入需要发送的信息为:{"data":"998"}

非阻塞模式(不配合复用器)

  1. 打开对应服务/客户端的通道, 设置非阻塞
  2. 当接收到新连接后, 只开启一个新线程进行非阻塞循环读取所有通道里面内容

服务端

缺点, 有二个线程会不断的自旋, 大量消耗CPU资源!

private static int num = 0;
    private static int num1 = 0;
    private static int num2 = 0;

    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8888));
        serverSocketChannel.configureBlocking(false); // 默认true
        ByteBuffer allocate = ByteBuffer.allocate(8);
        StringBuilder builder = new StringBuilder();
        List<SocketChannel> NotBlockSocketChannel = new CopyOnWriteArrayList<>();
        for (; ; ) {
            if (num1 < 3) {
                log.info("不阻塞等待连接!, 只打印3次日志");
                num1++;
            }
            // 这里不阻塞当前线程等待新连接, 但是没有新连接的情况下, 这里会一直疯狂每循环一次都返回null
            SocketChannel channel = serverSocketChannel.accept();
            if (Objects.isNull(channel)) {
                if (num2 < 3) {
                    log.info("没有连接每循环一次都返回null! 只打印3次日志");
                    num2++;
                }
                continue;
            }
            channel.configureBlocking(false);
            // 接收到的通道也设置是非阻塞的情况,
            log.info("非阻塞的连接成功!, 非阻塞通道添加进通道集合, 不断循环读取集合中通道的内容");
            NotBlockSocketChannel.add(channel);

            // 只需要开启一个线程不断循环读取集合通道里面的内容
            if (num == 0) {
                new Thread(() -> {
                    for (; ; ) {
                        try {
                            for (SocketChannel socketChannel : NotBlockSocketChannel) {
                                int read;
                                while ((read = socketChannel.read(allocate)) > 0) {
                                    log.info("读取到多少个字节:{}", read);
                                    allocate.flip();
                                    builder.append(new String(allocate.array(), allocate.position(), allocate.limit()));
                                    allocate.clear();
                                    log.info("读取的结果:{}", builder.toString());
                                }
                                builder.setLength(0);
                            }
                        } catch (IOException e) {
                        }
                    }
                }).start();
                num++;
            }
        }
    }

客户端 (多个也是代码一样)

public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
        while (!socketChannel.finishConnect()) {
        }
        waitWrite(socketChannel);
        LockSupport.park();
    }

    private static void waitWrite(SocketChannel channel) throws IOException {
        Scanner scan = new Scanner(System.in);
        log.info("NotBlockNIOClient1号, 求输入需要发送的信息, 格式 1-xxxxx");
        System.out.print("输入数据:");
        String str = null;  // 接收数据
        while (StrUtil.isNotBlank((str = scan.next()))) {
            break;
        }
        String[] split = str.split("-");
        JSONObject jsonObject = new JSONObject();
        if (split.length > 1) {
            jsonObject.putOnce("clientNum", split[0]);
            jsonObject.putOnce("data", split[1]);
        } else {
            jsonObject.putOnce("data", split[0]);
        }
        log.info("NotBlockNIOClient1号, 输入需要发送的信息为:{}", jsonObject.toString());
        channel.write(ByteBuffer.wrap(jsonObject.toString().getBytes(StandardCharsets.UTF_8)));
    }
12:02:50.068 [main] INFO com.zhihao.nio.NotBlockNIOService - 不阻塞等待连接!, 只打印3次日志
12:02:50.072 [main] INFO com.zhihao.nio.NotBlockNIOService - 没有连接每循环一次都返回null! 只打印3次日志
12:02:50.072 [main] INFO com.zhihao.nio.NotBlockNIOService - 不阻塞等待连接!, 只打印3次日志
12:02:50.072 [main] INFO com.zhihao.nio.NotBlockNIOService - 没有连接每循环一次都返回null! 只打印3次日志
12:02:50.072 [main] INFO com.zhihao.nio.NotBlockNIOService - 不阻塞等待连接!, 只打印3次日志
12:02:50.072 [main] INFO com.zhihao.nio.NotBlockNIOService - 没有连接每循环一次都返回null! 只打印3次日志
12:02:54.680 [main] INFO com.zhihao.nio.NotBlockNIOService - 非阻塞的连接成功!, 非阻塞通道添加进通道集合, 不断循环读取集合中通道的内容
12:02:58.088 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取到多少个字节:8
12:02:58.091 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取的结果:{"data":
12:02:58.091 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取到多少个字节:8
12:02:58.091 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取的结果:{"data":"8978978
12:02:58.091 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取到多少个字节:4
12:02:58.091 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取的结果:{"data":"897897897"}
12:03:03.570 [main] INFO com.zhihao.nio.NotBlockNIOService - 非阻塞的连接成功!, 非阻塞通道添加进通道集合, 不断循环读取集合中通道的内容
12:03:08.330 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取到多少个字节:8
12:03:08.331 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取的结果:{"data":
12:03:08.331 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取到多少个字节:6
12:03:08.331 [Thread-0] INFO com.zhihao.nio.NotBlockNIOService - 读取的结果:{"data":"998"}
// ---------------------------------------------
输入数据:897897897
12:02:58.086 [main] INFO com.zhihao.nio.NotBlockNIOClient1 - NotBlockNIOClient1号, 输入需要发送的信息为:{"data":"897897897"}
// ----------------------------------------------
输入数据:998
12:03:08.328 [main] INFO com.zhihao.nio.NotBlockNIOClient1 - NotBlockNIOClient1号, 输入需要发送的信息为:{"data":"998"}

直接缓冲区DirectBuffer

零拷贝指的是数据无需拷贝到 JVM 内存中,同时具有以下三个优点

传统的 IO 将一个文件通过 socket 写出 内存流程如下

7jmRq1.png

可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

NIO通过了DirectBuffer 进行了优化

NIO 的 Buffer 除了做了缓冲块优化之外,还提供了一个可以直接访问物理内存的类 DirectBuffer。普通的 Buffer 分配的是 JVM 堆内存,而 DirectBuffer 是直接分配物理内存 (非堆内存)。

DirectBuffer 则是直接将步骤简化为数据直接保存到非堆内存,从而减少了一次数据拷贝。

由于 DirectBuffer 申请的是非 JVM 的物理内存,所以创建和销毁的代价很高。DirectBuffer 申请的内存并不是直接由 JVM 负责垃圾回收,但在 DirectBuffer 包装类被回收时,会通过 Java Reference 机制来释放该内存块。

DirectBuffer 只优化了用户空间内部的拷贝,而之前我们是说优化用户空间和内核空间的拷贝,那 Java 的 NIO 中是否能做到减少用户空间和内核空间的拷贝优化呢?

答案是可以的,DirectBuffer 是通过 unsafe.allocateMemory(size) 方法分配内存,也就是基于本地类 Unsafe 类调用 native 方法进行内存分配的。而在 NIO 中,还存在另外一个 Buffer 类:MappedByteBuffer,跟 DirectBuffer 不同的是,MappedByteBuffer 是通过本地类调用 mmap 进行文件内存映射的,map() 系统调用方法会直接将文件从硬盘拷贝到用户空间,只进行一次数据拷贝,从而减少了传统的 read() 方法从硬盘拷贝到内核空间这一步。

ByteBuffer 粘包与半包

网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔
但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为

变成了下面的两个 byteBuffer (粘包,半包)

出现原因

发送方在发送数据时,并不是一条一条地发送数据,而是将数据整合在一起,当数据达到一定的数量后再一起发送。这就会导致多条信息被放在一个缓冲区中被一起发送出去

接收方的缓冲区的大小是有限的,当接收方的缓冲区满了以后,就需要将信息截断,等缓冲区空了以后再继续放入数据。这就会发生一段完整的数据最后被截断的现象

暂时的解决办法

public class ByteBufferDemo {
    public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(32);
        // 模拟粘包+半包
        buffer.put("Hello,world\nI'm Nyima\nHo".getBytes());
        // 调用split函数处理
        split(buffer);
        buffer.put("w are you?\n".getBytes());
        split(buffer);
    }

    private static void split(ByteBuffer buffer) {
        // 切换为读模式
        buffer.flip();
        for(int i = 0; i < buffer.limit(); i++) {

            // 遍历寻找分隔符
            if (buffer.get(i) == '\n') {
                // 缓冲区长度
                int length = i+1-buffer.position();
                ByteBuffer target = ByteBuffer.allocate(length);
                // 将前面的内容写入target缓冲区
                for(int j = 0; j < length; j++) {
                    // 将buffer中的数据写入target中
                    target.put(buffer.get());
                }
                // 打印查看结果
                ByteBufferUtil.debugAll(target);
            }
        }
        // 切换为写模式,但是缓冲区可能未读完,这里需要使用compact
        buffer.compact();
    }
}

ByteBuffer 大小分配

一次无法写完例子

public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8888));

        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while(true) {
            selector.select();

            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
                    // 1. 向客户端发送内容
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 30000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    int write = sc.write(buffer);
                    // 3. write 表示实际写了多少字节
                    System.out.println("实际写入字节:" + write);
                    // 4. 如果有剩余未读字节,才需要关注写事件
                    if (buffer.hasRemaining()) {
                    log.info("NIOServiceDemo-通道写了:{}缓冲池消耗跟不上了, 还有{}未完写的数据, 给当前通道事件key添加多个写事件, 当通道可写时候触发",write,buffer.limit()-write);
                        // read 1  write 4
                        // 在原有关注事件的基础上,多关注 写事件 (关注了读+写)
                        sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
                        // 把 buffer 作为附件加入 sckey
                        sckey.attach(buffer);
                    }
                } else if (key.isWritable()) {
                    // 取没写完的附件、通道
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    SocketChannel sc = (SocketChannel) key.channel();
                    // 进行写
                    int write = sc.write(buffer);
                    log.info("NIOServiceDemo-进行再次进行写未写完的数据, 实际写入字节:{}",write);
                    if (!buffer.hasRemaining()) { 
                        // 写完了, 取消关注写事件与附件
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                        key.attach(null);
                    }
                }
            }
        }
    }

1

标签:INFO,Netty,01,20,nio,阻塞,ByteBuffer,zhihao,com
来源: https://www.cnblogs.com/zhihao-plus/p/15910959.html