前言
如果已经熟悉POSIX线程,那么C++11的线程也不会陌生,可以看做将POSIX线程API的子集做了高层抽象,一个好处就是更容易写跨平台线程应用,下文简要罗列C++11线程的几个重要组成部分,最后给一个简单的线程池实现。
Threads --- 线程创建与回收
线程构造函数,参数f可以是普通函数或类成员函数
-
template< class Function, class... Args >
-
explicit thread( Function&& f, Args&&... args );
线程ID,全局唯一,线程退出后会可能被重用
-
std::thread::id get_id() const;
阻塞当前运行线程,直到目标线程执行完毕
Mutual exclusion --- 互斥锁
基本的0-1互斥锁,支持lock/unlock/try_lock
以RAII(Resource Acquisition Is Initialization)的方式使用mutex
-
template< class Mutex >
-
class lock_guard;
movable mutex,支持各种mutex场景
-
template< class Mutex >
-
class unique_lock;
Condition variables --- 条件变量
与std::unique_lock配合使用的条件变量,常用方法wait/notify_one/notify_all
-
class condition_variable;
Atomics --- 原子操作
C++唯一可以无锁(lock-free)异步访问的数据结构。这里是一个雷区,有很多晦涩和反直觉的现象,在100%理解相关概念之前,只用bool标志读写或计数器自增之类简单功能吧!
Futures --- 异步或并行执行的高层接口
是上面功能的高层抽象,让异步或并行用起来更简单,直接看参考手册即可。
Thread Pool Executor--- 线程池示例
简单的线程池,但已经涉及到上文的多数功能
-
#include <algorithm>
-
#include <atomic>
-
#include <condition_variable>
-
#include <functional>
-
#include <mutex>
-
#include <queue>
-
#include <thread>
-
#include <vector>
-
-
#include <iostream>
-
#include <chrono>
-
-
namespace concurrent {
-
-
namespace futures {
-
-
class ThreadPoolExecutor {
-
-
typedef std::function<void()> Function;
-
-
public:
-
ThreadPoolExecutor(int maxworkers): _exit_flag(false)
-
{
-
int num;
-
if (maxworkers > 0) {
-
num = maxworkers;
-
} else {
-
num = std::max(1, (int)std::thread::hardware_concurrency());
-
}
-
for ( ; num > 0; num--) {
-
_workers.emplace_back(&ThreadPoolExecutor::_run, this);
-
}
-
}
-
~ThreadPoolExecutor()
-
{
-
if (!_exit_flag.load()) {
-
stop();
-
}
-
}
-
-
void submit(Function&& f)
-
{
-
{
-
std::lock_guard<std::mutex> lg(_cond_mutex);
-
_tasks.emplace(f);
-
}
-
_cond_var.notify_all();
-
}
-
-
void join()
-
{
-
for (auto &t: _workers) {
-
t.join();
-
}
-
}
-
-
void stop()
-
{
-
_exit_flag.store(true);
-
_cond_var.notify_all();
-
join();
-
}
-
-
private:
-
void _run()
-
{
-
Function job;
-
auto wakeup = [&]{ return _exit_flag.load() || !_tasks.empty(); };
-
while (!_exit_flag.load()) {
-
{
-
std::unique_lock<std::mutex> ul(_cond_mutex);
-
_cond_var.wait(ul, wakeup);
-
if (_exit_flag.load()) break;
-
job = _tasks.front();
-
_tasks.pop();
-
}
-
job();
-
}
-
}
-
-
private:
-
std::vector<std::thread> _workers;
-
std::queue<Function> _tasks;
-
std::mutex _cond_mutex;
-
std::condition_variable _cond_var;
-
std::atomic<bool> _exit_flag;
-
-
}; // class ThreadPoolExecutor
-
-
}; // namespace futures
-
-
}; // namespace concurrent
-
-
void print(int num, char c)
-
{
-
for (int i = 0; i < num; ++i) {
-
std::cerr << c << ' ';
-
std::this_thread::sleep_for(std::chrono::milliseconds(100));
-
}
-
}
-
-
using namespace concurrent::futures;
-
-
int main(int argc, char* argv[])
-
{
-
std::vector<char> sample{'a', 'b', 'c', 'd', 'e'};
-
ThreadPoolExecutor pool(0);
-
-
for (auto v: sample) {
-
pool.submit(std::bind(print, 100, v));
-
}
-
pool.join();
-
-
return 0;
-
}
阅读(3780) | 评论(0) | 转发(1) |