C++大法:举世闻名之BOOST大法精华浅析(九)并发编程进阶
作者:互联网
书接上回。
文章目录
8.3.3 线程中断
thread成员函数interrupt()允许正在执行的线程被中断,被中断的线程会抛出一个thread_interrupted异常,它是一个空类,其应该在线程处理函数中捕获并且处理。如果不处理这个异常,视为线程终止。
-
实例:
#include <iostream> #include <boost/thread.hpp> #include <boost/noncopyable.hpp>//禁止类赋值的小工具 #include <boost/ref.hpp> using namespace boost; //原子操作计数器实例: template <class T> class basic_atom:public noncopyable { private: T n; typedef boost::mutex mutex_t; mutex_t mu; public: basic_atom(T x=T()):n(x){} T operator++() { mutex_t::scoped_lock lock(mu); return ++n; } operator T(){return n;} //类型转换操作符定义 }; typedef basic_atom<int> atom_int; typedef boost::mutex mutex; static mutex io_mu; void to_interrupt(atom_int& x,const std::string& str) { try { for (int i=0;i<5;i++) { this_thread::sleep(posix_time::seconds(1));//线程中每次打印前先睡一秒 mutex::scoped_lock lock(io_mu);//为atom_int 上锁 std::cout<<str<<++x<<std::endl; } } catch (thread_interrupted&) { std::cout<<"thread_interrupted"<<std::endl; } } int main() { atom_int x; function<void()> fn = bind(to_interrupt,ref(x),"handle"); thread th(fn); this_thread::sleep(posix_time::seconds(2)); th.interrupt();//线程中断 if(th.joinable()){ th.join(); } return 0; } //结果为: /* handle1 thread_interrupted */
-
线程的中断点:
线程不是任意时刻都能够中断的,是有中断点时才可以在中断点处被中断,预置了九个中断点如下,其实都是一些函数:
thread::join(); thread::timed_join(); condition_variable::wait(); condition_variable::timed_wait(); condition_variable_any::wait(); condition_variable_any::timed_wait(); thread::sleep(); this_thread::sleep();//前8个都是会等待,不论等待多久,而下边这个非等待。 this_thread::interruption_point();//为线程添加可断点,即线程可以此函数调用处被中断,较常用,相当于一个标签。标志着“向我开炮”
-
启用和关闭线程中断机制:
在boost::this_thread命名空间下存放了诸如disable_interruption()等函数用来控制线程的中断。
8.3.4 线程组
thread库中提供thread_group用于管理一组线程,就像一个线程池,它内部使用**std::list<thread*>**来容纳创建的thread对象。
thread_group接口很小,用法也很简单。
class thread_group
{
private:
thread_group(thread_group const&);//私用的拷贝构造
thread_group& operator=(thread_group const&);//私有的赋值运算符重载
public:
thread_group() {}//只公开了一个无参构造
~thread_group()
{
for(std::list<thread*>::iterator it=threads.begin(),end=threads.end();
it!=end;
++it)
{
delete *it;//遍历容器删除元素
}
}
bool is_this_thread_in()
{
thread::id id = this_thread::get_id();
boost::shared_lock<shared_mutex> guard(m);
for(std::list<thread*>::iterator it=threads.begin(),end=threads.end();
it!=end;
++it)
{
if ((*it)->get_id() == id)
return true;
}
return false;
}
bool is_thread_in(thread* thrd)
{
if(thrd)
{
thread::id id = thrd->get_id();
boost::shared_lock<shared_mutex> guard(m);
for(std::list<thread*>::iterator it=threads.begin(),end=threads.end();
it!=end;
++it)
{
if ((*it)->get_id() == id)
return true;
}
return false;
}
else
{
return false;
}
}
template<typename F>
thread* create_thread(F threadfunc)//创建线程并且运行线程,同时加入list---工厂函数
{
boost::lock_guard<shared_mutex> guard(m);
boost::csbl::unique_ptr<thread> new_thread(new thread(threadfunc));//智能指针托管新线程,新线程处理函数threadFunc。
threads.push_back(new_thread.get());//将新线程的线程指针压入list中
return new_thread.release();
}
void add_thread(thread* thrd)//将线程加入到线程组
{
if(thrd)//线程存在,非空
{
BOOST_THREAD_ASSERT_PRECONDITION( ! is_thread_in(thrd) ,
thread_resource_error(static_cast<int>(system::errc::resource_deadlock_would_occur), "boost::thread_group: trying to add a duplicated thread")
);
boost::lock_guard<shared_mutex> guard(m);
threads.push_back(thrd);
}
}
void remove_thread(thread* thrd)//移出某个线程
{
boost::lock_guard<shared_mutex> guard(m);
std::list<thread*>::iterator const it=std::find(threads.begin(),threads.end(),thrd);
if(it!=threads.end())
{
threads.erase(it);
}
}
void join_all()//回收该线程组资源
{
BOOST_THREAD_ASSERT_PRECONDITION( ! is_this_thread_in() ,
thread_resource_error(static_cast<int>(system::errc::resource_deadlock_would_occur), "boost::thread_group: trying joining itself")
);
boost::shared_lock<shared_mutex> guard(m);
for(std::list<thread*>::iterator it=threads.begin(),end=threads.end();
it!=end;
++it)
{
if ((*it)->joinable())
(*it)->join();
}
}
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
void interrupt_all()//组中每个线程到点打断
{
boost::shared_lock<shared_mutex> guard(m);
for(std::list<thread*>::iterator it=threads.begin(),end=threads.end();
it!=end;
++it)
{
(*it)->interrupt();
}
}
#endif
size_t size() const//返回线程总数
{
boost::shared_lock<shared_mutex> guard(m);
return threads.size();
}
private:
std::list<thread*> threads;//线程容器
mutable shared_mutex m;//可变的(mutable)读写锁
};
}
8.3.5 条件变量
条件变量是一种线程间同步和通信的常见手段,其必须配合互斥量使用,等待另一个线程中的某个事情发生,才能继续执行。thread库中提供的条件变量有两种对象:condition_variable和condition_variable_any,一般情况下更多使用condition_variable_any,其使用范围更广,适用于更多的互斥量。
condition_variable_any的类摘要如下:
class condition_variable_any:
private detail::basic_condition_variable
{
public:
BOOST_THREAD_NO_COPYABLE(condition_variable_any)
condition_variable_any()
{}
using detail::basic_condition_variable::do_wait_until;
using detail::basic_condition_variable::notify_one;
using detail::basic_condition_variable::notify_all;
template<typename lock_type>
void wait(lock_type& m)
{
do_wait_until(m, detail::internal_platform_timepoint::getMax());
}
template<typename lock_type,typename predicate_type>
void wait(lock_type& m,predicate_type pred)
{
while (!pred())
{
wait(m);
}
}
#if defined BOOST_THREAD_USES_DATETIME
template<typename lock_type>
bool timed_wait(lock_type& m,boost::system_time const& abs_time)
{
// The system time may jump while this function is waiting. To compensate for this and time
// out near the correct time, we could call do_wait_until() in a loop with a short timeout
// and recheck the time remaining each time through the loop. However, because we can't
// check the predicate each time do_wait_until() completes, this introduces the possibility
// of not exiting the function when a notification occurs, since do_wait_until() may report
// that it timed out even though a notification was received. The best this function can do
// is report correctly whether or not it reached the timeout time.
const detail::real_platform_timepoint ts(abs_time);
const detail::platform_duration d(ts - detail::real_platform_clock::now());
do_wait_until(m, detail::internal_platform_clock::now() + d);
return ts > detail::real_platform_clock::now();
}
template<typename lock_type>
bool timed_wait(lock_type& m,boost::xtime const& abs_time)
{
return timed_wait(m, system_time(abs_time));
}
template<typename lock_type,typename duration_type>
bool timed_wait(lock_type& m,duration_type const& wait_duration)
{
if (wait_duration.is_pos_infinity())
{
wait(m);
return true;
}
if (wait_duration.is_special())
{
return true;
}
const detail::platform_duration d(wait_duration);
return do_wait_until(m, detail::internal_platform_clock::now() + d);
}
template<typename lock_type,typename predicate_type>
bool timed_wait(lock_type& m,boost::system_time const& abs_time,predicate_type pred)
{
// The system time may jump while this function is waiting. To compensate for this
// and time out near the correct time, we call do_wait_until() in a loop with a
// short timeout and recheck the time remaining each time through the loop.
const detail::real_platform_timepoint ts(abs_time);
while (!pred())
{
detail::platform_duration d(ts - detail::real_platform_clock::now());
if (d <= detail::platform_duration::zero()) break; // timeout occurred
d = (std::min)(d, detail::platform_milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS));
do_wait_until(m, detail::internal_platform_clock::now() + d);
}
return pred();
}
template<typename lock_type,typename predicate_type>
bool timed_wait(lock_type& m,boost::xtime const& abs_time,predicate_type pred)
{
return timed_wait(m, system_time(abs_time), pred);
}
template<typename lock_type,typename duration_type,typename predicate_type>
bool timed_wait(lock_type& m,duration_type const& wait_duration,predicate_type pred)
{
if (wait_duration.is_pos_infinity())
{
while (!pred())
{
wait(m);
}
return true;
}
if (wait_duration.is_special())
{
return pred();
}
const detail::platform_duration d(wait_duration);
const detail::internal_platform_timepoint ts(detail::internal_platform_clock::now() + d);
while (!pred())
{
if (!do_wait_until(m, ts)) break; // timeout occurred
}
return pred();
}
#endif
#ifdef BOOST_THREAD_USES_CHRONO
template <class lock_type,class Duration>
cv_status
wait_until(
lock_type& lock,
const chrono::time_point<detail::internal_chrono_clock, Duration>& t)
{
const detail::internal_platform_timepoint ts(t);
if (do_wait_until(lock, ts)) return cv_status::no_timeout;
else return cv_status::timeout;
}
template <class lock_type, class Clock, class Duration>
cv_status
wait_until(
lock_type& lock,
const chrono::time_point<Clock, Duration>& t)
{
// The system time may jump while this function is waiting. To compensate for this and time
// out near the correct time, we could call do_wait_until() in a loop with a short timeout
// and recheck the time remaining each time through the loop. However, because we can't
// check the predicate each time do_wait_until() completes, this introduces the possibility
// of not exiting the function when a notification occurs, since do_wait_until() may report
// that it timed out even though a notification was received. The best this function can do
// is report correctly whether or not it reached the timeout time.
typedef typename common_type<Duration, typename Clock::duration>::type common_duration;
common_duration d(t - Clock::now());
do_wait_until(lock, detail::internal_chrono_clock::now() + d);
if (t > Clock::now()) return cv_status::no_timeout;
else return cv_status::timeout;
}
template <class lock_type, class Rep, class Period>
cv_status
wait_for(
lock_type& lock,
const chrono::duration<Rep, Period>& d)
{
return wait_until(lock, chrono::steady_clock::now() + d);
}
template <class lock_type, class Clock, class Duration, class Predicate>
bool
wait_until(
lock_type& lock,
const chrono::time_point<detail::internal_chrono_clock, Duration>& t,
Predicate pred)
{
const detail::internal_platform_timepoint ts(t);
while (!pred())
{
if (!do_wait_until(lock, ts)) break; // timeout occurred
}
return pred();
}
template <class lock_type, class Clock, class Duration, class Predicate>
bool
wait_until(
lock_type& lock,
const chrono::time_point<Clock, Duration>& t,
Predicate pred)
{
// The system time may jump while this function is waiting. To compensate for this
// and time out near the correct time, we call do_wait_until() in a loop with a
// short timeout and recheck the time remaining each time through the loop.
typedef typename common_type<Duration, typename Clock::duration>::type common_duration;
while (!pred())
{
common_duration d(t - Clock::now());
if (d <= common_duration::zero()) break; // timeout occurred
d = (std::min)(d, common_duration(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)));
do_wait_until(lock, detail::internal_platform_clock::now() + detail::platform_duration(d));
}
return pred();
}
template <class lock_type, class Rep, class Period, class Predicate>
bool
wait_for(
lock_type& lock,
const chrono::duration<Rep, Period>& d,
Predicate pred)
{
return wait_until(lock, chrono::steady_clock::now() + d, boost::move(pred));
}
#endif
};
BOOST_THREAD_DECL void notify_all_at_thread_exit(condition_variable& cond, unique_lock<mutex> lk);
}
-
条件变量的使用方法:
拥有条件变量的线程先锁住互斥量,然后循环检查某个条件,如果条件不满足,那么就调用条件变量的成员函数wait()直到条件满足。当条件满足使用完后,调用notify_one()或者notify_all(),以通知一个或者所有正在等待的线程停止等待开始执行。
-
关于条件变量使用的简单例子:
//#define BOOST_DATA_TIME_SOURCE //#define BOOST_THREAD_NO_LIB #include <iostream> //#include <boost/thread.hpp> #include <boost/thread.hpp> #include <boost/noncopyable.hpp>//禁止类赋值的小工具 #include <boost/ref.hpp> using namespace boost; #include <stack> static mutex io_mu; class buffer{ private: boost::mutex mu; condition_variable_any cond_put; condition_variable_any cond_get; std::stack<int> stk; int un_read, capacity; bool is_full(){ return un_read==capacity; } bool is_empty(){ return un_read==0; } public: buffer(size_t n):un_read(0),capacity(n){} void put(int x){ { mutex::scoped_lock lock(mu); while(is_full()){ { mutex::scoped_lock lock_io(io_mu); std::cout<<"full waiting...."<<std::endl; } cond_put.wait(mu); } stk.push(x); ++un_read; } cond_get.notify_one(); } void get(int *x){ { mutex::scoped_lock lock(mu); while(is_empty()){ { mutex::scoped_lock lock_io(io_mu); std::cout<<"empty waiting...."<<std::endl; } cond_get.wait(mu); } --un_read; *x = stk.top(); stk.pop(); } cond_put.notify_one(); } }; buffer buf(5); //生产者线程的启动例程 void producer(int n) { for (int var = 0; var < n; ++var) { { mutex::scoped_lock lock(io_mu); std::cout<<"put:"<<n<<std::endl; } buf.put(var); } } //消费者线程的启动例程 void consumer(int n){ int x; for (int i = 0; i < n; ++i) { buf.get(&x); mutex::scoped_lock lock(io_mu); std::cout<<"get:"<<x<<std::endl; } } int main() { //条件变量实现生产者消费者的例子----基于STL::stack实现后进先出的缓存 //用到两个条件变量cond_put和cond_get thread th1(producer,20); thread th2(consumer,10); thread th3(consumer,10); th1.join(); th2.join(); th3.join(); return 0; }
8.3.6 future
很多情况下线程不仅仅要完成一些工作,还会返回一些处理后的结果,一般的笨拙做法是设置一个全局变量,线程操作该全局变量。主线程不断检查是否有值或者阻塞等待。
thread库中的future范式提供了一种异步操作线程返回值的方法。future中最终要的两个模板类packaged_task和promise两个模块类来包装异步调用。
-
packaged_task 和unique_future
package_task包装一个可回调物,然后他就可以被任意线程调用执行,最后的future值可以通过成员函数get_future()获得。
unique_future用来存储packaged_task异步计算得到的future值,他只能持有结果的唯一一个引用,成员函数wait()和timed_wait()的行为类似thread.join(),可以阻塞等待packaged_task的执行,直到获取future值。成员函数is_ready(),has_value(),和has_exception()分别用来测试unique_future是否可用,是有有值和是否发生异常,如果一切正常,那么可以使用get()获得future值。
-
实例:
#include <boost/thread.hpp> using namespace boost; //案例:多线程计算多个fab的值 int fab(int n) { if(n==1 || n==2){ return 1 } return fab(n-2) + fab(n-1) } int main() { typedef packaged_task<int> pck_t; typedef unique_future<int> uq_f;//pck_t的future值的引用唯一引用 array<pck_t, 5> arp;//存储5个packaged_task对象 array<uq_f, 5> aru; for(int i=0;i<5;i++){ arp[i] = pck_t(bind(fab, 10+i));//packaged_task只能接受无参的函数对象,所以使用bind函数。 aru[i] = arp[i].get_future();//即每一个unique_future对应接受一个packaged_task的对象处理的future结果 thread(move(arp[i]));//这件求fab值的事情移步到线程中去做,而追中的结果由上一行的aru[i]异步获取。 } wait_for_all(aru.begin(),aru.end());//等待所有结果获取完成 BOOST_FOREACH(uq_f&, aru){ std::cout<<uq_f.get()<<std::endl;//遍历每一个线程所返回的结果。 } return 0; }
标签:大法,return,进阶,thread,lock,线程,time,浅析,wait 来源: https://blog.csdn.net/weixin_43520503/article/details/111492440