Chinaunix首页 | 论坛 | 博客
  • 博客访问: 729564
  • 博文数量: 79
  • 博客积分: 2671
  • 博客等级: 少校
  • 技术积分: 1247
  • 用 户 组: 普通用户
  • 注册时间: 2010-04-02 15:26
个人简介

宅男

文章分类

全部博文(79)

文章存档

2017年(11)

2016年(12)

2015年(6)

2012年(10)

2011年(33)

2010年(7)

分类: Android平台

2015-12-25 15:24:34

由于futex的futex_wake/futex_wait基本原理比较简单,结构体之间的关系也比较简单。所以在此不再讲解futex的基本原理。
futex的内核基本数据跟mutex类似,无非是一个xxxx_waiter。关键是如何组织这些waiter。futex采用的是基于hash和优先级排序的队列。
而内核mutex采用的FIFO的单链表。
本文主要讲解一下futex的requeue操作。因为这个操作要涉及到具体的用户case才能深入了解。本文主要比较android的art/runtime/base/mutex.cc和bionic 中pthread_cond_t的基于futex的两种实现。
  1. int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex)
pthread_cond_wait调用了futex的系统,最终调用了内核的futex_wait来获取锁,如果acquire不成功,则进入阻塞睡眠状态。
  1. 182int pthread_cond_broadcast(pthread_cond_t* cond) {
  2. 183 return __pthread_cond_pulse(cond, INT_MAX);
  3. 184}
pthread_cond_broadcast回去尝试唤醒所有的阻塞睡眠的线程。因为这个传入的nr_wake为INT_MAX,即唤醒尽可能多的线程。
在应用层,一般使用这样的API来进行线程间同步。
  1. pthread_mutex_lock(&mutex);
  2. pthread_cond_wait(&cond,&mutex);
  3. pthread_mutex_unlock(&mutex);
如果将pthread_cond_wait稍微分解一下的话,大体变成下面的代码:
  1. pthread_mutex_lock(&mutex);
  2. do_something_1();
  3. pthread_mutex_unlock(&mutex);
  4. step A :int status = __futex_wait_ex(&cond->value, COND_IS_SHARED(cond->value), old_value, reltime);
  5. step B: pthread_mutex_lock(&mutex);
  6. do_something_2();
  7. pthread_mutex_unlock(&mutex);
假如有好多的线程正在同时阻塞睡眠在futex_wait(step A)里面。这时候调用pthread_cond_broadcast会同时唤醒所有的被阻塞的线程,
然后这些线程又同时调用pthread_mutex_lock(step B)。那么这些线程中只有一个进程能够成功的
获取mutex,其余的线程又会再次进入到睡眠状态。
这种情况被称之为“惊群效应”,是应该极力避免的一种情况。因为这些线程被唤醒之后又重新进入睡眠状态,浪费了大量的CPU
资源进行无效的调度运算。
下面接着来看看android的art/runtime/base里面是如何使用futex的(ART_USE_FUTEXES的情况)
  1. class LOCKABLE Mutex : public BaseMutex {
  2. 207 // Block until mutex is free then acquire exclusive access.
  3. 208 void ExclusiveLock(Thread* self) EXCLUSIVE_LOCK_FUNCTION();
  4. 209 void Lock(Thread* self) EXCLUSIVE_LOCK_FUNCTION() { ExclusiveLock(self); }
  5. 210
  6. 211 // Returns true if acquires exclusive access, false otherwise.
  7. 212 bool ExclusiveTryLock(Thread* self) EXCLUSIVE_TRYLOCK_FUNCTION(true);
  8. 213 bool TryLock(Thread* self) EXCLUSIVE_TRYLOCK_FUNCTION(true) { return ExclusiveTryLock(self); }
  9. 214
  10. 215 // Release exclusive access.
  11. 216 void ExclusiveUnlock(Thread* self) UNLOCK_FUNCTION();
  12. 217 void Unlock(Thread* self) UNLOCK_FUNCTION() { ExclusiveUnlock(self); }
  13. ..
  14. 249 private:
  15. 250 #if ART_USE_FUTEXES
  16. 251 // 0 is unheld, 1 is held.
  17. 252 AtomicInteger state_;//注意这是一个AtomicInteger,用于表示该mutex是否被进程hold住
  18. 253 // Exclusive owner.
  19. 254 volatile uint64_t exclusive_owner_;//该锁owner的TID
  20. 255 // Number of waiting contenders.
  21. 256 AtomicInteger num_contenders_;
  22. 257 #else
  23. 258 pthread_mutex_t mutex_;
  24. 259 volatile uint64_t exclusive_owner_; // Guarded by mutex_.
  25. 260 #endif
  26. 261 const bool recursive_; // Can the lock be recursively held?
  27. 262 unsigned int recursion_count_;
  28. 263 friend class ConditionVariable;
  29. 264 DISALLOW_COPY_AND_ASSIGN(Mutex);
  30. 265 };
