系统相关
首页 > 系统相关> > 通过分析nginx upstream源码来看动态配置upstream模块

通过分析nginx upstream源码来看动态配置upstream模块

作者:互联网

upstream回源处理流程

代码围绕着ngx_http_upstream.c展开,该模块主要为创建mainconf函数:

static void *

ngx_http_upstream_create_main_conf(ngx_conf_t *cf)

{

...

//创建main conf

umcf = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_main_conf_t));

//创建upstream数组,每一个ngx_http_upstream_srv_conf_t对应一个upstream

ngx_array_init(&umcf->upstreams, cf->pool, 4,

                       sizeof(ngx_http_upstream_srv_conf_t *)

//创建implicit_upstreams链表,用来存放proxy_pass创建出来的upstream,真正创建upstream的时候用

ngx_list_init(&umcf->implicit_upstreams, cf->pool, 4,

                       sizeof(ngx_http_upstream_srv_conf_t *)

//创建rbtree。

ngx_rbtree_init(&umcf->rbtree, &umcf->sentinel,

                    ngx_http_upstream_rbtree_insert_value);

...

}

配置中的upstream会触发ngx_http_upstream函数,该配置作用于mainconf的配置

static char *

ngx_http_upstream(ngx_conf_t *cf, ngx_command_t *cmd, void *dummy)

{

...

// upstream的name
value = cf->args->elts;
u.host = value[1];

//生成uscf结构体
uscf = ngx_http_upstream_add(cf, &u, flag);


//创建配置指针,保存当前http块的配置,将生成配置的main_conf指向http块

ctx = ngx_pcalloc(cf->pool, sizeof(ngx_http_conf_ctx_t));
http_ctx = cf->ctx;
ctx->main_conf = http_ctx->main_conf;


//将生成的uscf放在upstream module对应srv配置上,因为这个模块没有create srv conf
ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_http_max_module);
ctx->srv_conf[ngx_http_upstream_module.ctx_index] = uscf;
uscf->srv_conf = ctx->srv_conf;


//接下来是生成loc conf并且将全部的模块过滤一次,生成对应的srv conf和loc conf,不用merge的是因为在ups里的配置不可能出现在别的地方了,这些生成的conf会在接下来使用

uscf->servers = ngx_array_create(cf->pool, 4,
                                     sizeof(ngx_http_upstream_server_t));

// 接下来开始解析里面的配置,主要是servers
pcf = *cf;
cf->ctx = ctx;
cf->cmd_type = NGX_HTTP_UPS_CONF;
rv = ngx_conf_parse(cf, NULL);

...

}

ngx_http_upstream_add函数是当前的upstream加进去,这个函数在遇到“upstream”的时候,是带着create标志位的,这个就是正了八经的创建,如果遇到proxy_pass,带的标志位是0

ngx_http_upstream_srv_conf_t *
ngx_http_upstream_add(ngx_conf_t *cf, ngx_url_t *u, ngx_uint_t flags)
{

...

// 红黑树里寻找uscf,对应upstream的配置
umcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_upstream_module);

uscf = ngx_http_upstream_rbtree_lookup(umcf, &u->host);

// not found的时候,再找找看有没有proxy_pass搞出来的part
not_found:
    part = &umcf->implicit_upstreams.part;
    uscfp = part->elts;

    // 最后的创建,先看是否带标志位,如果带则删掉part,insert进红黑树
if (flags & NGX_HTTP_UPSTREAM_CREATE) {
    uscfp[i]->flags = flags;


#if (NGX_HTTP_UPSTREAM_RBTREE)
    uscf = uscfp[i];
    ngx_rbtree_insert(&umcf->rbtree, &uscfp[i]->node);
    ngx_list_delete(&umcf->implicit_upstreams, &uscfp[i]);
    return uscf;

#endif

    }
...

}

分析完回源upstream处理流程,再来看dyups是怎么工作的?

dyups_module

动态修改upstream不reload nginx模块,ngx_http_dyups_module分析。

围绕ngx_http_dyups_module.c进行分析:

在create main conf的时候初始化这个数组

static void *

