C++11对多线程有了直接的支持,这对于跨平台的统一的线程管理和使用是个福音。

C++11多线程库提供了std::thread类,用于创建和管理线程,还为共享资源的竞态问题提供了互斥量(mutex)、锁、事件通知的条件变量(condition variable)和获取异步调用的返回值(future)等。

std::thread <thread>

构造函数:

thread();
thread对象处于初始状态。

template<class Fn, class... Args>
explicit thread(Fn&& F, Args&&... A);

提供函数(函数,函数对象,lambda函数),以及提供可选的参数,参数是右值引用方式传递。

thread(thread&& Other);
move语义,获取Other对象的状态,并使其处于初始状态。

注:

  1. 参数是右值引用类型,默认很可能会引用到局部变量。
  2. 函数参数推荐使用拷贝语义,这样可以避免引用引起的生命期问题。

例如:

#include <iostream>
#include <thread>
#include <string>
using namespace std;
 
int main()
{
    thread thd;
    {
       // 值传递
       auto func = [](string s)
       {
           // 等待5秒,保证外层的引用传递的str已析构
           std::chrono::milliseconds dura( 5000 );
           this_thread::sleep_for( dura ),
              cout << s;
       };
 
       string str = "hello world!";
 
       thd.swap( thread( func, str) ) ;
    }
 
    thd.join();
 
    return 0;
}

成员函数

id get_id() const;
获取线程的标识,类型是thread::id,如果没有关联的线程,则返回id()

bool joinable() const;
对象是否有关联的线程执行,即get_id() != id()

void detach();
解除对象与线程体的关联,由系统负责线程结束的资源释放,如果不是joinable,将抛出system_error异常。

void join();
调用线程阻塞,直到对象关联的线程执行完成。如果非joinable,则抛出system_error异常。

native_handle_type native_handle();
返回线程句柄,native_handle_type在不同的平台有不同的实现:

thread& operator = ( thread&& Other );
关联右值对象对应的状态,而本身状态如果是joinable的,则会调用detach()。

void swap(thread& Other);
交换两个对象的状态。

管理当前线程的this_thread名字空间

thread::id get_id();
返回当前线程的id

void yield();
线程取消当前时间片,让操作系统重新调度。

template< class Rep, class Period >
void sleep_for( std::chrono::duration<Rep,Period> sleep_duration );
当前线程等待一段时间
template< class Clock, class Duration >
void sleep_until( const std::chrono::time_point<Clock,Duration>& sleep_time );
当前线程一直等待到 sleep_time 为止。

mutex 互斥量 <mutex>

mutex相关类型

mutex();
构造函数,初始处于未锁定状态

bool try_lock();
尝试去锁定mutex,立即返回。返回true则获取锁成功;否则失败。

void lock();
锁定mutex,如果mutex当前被其他线程占有,则lock将阻塞调用线程,直到mutex被其他线程释放了。 注:如果线程已经拥有mutex,则在该mutex上再次调用lock将导致死锁异常。

void unlock();
释放mutex。

支持递归锁定,即:如果线程已经拥有mutex,在该mutex上再次调用lock会成功,不过要求unlock与lock数量匹配。

mutex辅助管理

支持RAII的方式自动管理mutex类。

例如:

recursive_mutex mtx;
// ...
{
    lock_guard<recursive_mutex> guard( mtx );
    // ...
}

构造函数:

unique_lock();

unique_lock( unique_lock&& other );

explicit unique_lock( mutex_type& m );
初始化锁定m.lock()

unique_lock( mutex_type& m, std::defer_lock_t t );
不锁定mutex

unique_lock( mutex_type& m, std::try_to_lock_t t );
尝试锁定mutex

unique_lock( mutex_type& m, std::adopt_lock_t t );
假定线程已经拥有该mutex

template< class Rep, class Period >
unique_lock( mutex_type& m,
const std::chrono::duration<Rep,Period>& timeout_duration );
尝试锁定m,阻塞直到m成功获取或者时间段timeout_duration已过
template< class Clock, class Period >
unique_lock( mutex_type& m,
const std::chrono::time_point<Clock,Duration>& timeout_time );