首先定义了类似于pthread_mutex_t的一个Mutex,实现了类似于pthread_mutex_lock/pthread_mutex_unlock的操作。
  1. void Mutex::ExclusiveLock(Thread* self) {
  2.   DCHECK(self == NULL || self == Thread::Current());
  3.   if (kDebugLocking && !recursive_) {
  4.     AssertNotHeld(self);
  5.   }
  6.   if (!recursive_ || !IsExclusiveHeld(self)) {
  7.     bool done = false;
  8.     do {
  9.       int32_t cur_state = state_.LoadRelaxed();//原子操作,获取state_的当前值
  10.       if (LIKELY(cur_state == 0)) {//如果state_的值为0,即表示该锁没有被任何进程hold住。
  11. 所以当前进程就获得该锁。这个用户态变量有效的减少了syscall陷入内核态的次数。提高了性能
  12.         // Change state from 0 to 1 and impose load/store ordering appropriate for lock acquisition.
  13.         done = state_.CompareExchangeWeakAcquire(0 /* cur_state */, 1 /* new state */);
  14. //原子操作,将state_ 的值从0变成1.
  15.       } else {
  16.         // Failed to acquire, hang up.
  17.         ScopedContentionRecorder scr(this, SafeGetTid(self), GetExclusiveOwnerTid());//debug 相关
  18.         num_contenders_++;//竞争者计数+1
  19.         if (futex(state_.Address(), FUTEX_WAIT, 1, NULL, NULL, 0) != 0) {
  20.           // EAGAIN and EINTR both indicate a spurious failure, try again from the beginning.
  21.           // We don't use TEMP_FAILURE_RETRY so we can intentionally retry to acquire the lock.
  22.           if ((errno != EAGAIN) && (errno != EINTR)) {
  23.             PLOG(FATAL) << "futex wait failed for " << name_;
  24.           }
  25.         }
  26.         num_contenders_--;
  27.       }
  28.     } while (!done);
  29. DCHECK_EQ(state_.LoadRelaxed(), 1)//该进程获得锁,所以state_计数肯定为1
  30.     DCHECK_EQ(exclusive_owner_, 0U);
  31.     exclusive_owner_ = SafeGetTid(self);//设置锁的owner
  32.     RegisterAsLocked(self); //debug 相关
  33.   }
  34.   recursion_count_++;
  35.   if (kDebugLocking) {
  36.     CHECK(recursion_count_ == 1 || recursive_) << "Unexpected recursion count on mutex: "
  37.         << name_ << " " << recursion_count_;
  38.     AssertHeld(self);
  39.   }
  40. }
  41.  
  42. void Mutex::ExclusiveUnlock(Thread* self) {
  43.   DCHECK(self == NULL || self == Thread::Current());
  44.   AssertHeld(self);
  45.   DCHECK_NE(exclusive_owner_, 0U);//unlock时肯定是有进程获得了该锁。一般就是当前进程
  46.   recursion_count_--;
  47.   if (!recursive_ || recursion_count_ == 0) {
  48.     if (kDebugLocking) {
  49.       CHECK(recursion_count_ == 0 || recursive_) << "Unexpected recursion count on mutex: "
  50.           << name_ << " " << recursion_count_;
  51.     }
  52.     RegisterAsUnlocked(self);
  53.     bool done = false;
  54.     do {
  55.       int32_t cur_state = state_.LoadRelaxed();
  56.       if (LIKELY(cur_state == 1)) {//state_的值肯定为0,要不然就是一个错误。
  57.         // We're no longer the owner.
  58.         exclusive_owner_ = 0;//将owner清除。正如注释所说,当前进程正在释放该锁,不是owner了
  59.         // Change state to 0 and impose load/store ordering appropriate for lock release.
  60.         // Note, the relaxed loads below musn't reorder before the CompareExchange.
  61.         // TODO: the ordering here is non-trivial as state is split across 3 fields, fix by placing
  62.         // a status bit into the state on contention.
  63.         done = state_.CompareExchangeWeakSequentiallyConsistent(cur_state, 0 /* new state */);//将state_状态为从0变成1,注意这个字段之前困扰了我好久。因为我当初理解,一个进程如果有几个waiter。将这个字段从1变成0,其余的waiter怎么办。
  64. //答案就是其余的waiter lock 成功的时候,会再次将这个state_字段从0变成1。总之一句话,这是一个应用层的标志位,而不是内核。用户态只需要关心是否能够成功获得锁。
  65. 如果不能,则交给内核来仲裁。
  66.         if (LIKELY(done)) { // Spurious fail?
  67.           // Wake a contender.
  68.           if (UNLIKELY(num_contenders_.LoadRelaxed() > 0)) {// num_contenders_字段才真正的表示有多少了waiters
  69.             futex(state_.Address(), FUTEX_WAKE, 1, NULL, NULL, 0);
  70.           }
  71.         }
  72.       } else {
  73.         // Logging acquires the logging lock, avoid infinite recursion in that case.
  74.         if (this != Locks::logging_lock_) {
  75.           LOG(FATAL) << "Unexpected state_ in unlock " << cur_state << " for " << name_;
  76.         } else {
  77.           LogMessageData data(__FILE__, __LINE__, INTERNAL_FATAL, -1);
  78.           LogMessage::LogLine(data, StringPrintf("Unexpected state_ %d in unlock for %s",
  79.                                                  cur_state, name_).c_str());
  80.           _exit(1);
  81.         }
  82.       }
  83. } while (!done);
  84.   }
  85. }
