其他分享
首页 > 其他分享> > Concurrency-with-Modern-Cpp学习笔记 - 并发架构

Concurrency-with-Modern-Cpp学习笔记 - 并发架构

作者:互联网

并发架构

活动对象

​ 活动对象模式将执行与对象的成员函数解耦,每个对象会留在在自己的控制线程中。其目标是通过使用异步方法,处理调度器的请求,从而触发并发。维基百科:Active object。所以,这种模式也称为并发对象模式。

客户端的调用会转到代理,代理表现为活动对象的接口。服务提供活动对象的实现,并在单独的线程中运行。代理在运行时将客户端的调用转换为对服务的调用,调度程序将方法加入到激活列表中。调度器与服务在相同的线程中活动,并将方法调用从激活列表中取出,再将它们分派到相应的服务上。最后,客户端可以通过future从代理处获取最终的结果。

组件

活动对象模式由六个组件组成:

  1. 代理为活动对象的可访问方法提供接口。代理将触发激活列表的方法,并请求对象的构造。并且,代理和客户端运行在相同的线程中。
  2. 方法请求类定义了执行活动对象的接口。
  3. 激活列表的目标是维护挂起的请求,激活列表将客户端线程与活动对象线程解耦。代理对入队请求的进行处理,而调度器将请求移出队列。
  4. 调度器与代理可在不同的线程中运行。调度器会在活动对象的线程中运行,并决定接下来执行激活列表中的哪个请求。
  5. 可以通过服务实现活动对象,并在活动对象的线程中运行,服务也支持代理接口。
  6. future是由代理创造的,客户端可以从future上获取活动对象调用的结果。客户端可以安静等待结果,也可以对结果进行轮询。

在这里插入图片描述

代理

代理设计模式是《设计模式:可重用的面向对象软件的元素》中的经典模式,代理是其他对象的代表。典型的代理可以是远程代理CORBA、安全代理、虚拟代理或智能指针,如std::shared_ptr。每个代理会为它所代表的对象添加额外的功能。远程代理代表远程对象,并使客户端产生本地对象的错觉。安全代理通过对数据进行加密和解密,将不安全的连接转换为安全的连接。虚拟代理以惰性的方式封装对象的创建,智能指针将接管底层内存的生存期。

在这里插入图片描述

优点和缺点

介绍Active Object模式的最小实现前,先了解一下它的优点和缺点。

优点:

缺点:

具体实现

下面的示例展示了活动对象模式的简单实现。我没有定义一个请求,这应该由代理和服务实现。而且,当请求调度程序执行下一个请求时,服务应该只执行这个请求。

所涉及的类型为future<vector<future<pair<bool, int>>>>,这个类型的标识有点长。为了提高可读性,我使用了声明(第16 - 37行)。

// activeObject.cpp

#include <algorithm>
#include <deque>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <numeric>
#include <random>
#include <thread>
#include <utility>
#include <vector>

using std::async;//16
using std::boolalpha;
using std::cout;
using std::deque;
using std::distance;
using std::endl;
using std::for_each;
using std::find_if;
using std::future;
using std::lock_guard;
using std::make_move_iterator;
using std::make_pair;
using std::move;
using std::mt19937;
using std::mutex;
using std::packaged_task;
using std::pair;
using std::random_device;
using std::sort;
using std::thread;
using std::uniform_int_distribution;
using std::vector; //37

class IsPrime //39
{
public:
    pair<bool, int> operator()(int i)
    {
        for (int j = 2; j * j <= i; ++j)
        {
            if (i % j == 0)return std::make_pair(false, i);
        }
        return std::make_pair(true, i);
    }
}; //47

class ActivaeObject
{
public:
    future<pair<bool, int>> enqueueTask(int i)
    {
        IsPrime isPrime;
        packaged_task<pair<bool, int>(int)> newJob(isPrime);
        auto isPrimeFuture = newJob.get_future();
        auto pair = make_pair(move(newJob), i);
        {
            lock_guard<mutex> lockGuard(activationListMutex);
            activationList.push_back(move(pair));
        }
        return isPrimeFuture;
    }

