其他分享
首页 > 其他分享> > Netty学习笔记(1) NIO基础-2

Netty学习笔记(1) NIO基础-2

作者:互联网

文章目录


1. 前言

笔记基于黑马的Netty教学,视频地址:黑马Netty


2. 网络编程(单线程)

1、阻塞

下面的例子:使用客户端和服务端演示阻塞

  1. 1个客户端的情况,但是下面的代码无法处理一个客户端发送多条请求的情况,因为在发送完成的时候 SocketChannel sc = ssc.accept()再次堵塞,这时候需要一个新的客户端才可以继续运行下去。阻塞模式下单线程不能很好的工作。
@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        //使用NIO来理解阻塞模式
        ByteBuffer buffer = ByteBuffer.allocate(16);
        //1、创建一个服务器对象
        ServerSocketChannel ssc = ServerSocketChannel.open();

        //2、绑定端口
        ssc.bind(new InetSocketAddress(8080));

        //3、建立连接的集合
        List<SocketChannel> channels = new LinkedList<>();

        while (true) {
            //4、建立与客户端的一个连接accept
            //SocketChannel 用来和客户端之间进行通信
            log.debug("connecting...");
            /**
             accept默认阻塞,单线程情况下线程停止运行,连接建立之后才可以继续指向
             */
            SocketChannel sc = ssc.accept();
            log.debug("connected... {}", sc);
            channels.add(sc);
            for (SocketChannel channel : channels) {
                //connected local=/127.0.0.1:8080 remote=/127.0.0.1:61314
                log.debug("before read... {}", channel);
                //5、接受客户端发送的数据
                /**
                 read也是阻塞方法,线程停止运行,客户端没有发送数据那么这里就不会继续走下去
                 */
                channel.read(buffer);
                //读模式
                buffer.flip();
                debugAll(buffer);
                //写模式
                buffer.clear();
                log.debug("after read... {}", channel);
            }
        }
    }
}
public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        System.out.println("waiting");
    }
}
  1. 当然解决方法可以是一个客户端用一个连接去处理


2、非阻塞

下面的例子:使用客户端和服务端演示非阻塞

  1. 非阻塞模式下使用configureBlocking(false)来指定,此时的accept方法和read方法都是非阻塞的
  2. 但是也有一个弊端,就是cpu占用率太高了,无论什么情况都在死循环,效率很低,所以我们需要改进,能不能等到有连接的时候才调用。
@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        //使用NIO来理解阻塞模式
        ByteBuffer buffer = ByteBuffer.allocate(16);
        //1、创建一个服务器对象
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);   //切换成非阻塞模式,accept方法变成非阻塞的
        //2、绑定端口
        ssc.bind(new InetSocketAddress(8080));

        //3、建立连接的集合
        List<SocketChannel> channels = new LinkedList<>();

        while (true) {
            //4、建立与客户端的一个连接accept
            //SocketChannel 用来和客户端之间进行通信
            log.debug("connecting...");
            /**
             accept默认阻塞,单线程情况下线程停止运行,连接建立之后才可以继续指向
             非阻塞模式下,线程还会继续运行,如果没有连接建立,那么sc返回的是null
             */
            SocketChannel sc = ssc.accept();
            if(sc != null){
                log.debug("connected... {}", sc);
                channels.add(sc);
                /**
                    channel设置成非阻塞模式,那么下面的read就是非阻塞了
                 */
                sc.configureBlocking(false);
            }
            for (SocketChannel channel : channels) {
                log.debug("before read... {}", channel);
                //5、接受客户端发送的数据
                /**
                 此时的read是非阻塞的,会继续运行,没有读到数据会返回0
                 */
                int read = channel.read(buffer);
                if(read > 0){
                    //读到数据了
                    //读模式
                    buffer.flip();
                    debugAll(buffer);
                    //写模式
                    buffer.clear();
                    log.debug("after read... {}", channel);
                }
            }
        }
    }
}



3、多路复用和事件处理

1. 事件处理

线程必须配合Selector才可以完成对多个Channel可读写事件的监控,这称之为多路复用,注意有事件才有Selector