析构函数:

~unique_lock
如果已取得关联的mutex的所有权,则释放该mutex。

其他函数:

void lock();
锁定关联的mutex,如果没有关联的mutex或mutex已经锁定,将抛出system_error异常。

bool trylock();
尝试锁定关联的mutex,如果没有关联的mutex或mutex已经锁定,将抛出system_error异常。

void unlock();
unlock关联的mutex,如果没有关联的mutex或mutex未锁定,将抛出system_error异常。

template< class Rep, class Period >
bool try_lock_for( const chrono::duration<Rep,Period>& timeout );

template< class Clock, class Duration >
bool try_lock_until( const std::chrono::time_point<Clock,Duration>& timeout_time );

mutex_type* release();
取消mutex的关联,但不会调用unlock。

template< class Mutex >
void swap( unique_lock<Mutex>& other );
交换两个锁的状态。

bool owns_lock() const;
判断是否已经拥有锁。

explicit operator bool() const; // 同owns_lock()

通用锁定算法

用避免死锁的算法同时获取多个mutex:

template< class Lockable1, class Lockable2, class LockableN... >
int try_lock( Lockable1& lock1, Lockable2& lock2, LockableN& lockn... );
 
template< class Lockable1, class Lockable2, class LockableN... >
void lock( Lockable1& lock1, Lockable2& lock2, LockableN& lockn... );

例如:有时候分开获取锁,很容易导致死锁。可以用通用锁定算法来避免。

void f()
{
    // ...
    // make locks but don't yet try to acquire the mutexes
    std::unique_lock lck1(m1,std::defer_lock);   
    std::unique_lock lck2(m2,std::defer_lock);
    std::unique_lock lck3(m3,std::defer_lock);
    lock(lck1,lck2,lck3);
    // manipulate shared data
}

条件变量 <condition_variable>

condition_variable类

condition_variable();
构造函数

~condition_variable();
析构函数,必须保证没有线程还在等待该条件变量,否则会一起程序异常。

void wait( std::unique_lock<std::mutex>& lock );

template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );

当前线程等待该条件变量触发notify,使用条件变量时需要配合unique_lock<mutex>使用:

例如:

#include <iostream>
#include <condition_variable>
#include <thread>
#include <mutex>
 
std::condition_variable cv;
std::mutex cv_m;
bool done = false;
 
void waits()
{
    std::unique_lock<std::mutex> lk(cv_m);
    std::cout << "Waiting... \n";
    cv.wait(lk, [](){return done;});
    std::cout << "...finished waiting.\n";
}
 
voidsignals()
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Notifying...\n";
    cv.notify_one();
 
    std::unique_lock<std::mutex> lk(cv_m);
    done = true;
}
 
int main()
{
    std::thread t1(waits), t2(signals);
    t1.join();
    t2.join();
}

输出:
Waiting…
Notifying…
…finished waiting.

void notify_one();
唤醒当前等待条件变量的其中一个线程。

void notify_all();
唤醒所有等待当前条件变量的线程。

template< class Rep, class Period >
std::cv_status wait_for( std::unique_lock<std::mutex>& lock,
                const std::chrono::duration<Rep, Period>& rel_time);
                
template< class Rep, class Period, class Predicate >
bool wait_for( std::unique_lock<std::mutex>& lock,
const std::chrono::duration<Rep, Period>& rel_time, Predicate pred);
 
template< class Clock, class Duration >
std::cv_status wait_until( std::unique_lock<std::mutex>& lock,
     const std::chrono::time_point<Clock, Duration>& timeout_time );
     
template< class Clock, class Duration, class Predicate >
bool wait_until( std::unique_lock<mutex>& lock,
 const std::chrono::time_point<Clock, Duration>& timeout_time, Predicate pred );
// 支持等待一段时间。

例子(生产者&消费者):

#include <condition_variable>
#include <mutex>
#include <thread>
#include <iostream>
#include <queue>
#include <chrono>
 