    void run()//64
    {
        thread servant([this]
        {
            while (!isEmpty()) //66
            {
                auto myTask = dequeueTask();
                myTask.first(myTask.second); //68
            }
        });
        servant.join();
    }//72

private:

    pair<packaged_task<pair<bool, int>(int)>, int> dequeueTask()
    {
        lock_guard<mutex> lockGuard(activationListMutex);
        auto myTask = std::move(activationList.front());
        activationList.pop_front();
        return myTask;
    }

    bool isEmpty() //83
    {
        lock_guard<mutex> lockGuard(activationListMutex);
        auto empty = activationList.empty();
        return empty;
    } //87

    deque<pair<packaged_task<pair<bool, int>(int)>, int >> activationList; //89
    mutex activationListMutex;
};

vector<int> getRandNumber(int number)
{
    random_device seed;
    mt19937 engine(seed());
    uniform_int_distribution<> dist(1000000, 1000000000); //96
    vector<int> numbers;
    for (long long i = 0; i < number; ++i) numbers.push_back(dist(engine));
    return numbers;
}

future<vector<future<pair<bool, int>>>> getFutures(ActivaeObject &activeObject,
        int numberPrimes)
{
    return async([&activeObject, numberPrimes]
    {
        vector<future<pair<bool, int>>> futures;
        auto randNumbers = getRandNumber(numberPrimes);
        for (auto numb : randNumbers)
        {
            futures.push_back(activeObject.enqueueTask(numb));
        }
        return futures;
    });
}


int main()
{

    cout << boolalpha << endl;

    ActivaeObject activeObject;

    // a few clients enqueue work concurrently //121
    auto client1 = getFutures(activeObject, 1998);
    auto client2 = getFutures(activeObject, 2003);
    auto client3 = getFutures(activeObject, 2011);
    auto client4 = getFutures(activeObject, 2014);
    auto client5 = getFutures(activeObject, 2017); //126

    // give me the futures
    auto futures = client1.get(); //129
    auto futures2 = client2.get();
    auto futures3 = client3.get();
    auto futures4 = client4.get();
    auto futures5 = client5.get(); //133

    // put all futures together
    futures.insert(futures.end(), make_move_iterator(futures2.begin()), //136
                   make_move_iterator(futures2.end()));

    futures.insert(futures.end(), make_move_iterator(futures3.begin()),
                   make_move_iterator(futures3.end()));

    futures.insert(futures.end(), make_move_iterator(futures4.begin()),
                   make_move_iterator(futures4.end()));

    futures.insert(futures.end(), make_move_iterator(futures5.begin()),
                   make_move_iterator(futures5.end())); //146

    // run the promises
    activeObject.run(); //149

    // get the results from the futures
    vector<pair<bool, int>> futResults;
    futResults.reserve(futResults.size());
    for (auto &fut : futures)futResults.push_back(fut.get()); //154

    sort(futResults.begin(), futResults.end()); //156

    // separate the primes from the non-primes
    auto prIt = find_if(futResults.begin(), futResults.end(),
    [](pair<bool, int>pa) {return pa.first == true; });

    cout << "Number primes: " << distance(prIt, futResults.end()) << endl;
    cout << "Primes: " << endl;
    for_each(prIt, futResults.end(), [](auto p) {cout << p.second << " "; });

    cout << "\n\n";

    cout << "Number no primes: " << distance(futResults.begin(), prIt) << endl;
    cout << "No primes: " << endl;
    for_each(futResults.begin(), prIt, [](auto p) {cout << p.second << " "; });

    cout << endl;

}

示例的基本思想是,客户端可以在激活列表上并发地安排作业。线程的工作是确定哪些数是质数。激活列表是活动对象的一部分,而活动对象在一个单独的线程上进行入队操作,并且客户端可以在激活列表中查询作业的结果。