ngx_http_dyups_create_main_conf(ngx_conf_t *cf)

{
...

if (ngx_array_init(&dmcf->dy_upstreams, cf->pool, 1024,
                       sizeof(ngx_http_dyups_srv_conf_t))
        != NGX_OK)
    {
        return NULL;
    }
...

}

ngx_http_dyups_init

在dyups init的时候将upstream中的conf全部取出来放进去

//初始化dy_upstream链以及全局ngx_http_dyups_deleted_upstream

static ngx_int_t

ngx_http_dyups_init(ngx_conf_t *cf)

{
...

dmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_dyups_module);
umcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_upstream_module);

uscfp = umcf->upstreams.elts;

for (i = 0; i < umcf->upstreams.nelts; i++) {

    duscf = ngx_array_push(&dmcf->dy_upstreams);
    // 清零
    ngx_memzero(duscf, sizeof(ngx_http_dyups_srv_conf_t));

    duscf->pool = NULL;

    // 赋值
    duscf->upstream = uscfp[i];
    duscf->dynamic = (uscfp[i]->port == 0
                          && uscfp[i]->srv_conf && uscfp[i]->servers
                          && uscfp[i]->flags & NGX_HTTP_UPSTREAM_CREATE);
    duscf->deleted = 0;

    // 赋值index
    duscf->idx = i;
}
...

}

dyups share memory同步机制

shm初始化在ngx_http_dyups_init_main_conf函数中,同时设置了read_mesg的超时时间,并且指定了大小。

static char *

ngx_http_dyups_init_main_conf(ngx_conf_t *cf, void *conf)

{

...

if (dmcf->read_msg_timeout == NGX_CONF_UNSET_MSEC) {
    // 一秒一次
    dmcf->read_msg_timeout = 1000;
}

if (dmcf->shm_size == NGX_CONF_UNSET_UINT) {
    dmcf->shm_size = 2 * 1024 * 1024;
}

    return ngx_http_dyups_init_shm(cf, conf);
...

}
static char *

ngx_http_dyups_init_shm(ngx_conf_t *cf, void *conf)

{
...

shm_zone = ngx_shared_memory_add(cf, &dmcf->shm_name, dmcf->shm_size,
                                     &ngx_http_dyups_module);

shm_zone->data = cf->pool;

// 加进去的这个名头的共享内存块的init函数会在初始化的时候统一调用
shm_zone->init = ngx_http_dyups_init_shm_zone;

...
}
static ngx_int_t

ngx_http_dyups_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)

{

...
shpool = (ngx_slab_pool_t *) shm_zone->shm.addr;

sh = ngx_slab_alloc(shpool, sizeof(ngx_dyups_shctx_t));

if (sh == NULL) {
     return NGX_ERROR;
}

// 全局变量,sh和shpool

ngx_dyups_global_ctx.sh = sh;

ngx_dyups_global_ctx.shpool = shpool;

// 初始化msg->queue
ngx_queue_init(&sh->msg_queue);

sh->version = 0;
sh->status = NULL;

...

}

dyups init process

该函数在启动进程时候调用,设定定时器。

//初始化共享内存状态,判断如果是非正常退出,则重新加载upstream配置

static ngx_int_t

ngx_http_dyups_init_process(ngx_cycle_t *cycle)