下面的例子:使用客户端和服务端演示阻塞

  1. 第一步,首先演示监听客户端连接事件,步骤如下:

事件有4种类型:
1、accept-会在用连接请求的时候触发
2、connect-客户端连接建立后触发的事件
3、read-客户端发送数据了就会触发
4、write-可写事件
注意:select在事件未处理的时候,它是不会阻塞的。

具体流程就是首先创建一个Selector,然后创建一个通道ServerSocketChannel,设置为非阻塞的。把这个通道和Selector绑定起来,然后为这个通道设置感兴趣的事件为accept事件,接着绑定端口。调用Selector.select方法进行阻塞,当有事件发生的时候,从Selector获取到所有的事件,然后通过事件找出通道,比如连接的时候,就会找出对accept感兴趣的通道,然后再调用accept接受客户端。

@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        //1. 创建selector,管理多个channel
        Selector selector = Selector.open();

        ByteBuffer buffer = ByteBuffer.allocate(16);
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        //2. 建立channel和selector之间的联系(注册)
        //SelectionKey:事件发生后通过这个可以获取到什么事件,还可以知道是那个channel发生的事件
        /**
            事件有4种类型:
                1.accept-会在用连接请求的时候触发
                2.connect-客户端连接建立后触发的事件
                3.read-客户端发送数据了就会触发
                4.write-可写事件
         */
        SelectionKey sscKey = ssc.register(selector, 0, null);
        //表名我们这个key只关注accept事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        log.debug("register key:{}", sscKey);
        ssc.bind(new InetSocketAddress(8080));

        while (true) {
            //3. selector.select()方法,没有事件就阻塞,有了事件发送了就恢复运行继续向下处理
            //解决了白白循环浪费CPU的问题
            selector.select();

            //4. 处理事件,selectionKeys拿到所有发生的可读可写的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

            while(iterator.hasNext()){
            	//注意,如果事件不调用accept进行处理,那么不会阻塞,因为事件没被处理,就不能阻塞
            	//也就是说事件要么处理要么取消,不能不管
                SelectionKey key = iterator.next();
				//key.cancle():取消事件
                log.debug("key:{}", key);
                //拿到触发事件的channel
                ServerSocketChannel channel = (ServerSocketChannel)key.channel();
                SocketChannel sc = channel.accept();
                log.debug("{}", sc);
            }
        }
    }
}

处理不同的事件
在这里插入图片描述
按上面的图片,一开始的sscKey 全部存放在selector中,后面当有事件发生的时候,会从selector中复制一份到selectedKeys这个集合中去进行遍历。同时要注意几个问题:

@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        //1. 创建selector,管理多个channel
        Selector selector = Selector.open();

        ByteBuffer buffer = ByteBuffer.allocate(16);
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        //2. 建立channel和selector之间的联系(注册)
        //SelectionKey:事件发生后通过这个可以获取到什么事件,还可以知道是那个channel发生的事件
        /**
            事件有4种类型:
                1.accept-会在用连接请求的时候触发
                2.connect-客户端连接建立后触发的事件
                3.read-客户端发送数据了就会触发
                4.write-可写事件
         */
        SelectionKey sscKey = ssc.register(selector, 0, null);
        //表名我们这个key只关注accept事件,所有客户端连接的时候消息都会进入这个通道
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        log.debug("register key:{}", sscKey);
        ssc.bind(new InetSocketAddress(8080));

        while (true) {
            //3. selector.select()方法,没有事件就阻塞,有了事件发送了就恢复运行继续向下处理
            //解决了白白循环浪费CPU的问题
            selector.select();

            //4. 处理事件,selectionKeys拿到所有发生的可读可写的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

            //多个key的时候,accept和read方法都会触发事件,所以要区分事件类型
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                //处理key的时候要从绿色的selectkeys中删除,否则就报错
                iterator.remove();
                log.debug("key:{}", key);

                //5.区分事件类型
                if (key.isAcceptable()) {
                    //拿到触发事件的channel
                    ServerSocketChannel channel = (ServerSocketChannel)key.channel();
                    SocketChannel sc = channel.accept();
                    //设置为非阻塞
                    sc.configureBlocking(false);
                    //scKey管sc的channel
                    SelectionKey scKey = sc.register(selector, 0, null);
                    //scKey关注读事件,也就是说客户端的通道关注可读事件
                    scKey.interestOps(SelectionKey.OP_READ);
                    log.debug("{}", sc);
                }else if(key.isReadable()){
                    //客户端关闭之后也会引发read事件,这时需要从key中remove掉,否则拿不到channel,报错
                    try {
                        //可读事件
                        SocketChannel channel = (SocketChannel)key.channel();//触发事件的channel
                        ByteBuffer buffer1 = ByteBuffer.allocate(16);
                        //客户端正常断开,read返回值是-1
                        int read = channel.read(buffer1);
                        if(read == -1){
                            //正常断开
                            key.cancel();
                        }
                        buffer1.flip();
                        debugAll(buffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                        key.cancel();   //客户端断开,需要将key取消(从selector的key集合中真正删除)
                    }
                }

            }
        }
    }
}



