Chinaunix首页 | 论坛 | 博客
  • 博客访问: 85634
  • 博文数量: 12
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 200
  • 用 户 组: 普通用户
  • 注册时间: 2013-01-23 09:57
文章分类

全部博文(12)

文章存档

2015年(11)

2014年(1)

我的朋友

分类: C/C++

2015-09-07 19:47:04

 一. shared_ptr线程安全性:

    shared_ptr的线程安全级别和内建类型,标准库容器,std::string是一样的,需要注意的是:这是shared_ptr对象本身的线程安全级别,不是shared_ptr管理的对象的线程安全级别
    多个线程读写同一个shared_ptr,需要加锁保护,核心想法是生成一个本地的shared_ptr对象供自己使用,而不要直接使用共享的shared_ptr,这里借用书中的代码:
     

点击(此处)折叠或打开

  1. MutexLock mutex; // No need for ReaderWriterLock  
  2. shared_ptr<Foo> globalPtr;  
  3. // 我们的任务是把globalPtr 安全地传给doit()  
  4. void doit(const shared_ptr<Foo>& pFoo);

  5. void read()  
  6. {  
  7.     shared_ptr<Foo> localPtr;  
  8.     {  
  9.         MutexLockGuard lock(mutex);  
  10.         localPtr = globalPtr; // read globalPtr  
  11.     }  
  12.     // use localPtr since here,读写localPtr 也无须加锁  
  13.     doit(localPtr);  
  14. } 

  15. void write()  
  16. {  
  17.     shared_ptr<Foo> newPtr(new Foo); // 注意,对象的创建在临界区之外  
  18.     {  
  19.         MutexLockGuard lock(mutex);  
  20.         globalPtr = newPtr; // write to globalPtr  
  21.     }  
  22.     // use newPtr since here,读写newPtr 无须加锁  
  23.     doit(newPtr);  
  24. } 
  25.  