程序的详情:5个客户端通过getFutures将工作(第121 - 126行)入队到activeObjectnumberPrimes中的数字是1000000到1000000000之间(第96行)的随机数,将这些数值放入vector<future<pair<bool, int>>中。future<pair<bool, int>持有一个boolint对,其中bool表示int值是否是质数。再看看第108行:future .push_back(activeObject.enqueueTask(numb))。此调用将触发新作业进入激活列表的队列,所有对激活列表的调用都必须受到保护,这里激活列表是一个promise队列(第89行):deque<pair<packaged_task<pair<bool, int>(int)>, int >>

每个promise在调用执行函数对象IsPrime(第39 - 47行)时,会返回一个boolint对。现在,工作包已经准备好了,开始计算吧。所有客户端在第129 - 133行中返回关联future的句柄,并把所有的future放在一起(第136 - 146行),这样会使工作更加容易。第149行中的调用activeObject.run()启动执行。run(第64 - 72行)启动单独的线程,并执行promises(第68行),直到执行完所有作业(第66行)。isEmpty(第83 - 87行)确定队列是否为空,dequeTask会返回一个新任务。通过在每个future上调用futResults.push_back(fut.get())(第154行),所有结果都会推送到futResults上。第156行对成对的向量进行排序:vector<pair<bool, int>>。其余代码则是给出了计算结果,第159行中的迭代器prIt将第一个迭代器指向一个素数对。

在这里插入图片描述

监控对象

监控对象模式会同步并发执行,以确保对象只执行一个方法。并且,还允许对象的方法协同调度执行序列。这种模式也称为线程安全的被动对象模式。

模式要求

多个线程同时访问一个共享对象时,需要满足以下要求:

  1. 并发访问时,需要保护共享对象不受非同步读写操作的影响,以避免数据争用。
  2. 必要的同步是实现的一部分,而不是接口的一部分。
  3. 当线程处理完共享对象时,需要发送一个通知,以便下一个线程可以使用共享对象。这种机制有助于避免死锁,并提高系统的整体性能。
  4. 方法执行后,共享对象的不变量必须保持不变。

客户端(线程)可以访问监控对象的同步方法。因为监控锁在任何时间点上,只能运行一个同步方法。每个监控对象都有一个通知等待客户端的监控条件。

组件

监控对象由四个组件组成。

在这里插入图片描述

  1. 监控对象:支持一个或多个方法。每个客户端必须通过这些方法访问对象,每个方法都必须在客户端线程中运行。
  2. 同步方法:监控对象支持同步方法。任何给定的时间点上,只能执行一个方法。线程安全接口有助于区分接口方法(同步方法)和(监控对象的)实现方法。
  3. 监控锁:每个监控对象有一个监控锁,锁可以确保在任何时间点上,只有一个客户端可以访问监控对象。
  4. 监控条件:允许线程在监控对象上进行调度。当前客户端完成同步方法的调用后,下一个等待的客户端将被唤醒。

虽然监控锁可以确保同步方法的独占访问,但是监控条件可以保证客户端的等待时间最少。实质上,监控锁可以避免数据竞争,条件监控可以避免死锁。

运行时行为

监控对象及其组件之间的交互具有不同的阶段。

优点和缺点

监控对象的优点和缺点是什么?

优点:

// monitorObject.cpp

#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>

template <typename T>
class Monitor
{
public:
    void lock() const
    {
        monitMutex.lock();
    }
    void unlock() const
    {
        monitMutex.unlock();
    }

    void notify_one() const noexcept
    {
        monitCond.notify_one();
    }
    void wait() const
    {
        std::unique_lock<std::recursive_mutex> monitLock(monitMutex);
        monitCond.wait(monitLock);
    }

private:
    mutable std::recursive_mutex monitMutex;
    mutable std::condition_variable_any monitCond;
};

template <typename T> //34
class ThreadSafeQueue : public Monitor<ThreadSafeQueue<T>>
{
public:
    void add(T val)
    {
        derived.lock();
        myQueue.push(val);
        derived.unlock();
        derived.notify_one();
    }

    T get()
    {
        derived.lock();
        while (myQueue.empty()) derived.wait();
        auto val = myQueue.front();
        myQueue.pop();
        derived.unlock();
        return val;
    }
private:
    std::queue<T> myQueue;//53
    ThreadSafeQueue<T> &derived = static_cast<ThreadSafeQueue<T>&>(*this); //54 //55
};

