c-启动多个线程并重新启动它们
作者:互联网
我正在尝试对创建x个工作线程的系统进行编程.这些线程将在不同的时间完成工作.当它们中的任何一个完成工作时,我将检查它们的输出并再次重新启动它们(将运行线程数保持在x左右).我将在许多不合时宜的迭代中执行此操作.因此,基本上,一个控制器线程将启动x个线程,并在完成工作后重新启动它们,直到达到一定数量的迭代为止.
附加说明#1:当我说重新启动时,最好等到当前的一个退出/中止并销毁并创建一个新的.它不必“重新启动”同一线程.我最有兴趣以一种干净的异步方式执行此操作.
注意:我不是在寻找任何特定的代码,而是在寻找一些可能的伪代码以及使用插槽和信号的设计模式.
我知道qt线程并且已经使用了它们.我熟悉一些示例,在这些示例中,您启动了x数量的线程,然后等待所有线程都使用yield并等待.我正在寻找一种干净的方法来实现我在第一段中使用信号和插槽描述的内容.
解决方法:
这就是QtConcurrent :: run()或QThreadPool :: start()的目的. Concurrent框架在内部使用线程池,因此它们相当等效:前者是后者的便捷包装.默认线程池最好留给短期运行的任务.要运行长任务,请使用自己的线程池.您将其作为第一个参数传递给QtConcurrent :: run().
QThreadPool维护一个工作项队列,将它们分派给线程,并动态创建和销毁工作线程.这是一个很棒的课程,您无需重新实现.
如果您没有太多的工作单元并且可以预先提供所有这些单元,则只需使用QtConcurrent :: run()或QThreadPool :: start()将它们提前排入队列.它们可以从辅助对象发出信号,以在它们每个完成时通知您.
如果工作单元太昂贵而无法一次创建所有工作单元,则必须在线程池的顶部实现一个通知工作队列.
工作单元需要通知队列及其用户它已经完成.这可以例如完成通过重新实现QRunnable作为WorkUnit的基础,将工作转发到抽象方法,并在抽象方法完成时通知队列.相同的方法适用于QtConcurrent :: run,不同之处在于,不是重新实现QRunnable :: run,而是实现函子的operator()().
队列将为每个完成的工作单元发出workUnitDone信号.希望用户在收到信号后用一项工作重新填充队列(如果没有更多工作,则不填).
为了方便起见,队列可以通过发出workUnitDone(nullptr)来请求许多初始工作项.如果您每次上一个项目完成时都恰好补充一个项目,则队列将保持工作项目的初始数量.
如果项目需要很短的时间来处理,那么可用的线程数应该比线程数多得多,这样就不会有线程在没有工作的情况下处于空闲状态.对于大多数时间较长(数十毫秒或更长)的项目,具有1.5-2倍QThread :: idealThreadCount足够了.
添加到队列中的工作单元可以是WorkUnit的实例或函子.
// https://github.com/KubaO/stackoverflown/tree/master/questions/notified-workqueue-38000605
#include <QtCore>
#include <type_traits>
class WorkUnit;
class WorkQueue : public QObject {
Q_OBJECT
friend class WorkUnit;
QThreadPool m_pool{this};
union alignas(64) { // keep it in its own cache line
QAtomicInt queuedUnits{0};
char filler[64];
} d;
void isDone(WorkUnit * unit) {
auto queued = d.queuedUnits.deref();
emit workUnitDone(unit);
if (!queued) emit finished();
}
public:
explicit WorkQueue(int initialUnits = 0) {
if (initialUnits)
QTimer::singleShot(0, [=]{
for (int i = 0; i < initialUnits; ++i)
emit workUnitDone(nullptr);
});
}
Q_SLOT void addWork(WorkUnit * unit);
template <typename F> void addFunctor(F && functor);
Q_SIGNAL void workUnitDone(WorkUnit *);
Q_SIGNAL void finished();
};
class WorkUnit : public QRunnable {
friend class WorkQueue;
WorkQueue * m_queue { nullptr };
void run() override {
work();
m_queue->isDone(this);
}
protected:
virtual void work() = 0;
};
template <typename F>
class FunctorUnit : public WorkUnit, private F {
void work() override { (*this)(); }
public:
FunctorUnit(F && f) : F(std::move(f)) {}
};
void WorkQueue::addWork(WorkUnit *unit) {
d.queuedUnits.ref();
unit->m_queue = this;
m_pool.start(unit);
}
template <typename F> void WorkQueue::addFunctor(F && functor) {
addWork(new FunctorUnit<typename std::decay<F>::type>{std::forward<F>(functor)});
}
为了演示这些内容,让我们做50单位的“工作”睡眠,时间介于1us和1s之间.我们将一半的单元作为SleepyWork实例传递,将另一半作为lambda传递.
#include <random>
struct SleepyWork : WorkUnit {
int usecs;
SleepyWork(int usecs) : usecs(usecs) {}
void work() override {
QThread::usleep(usecs);
qDebug() << "slept" << usecs;
}
};
int main(int argc, char ** argv) {
QCoreApplication app{argc, argv};
std::random_device dev;
std::default_random_engine eng{dev()};
std::uniform_int_distribution<int> dist{1, 1000000};
auto rand_usecs = [&]{ return dist(eng); };
int workUnits = 50;
WorkQueue queue{2*QThread::idealThreadCount()};
QObject::connect(&queue, &WorkQueue::workUnitDone, [&]{
if (workUnits) {
if (workUnits % 2) {
auto us = dist(eng);
queue.addFunctor([us]{
QThread::usleep(us);
qDebug() << "slept" << us;
});
} else
queue.addWork(new SleepyWork{rand_usecs()});
--workUnits;
}
});
QObject::connect(&queue, &WorkQueue::finished, [&]{
if (workUnits == 0) app.quit();
});
return app.exec();
}
#include "main.moc"
到此结束示例.
标签:c,multithreading,qt,signals-slots 来源: https://codeday.me/bug/20191012/1901957.html