其他分享
首页 > 其他分享> > muduo之TcpServer

muduo之TcpServer

作者:互联网

         TcpServer拥有Acceptor类,新连接到达时new TcpConnection后续客户端和TcpConnection类交互。TcpServer管理连接和启动线程池,用Acceptor接受连接。

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include "muduo/net/TcpServer.h"

#include "muduo/base/Logging.h"
#include "muduo/net/Acceptor.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/EventLoopThreadPool.h"
#include "muduo/net/SocketsOps.h"

#include <stdio.h>  // snprintf

using namespace muduo;
using namespace muduo::net;

TcpServer::TcpServer(EventLoop* loop,
                     const InetAddress& listenAddr,
                     const string& nameArg,
                     Option option)
  : loop_(CHECK_NOTNULL(loop)), //TcpServer所在的主线程下运行的事件驱动循环
    ipPort_(listenAddr.toIpPort()),/* 服务器负责监听的本地ip和端口 */
    name_(nameArg),/* 服务器名字,创建时传入 */
    acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),/* Acceptor对象,负责监听客户端连接请求,运行在主线程的EventLoop中 */
    threadPool_(new EventLoopThreadPool(loop, name_)),/* 事件驱动线程池,池中每个线程运行一个EventLoop */
    connectionCallback_(defaultConnectionCallback),/* 用户传入,有tcp连接到达或tcp连接关闭时调用,传给TcpConnection */
    messageCallback_(defaultMessageCallback),/* 用户传入,对端发来消息时调用,传给TcpConnection */
    nextConnId_(1) /* TcpConnection特有id,每增加一个TcpConnection,nextConnId_加一 */
{ 
  /* 
   * 设置回调函数,当有客户端请求时,Acceptor接收客户端请求,然后调用这里设置的回调函数
   * 回调函数用于创建TcpConnection连接
   */
  acceptor_->setNewConnectionCallback(
      std::bind(&TcpServer::newConnection, this, _1, _2));
}

TcpServer::~TcpServer()
{
  loop_->assertInLoopThread();
  LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";

  for (auto& item : connections_)
  {
    TcpConnectionPtr conn(item.second);
    item.second.reset();
    conn->getLoop()->runInLoop(
      std::bind(&TcpConnection::connectDestroyed, conn));
  }
}

void TcpServer::setThreadNum(int numThreads)
{
  assert(0 <= numThreads);
  threadPool_->setThreadNum(numThreads);
}

void TcpServer::start()
{
  if (started_.getAndSet(1) == 0)
  {
    threadPool_->start(threadInitCallback_);//启动线程池,threadInitCallback_创建好所有线程后调用的回调函数

    assert(!acceptor_->listenning());
    loop_->runInLoop(       //直接调用linsten函数
        std::bind(&Acceptor::listen, get_pointer(acceptor_)));
  }
}

/* 
 * Acceptor接收客户端请求后调用的回调函数
 * @param sockfd: 已经接收完成(三次握手完成)后的客户端套接字
 * @param peerAddr: 客户端地址
 * 
 * Acceptor只负责接收客户端请求
 * TcpServer需要生成一个TcpConnection用于管理tcp连接
 * 
 * 1.TcpServer内有一个EventLoopThreadPool,即事件循环线程池,池子中每个线程都是一个EventLoop
 * 2.每个EventLoop包含一个Poller用于监听注册到这个EventLoop上的所有Channel
 * 3.当建立起一个新的TcpConnection时,这个连接会放到线程池中的某个EventLoop中
 * 4.TcpServer中的baseLoop只用来检测客户端的连接
 * 
 * 从libevent的角度看就是
 * 1.EventLoopThreadPool是一个struct event_base的池子,池子中全是struct event_base
 * 2.TcpServer独占一个event_base,这个event_base不在池子中
 * 3.TcpConnection会扔到这个池子中的某个event_base中
 */
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
  loop_->assertInLoopThread();
  EventLoop* ioLoop = threadPool_->getNextLoop();//从事件驱动线程池中取出一个线程给TcpConnection 
   /* 为TcpConnection生成独一无二的名字 */
  char buf[64];
  snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
  ++nextConnId_;
  string connName = name_ + buf;

  LOG_INFO << "TcpServer::newConnection [" << name_
           << "] - new connection [" << connName
           << "] from " << peerAddr.toIpPort();
  /* 
   * 根据sockfd获取tcp连接在本地的<地址,端口>
   * getsockname(int fd, struct sockaddr*, int *size);
   */
  InetAddress localAddr(sockets::getLocalAddr(sockfd));
  // FIXME poll with zero timeout to double confirm the new connection
  // FIXME use make_shared if necessary
  /* 创建一个新的TcpConnection代表一个Tcp连接 */
  TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));
   /* 添加到所有tcp 连接的map中,键是tcp连接独特的名字(服务器名+客户端<地址,端口>) */
  connections_[connName] = conn;
   /* 为tcp连接设置回调函数(由用户提供) */
  conn->setConnectionCallback(connectionCallback_);
  conn->setMessageCallback(messageCallback_);
  conn->setWriteCompleteCallback(writeCompleteCallback_);
  /* 
   * 关闭回调函数,由TcpServer设置,作用是将这个关闭的TcpConnection从map中删除
   * 当poll返回后,发现被激活的原因是EPOLLHUP,此时需要关闭tcp连接
   * 调用Channel的CloseCallback,进而调用TcpConnection的handleClose,进而调用removeConnection
   */
  conn->setCloseCallback(
      std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe

  /* 
   * 连接建立后,调用TcpConnection连接建立成功的函数
   * 1.新建的TcpConnection所在事件循环是在事件循环线程池中的某个线程
   * 2.所以TcpConnection也就属于它所在的事件驱动循环所在的那个线程
   * 3.调用TcpConnection的函数时也就应该在自己所在线程调用
   * 4.所以需要调用runInLoop在自己的那个事件驱动循环所在线程调用这个函数
   * 5.当前线程是TcpServer的主线程,不是TcpConnection的线程,如果在这个线程直接调用会阻塞监听客户端请求
   * 6.其实这里不是因为线程不安全,即使在这个线程调用也不会出现线程不安全,因为TcpConnection本就是由这个线程创建的
   */
  ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
  // FIXME: unsafe
  loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
  //关闭连接,把fd从epoll中del掉,要释放connector(包括描述符)和channel
  loop_->assertInLoopThread();
  LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
           << "] - connection " << conn->name();
  size_t n = connections_.erase(conn->name());
  (void)n;
  assert(n == 1);
  EventLoop* ioLoop = conn->getLoop();
  ioLoop->queueInLoop(
      std::bind(&TcpConnection::connectDestroyed, conn));
}

        不多说

标签:muduo,TcpConnection,TcpServer,调用,线程,连接,conn
来源: https://blog.51cto.com/u_15144024/2988084