int main()
{
    std::queue<int> produced_nums;
    std::mutex m;
    std::condition_variable cond_var;
    bool done = false;
    bool notified = false;
 
    std::thread producer([&]()
    {
       for (int i = 0; i < 5; ++i)
       {
           std::this_thread::sleep_for(std::chrono::seconds(1));
           std::unique_lock<std::mutex> lock(m);
           std::cout << "producing " << i << '\n';
           produced_nums.push(i);
           notified = true;
           cond_var.notify_one();
       }  
 
       done = true;
    });
 
    std::thread consumer([&]()
    {
       std::unique_lock<std::mutex> lock(m);
       while (!done)
       {
           while (!notified)
           {  // 没有notify,继续等待,只有这里会释放锁
              cond_var.wait(lock);
           }  
           while (!produced_nums.empty())
           {
              std::cout << "consuming " << produced_nums.front() << '\n';
              produced_nums.pop();
           }  
           notified = false;
       }  
    });
 
    producer.join();
    consumer.join();
}

输出:
producing 0
consuming 0
producing 1
consuming 1
producing 2
consuming 2
producing 3
consuming 3
producing 4
consuming 4

condition_variable_any类

是condition_variable的一个泛化,支持任意类型的锁。

成员签名:

condition_variable_any();

void notify_one();

void notify_all();

template< class Lock >
void wait( Lock& lock );
 
template< class Lock, class Predicate >
void wait( Lock& lock, Predicate pred );
 
template< class Lock, class Rep, class Period >
std::cv_status wait_for( Lock& lock,
                const std::chrono::duration<Rep, Period>& rel_time);
 
template< class Lock, class Rep, class Period, class Predicate >
bool wait_for( Lock& lock,
 const std::chrono::duration<Rep, Period>& rel_time,Predicate pred);

future相关 <future>

future提供了一种直接获取异步操作返回值的方式。

std::async函数

template< class Function, class... Args>
std::future<typename std::result_of<Function(Args...)>::type>
    async( Function&& f, Args&&... args );
 
template< class Function, class... Args >
std::future<typename std::result_of<Function(Args...)>::type>
    async( std::launch policy, Function&& f, Args&&... args );

async函数返回future对象:

future类

template< class T > class future;
template< class T > class future<T&>;
template<>          class future<void>;

成员:

future();

future( future&& other );
// 构造函数,不支持拷贝构造
 
future& operator=( future&& other );
 
T get();

T& get();

void get();
// get将进入等待,直到future具有合法的结果。调用以后valid()==false.
 
bool valid() const;
// future是否与shared state关联,即还未调用get()之前。
 
void wait() const;
// 等待future的结果可用。
 
template< class Rep, class Period >
future_status wait_for( const chrono::duration<Rep,Period>& tm );
// 等待结果一段时间。

例1:

#include <iostream>
#include <future>
#include <thread>
#include <chrono>
using namespace std;
 
int main()
{
    future<int> future = async(launch::async, []()
    {
       this_thread::sleep_for(chrono::seconds(3));
       return 8; 
    });
 
    cout << "waiting...\n";
    future_status::future_status status;
    do
    {
       status = future.wait_for(chrono::seconds(1));
       if (status == future_status::deferred)
       {
           cout << "deferred\n";
       }
       else if (status == future_status::timeout)
       {
           cout << "timeout\n";
       }
       else if (status == future_status::ready)
       {
           cout << "ready!\n";
       }
    } while (status != future_status::ready);
 
    cout << "result is " << future.get() << '\n';
}

输出:
waiting…
timeout
timeout
ready!
result is 8

例2:

#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>
 
template <typename RAIter>
intparallel_sum(RAIter beg, RAIter end)
{// 同步函数,但是async多线程加速
    typename RAIter::difference_type len = end-beg;
    if(len < 1000)
       return std::accumulate(beg, end, 0);
 
    RAIter mid = beg + len/2;
    auto handle = std::async(std::launch::async,
       parallel_sum<RAIter>, mid, end);
 
    int sum = parallel_sum(beg, mid);
    return sum + handle.get();
}
 
int main()
{
    std::vector<int> v(100000, 100);
    std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
}

输出:
The sum is 10000000

参考