class Dice
{
public:
    int operator()() { return rand(); }
private:
    std::function<int()>rand = std::bind(std::uniform_int_distribution<>(1, 6),
                                         std::default_random_engine());
};


int main()
{

    std::cout << std::endl;

    constexpr auto NUM = 100;

    ThreadSafeQueue<int> safeQueue; //72
    auto addLambda = [&safeQueue](int val) {safeQueue.add(val); };//73
    auto getLambda = [&safeQueue] {std::cout << safeQueue.get() << " " //74
                                   << std::this_thread::get_id() << ";";
                                  };

    std::vector<std::thread> addThreads(NUM); //78  
    Dice dice;
    for (auto &thr : addThreads) thr = std::thread(addLambda, dice());

    std::vector<std::thread> getThreads(NUM);
    for (auto &thr : getThreads) thr = std::thread(getLambda);

    for (auto &thr : addThreads) thr.join();
    for (auto &thr : addThreads) thr.join();

    std::cout << "\n\n";

}

该示例的核心思想是,将监控对象封装在一个类中,这样就可以重用。监控类使用std::recursive_mutex作为监控锁,std::condition_variable_any作为监控条件。与std::condition_variable不同,std::condition_variable_any能够接受递归互斥。这两个成员变量都声明为可变,因此可以在常量方法中使用。监控类提供了监控对象的最小支持接口。

第34 - 55行中的ThreadSafeQueue使用线程安全接口扩展了第53行中的std::queueThreadSafeQueue继承于监控类,并使用父类的方法来支持同步的方法addget。方法addget使用监控锁来保护监控对象,特别是非线程安全的myQueue。当一个新项添加到myQueue时,add会通知等待线程,并且这个通知是线程安全的。当如ThreadSafeQueue这样的模板类,将派生类作为基类的模板参数时,这属于C++的一种习惯性用法,称为CRTP:class ThreadSafeQueue: public Monitor<threadsafequeue<T>>。理解这个习惯的关键是第54行:ThreadSafeQueue<T>& derived = static_cast<threadsafequeue<T>&>(*this),该表达式将this指针向下转换为派生类。监控对象safeQueue第72行使用(第73行和第74行中的)Lambda函数添加一个数字,或从同步的safeQueue中删除一个数字。ThreadSafeQueue本身是一个模板类,可以保存任意类型的值。程序模拟的是100个客户端向safeQueue添加100个介于1 - 6之间的随机数(第78行)的同时,另外100个客户端从safeQueue中删除这100个数字。程序会显示使用的线程的编号和id。

在这里插入图片描述

不知道为啥会段错误

奇异递归模板模式(CRTP)

奇异递归模板模式,简单地说,CRTP代表C++中的一种习惯用法,在这种用法中,Derived类派生自类模板Base,因此Base作为Derived模板参数。

template<class T>
class Base
{
    ....
};

class Derived : public Base<Derived>
{
    ....
};

理解CRTP习惯用法的关键是,实例化方法是惰性的,只有在需要时才实例化方法。CRTP有两个主要的用例。

惰性C++:CRTP一文中,有对CRTP习语有更深入地描述。

活动对象和监控对象在几个重要的方面类似,但也有不同。这两种体系结构模式,会同步对共享对象的访问。活动对象的方法在不同线程中执行,而监控对象的方法则在同一线程中执行。活动对象更好地将其方法调用与执行解耦,因此更容易维护。

半同步/半异步

半同步/半异步模式会对并发系统中异步和同步服务进行解耦,从而在不过度降低性能的情况下简化编程。该模式引入了两个可以相互通信的层,一个用于异步,另一个用于同步。

在这里插入图片描述

半同步/半异步模式通常用于服务器的事件循环或图形界面。事件循环的工作流是将事件请求插入队,并在单独的线程中同步处理。异步处理确保了运行效率,而同步处理简化了申请流程。异步服务层和同步服务层分解为两个层,并且在这两个层之间有队列坐标。异步层由较底层的系统服务(如中断)组成,而同步层由较高层的服务(如数据库查询或文件操作)组成。异步层和同步层可以通过队列层相互通信。