下面看一下类似于pthread_cond_t类型的ConditionVariables
  1. // ConditionVariables allow threads to queue and sleep. Threads may then be resumed individually
  2. // (Signal) or all at once (Broadcast).
  3. class ConditionVariable {
  4.  public:
  5.   explicit ConditionVariable(const char* name, Mutex& mutex);
  6.   ~ConditionVariable();

  7.   void Broadcast(Thread* self);
  8.   void Signal(Thread* self);
  9.   // TODO: No thread safety analysis on Wait and TimedWait as they call mutex operations via their
  10.   // pointer copy, thereby defeating annotalysis.
  11.   void Wait(Thread* self) NO_THREAD_SAFETY_ANALYSIS;
  12.   void TimedWait(Thread* self, int64_t ms, int32_t ns) NO_THREAD_SAFETY_ANALYSIS;
  13.   // Variant of Wait that should be used with caution. Doesn't validate that no mutexes are held
  14.   // when waiting.
  15.   // TODO: remove this.
  16.   void WaitHoldingLocks(Thread* self) NO_THREAD_SAFETY_ANALYSIS;

  17.  private:
  18.   const char* const name_;
  19.   // The Mutex being used by waiters. It is an error to mix condition variables between different
  20.   // Mutexes.
  21.   Mutex& guard_;//注意这边跟Mutexlock一样。也是一个Mutex的引用。这是关键所在。
  22.   // A counter that is modified by signals and broadcasts. This ensures that when a waiter gives up
  23.   // their Mutex and another thread takes it and signals, the waiting thread observes that sequence_
  24.   // changed and doesn't enter the wait. Modified while holding guard_, but is read by futex wait
  25.   // without guard_ held.
  26.   AtomicInteger sequence_;//这个原子量只有signal和broadcast的时候才会被调用。
  27. //其实这段注释已经说的比较清楚了。是说ConditionVariable的wait在时候是一个compare wait,每次wait的都是都会在内核比较这个值。如果signal或者broadcast此时正好修改了这个值。那么内核的futex_wait会返回EWOULDBLOCK。那么这个当前准备睡眠的进程就不需要阻塞了。
  28.   // Number of threads that have come into to wait, not the length of the waiters on the futex as
  29.   // waiters may have been requeued onto guard_. Guarded by guard_.
  30.   volatile int32_t num_waiters_;//当前有多少了进程阻塞在这个ConditionVariable上。
  31.   DISALLOW_COPY_AND_ASSIGN(ConditionVariable);
  32. };
