其他分享
首页 > 其他分享> > c-是什么导致boost ::协程随机崩溃?

c-是什么导致boost ::协程随机崩溃?

作者:互联网

我有一个多线程应用程序,通过集成在boost :: asio中使用boost :: asio和boost ::协程.每个线程都有其自己的io_service对象.线程之间唯一的共享状态是连接池,当从连接池获得连接或从连接池返回连接时,连接池将被互斥锁锁定.当池中没有足够的连接时,我将无限的asio :: steady_tiemer推入池的内部结构中,并异步等待它,然后从couroutine函数中屈服.当其他线程返回到池的连接时,它检查是否有等待计时器,它从内部结构获取等待计时器,它获取其io_service对象,并发布一个lambda,唤醒该计时器以恢复挂起的协程.我在应用程序中发生随机崩溃.我尝试调查valgrind的问题.它发现了一些问题,但我无法理解,因为它们发生在boost ::协程和boost :: asio内部.这是我的代码和valgrind输出的片段.有人可以看到并解释问题吗?

这是调用代码:

template <class ContextsType>
void executeRequests(ContextsType& avlRequestContexts)
{
    AvlRequestDataList allRequests;
    for(auto& requestContext : avlRequestContexts)
    {
        if(!requestContext.pullProvider || !requestContext.toAskGDS())
            continue;

        auto& requests = requestContext.pullProvider->getRequestsData();
        copy(requests.begin(), requests.end(), back_inserter(allRequests));
    }

    if(allRequests.size() == 0)
        return;

    boost::asio::io_service ioService;
    curl::AsioMultiplexer multiplexer(ioService);

    for(auto& request : allRequests)
    {
        using namespace boost::asio;

        spawn(ioService, [&multiplexer, &request](yield_context yield)
        {
            request->prepare(multiplexer, yield);
        });
    }

    while(true)
    {
        try
        {
            VLOG_DEBUG(avlGeneralLogger, "executeRequests: Starting ASIO event loop.");
            ioService.run();
            VLOG_DEBUG(avlGeneralLogger, "executeRequests: ASIO event loop finished.");
            break;
        }
        catch(const std::exception& e)
        {
            VLOG_ERROR(avlGeneralLogger, "executeRequests: Error while executing GDS request: " << e.what());
        }
        catch(...)
        {
            VLOG_ERROR(avlGeneralLogger, "executeRequests: Unknown error while executing GDS request.");
        }
    }
}

这是在生成的lambda中调用的prepare函数实现:

void AvlRequestData::prepareImpl(curl::AsioMultiplexer& multiplexer,
                                 boost::asio::yield_context yield)
{
    auto& ioService = multiplexer.getIoService();
    _connection = _pool.getConnection(ioService, yield);
    _connection->prepareRequest(xmlRequest, xmlResponse, requestTimeoutMS);

    multiplexer.addEasyHandle(_connection->getHandle(),
                              [this](const curl::EasyHandleResult& result)
    {
        if(0 == result.responseCode)
            returnQuota();
        VLOG_DEBUG(lastSeatLogger, "Response " << id << ": " << xmlResponse);
        _pool.addConnection(std::move(_connection));
    });
}


void AvlRequestData::prepare(curl::AsioMultiplexer& multiplexer,
                             boost::asio::yield_context yield)
{
    try
    {
        prepareImpl(multiplexer, yield);
    }
    catch(const std::exception& e)
    {
        VLOG_ERROR(lastSeatLogger, "Error wile preparing request: " << e.what());
        returnQuota();
    }
    catch(...)
    {
        VLOG_ERROR(lastSeatLogger, "Unknown error while preparing request.");
        returnQuota();
    }
}

returnQuota函数是AvlRequestData类的纯虚拟方法,其在我所有测试中使用的TravelportRequestData类的实现如下:

void returnQuota() const override
{
    auto& avlQuotaManager = AvlQuotaManager::getInstance();
    avlQuotaManager.consumeQuotaTravelport(-1);
}

这是连接池的push和pop方法.

auto AvlConnectionPool::getConnection(
        TimerPtr timer,
        asio::yield_context yield) -> ConnectionPtr
{
    lock_guard<mutex> lock(_mutex);

    while(_connections.empty())
    {
        _timers.emplace_back(timer);
        timer->expires_from_now(
            asio::steady_timer::clock_type::duration::max());

        _mutex.unlock();
        coroutineAsyncWait(*timer, yield);
        _mutex.lock();
    }

    ConnectionPtr connection = std::move(_connections.front());
    _connections.pop_front();

    VLOG_TRACE(defaultLogger, str(format("Getted connection from pool: %s. Connections count %d.")
                                  % _connectionPoolName % _connections.size()));

    ++_connectionsGiven;

    return connection;
}