二. 利用shared_ptr管理对象池

 (内容来源于书P21以及作者的博客:http://www.cnblogs.com/Solstice/archive/2010/02/10/dtor_meets_threads.html )
 使用一个StockFactory来管理所有的股票,第一版代码如下:

点击(此处)折叠或打开

  1. class StockFactory : boost::noncopyable 
  2.  
  3. { // questionable code 
  4.  public: 
  5.   shared_ptr<Stock> get(const string& key); 
  6.  
  7.  private: 
  8.   std::map<string, shared_ptr<Stock> > stocks_; 
  9.   mutable Mutex mutex_; 
  10. };
  代码的BUG:StockFactory持有Stock指针的shared_ptr,会导致Stock对象无法销毁,只要stocks_存在,shared_ptr的引用计数就不会为0。

  修改一下,将shared_ptr改为weak_ptr,来一个第二版的代码:

点击(此处)折叠或打开

  1. class StockFactory : boost::noncopyable
  2. {
  3.  public:
  4.   shared_ptr<Stock> get(const string& key)
  5.   {
  6.     shared_ptr<Stock> pStock;
  7.     MutexLock lock(mutex_);
  8.     weak_ptr<Stock>& wkStock = stocks_[key];  // 如果 key 不存在,会默认构造一个
  9.     pStock = wkStock.lock();  
  10.     if (!pStock) {
  11.       pStock.reset(new Stock(key));
  12.       wkStock = pStock;  // 这里更新了 stocks_[key],注意 wkStock 是个引用
  13.     }
  14.     return pStock;
  15.   }
  16.  
  17.  private:
  18.   std::map<string, weak_ptr<Stock> > stocks_;
  19.   mutable Mutex mutex_;
  20. };
  这时不会阻止Stock对象销毁了,不过stocks_只增不减,不会在Stock对象销毁的时候把stocks_里面保存的weak_ptr也销毁掉

  考虑到shared_ptr有一个参数可以定制销毁行为,我们可以在构造的时候提供一个销毁函数删除weak_ptr,再改一下,得到第三版代码:

点击(此处)折叠或打开

  1. class StockFactory : boost::noncopyable
  2. {
  3.  public:
  4.   shared_ptr<Stock> get(const string& key)
  5.   {
  6.     shared_ptr<Stock> pStock;
  7.     MutexLock lock(mutex_);
  8.     weak_ptr<Stock>& wkStock = stocks_[key];  // 如果 key 不存在,会默认构造一个
  9.     pStock = wkStock.lock(); 
  10.     if (!pStock) {
  11.       // 利用boost::bind绑定一个销毁函数,在销毁Stock对象时调用
  12.       pStock.reset(new Stock(key), boost::bind(&StockFactory::deleteStock, this, _1));
  13.       wkStock = pStock;  // 这里更新了 stocks_[key],注意 wkStock 是个引用
  14.     }
  15.     return pStock;
  16.   }
  17.  
  18.  private:
  19.  void deleteStock(Stock* stock)
  20.   {
  21.     if (stock) {
  22.       // 从map中删除weak_ptr
  23.       MutexLock lock(mutex_);
  24.       stocks_.erase(stock->key());
  25.     }
  26.     delete stock;
  27.   }
  28.   std::map<string, weak_ptr<Stock> > stocks_;
  29.   mutable Mutex mutex_;
  30. };
  看起来好像不错,实际上还有一个问题:pStock.reset(new Stock(key), boost::bind(&StockFactory::deleteStock, this, _1));     这里传递给bind的是一个StockFactory的原始指针,如果这个StockFactory在Stock对象销毁之前被销毁了,那再调用deleteStock就挂了。 
 很显然,我们需要在调用deleteStock函数之前知道StockFactory是否还活着,这里需要把this指针包装成shared_ptr对象传递给bind函数,使用enable_shared_from_this来完成这个任务。
 再改改代码,得到第四版:

点击(此处)折叠或打开

  1. class StockFactory : public boost::enable_shared_from_this<StockFactory>, 
  2.                      boost::noncopyable 
  3. { /* ... */ }; 

  4. shared_ptr<Stock> StockFactory::get(const string& key)
  5. {
  6.   // change
  7.   // pStock.reset(new Stock(key),
  8.   //               boost::bind(&StockFactory::deleteStock, this, _1));
  9.   // to
  10.   pStock.reset(new Stock(key),
  11.                boost::bind(&StockFactory::deleteStock,
  12.                            shared_from_this(),
  13.                            _1));
  14.   // ...
  这里传递给bind的是shared_ptr对象,不过还有一个问题:因为是shared_ptr,所以只要有一个Stock存在,StockFactory就不会销毁
  
  因此,解决方式是把shared_ptr转化为weak_ptr,再改改得到第五版:

点击(此处)折叠或打开

  1. class StockFactory : public boost::enable_shared_from_this<StockFactory>,
  2.                        boost::noncopyable
  3. {
  4.  public:
  5.   shared_ptr<Stock> get(const string& key)
  6.   {
  7.     shared_ptr<Stock> pStock;
  8.     MutexLock lock(mutex_);
  9.     weak_ptr<Stock>& wkStock = stocks_[key];
  10.     pStock = wkStock.lock();
  11.     if (!pStock) {
  12.       pStock.reset(new Stock(key),
  13.                     boost::bind(&StockFactory::weakDeleteCallback,
  14.                                  boost::weak_ptr<StockFactory>(shared_from_this()),
  15.                                  _1));
  16.       // 上面必须强制把 shared_from_this() 转型为 weak_ptr,才不会延长生命期
  17.       wkStock = pStock;
  18.     }
  19.     return pStock;
  20.   }
  21.  
  22.  private:
  23.   static void weakDeleteCallback(boost::weak_ptr<StockFactory> wkFactory,
  24.                                     Stock* stock)
  25.   {
  26.     shared_ptr<StockFactory> factory(wkFactory.lock());  // 尝试提升
  27.     if (factory) {  // 如果 factory 还在,那就清理 stocks_
  28.       factory->removeStock(stock);
  29.     }
  30.     delete stock;  // sorry, I lied
  31.   }
  32.  
  33.   void removeStock(Stock* stock) 
  34.   {
  35.     if (stock) {
  36.       MutexLock lock(mutex_);
  37.       stocks_.erase(stock->key());
  38.     }
  39.   }
  40.  
  41.  private:
  42.   std::map<string, weak_ptr<Stock> > stocks_;
  43.   mutable Mutex mutex_;
  44. };
  OK,成功解决了生存周期的问题,这里还需要注意几个要点:
 1.  shared_from_this不能在构造函数里面调用,因为在构造StockFactory的时候,它还没有交给shared_ptr保管。
 2.  为了使用enable_shared_from_this,StockFactory必须在堆上创建并且保存在shared_ptr中,使用方式是这样的:
    shared_ptr<StockFactory> factory(new StockFactory);
    stock = factory->get("NYSE:IBM");


三. 使用shared_ptr实现COW

对于一个需要共享的容器,我们都能很容易的通过加锁实现线程安全,例如:


点击(此处)折叠或打开

  1. std::vector<Foo> foos; 
  2. MutexLock mutex;
  3. void post(const Foo& f) 
  4. {
  5.   MutexLockGuard lock(mutex);
  6.   foos.push_back(f);
  7. }
  8.  
  9. void traverse() 
  10. {
  11.   MutexLockGuard lock(mutex);
  12.   for (auto it = foos.begin(); it != foos.end(); ++it) { // 用了 0x 新写法
  13.     it->doit();
  14.   }
  15. }

一切看起来没啥问题,不过万一doit函数调用了post怎么办,结果要么vector被破坏要么死锁。
怎么解决这个问题了:使用shared_ptr实现Copy On Write操作,这样就不会死锁了。
借用 代码来讲解

点击(此处)折叠或打开

  1. #include "../Mutex.h" 
  2. #include "../Thread.h"
  3. #include <vector>
  4. #include <boost/shared_ptr.hpp>
  5. #include <stdio.h> 
  6. using namespace muduo;
  7.  
  8. class Foo
  9. {
  10.  public:
  11.   void doit() const; 
  12. };
  13.  
  14. typedef std::vector<Foo> FooList;
  15. typedef boost::shared_ptr<FooList> FooListPtr;
  16. FooListPtr g_foos;
  17. MutexLock mutex; 
  18. void post(const Foo& f)
  19.  
  20. {
  21.   printf("post\n");
  22.   MutexLockGuard lock(mutex);
  23.  // 1. 检查g_foos是否只有自己在访问 
  24.   if (!g_foos.unique())
  25.   {
  26.  // 2. 有读线程正在读数据,根据已有的g_foos构造一个新的FooList,替换g_foos 
  27.     g_foos.reset(new FooList(*g_foos)); 
  28.   }
  29.   assert(g_foos.unique());
  30.  // 3. 对新FooList执行修改操作 
  31.   g_foos->push_back(f);
  32. }
  33.  
  34. void traverse() 
  35. { 
  36.  // 1. 生成一个本地的FooListPtr,g_foos引用计数加一,这样当写线程执行g_foos.reset时, 
  37.  // 旧的FooListPtr不会销毁,而是由本地的foos保存 
  38.   FooListPtr foos;
  39.   {
  40.     MutexLockGuard lock(mutex);
  41.     foos = g_foos;
  42.     assert(!g_foos.unique());
  43.   } 
  44.   // 2. 对本地的FooListPtr进行操作,执行doit函数,即使doit去执行post也无影响 
  45.   for (std::vector<Foo>::const_iterator it = foos->begin();
  46.       it != foos->end(); ++it)
  47.   {
  48.     it->doit();
  49.   }
  50. } 

  51.  
  52. void Foo::doit() const
  53. {
  54.   Foo f;
  55.   post(f);
  56. } 
  57.   
  58. int main()
  59. {
  60.   g_foos.reset(new FooList);
  61.   Foo f;
  62.   post(f);
  63.   traverse();
  64. }



对于上面的post函数,还有几种
错误的变体,代码来自书P54

点击(此处)折叠或打开

  1. // version 1 
  2. void post(const Foo& f) {
  3.  // 直接在原来的数据上修改,错误 
  4.     MutexLockGuard lock(mutex);
  5.     g_foos->push(f);
  6. }
  7.  
  8. // version 2
  9. void post(const Foo& f) {
  10.  // copy旧数据的时候没有加锁保护,如果其他线程正在修改g_foos对象,则获取了不完整的数据,错误 
  11.     FooListPtr newFoos(new FoosList(*g_foos));
  12.     newFoos->push_back(f);
  13.     MutexLockGuard lock(mutex);
  14.     g_foos = newFoos;
  15. }
  16.  

  17.   
  18. // version 3
  19. void post(const Foo& f) {
  20.  // 把锁拆成两部分,当执行到 FooListPtr newFoos(new FoosList(*oldFoos)) 的时候,还是在无锁的情况copy旧数据,错误。 
  21.     FooListPtr oldFoos;
  22.     {
  23.         MutexLockGuard lock(mutex);
  24.         oldFoos = g_foos;
  25.     }
  26.     FooListPtr newFoos(new FoosList(*oldFoos));
  27.     newFoos->push_back(f);
  28.     MutexLockGuard lock(mutex);
  29.     g_foos = newFoos;
  30. }

四. 使用shared_ptr实现对象销毁专有线程

核心想法:利用shared_ptr可以定制销毁函数的特性,在销毁函数中将对象转移到专有的线程中delete

点击(此处)折叠或打开

  1. #include <memory>
  2. #include <thread> 
  3. #include <string>
  4. #include <mutex>
  5. #include <queue>
  6. #include <condition_variable>
  7. #include <iostream>
  8. #include <cstdlib>
  9. #include <cstdio>
  10. using namespace std;
  11. class TempObject {
  12.     public:
  13.  
  14.         TempObject(const std::string& str)
  15.             : b(str) {

  16.         }
  17.  
  18.         ~TempObject(){
  19.             std::cout << "desruct thread " << b <<" : " << std::this_thread::get_id() << std::endl;
  20.         }
  21.  
  22.         bool isExit() {
  23.             if (== "quit") {
  24.                 return true;
  25.             }
  26.             return false;
  27.         }
  28.     private:
  29.         std::string b;
  30. };
  31.  
  32.   
  33. template <typename T>
  34. class BlockingQueue {
  35.     public:
  36.         BlockingQueue()
  37.         { 
  38.         }
  39.  
  40.         ~BlockingQueue(){
  41.  
  42.         }
  43.  
  44.         void push(T* a) {
  45.             {
  46.                 std::lock_guard<std::mutex> lg(mutexQueue_);
  47.                 queue_.push(a);
  48.             }
  49.             condQueue_.notify_one();
  50.         }
  51.   
  52.         void thread_func() {
  53.             while(true)
  54.             {
  55.                 T* obj;
  56.                 {
  57.                     std::unique_lock<std::mutex> ul(mutexQueue_);
  58.                     while (queue_.empty()) {
  59.                         condQueue_.wait(ul);
  60.                     }
  61.                     obj = queue_.front();
  62.                     queue_.pop();
  63.                 }
  64.  
  65.                 if (obj->isExit()) {
  66.                     delete obj;
  67.                     std::cout << "thread " << std::this_thread::get_id() << " is exit" << std::endl;
  68.                     break;
  69.                 }
  70.                 else {
  71.                     delete obj;
  72.                 }
  73.             }
  74.         }
  75.  
  76.         void start_thread() {
  77.            thread_ = std::thread(&BlockingQueue::thread_func, this);
  78.         }
  79.  
  80.         void join() {
  81.             thread_.join();
  82.         }
  83.  
  84.     private:
  85.         std::mutex mutexQueue_;
  86.         std::condition_variable condQueue_;
  87.         std::queue<T* > queue_;
  88.         std::thread thread_;
  89. };
  90.  

  91.  
  92. class TempObjectDeleter {
  93.     public:
  94.         TempObjectDeleter(BlockingQueue<TempObject> * bqp)
  95.             : bq(bqp) {
  96.         }
  97.  
  98.         void operator () (TempObject* parameter) {
  99.  // 这里不执行销毁,而是将对象压入到专有线程的队列中 
  100.             std::cout << "Push thread : " << std::this_thread::get_id() << " \n";
  101.             bq->push(parameter);
  102.         }
  103.     private:
  104.         BlockingQueue<TempObject> * bq;
  105. };
  106.  
  107. void thread_func_1(BlockingQueue<TempObject> *dQueue) {
  108.         char buffer[10] = {0};
  109.         for (int i = 0; i < 3; i++){
  110.             sprintf(buffer, "b%d", i);
  111.             std::string temp = buffer;
  112.             // 创建时传入对象销毁函数对象
  113.             std::shared_ptr<TempObject> a(new TempObject(temp), TempObjectDeleter(dQueue));
  114.         }
  115. }
  116.  

  117.  
  118. int main(void) {
  119.     BlockingQueue<TempObject> dQueue;
  120.     dQueue.start_thread();
  121.     std::weak_ptr<TempObject> b;
  122.     std::thread t(thread_func_1, &dQueue);
  123.     {
  124.         char buffer[10] = {0};
  125.         for (int i = 0; i < 4; i++){
  126.             sprintf(buffer, "a%d", i);
  127.             std::string temp = buffer;
  128.             std::shared_ptr<TempObject> a(new TempObject(temp), TempObjectDeleter(&dQueue));
  129.         }
  130.     }
  131.     t.join();
  132.     {
  133.         std::shared_ptr<TempObject> b2 (new TempObject("quit"), TempObjectDeleter(&dQueue));
  134.     }
  135.     dQueue.join();
  136.     std::cout<< "let's exit" << std::endl;
  137.     return 0;
  138. }
阅读(2515) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~