2. 处理消息的边界

对于上面的代码,将buffer的容量减低,比如下面的例子减低为4个字节,当发送了2个汉字的时候,此时由于字节超出边界,会导致读取不完整,有些汉字会被分开读取,也就导致了不能完整读出的问题。

ByteBuffer buffer = ByteBuffer.allocate(4);
// 解码并打印
System.out.println(StandardCharsets.UTF_8.decode(buffer));

//你�
//��

在这里插入图片描述
在文本传输的时候也会出现半包和粘包的情况,那么如何解决呢?


下面是给出第二种处理边界的思路:

else if(key.isReadable()){
//客户端关闭之后也会引发read事件,这时需要从key中remove掉,否则拿不到channel,报错
  try {
      //可读事件
      SocketChannel channel = (SocketChannel)key.channel();//触发事件的channel
      ByteBuffer buffer1 = ByteBuffer.allocate(16);
      //客户端正常断开,read返回值是-1
      int read = channel.read(buffer1);
      if(read == -1){
          //正常断开
          key.cancel();
      }else{
      //split
          split(buffer1);
      }
  } 
  
 //按\n拆分
    private static void split(ByteBuffer source) {
        source.flip();
        for (int i = 0; i < source.limit(); i++) {
            //找到一条完整的信息
            if (source.get(i) == '\n') {
                //一条消息完整的长度
                int length = i + 1 - source.position();
                //把这条完整消息存入一个新的byteBuffer
                ByteBuffer target = ByteBuffer.allocate(length);
                for (int j = 0; j < length; j++) {
                    target.put(source.get());
                }
                debugAll(target);
            }
        }
    }

但是上面的处理是有问题的,因为发送的数据>16的时候由于split检测不到\n,所以没法输出,会不断读,直到读到\n才可以输出。
解决方法:

1、使用buffer作为附件和scKey连接起来。

//一个Buffer关联到SelectionKey中,防止多个channel同时使用一个buffer
ByteBuffer buffer1 = ByteBuffer.allocate(16);   //attachment:附件
SelectionKey scKey = sc.register(selector, 0, buffer1);

2、容量不足的时候进行扩容,同时别忘了重新复制attar

 split(buffer1);
 if(buffer1.position() == buffer1.limit()){
        //扩容
        ByteBuffer newBuffer = ByteBuffer.allocate(buffer1.capacity() * 2);
        buffer1.flip();
        newBuffer.put(buffer1);
        //替换附件
        key.attach(newBuffer);
    }

3、全部代码

@Slf4j
public class Server {

    //按\n拆分
    private static void split(ByteBuffer source) {
        source.flip();
        for (int i = 0; i < source.limit(); i++) {
            //找到一条完整的信息
            if (source.get(i) == '\n') {
                //一条消息完整的长度
                int length = i + 1 - source.position();
                //把这条完整消息存入一个新的byteBuffer
                ByteBuffer target = ByteBuffer.allocate(length);
                for (int j = 0; j < length; j++) {
                    target.put(source.get());
                }
                debugAll(target);
            }
        }
        //compact切换写模式了
        //compact底层是把buffer中的字节变成未读的字节,但是由于我们没有读取,所以这里剩余还是16
        source.compact();
    }

    public static void main(String[] args) throws IOException {
        //1. 创建selector,管理多个channel
        Selector selector = Selector.open();

        ByteBuffer buffer = ByteBuffer.allocate(16);
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        //2. 建立channel和selector之间的联系(注册)
        //SelectionKey:事件发生后通过这个可以获取到什么事件,还可以知道是那个channel发生的事件
        /**
            事件有4种类型:
                1.accept-会在用连接请求的时候触发
                2.connect-客户端连接建立后触发的事件
                3.read-客户端发送数据了就会触发
                4.write-可写事件
         */
        SelectionKey sscKey = ssc.register(selector, 0, null);
        //表名我们这个key只关注accept事件,所有客户端连接的时候消息都会进入这个通道
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        log.debug("register key:{}", sscKey);
        ssc.bind(new InetSocketAddress(8080));

        while (true) {
            //3. selector.select()方法,没有事件就阻塞,有了事件发送了就恢复运行继续向下处理
            //解决了白白循环浪费CPU的问题
            selector.select();

            //4. 处理事件,selectionKeys拿到所有发生的可读可写的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

            //多个key的时候,accept和read方法都会触发事件,所以要区分事件类型
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                //处理key的时候要从绿色的selectkeys中删除,否则就报错
                iterator.remove();
                log.debug("key:{}", key);

                //5.区分事件类型
                if (key.isAcceptable()) {
                    //拿到触发事件的channel
                    ServerSocketChannel channel = (ServerSocketChannel)key.channel();
                    SocketChannel sc = channel.accept();
                    //设置为非阻塞
                    sc.configureBlocking(false);
                    //scKey管sc的channel
                    //一个Buffer关联到SelectionKey中,防止多个channel同时使用一个buffer
                    ByteBuffer buffer1 = ByteBuffer.allocate(16);   //attachment:附件
                    SelectionKey scKey = sc.register(selector, 0, buffer1);
                    //scKey关注读事件,也就是说客户端的通道关注可读事件
                    scKey.interestOps(SelectionKey.OP_READ);
                    log.debug("{}", sc);
                }else if(key.isReadable()){
                    //客户端关闭之后也会引发read事件,这时需要从key中remove掉,否则拿不到channel,报错
                    try {
                        //可读事件
                        SocketChannel channel = (SocketChannel)key.channel();//触发事件的channel
                        //从SelectionKey中获取到独有的ByteBuffer附件
                        ByteBuffer buffer1 = (ByteBuffer)key.attachment();
                        //客户端正常断开,read返回值是-1
                        int read = channel.read(buffer1);
                        if(read == -1){
                            //正常断开
                            key.cancel();
                        }else{
                            split(buffer1);
                            if(buffer1.position() == buffer1.limit()){
                                //扩容
                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer1.capacity() * 2);
                                buffer1.flip();
                                newBuffer.put(buffer1);
                                //替换附件
                                key.attach(newBuffer);
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        key.cancel();   //客户端断开,需要将key取消(从selector的key集合中真正删除)
                    }
                }

            }
        }
    }
}



3. ByteBuffer大小分配

分配大小的思路如下:

  1. 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现:分配地址
  2. 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗



4、write事件

1、初始的代码,使用while循环不断写入

public class WriteServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        ssc.bind(new InetSocketAddress("localhost", 8080));

        while(true){
            selector.select();

            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                if(key.isAcceptable()){
                    //这里其实是ssc,因为只有一个OP_ACCEPT
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    //向客户端发送大量数据
                    StringBuilder sb = new StringBuilder();
                    for(int i = 0; i < 3000000; i++){
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    //返回值是实际写入的字节数
                    while(buffer.hasRemaining()){
                        //2. 写到客户端
                        int write = sc.write(buffer);
                        System.out.println("字节数:" + write);
                    }
                }
            }
        }
    }
}