void AvlConnectionPool::addConnection(ConnectionPtr connection,
                                      Side side /* = Back */)
{
    lock_guard<mutex> lock(_mutex);

    if(Front == side)
        _connections.emplace_front(std::move(connection));
    else
        _connections.emplace_back(std::move(connection));

    VLOG_TRACE(defaultLogger, str(format("Added connection to pool: %s. Connections count %d.")
                                  % _connectionPoolName % _connections.size()));

    if(_timers.empty())
        return;

    auto timer = _timers.back();
    _timers.pop_back();

    auto& ioService = timer->get_io_service();
    ioService.post([timer](){ timer->cancel(); });

    VLOG_TRACE(defaultLogger, str(format("Connection pool %s: Waiting thread resumed.")
                                  % _connectionPoolName));
}

这是coroutineAsyncWait的实现.

inline void coroutineAsyncWait(boost::asio::steady_timer& timer,
                               boost::asio::yield_context yield)
{
    boost::system::error_code ec;
    timer.async_wait(yield[ec]);
    if(ec && ec != boost::asio::error::operation_aborted)
        throw std::runtime_error(ec.message());
}

最后是valgrind输出的第一部分:

==8189== Thread 41:
==8189== Invalid read of size 8
==8189== at 0x995F84: void boost::coroutines::detail::trampoline_push_void, void, boost::asio::detail::coro_entry_point, void (anonymous namespace)::executeRequests > >(std::vector<(anonymous namespace)::AvlRequestContext, std::allocator<(anonymous namespace)::AvlRequestContext> >&)::{lambda(boost::asio::basic_yield_context >)#1}>&, boost::coroutines::basic_standard_stack_allocator > >(long) (trampoline_push.hpp:65)
==8189== Address 0x2e3b5528 is not stack’d, malloc’d or (recently) free’d

当我使用带有调试器的valgrind时,它会停止在boost :: coroutine库中的trampoline_push.hpp中的以下函数中.

53│ template< typename Coro >
54│ void trampoline_push_void( intptr_t vp)
55│ {
56│     typedef typename Coro::param_type   param_type;
57│
58│     BOOST_ASSERT( vp);
59│
60│     param_type * param(
61│         reinterpret_cast< param_type * >( vp) );
62│     BOOST_ASSERT( 0 != param);
63│
64│     Coro * coro(
65├>        reinterpret_cast< Coro * >( param->coro) );
66│     BOOST_ASSERT( 0 != coro);
67│
68│     coro->run();
69│ }

解决方法:

最终,我发现当需要删除对象时,如果不正确使用shared_ptr和weak_ptr,boost :: asio将无法优雅地处理它.当确实发生崩溃时,它们很难调试,因为很难查看io_service队列在发生故障时正在做什么.

在最近完成了完整的异步客户端体系结构并遇到了随机崩溃问题之后,我将提供一些技巧.不幸的是,我不知道这些方法是否可以解决您的问题,但希望它为正确的方向提供了良好的开端.

Boost Asio协程使用技巧

>使用boost :: asio :: asio_handler_invoke而不是io_service.post():

auto& ioService = timer->get_io_service();

ioService.post(timer{ timer->cancel(); });

在协程中使用发布/发送通常是一个坏主意.从协程调用您时,请始终使用asio_handler_invoke.但是,在这种情况下,您可能可以安全地调用timer-> cancel()而不用将其发布到消息循环中.
>您的计时器似乎未使用shared_ptr对象.无论应用程序其余部分发生什么情况,都无法确定何时应销毁这些对象.我强烈建议您为所有计时器对象使用shared_ptr对象.同样,任何指向类方法的指针也应使用shared_from_this().如果使用普通格式,这会非常危险,如果它被破坏(在堆栈上)或超出了shared_ptr中其他位置的范围.无论做什么,都不要在对象的构造函数中使用shared_from_this()!

如果在执行io_service中的处理程序时遇到崩溃,但是该处理程序的一部分不再有效,则调试起来将非常困难.注入到io_service中的处理程序对象包括指向计时器的任何指针,或执行该处理程序可能必需的对象的指针.

我强烈建议您使用在所有asio类周围包裹的shared_ptr对象来解决问题.如果问题消失了,则可能是破坏顺序问题.
>故障地址在堆上的某个位置还是指向堆栈?这将帮助您诊断其对象是否在错误的时间超出了方法的范围,或者它是否是其他原因.例如,这向我证明,即使在单个线程应用程序中,我所有的计时器也必须成为shared_ptr对象.

标签:c,multithreading,boost-asio,valgrind,boost-coroutine
来源: https://codeday.me/bug/20191012/1901047.html