{

...

// 设定定时器来定时read msg,同步信息

timer = &ngx_dyups_global_ctx.msg_timer;
timer->handler = ngx_http_dyups_read_msg;

ngx_add_timer(timer, dmcf->read_msg_timeout);

// 拿到全局的pool和sh
shpool = ngx_dyups_global_ctx.shpool;

sh = ngx_dyups_global_ctx.sh;

ngx_shmtx_lock(&shpool->mutex);

// 初始化的时候肯定是NULL,,申请对应数量进程数的内存

if (sh->status == NULL) {

    sh->status = ngx_slab_alloc_locked(shpool,
                sizeof(ngx_dyups_status_t) * ccf->worker_processes);

    if (sh->status == NULL) {
        ngx_shmtx_unlock(&shpool->mutex);
        return NGX_ERROR;

    }

    ngx_memzero(sh->status,
                sizeof(ngx_dyups_status_t) * ccf->worker_processes);

    ngx_shmtx_unlock(&shpool->mutex);
    return NGX_OK;
}

ngx_shmtx_unlock(&shpool->mutex);

// 判断version,如果不是0的话,说明version已经在同步中被++了,所以是进程挂掉再被拉起来

if (sh->version != 0) {
        // 这里后续再看...
}

最核心的是ngx_http_dyups_read_msg函数里的ngx_http_dyups_read_msg_locked函数

static void

ngx_http_dyups_read_msg_locked(ngx_event_t *ev)

{

...
sh = ngx_dyups_global_ctx.sh;

shpool = ngx_dyups_global_ctx.shpool;

for (i = 0; i < ccf->worker_processes; i++) {
    status = &sh->status[i];

    if (status->pid == 0 || status->pid == ngx_pid) {

        ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0,
                           "[dyups] process %P update time %ui",
                           status->pid, status->time);

        // 遍历全部进程,将对应的pid赋值
        status->pid = ngx_pid;

        status->time = now;

        break;

    }

}

// 遍历消息队列
for (q = ngx_queue_last(&sh->msg_queue);
         q != ngx_queue_sentinel(&sh->msg_queue);
         q = ngx_queue_prev(q))
{

    // 如果该msg的count和进程数一致,就是大家都同步过了,把这个msg删掉

    if (msg->count == ccf->worker_processes) {
        t = ngx_queue_next(q); ngx_queue_remove(q); q = t;

        ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0,
                   "[dyups] destroy msg %V:%V",
                    &msg->name, &msg->content);

        ngx_dyups_destroy_msg(shpool, msg);
        continue;
    }

    found = 0;

    for (i = 0; i < msg->count; i++) {
        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, 0,
                  "[dyups] msg pids [%P]", msg->pid[i]);

        if (msg->pid[i] == ngx_pid) {
                found = 1;
                break;
        }
    }

    // 如果发现该进程了,就说明已经同步过了,继续
    if (found) {
        ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0,
                    "[dyups] msg %V count %ui found",
                    &msg->name, msg->count);
            continue;
        }

        // 如果没发现的话,count++,pid更新
        msg->pid[i] = ngx_pid;

        msg->count++;
        ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0,
                       "[dyups] msg %V count %ui", &msg->name, msg->count);

        // 取出来name和content
        name = msg->name;
        content = msg->content;

        // 执行同步
        rc = ngx_dyups_sync_cmd(pool, &name, &content, msg->flag);
        if (rc != NGX_OK) {
            ngx_log_error(NGX_LOG_ALERT, ev->log, 0,
                          "[dyups] read msg error, may cause the "
                          "config inaccuracy, name:%V, content:%V",
                          &name, &content);
        }
}
...

}
static ngx_int_t

ngx_dyups_sync_cmd(ngx_pool_t *pool, ngx_str_t *name, ngx_str_t *content,

    ngx_uint_t flag)
{

...

} else if (flag == NGX_DYUPS_ADD) {

        body.start = body.pos = content->data;
        body.end = body.last = content->data + content->len;
        body.temporary = 1;

        rc = ngx_dyups_do_update(name, &body, &rv);

        ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, 0,
                      "[dyups] sync add: %V rv: %V rc: %i",
                      name, &rv, rc);

        if (rc != NGX_HTTP_OK) {
            return NGX_ERROR;
        }

        return NGX_OK;
    }
...

}

接下来就是ngx_dyups_do_update

同步其他进程接收的信息,如果是当前进程处理的,要把信息添加到消息队列中。


dyups update

ngx_dyups_update_upstream

ngx_int_t

ngx_dyups_update_upstream(ngx_str_t *name, ngx_buf_t *buf, ngx_str_t *rv)

{

...

// 先检查有没有需要做的操作
ngx_http_dyups_read_msg_locked(timer);

// 沙箱测试配置

status = ngx_dyups_sandbox_update(buf, rv);

if (status != NGX_HTTP_OK) {
        goto finish;
}

status = ngx_dyups_do_update(name, buf, rv);

if (status == NGX_HTTP_OK) {

    //关键的是把操作发到队列中去
    if (ngx_http_dyups_send_msg(name, buf, NGX_DYUPS_ADD)) {
            ngx_str_set(rv, "alert: update success "
                        "but not sync to other process");
            status = NGX_HTTP_INTERNAL_SERVER_ERROR;
        }
    }

...
}