wait操作大体跟pthread_cond_wait类似,调用futex_wait来获取锁,不成功则阻塞进入睡眠状态。
  1. void ConditionVariable::Wait(Thread* self) {
  2.   guard_.CheckSafeToWait(self);
  3.   WaitHoldingLocks(self);
  4. }

  5. void ConditionVariable::WaitHoldingLocks(Thread* self) {
  6.   DCHECK(self == NULL || self == Thread::Current());
  7.   guard_.AssertExclusiveHeld(self);
  8.   unsigned int old_recursion_count = guard_.recursion_count_;
  9.   num_waiters_++;//阻塞计数加1
  10.   // Ensure the Mutex is contended so that requeued threads are awoken.
  11.   guard_.num_contenders_++;//有点奇怪,mutex的ExclusiveUnlock里面就是根据这个num_contenders计数来判断是否要唤醒进程。如果这边ConditionVariable的wait增加num_contenders计数,但是还没来得及调用broadcast来requeue的话。这些进程并不在uaddr2(guard_)的fetux lock上。这时候一个没有配对的被ExclusiveUnlock调用了(一般不会发生)。Futex wake并没有唤醒到任何进程。为什么不等到调用broadcast的时候再修改这个guard_.num_contenders_ 呢??
  12. //答案是调用的Broadcast的线程首先通过Mutexlock来获得guard_锁。所以在这边修改guard_.num_contenders_也没关系。
  13.   guard_.recursion_count_ = 1;
  14.   int32_t cur_sequence = sequence_.LoadRelaxed();
  15.   guard_.ExclusiveUnlock(self);//这个很重要,用guard_来保护ConditionVariable的一些private参数。
  16.   if (futex(sequence_.Address(), FUTEX_WAIT, cur_sequence, NULL, NULL, 0) != 0) {//调用futex_wake来唤醒一个waiter
  17.     // Futex failed, check it is an expected error.
  18.     // EAGAIN == EWOULDBLK, so we let the caller try again.
  19.     // EINTR implies a signal was sent to this thread.
  20.     if ((errno != EINTR) && (errno != EAGAIN)) {
  21.       PLOG(FATAL) << "futex wait failed for " << name_;
  22.     }
  23.   }
  24.   guard_.ExclusiveLock(self);
  25.   CHECK_GE(num_waiters_, 0);
  26.   num_waiters_--;
  27.   // We awoke and so no longer require awakes from the guard_'s unlock.
  28.   CHECK_GE(guard_.num_contenders_.LoadRelaxed(), 0);
  29.   guard_.num_contenders_--;
  30.   guard_.recursion_count_ = old_recursion_count;
  31. }