客户端接受

public class WriteClinent {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        ByteBuffer buffer = ByteBuffer.allocate(1024*1024);
        int count = 0;
        //3. 接受数据
        while(true){
            count += sc.read(buffer);
            System.out.println(count);
            buffer.clear();
        }
    }
}

2、改进
上面的方法可以写入,但是我们有一个想法就是等缓冲区写入完全的时候再向客户端发送数据过去,而不是缓冲区写一点发送一点,这样CPU资源耗费就很大。
步骤思想就是:首先把50000000个字节的数据存入buffer中,然后一开始先写一次,把关注事件添加可写事件(可写是只要不阻塞都可写),第二次进入的时候key.isWritable判断成功,进入其中开始写buffer,后续就不断循环判断写buffer即可,当写完了之后,我们要把buffer从key中清除掉,attach设置为null,因为buffer内存太大的话会影响我们的效率和内存损耗。

public class WriteServer {
    public static void main(String[] args) throws IOException {
    	//创建一个通道
        ServerSocketChannel ssc = ServerSocketChannel.open();
        //设置为非阻塞的
        ssc.configureBlocking(false);
		//选择器
        Selector selector = Selector.open();
        //进行注册绑定selector
        ssc.register(selector, SelectionKey.OP_ACCEPT);
		//绑定端口号
        ssc.bind(new InetSocketAddress("localhost", 8080));
		
        while(true){
        	//等待事件
            selector.select();
			//迭代
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while(iterator.hasNext()){
            	//获取SelectionKey 
                SelectionKey key = iterator.next();
                iterator.remove();
                //连接事件
                if(key.isAcceptable()){
                    //这里其实是ssc,因为只有一个OP_ACCEPT
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    SelectionKey sckey = sc.register(selector, 0, null);

                    //向客户端发送大量数据
                    StringBuilder sb = new StringBuilder();
                    for(int i = 0; i < 5000000; i++){
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    //2. 写到客户端
                    int write = sc.write(buffer);
                    System.out.println("字节数:" + write);
                    //3. 判断是不是还有剩余内容,因为channel一次能写入的数据是有限制的,buffer里面的内容可能写不完
                    //缓冲区还有数据没写完,就不要写了,直接保留等下一次凑够一波再写
                    if(buffer.hasRemaining()){
                        //4. 关注可写事件同时保留原来的事件
                        sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE);
                        //5. 把未写完的数据挂到selectionkey上
                        sckey.attach(buffer);
                    }
                    //可写事件,只要不阻塞其实就是可写事件
                }else if(key.isWritable()){
                //获取buffer ,buffer还有没写出去的一些数据
                    ByteBuffer buffer = (ByteBuffer)key.attachment();
                    //获取通道
                    SocketChannel sc = (SocketChannel)key.channel();
					//写入buffer
                    int write = sc.write(buffer);
                    System.out.println(write);
                    //6. buffer写完了,就清除掉,防止占用内存
                    if(!buffer.hasRemaining()){
                        key.attach(null);   //清除buffer
                        key.interestOps(SelectionKey.OP_READ);//不再关注可写事件
                    }
                }
            }
        }
        //字节数:3276775
        //1179639
        //543586
    }
}
public class WriteClinent {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        ByteBuffer buffer = ByteBuffer.allocate(1024*1024);
        int count = 0;
        //3. 接受数据
        while(true){
            count += sc.read(buffer);
            System.out.println(count);
            buffer.clear();
        }
    }
}



5、小结

1. 绑定Channel

也称之为注册事件,绑定的事件selector才会关心

//创建一个通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//设置为非阻塞的
ssc.configureBlocking(false);
//选择器
Selector selector = Selector.open();
//进行注册绑定selector
ssc.register(selector, SelectionKey.OP_ACCEPT);



2. 监听Channel事件

可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少channel发生了事件
方法1、阻塞知道绑定事件发送:

int count = selector.select();

方法2、阻塞直到绑定事件发生,或是超时(单位ms):

int count = selector.select(long timeout);

方法3、不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值判断有没有事件:

int count = selector.selectNow();

select何时不阻塞 ?

标签:Netty,NIO,buffer,笔记,selector,事件,key,ByteBuffer,channel
来源: https://blog.csdn.net/laohuangaa/article/details/122513118