接下来就是ngx_http_dyups_send_msg函数

static ngx_int_t

ngx_http_dyups_send_msg(ngx_str_t *name, ngx_buf_t *body, ngx_uint_t flag)

{
...

// 这个函数还是挺简单的,就是初始化整个msg,将name和body填充进去。
sh->version++;
ngx_queue_insert_head(&sh->msg_queue, &msg->queue);

...
}

dyups ngx_dyups_do_update

update之前先find寻找对应的upstream。

static ngx_http_dyups_srv_conf_t *

ngx_dyups_find_upstream(ngx_str_t *name, ngx_int_t *idx)

{
...

duscfs = dumcf->dy_upstreams.elts;

for (i = 0; i < dumcf->dy_upstreams.nelts; i++) {

    duscf = &duscfs[i];

    uscf = duscf->upstream;


    if (uscf->host.len != name->len
            || ngx_strncasecmp(uscf->host.data, name->data, uscf->host.len)
               != 0)
    {
        continue;
    }

    *idx = i;
    return duscf;

}

...
}

如果寻找到了则对idx赋值,一旦发现寻找到了对应name的dy_upstream进行判断。

调用的是ngx_dyups_mark_upstream_delete函数

static void

ngx_dyups_mark_upstream_delete(ngx_http_dyups_srv_conf_t *duscf)

{

...
// 获取umcf和uscf

uscf = duscf->upstream;

umcf = ngx_http_cycle_get_module_main_conf(ngx_cycle,
                        ngx_http_upstream_module);

// us获取这个dynamic upstream下的servers
us = uscf->servers->elts;

for (i = 0; i < uscf->servers->nelts; i++) {

    // 标志位置1
    us[i].down = 1;

    #if (NGX_HTTP_UPSTREAM_CHECK)
        if (us[i].addrs) {

        // 关闭peer,看宏定义主要关闭健康检查的peer
            ngx_http_upstream_check_delete_dynamic_peer(&uscf->host,
                                             us[i].addrs);
        }
    #endif

}

// 将upstream对应的index的配置变成一个dummy配置
uscfp[duscf->idx] = &ngx_http_dyups_deleted_upstream;


#if (NGX_HTTP_UPSTREAM_RBTREE)
    ngx_rbtree_delete(&umcf->rbtree, &uscf->node);
#endif

duscf->deleted = NGX_DYUPS_DELETING;

...
}

这里最重要的是check_delete_dynamic_peer

void

ngx_http_upstream_check_delete_dynamic_peer(ngx_str_t *name,
    ngx_addr_t *peer_addr)
{

...

/* 一堆比较 找到choosen*/
chosen = &peer[i];

chosen->shm->ref--;

if (chosen->shm->ref <= 0 && chosen->shm->delete != PEER_DELETED) {
    ngx_http_upstream_check_clear_dynamic_peer_shm(chosen->shm);
    chosen->shm->delete = PEER_DELETED;
}

ngx_shmtx_unlock(&chosen->shm->mutex);
ngx_http_upstream_check_clear_peer(chosen);

...
}

删完一次之后再find一次,idx大概率就变成-1了,然后就进行创建了。

static ngx_int_t

ngx_dyups_do_update(ngx_str_t *name, ngx_buf_t *buf, ngx_str_t *rv)

{

...

if (idx == -1) {

    duscf = ngx_array_push(&dumcf->dy_upstreams);
    // 这个uscfp是没有用处的,只为了给这个数组加一

    uscfp = ngx_array_push(&umcf->upstreams);

    ngx_memzero(duscf, sizeof(ngx_http_dyups_srv_conf_t));

    // 这块是为了获取在umcf中的新upstream的index值。

    idx = umcf->upstreams.nelts - 1;
}

duscf->idx = idx;

rc = ngx_dyups_init_upstream(duscf, name, idx);
rc = ngx_dyups_add_server(duscf, buf);

...
}