优点和缺点

半同步/半异步模式的优点和缺点是什么?

优点:

缺点:

半同步/半异步模式通常用于事件的多路分解和调度框架,如Reactor或Proactor模式。

Reactor模式

Reactor模式也称为调度程序或通知程序。该模式是一个事件驱动的框架,用于将多个服务请求并发地分发到各个服务端。

使用要求

服务器应该并发地处理客户端的请求。每个客户端的请求都有一个唯一标识符,并支持映射到特定的服务端。以下几点是Reactor必备的:

解决方案

对于支持的服务类型,实现一个事件处理程序来满足特定客户端的请求。反应器中使用注册的方式,将服务端的事件处理程序进行注册,这里使用了事件解复用器来同步等待所有传入的事件。当一个事件到达时,反应器得到通知,并将相应的事件分派给特定的服务。

组件

在这里插入图片描述

反应器(而不是应用程序)等待特定事件,并进行分解和分派。具体的事件处理在反应器中注册,反应器改变了控制流程。反应器等待特定事件,并调用特定的处理程序。这种控制的倒置,称为好莱坞原则。(译者注:“不要给我们打电话,我们会给你打电话(don‘t call us, we‘ll call you)”这是著名的好莱坞原则。)

下面的代码段显示了C++框架的事件循环——自适应通信环境(ACE)

// CTRL c
SignalHandler *mutateTimer1 = new SignalHandler(timerId1);

// CTRL z
SignalHandler *mutateTimer2 = new SignalHandler(timerId2);

ACE_Reactor::instance()->register_handler(SIGINT, mutateTimer1);
ACE_Reactor::instance()->register_handler(SIGTSTP, mutateTimer2);

// "run" the timer.
Timer::instance()->wait_fot_event();

第2行和第5行定义按CTRL+c和CTRL+z的键盘事件的信号处理程序。第7行和第8行记录它们,事件循环从第12行开始。

优点和缺点

反应器模式的优点和缺点是什么呢?

半同步/半异步模式通常在反应器模式中,用于在独立线程中对客户端请求的响应。

Proactor模式是反应器模式的异步变体。反应器模式同步地分解和分派事件处理程序,而Proactor模式异步地分派事件处理程序。

Proactor模式

Proactor模式允许事件驱动的应用程序,对异步操作完成时触发的服务请求进行多路的分解和分派。

使用要求

事件驱动程序(如服务器),其性能可以通过异步处理服务来提高。为了实现这种方式,事件驱动程序必须同步处理多个事件,从而避免昂贵的数据同步或上下文切换。此外,修改后的服务应该很容易集成入系统,应用程序应该避免对多线程和同步方式进行挑战。

解决方案

将服务分为两部分:异步运行的长时间操作和处理操作结果的程序。结果处理程序与反应器模式中的事件处理程序非常相似,不过异步操作通常是操作系统的工作。所以,作为反应器模式,Proactor模式定义了事件循环。

异步操作(如连接请求)是该模式的独特之处,并且在不阻塞调用线程的情况下执行操作。当耗时相当长的操作完成时,它将一个完成事件放入完成事件队列,Proactor通过使用异步事件多路分解器在队列上等待。异步事件多路分解器将从队列中删除完成事件,而Proactor将其分派给特定的处理程序,处理操作的结果。

组件

Proactor模式由九个组件组成。

在这里插入图片描述

优点和缺点

Proactor模式的优点和缺点是什么呢?

Asio,即「异步 IO」(Asynchronous Input/Output)

随着Boost.Asio库可能作为网络库成为C++23的一部分,在未来大家可以在C++中轻易实现Proactor模式了。Boost.Asio是由Christopher Kohlhoff的提供,是“一个用于网络和低级I/O编程的跨平台C++库,并使用现代C++为其他开发者提供了一致性异步模型”。

标签:std,异步,同步,对象,Modern,线程,Concurrency,Cpp,客户端
来源: https://blog.csdn.net/TM1695648164/article/details/119899626