其他分享
首页 > 其他分享> > Tomcat NIO 模型的实现

Tomcat NIO 模型的实现

作者:互联网

Tomcat 对 BIO 和 NIO 两种模型都进行了实现,其中 BIO 的实现理解起来比较简单,而 NIO 的实现就比较复杂了,并且它跟常用的 Reactor 模型也略有不同,具体设计如下:

Tomcat NIO 模型
可以看出多了一个 BlockPoller 的设计,这是因为在 Servlet 规范中 ServletInputStream 和 ServletOutputStream 是阻塞的,所以请求体和响应体的读取和发送需要阻塞处理。请求行读取SSL 握手使用非阻塞的 Poller 处理。一次连接基本的处理流程是:

接下来分析核心代码的实现,源码来自 Tomcat 6.0.53 版本,之所以使用这个版本是因为看起来简单直观没有太多的抽象,也不影响来理解核心的处理逻辑。首先看下连接处理的方法调用情况,可右键直接打开图片查看大图:

Tomcat NIO 方法调用

相关类或接口的功能如下:

1. Acceptor 注册通道到 Poller 上

Acceptor 和 Poller 分属两个不同的线程,通常情况下 Poller 阻塞在 select() 方法的调用上,此方法会锁住内部的 publicKeys 集合,所以 Acceptor 接收到通道连接不能直接注册到 Poller 上,否则会造成死锁。Tomcat 使用生产者-消费者模式来进行并发协作,缓冲区使用的是 ConcurrentLinkedQueue 无界队列。

Acceptor 接收到连接的 SocketChannel 后,将其配置成非阻塞模式,封装成 NioChannel,最后调用 getPoller0().register(NioChannel) 加入到某个 Poller 的事件队列中。

public void register(final NioChannel socket) {
  socket.setPoller(this); // 关联此 Poller
  KeyAttachment key = keyCache.poll();
  final KeyAttachment ka = key!=null?key:new KeyAttachment();
  // 重置或者初始化 KeyAttachment 对象
  ka.reset(this,socket,getSocketProperties().getSoTimeout());
  PollerEvent r = eventCache.poll();
  // 声明此通道关注的事件
  ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
  // 将此通道和 SelectionKey 附件对象封装成 PollerEvent 对象
  if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
  else r.reset(socket,ka,OP_REGISTER);
  // 加入到 Poller 的 events 队列中
  addEvent(r);
}
public void addEvent(Runnable event) {
  events.offer(event); // 插入队列
  if ( wakeupCounter.incrementAndGet() == 0 )
    selector.wakeup(); // 唤醒 Selector
}

Poler 有个 events() 方法,用于遍历事件队列进行处理,events() 会在 select 调用超时或者被唤醒且没有通道发生 I/O 事件时被调用,代码如下:

public boolean events() {
  boolean result = false;
  Runnable r = null;
  // 遍历事件队列
  while ( (r = events.poll()) != null ) {
    result = true;// 有事件待处理
    try {
      r.run(); // 本质调用的是 PollerEvent.run()
      if ( r instanceof PollerEvent ) {
        // 重置并缓存 PollerEvent 对象
        ((PollerEvent)r).reset();
        eventCache.offer((PollerEvent)r); 
      }
    } catch ( Throwable x ) {
      log.error("",x);
    }
  }
  return result;
}

可以看出这里有个关键对象 PollerEvent,它内部有个 interestOps 属性,表示要处理的事件类型,它有三个可能的值分别是:

OP_REGISTER 的处理就是将通道注册到 Selector 上的最终实现,代码如下:

if ( interestOps == OP_REGISTER ) {
  try {
    // 将 SocketChannel 注册到 Poller 的 Selector 上并指定关注的事件和附加对象
    socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
  } catch (Exception x) {
    log.error("", x);
  }
}

至此已完成了通道注册,接下来看一下 PollerEvent 为什么还要处理 OP_READ 和 OP_WRITE 事件。

2. PollerEvent 对 OP_READ 和 OP_WRITE 的处理

PollerEvent(又或者说 Poller)要处理读写事件,就是因为程序需要一次非阻塞的读或写操作。一开始通道是在 Poller 上声明关注的事件,但是在发生 I/O 事件后,Poller 就会把此通道就绪的事件从它关注的事件中移除(原因见下文),所以如果需要非阻塞的读或写,只能再次在这个 Poller 上重新声明。

解析请求行是非阻塞的,解析过程中,由于 TCP 存在粘包/拆包的问题,可能导致数据读取不完整,需要再次从通道读取,此时就要在关联的 Poller 上重新关注读事件,核心代码:

// 拿到通道在 Poller 上对应的 SelectionKey
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
  boolean cancel = false;
  if (key != null) {
    ...
    // 将 interestOps 合并到 key 现有关注的事件集合中
    int ops = key.interestOps() | interestOps;
    // 更新 key 和 附加对象关注的操作
    att.interestOps(ops);
    key.interestOps(ops);
    att.setCometOps(ops);
  } else {
    cancel = true;
  }
}catch (CancelledKeyException ckx) {}

3. Poller 对 I/O 事件的处理

Poller 就是 Reactor,主要功能是将就绪的 SelectionKey 分配给处理器处理,此外它还检查通道是否超时。它在调用 select 方法时会根据条件确定是阻塞还是非阻塞,代码如下:

if ( !close ) {
  if (wakeupCounter.getAndSet(-1) > 0) {
    // wakeupCounter 大于0,意味着 Acceptor 接收了大量连接,产生大量 PollerEvent 急
    // 需 Poller 消费处理,此时进行一次非阻塞调用
    keyCount = selector.selectNow();// 非阻塞直接返回
  } else {
    // wakeupCounter 等于0,阻塞等待 IO 事件发生或被唤醒
    keyCount = selector.select(selectorTimeout);
  }
  wakeupCounter.set(0);
}

当有通道 I/O 事件就绪时,Poller 将会创建一个 SocketProcessor 提交线程池处理,具体代码不再贴出。在这个过程中有一个将当前就绪的事件从 SelectionKey 中移除的操作,这是为了后续能够在 BlockPoller 上阻塞读写时,防止多个线程的干扰,具体代码如下:

protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) {
  // 取反再与 - 表示从 sk.interestOps() 中清除 readyOps 所在的位
  reg(sk,attachment,sk.interestOps()& (~readyOps));
}
protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) {
  sk.interestOps(intops);
  attachment.interestOps(intops);
  //attachment.setCometOps(intops);
}

检查超时的方法是 Poller.timedout(keyCount, hasEvents),它在 Poller 的每次循环上都被调用,但不是每次都处理超时,因为这会产生过多的负载,而超时可等待几秒钟再超时也没事。Poler 有一个名为 nextExpiration 的成员变量,它表示检查超时的最短时间间隔,在这个时间内,如果只是 select() 调用超时(表示负载不大)会执行处理超时。

4. SocketProcessor 的处理

SocketProcessor 处理 SSL 握手和调用 Handler 进行实际的 I/O 操作。Handler 的子类 Http11ConnectionHandler 会创建 一个 Http11NioProcessor 对象最终处理 Socket,这里不分析具体的协议处理,来看看几种处理结果:

public SocketState process(NioChannel socket) {
  Http11NioProcessor processor = null;
  try {
    processor = connections.remove(socket);
    ...
    SocketState state = processor.process(socket);
    if (state == SocketState.LONG) {
      // 在处理request和生成response之间,保持socket和此processor的关联
      connections.put(socket, processor);
      // 通常是收到了不完整的请求行,再次以 OP_READ 注册到 Poller 上
      socket.getPoller().add(socket);
    } else if (state == SocketState.OPEN) {
      // 长连接,Http 保活,回收 processor
      release(socket, processor);
      // 此时已处理一个完整的请求并响应,再次注册到 Poller 上,等待处理下个请求
      socket.getPoller().add(socket);
    } else if (state == SocketState.SENDFILE) {
      // 处理文件
      connections.put(socket, processor);
    } else {
      // 连接关闭,回收 processor
      release(socket, processor);
    }
    return state;
  } catch (...) {...}
  release(socket, processor);
  return SocketState.CLOSED;
}

5. 模拟阻塞的实现

模拟阻塞是通过 NioBlockingSelector 和 BlockPoller,以及 KeyAttachment 中的两个 CountDownLatch 读写闭锁合作完成。这里分析阻塞读,阻塞写的实现类似。一般的,在读取 POST 请求参数时会使用模拟阻塞完成,来看下 NioBlockingSelector.read() 方法的具体实现:

public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
  // 拿到通道在 Poller 上注册的 key
  SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
  if ( key == null ) throw new IOException("Key no longer registered");
  KeyReference reference = new KeyReference();
  // key 的附加对象
  KeyAttachment att = (KeyAttachment) key.attachment();
  int read = 0; // 读取的字节数
  boolean timedout = false; // 是否超时
  int keycount = 1; //assume we can write 假设通道可读
  long time = System.currentTimeMillis(); //start the timeout timer
  try {
    while ( (!timedout) && read == 0) {
      if (keycount > 0) { //only read if we were registered for a read
        // 尝试读取一次,如果通道无数据可读则返回 0,若连接断开则返回 -1
        int cnt = socket.read(buf);
        if (cnt == -1) throw new EOFException();
        read += cnt;
        if (cnt > 0) break;
      }
      try {
        // 初始化读闭锁
        if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1);
        // 将此通道注册到 BlockPoller,关注读取事件
        poller.add(att,SelectionKey.OP_READ, reference);
        // 阻塞等待通道可读
        att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS);
      }catch (InterruptedException ignore) {
        Thread.interrupted();
      }
      if ( att.getReadLatch()!=null && att.getReadLatch().getCount()> 0) {
        // 被打断了,但是没有接收到 blockPoller 的提醒
        keycount = 0;
        // 继续循环等待可读
      }else {
        //通道可读,重置读闭锁
        keycount = 1;
        att.resetReadLatch();
      }
      if (readTimeout > 0 && (keycount == 0)) // 如果超时了,则不再读取,抛异常
        timedout = (System.currentTimeMillis() - time) >= readTimeout;
    } //while
    if (timedout)
        throw new SocketTimeoutException();
  } finally {
    poller.remove(att,SelectionKey.OP_READ); // 移除注册
    if (timedout && reference.key!=null) {
        poller.cancelKey(reference.key); // 超时取消
    }
    reference.key = null;
  }
  return read;
}

BlockPoller 实现逻辑与 Poller 大致相同,不同的地方在于对就绪 key 的处理,核心代码如下:

Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (run && iterator != null && iterator.hasNext()) {
  SelectionKey sk = iterator.next();
  KeyAttachment attachment = (KeyAttachment)sk.attachment();
  try {
    attachment.access();
    iterator.remove();
    // 移除已就绪的事件
    sk.interestOps(sk.interestOps() & (~sk.readyOps()));
    // 可读或可写时减少对应闭锁的值,此时阻塞在 NioBlockingSelector.read() 上的线程继续执行读取
    if ( sk.isReadable() ) {
        countDown(attachment.getReadLatch());
    }
    if (sk.isWritable()) {
        countDown(attachment.getWriteLatch());
    }
  }catch (CancelledKeyException ckx) {
    if (sk!=null) sk.cancel();
    countDown(attachment.getReadLatch());
    countDown(attachment.getWriteLatch());
  }
}//while

6. 小结

至此,本文对连接的接收、分发以及模拟阻塞的核心代码实现进行了分析,为了更好的理解内部流程,尽可能的使用简洁的代码仿写了这部分功能。本文首发于(微信公众号:顿悟源码),交流QQ群:673986158

源码地址https://github.com/tonwu/rxtomcat 位于 rxtomcat-net 模块

7. Tomcat 8.5 版本变化

7.1 替换缓存数据结构

Tomcat 对 PollerEvent、NioChannel 和 Processor 对象进行了缓存,目的是减少 GC 提高系统性能,这是一种用空间换时间,被称为对象池的优化手段。从版本 8.* 开始,缓存数据结构从 ConcurrentLinkedQueue 换成了自定义的同步栈 SynchronizedStack。SynchronizedStack 的 javadoc 明确说明:

当需要创建一个无需缩小的可重用对象池时,这是 ConcurrentLinkedQueue 无 GC 的主要替代方案。目的是尽可能快地以最少的垃圾提供最少的所需功能。

在这个特殊的情况下,ConcurrentLinkedQueue 有很多功能是不需要的,所以就实现了一个有重点的类,可以专注完成一件事,来提升性能。但它不是 ConcurrentLinkedQueue 的替代品。

7.2 LimitLatch

Acceptor 在接收连接前添加了一个 LimitLatch(类似信号量)来控制总连接数。分析下如果不加有什么现象,在极端情况下,线程池没有空闲线程并且它内部的队列已满,当有通道发生可读或可写事件时,Poller 会关闭此通道,此时系统负载已达到最高,如果 Acceptor 还在继续接收连接并请求注册,而不加限制,那么就会一直重复 PollerEvent 入队出队和 Poller 单纯关闭通道的操作,浪费系统资源。

标签:OP,NIO,Tomcat,模型,阻塞,key,通道,Poller,socket
来源: https://blog.csdn.net/weixin_42885157/article/details/89842754