接下来是关键的broadcast函数。
  1. void ConditionVariable::Broadcast(Thread* self) {
  2.   DCHECK(self == NULL || self == Thread::Current());
  3.   // TODO: enable below, there's a race in thread creation that causes false failures currently.
  4.   // guard_.AssertExclusiveHeld(self);
  5.   DCHECK_EQ(guard_.GetExclusiveOwnerTid(), SafeGetTid(self));
  6. #if ART_USE_FUTEXES
  7.   if (num_waiters_ > 0) {
  8.     sequence_++; // Indicate the broadcast occurred.
  9.     bool done = false;
  10.     do {
  11.       int32_t cur_sequence = sequence_.LoadRelaxed();
  12.       // Requeue waiters onto mutex. The waiter holds the contender count on the mutex high ensuring
  13.       // mutex unlocks will awaken the requeued waiter thread.
  14.       done = futex(sequence_.Address(), FUTEX_CMP_REQUEUE, 0,
  15.                    reinterpret_cast<const timespec*>(std::numeric_limits<int32_t>::max()),
  16.                    guard_.state_.Address(), cur_sequence) != -1;
  17.       if (!done) {
  18.         if (errno != EAGAIN) {
  19.           PLOG(FATAL) << "futex cmp requeue failed for " << name_;
  20.         }
  21.       }
  22.     } while (!done);
  23.   }
  24. #else
  25.   CHECK_MUTEX_CALL(pthread_cond_broadcast, (&cond_));
  26. #endif
  27. }
注意这儿的传给futex系统调用的op code跟pthread_cond_broadcast不一样。pthread_cond_broadcast里面传入的是FUTEX_WAKE.
但是这儿传入的是
FUTEX_CMP_REQUEUE。FUTEX_CMP_REQUEUE的opcode最后会调用到内核futex_requeue函数,
将uaddr1上的所有waiters全部移到锁uaddr2上。在我们这个场景上,就是讲ConditionVariables(uaddr1)上所有的waiters全部移到Mutex(uaddr2)上去。
那么问题来了!为什么要将这些waiters全部从ConditionVariables上移到Mutex上去呢
还得要从上面提到的“惊群效应”说起。既然这些被全部唤醒的阻塞在ConditionVariables上的线程又会同时去竞争
Mutex。为什么不干脆将这些waiters全部转移到Mutex上去呢!这样调用完Broadcast之后,Mutex.unlock会去唤醒其中一个线程。避免的
惊群效应,将这个Broadcast唤醒过程串行化。
Broadcast之后如何调用unlock ?,可以看下面art/runtime/gc/heap.cc里面的一段实例
  1. 2270 void Heap::FinishGC(Thread* self, collector::GcType gc_type) {
  2. 2271 MutexLock mu(self, *gc_complete_lock_);//个人认为这个名字不太准确,在android其余地方有一种更加
  3. 恰当的描述,Autolock。这个Mutexlock劫持了构造和析构函数。也就是Mutex.Lock和Mutex.Unlock.
  4. 2272 collector_type_running_ = kCollectorTypeNone;
  5. 2273 if (gc_type != collector::kGcTypeNone) {
  6. 2274 last_gc_type_ = gc_type;
  7. 2275 }
  8. 2276 // Wake anyone who may have been waiting for the GC to complete.
  9. 2277 gc_complete_cond_->Broadcast(self);//将等待的所有线程从gc_complete_cond_上移到mu上去。这样FinishGC退出时候,调用Mutexlock的析构函数
  10. 去unlock。等于变相的唤醒了一个线程A。这个线程A会从ConditionVariables.wait中退出,然后在wait里面调用了 guard_.ExclusiveLock(self);获得了这个
  11. Mutex,等进程A unlock这个Mutex的时候,等于变相的又唤醒了另外一个线程B。这就是为什么在ConditionVariables.wait函数里面要递加  guard_.num_contenders_
  12. 2278 }
本文中并没有讲解内核的futex_requeue函数, 了解这些原理之后,内核代码比较简单。
阅读(4679) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~