数据库
首页 > 数据库> > redis源码阅读二-终于把redis的启动流程搞明白了

redis源码阅读二-终于把redis的启动流程搞明白了

作者:互联网

阅读redis的源码永远也绕不过它的启动。我们来看下redis的启动流程。不想看代码可以直接看最后的流程图。

以下源码分析是redis的5.0分支 源码注释:https://github.com/yxkong/redis/commits/5.0

这是启动流程的核心代码。

int main(int argc, char **argv) {
    //申请空间oom后的处理器
    zmalloc_set_oom_handler(redisOutOfMemoryHandler);
        //哨兵模式
    server.sentinel_mode = checkForSentinelMode(argc,argv);
    //初始化服务器配置(默认值)
    initServerConfig();
    if (argc >= 2) {
        //将配置文件的内容填充到server中
        loadServerConfig(configfile,options);
    }
    //守护进程
    int background = server.daemonize && !server.supervised;
    if (background) daemonize();
    //初始化server服务
    initServer();
    InitServerLast();
    aeMain(server.el); 
}
    

初始化服务配置

initServerConfig中只是给变量赋值了默认值

void initServerConfig(void) {
    //初始化锁
    pthread_mutex_init(&server.next_client_id_mutex,NULL);
    pthread_mutex_init(&server.lruclock_mutex,NULL);
    pthread_mutex_init(&server.unixtime_mutex,NULL);
    //数据库配置填充
    //淘汰策略
    //持久化配置
    //将预定义的命令填充到server.commands
    populateCommandTable();
    

加载配置文件

将配置文件的内容填充到server中覆盖初始化变量 在config.c中

void loadServerConfig(char *filename, char *options) {
   //读取配置到config
    while(fgets(buf,CONFIG_MAX_LINE+1,fp) != NULL)
                config = sdscat(config,buf);
    //将配置填充到server,这里是各种if else逻辑     
    loadServerConfigFromString(config);
}

初始化server服务

这里是重头戏

void initServer(void) {
    //创建共享对象
    createSharedObjects();    
    //创建事件监听器,maxclients默认10000个
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    //socket监听,绑定几个ip,就监听几个,默认监听ipv6和ipv4
    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);
    //初始化化数据库的属性
    for (j = 0; j < server.dbnum; j++) {
        server.db[j].dict = dictCreate(&dbDictType,NULL);
        server.db[j].expires = dictCreate(&keyptrDictType,NULL);
        server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
        server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].id = j;
        server.db[j].avg_ttl = 0;
        server.db[j].defrag_later = listCreate();
    }
    /**
     * @brief 创建时间处理器,并将serverCron放入处理器里(重要)
     * 在这里创建了aeTimeEvent并扔给了eventLoop->timeEventHead
     */
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
    /**
     * @brief 重点 ##########
     * 监听多少个tcp就创建多少个
     */
    for (j = 0; j < server.ipfd_count; j++) {
        //将acceptTcpHandler 放入文件监听器里,
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
    //慢日志初始化
    slowlogInit();
    //monitor初始化
    latencyMonitorInit();
}

在createSharedObjects()里主要创建了共享对象,包括一些异常提示,服务交互指令,共享int等

在ae.c中,我们看下aeCreateEventLoop

/**
 * @brief 创建事件监听器
 * 
 * @param setsize 比配置的最大链接数要多96,为了安全处理(比如有的处理完了,还没有释放,多创建的就相当于缓冲队列了)
 * @return aeEventLoop* 
 */
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;
    //分配空间
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    //创建对应数量的aeFileEvent空间
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    //创建对应数量的aeFiredEvent空间,此处承载的是触发事件(从epoll里读取后会放入这里)
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    //时间事件链表头节点(只有一个)
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    //创建一个aeApiState 给eventLoop(不同的系统实现不同,aeApiState也不同)
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

在ae.c中,看下aeCreateFileEvent

**
 * @brief 创建文件事件监听器并放入到eventLoop->events中
 *   注册acceptTcpHandler处理AE_READABLE和AE_WRITABLE
 * @param eventLoop 
 * @param fd 对应tcp socket的fd值
 * @param mask 
 * @param proc 处理器acceptTcpHandler
 * @param clientData 
 * @return int 
 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    // 将创建的aeFileEvent 赋值给了&eventLoop->events[fd]
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    // 将处理器给rfileProc和wfileProc(后续会拿着这个直接执行)
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

初始化后端线程

void InitServerLast() {
    bioInit();
    server.initial_memory_usage = zmalloc_used_memory();
}
bioInit() 在 bio.c中
/**
 * @brief 初始化后台线程
 * 
 */
void bioInit(void) {
    /**
     * @brief 根据后台线程的数量创建线程
     * 
     */
    for (j = 0; j < BIO_NUM_OPS; j++) {
        //初始化一个NUll 的互斥锁
        pthread_mutex_init(&bio_mutex[j],NULL);
        //新任务条件变量
        pthread_cond_init(&bio_newjob_cond[j],NULL);
        //下一个执行的条件变量
        pthread_cond_init(&bio_step_cond[j],NULL);
        //创建一个list
        bio_jobs[j] = listCreate();
        //待处理
        bio_pending[j] = 0;
    }
   /**
     * @brief 创建后端线程,并放入线程组
     * 
     */
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        //创建线程,并启动bioProcessBackgroundJobs
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread;
    }
}


最重要的 aeMain

在ae.c中

/**
 * @brief 执行eventLoop
 * 
 * @param eventLoop 
 */
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    //只要没有停止,就循环执行,这个是主线程
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

/**
 * @brief 处理eventLoop里等待的 定时任务,文件事件,过期事件
 * @param eventLoop 
 * @param flags 事件类型,
 * 从main中过来是所有的事件,
 * 从networking过来是文件事件
 * @return int 
 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags){
     //先从eventLoop取下一个要执行的定时器,如果下一次定时还有一定的时间间隔,那么就让epoll阻塞间隔的时间获取数据,否则则立即执行
        /**
         * @brief 从epoll里拿到numevents数量的数据
         * 有时间,就阻塞指定的时间,没有时间,直到有数据
         */
        numevents = aeApiPoll(eventLoop, tvp);
         /**
         * @brief 执行触发的事件
         * 
         */
        for (j = 0; j < numevents; j++) {
            //获取一个事件处理器
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];  
            //处理读事件,此处的rfileProc和wfileProc 可以直接理解为acceptTcpHandler
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
        }
    //处理定时任务
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    //处理完以后进入另一次循环
    return processed; /* return the number of processed file/time events */
}


ae_kqueue.c中

/**
 * @brief 单位时间内获取事件数量
 * 
 * @param eventLoop 
 * @param tvp 在单位时间内
 * @return int 返回待处理的事件数量
 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    if (tvp != NULL) {
        struct timespec timeout;
        timeout.tv_sec = tvp->tv_sec;
        timeout.tv_nsec = tvp->tv_usec * 1000;
        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, &timeout);
    } else {
        /**
         * @brief 获取已经就绪的文件描述符数量
         * timeout指针为空,那么kevent()会永久阻塞,直到事件发生
         */
        retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, NULL);
    }

    //将事件填充到eventLoop->fired中
    if (retval > 0) {
        int j;

        numevents = retval;
        for(j = 0; j < numevents; j++) {
            int mask = 0;
            struct kevent *e = state->events+j;

            if (e->filter == EVFILT_READ) mask |= AE_READABLE;
            if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->ident;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

终极图

标签:AE,int,mask,流程,eventLoop,redis,server,源码,NULL
来源: https://blog.csdn.net/f80407515/article/details/121863795