最重要的就是init_upstream和add_server。

init upstream的传参是dy_srv_conf_t、upstream的name,以及upstream链表中对应的index。

static ngx_int_t

ngx_dyups_init_upstream(ngx_http_dyups_srv_conf_t *duscf, ngx_str_t *name,
    ngx_uint_t index)
{

...

umcf = ngx_http_cycle_get_module_main_conf(ngx_cycle,
              ngx_http_upstream_module);

uscfp = umcf->upstreams.elts;

/*初始化uscf 也就是upstream的各个结构体*/
uscfp[index] = uscf; // 赋值
duscf->dynamic = 1;
duscf->upstream = uscf;

ctx = ngx_pcalloc(duscf->pool, sizeof(ngx_http_conf_ctx_t));

// 存放ctx
duscf->ctx = ctx;

// insert进去uscf
uscf->node.key = ngx_crc32_short(uscf->host.data, uscf->host.len);

ngx_rbtree_insert(&umcf->rbtree, &uscf->node);

...
}

static ngx_int_t

ngx_dyups_add_server(ngx_http_dyups_srv_conf_t *duscf, ngx_buf_t *buf)

{

...

ngx_dyups_parse_upstream(&cf, buf)

...

}

static char *

ngx_dyups_parse_upstream(ngx_conf_t *cf, ngx_buf_t *buf)

{

...
b = *buf;
ngx_memzero(&conf_file, sizeof(ngx_conf_file_t));

conf_file.file.fd = NGX_INVALID_FILE;
conf_file.buffer = &b;

cf->conf_file = &conf_file;

return ngx_conf_parse(cf, NULL);

...
}

ngx_dyups_do_delete

static ngx_int_t

ngx_dyups_do_delete(ngx_str_t *name, ngx_str_t *rv)

{

...
    duscf = ngx_dyups_find_upstream(name, &dumy);
    // 如查出来是NULL or 被标记删除 or 彻底删除,说明要删的这个有问题

    if (duscf == NULL || duscf->deleted) {

        ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0,
                      "[dyups] not find upstream %V %p", name, duscf);

        ngx_str_set(rv, "not found uptream");

        return NGX_HTTP_NOT_FOUND;

    }
// 没问题的话就这样正常删除
ngx_dyups_mark_upstream_delete(duscf);

...
}

ngx_dyups_find_upstream

find upstream主要做了查找、删除工作。

static ngx_http_dyups_srv_conf_t *

ngx_dyups_find_upstream(ngx_str_t *name, ngx_int_t *idx)

{

...

dumcf = ngx_http_cycle_get_module_main_conf(ngx_cycle, ngx_http_dyups_module);

duscfs = dumcf->dy_upstreams.elts;

for (i = 0; i < dumcf->dy_upstreams.nelts; i++) {

// 这块是在mark_upstream中被标记的

if (duscf->deleted == NGX_DYUPS_DELETING) {

// 确认可以删除,主要看这个ref的引用计数

if (*(duscf->ref) == 0) {

        ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, 0,
                              "[dyups] free dynamic upstream in find upstream"
                              " %ui", duscf->idx);


        duscf->deleted = NGX_DYUPS_DELETED;

        if (duscf->pool) {
            ngx_destroy_pool(duscf->pool);
            duscf->pool = NULL;
        }
    }
}

// 如果是deleted或者是deleting
// 如果遍历完啥也没找到就返回一个deleted

if (duscf->deleted == NGX_DYUPS_DELETING) {
            continue;
        }

    if (duscf->deleted == NGX_DYUPS_DELETED) {
        *idx = i;
        duscf_del = duscf;
        continue;
    }

    // 如果找到了就正常返回
    if (uscf->host.len != name->len
            || ngx_strncasecmp(uscf->host.data, name->data, uscf->host.len)
               != 0)
        {
            continue;
        }

        *idx = i;
        return duscf;
}
...

}

标签:...,http,ngx,nginx,源码,conf,upstream,dyups
来源: https://blog.csdn.net/realmardrid/article/details/122557477