其他分享
首页 > 其他分享> > 记一次openresty协程返回结果错乱排查

记一次openresty协程返回结果错乱排查

作者:互联网

记一次openresty协程返回结果错乱排查

现场

在我普通的日常开发中,我写了一段普通查redis的代码,上线以后马上有报错,nginx errorlog如下

ERROR : "xxx/redis.lua:175: bad argument #1 to 'byte' (string expected, got boolean)" "POST xxx HTTP/1.1"

然后马上看了下产生报错的代码

local function _read_reply(self, sock)
    local line, err = sock:receive()
    if not line then
        if err == "timeout" and not self.subscribed then
            sock:close()
        end
        return nil, err
    end

    local prefix = byte(line)

redis.lua是agentzh写的开源库,看到这个很难理解,因为 sock:receive() 即ngx.socket.tcp.receive() 方法返回的应该是 line 和err都应该是字符串,line怎么会是bool呢

排查经过

然后打印了一下line 和 err

redis debug: line:true err:{"warning_count":0,"affected_rows":1,"insert_id":331954435,"server_status":2}
    -- update db
    local sql = "INSERT INTO xxxxxxxx;"
    --db_api("write", sql)  --db_api()是我们自己的db中间件
    local co, err = ngx.thread.spawn(db_api, sql)
    if co then
        local ok, ret = ngx.thread.wait(co)
    end

然后对了一下上面这个数据库的主键,果然是到了3亿多,那无疑就是redis receive 的时候返回了db中间件的结果,line=true,err=table 就和ngx.thread.wait函数的返回对上了

    -- spawn all threads
    for i = 1, #req_list do
        ...

        local thread, err = ngx.thread.spawn(icapture, uri, options, origin)
        if thread then
            threads[i] = thread
        
        ...
        end
    end

    local res, origin =  ngx.thread.wait(threads)

感觉问题有可能出现在这里,在官方文档中wait函数在等待多个协程时,只有有任意一个终止就会返回-any。

结果反复排查 确实是wait函数的问题,写了个复现case

local thread2,err  = ngx.thread.spawn(function()
    ngx.sleep(2)
    return 2
end)
local thread3,err  = ngx.thread.spawn(function()
    ngx.sleep(1)
    return 3
end)
local ok, ret = ngx.thread.wait(thread2, thread3)

local thread1,err  = ngx.thread.spawn(function()
    ngx.sleep(3)
    return 1
end)
local ok, ret = ngx.thread.wait(thread1)

ngx.say('expect 1,got '..ret)

输出确实是

expect 1,got 2

从表现上来说是在使用wait()等待过多个子协程之后,openresty并没有抛弃同组其他的子协程的返回值,在发起新的spawn-wait thread1过程中就会返回了 thread2的结果。

源码分析

首先是wait()的实现

static int
ngx_http_lua_uthread_wait(lua_State *L)
{
    int                          i, nargs, nrets;
    lua_State                   *sub_co;
    ngx_http_request_t          *r;
    ngx_http_lua_ctx_t          *ctx;
    ngx_http_lua_co_ctx_t       *coctx, *sub_coctx;

    r = ngx_http_lua_get_req(L);
    if (r == NULL) {
        return luaL_error(L, "no request found");
    }

    ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
    if (ctx == NULL) {
        return luaL_error(L, "no request ctx found");
    }

    ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_YIELDABLE);

    coctx = ctx->cur_co_ctx;

    nargs = lua_gettop(L);
    if (nargs == 0) {
        return luaL_error(L, "at least one coroutine should be specified");
    }

    for (i = 1; i <= nargs; i++) {
        sub_co = lua_tothread(L, i);

        ...
        luaL_argcheck(L, sub_co, i, "lua thread expected");

        sub_coctx = ngx_http_lua_get_co_ctx(sub_co, ctx);

        switch (sub_coctx->co_status) {
        case NGX_HTTP_LUA_CO_ZOMBIE:

            ngx_http_lua_probe_info("found zombie child");

            nrets = lua_gettop(sub_coctx->co);

            dd("child retval count: %d, %s: %s", (int) nrets,
               luaL_typename(sub_coctx->co, -1),
               lua_tostring(sub_coctx->co, -1));

            if (nrets) {
                lua_xmove(sub_coctx->co, L, nrets);
            }


            return nrets;

        case NGX_HTTP_LUA_CO_DEAD:
            dd("uthread already waited: %p (parent %p)", sub_coctx,
               coctx);

            if (i < nargs) {
                /* just ignore it if it is not the last one */
                continue;
            }

            /* being the last one */
            lua_pushnil(L);
            lua_pushliteral(L, "already waited or killed");
            return 2;

        default:
            dd("uthread %p still alive, status: %d, parent %p", sub_coctx,
               sub_coctx->co_status, coctx);
            break;
        }

        ngx_http_lua_probe_user_thread_wait(L, sub_coctx->co);
        sub_coctx->waited_by_parent = 1;
    }

    return lua_yield(L, 0);
}

ngx.thread.wait 在接受多个子协程作为参数的时候,
主要流程就是循环传进来的子协程,如果是LUA_CO_ZOMBIE ,说明这个子协程执行完成,会直接返回结果
对没有运行完的子协程标记 waited_by_parent = 1,标记已经被等待过,
最后yield,让出cpu,等下次唤醒

协程运行函数 ngx_http_lua_run_thread()

ngx_http_lua_run_thread(lua_State *L, ngx_http_request_t *r,
ngx_http_lua_ctx_t *ctx, volatile int nrets) 
        
        ... 
        if (ctx->cur_co_ctx->is_uthread) {
            /* being a user thread */

            lua_settop(L, 0);

            parent_coctx = ctx->cur_co_ctx->parent_co_ctx;

            if (ngx_http_lua_coroutine_alive(parent_coctx)) {
                if (ctx->cur_co_ctx->waited_by_parent) {
                    ngx_http_lua_probe_info("parent already waiting");
                    ctx->cur_co_ctx->waited_by_parent = 0;
                    success = 1;
                    goto user_co_done;
                }

                ngx_http_lua_probe_info("parent still alive");

                if (ngx_http_lua_post_zombie_thread(r, parent_coctx,
                                                    ctx->cur_co_ctx)
                    != NGX_OK)
                {
                    return NGX_ERROR;
                }

                lua_pushboolean(ctx->cur_co_ctx->co, 1);
                lua_insert(ctx->cur_co_ctx->co, 1);

                ctx->cur_co_ctx->co_status = NGX_HTTP_LUA_CO_ZOMBIE;
                ctx->cur_co_ctx = NULL;
                return NGX_AGAIN;
            }

            ngx_http_lua_del_thread(r, L, ctx, ctx->cur_co_ctx);
            ctx->uthreads--;

            if (ctx->uthreads == 0) {
                if (ngx_http_lua_entry_thread_alive(ctx)) {
                    ctx->cur_co_ctx = NULL;
                    return NGX_AGAIN;
                }

                /* all threads terminated already */
                goto done;
            }

            /* some other user threads still running */
            ctx->cur_co_ctx = NULL;
            return NGX_AGAIN;
        }
        ....

我们可以看到主体之后,is_uthread 先判断是不是子协程,ngx_http_lua_coroutine_alive() 判断父协程是否存活,然后再判断waited_by_parent 是否有被父协程wait过,如果wait过就执行 user_co_done 做结束的数据交换工作

user_co_done:

                nrets = lua_gettop(ctx->cur_co_ctx->co);

                next_coctx = ctx->cur_co_ctx->parent_co_ctx;

                if (next_coctx == NULL) {
                    /* being a light thread */
                    goto no_parent;
                }

                next_co = next_coctx->co;

                if (nrets) {
                    lua_xmove(ctx->cur_co_ctx->co, next_co, nrets);
                }

                if (ctx->cur_co_ctx->is_uthread) {
                    ngx_http_lua_del_thread(r, L, ctx, ctx->cur_co_ctx);
                    ctx->uthreads--;
                }

                if (!ctx->cur_co_ctx->is_wrap) {
                    /*
                     * ended successfully, coroutine.resume returns true plus
                     * any return values
                     */
                    lua_pushboolean(next_co, success);
                    lua_insert(next_co, 1);
                    nrets++;
                }

                ctx->cur_co_ctx = next_coctx;

                ngx_http_lua_probe_info("set parent running");

                next_coctx->co_status = NGX_HTTP_LUA_CO_RUNNING;

                ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                               "lua coroutine: lua user thread ended normally");

                continue;

lua_pushboolean() 额外添加 wait()的第一个参数 true,之后结果放入父协程ctx,循环结束resume父协程

源码分析结果

怎么处理

这个问题发现在github issue也发现了有人提过
https://github.com/openresty/lua-nginx-module/issues/1439
不光 sock.receive,sleep也是协程,所以也拿到了完成的子协程的返回值

不过agentzh不觉得这是个bug,是feature。作者不打算调整那我们只能在使用上注意

使用注意

 
  local capture = ngx.location.capture
  local spawn = ngx.thread.spawn
  local wait = ngx.thread.wait
  local say = ngx.say
 
  local function fetch(uri)
      return capture(uri)
  end
 
  local threads = {
      spawn(fetch, "/foo"),
      spawn(fetch, "/bar"),
      spawn(fetch, "/baz")
  }
 
  for i = 1, #threads do
      local ok, res = wait(threads[i])
      if not ok then
          say(i, ": failed to run: ", res)
      else
          say(i, ": status: ", res.status)
          say(i, ": body: ", res.body)
      end
  end

如果场景确实就是需要wait any的情况下(反正我是不会这样用了),在获取到一个结果后,可能需要:

  1. 确保所有其他子协程都执行完毕,比如使用ngx.thread.kill、ngx.sleep
  2. 然后for + ngx.sleep(0.01) 把ctx里脏数据清空(n = wait协程的数量,上文也说过ngx.sleep 也是协程,所以sleep的返回值就是之前其他子协程的返回值)

标签:co,thread,ctx,lua,openresty,错乱,协程,ngx,wait
来源: https://www.cnblogs.com/wenzaicaicai/p/16500102.html