由于futex的
futex_wake/futex_wait基本原理比较简单,结构体之间的关系也比较简单。所以在此不再讲解futex的基本原理。
futex的内核基本数据跟mutex类似,无非是一个xxxx_waiter。关键是如何组织这些waiter。futex采用的是基于hash和优先级排序的队列。
而内核mutex采用的FIFO的单链表。
本文主要讲解一下futex的requeue操作。因为这个操作要涉及到具体的用户case才能深入了解。本文主要比较android的art/runtime/base/mutex.cc和bionic 中pthread_cond_t的基于futex的两种实现。
-
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex)
pthread_cond_wait调用了futex的系统,最终调用了内核的futex_wait来获取锁,如果acquire不成功,则进入阻塞睡眠状态。
-
182int pthread_cond_broadcast(pthread_cond_t* cond) {
-
183 return __pthread_cond_pulse(cond, INT_MAX);
-
184}
pthread_cond_broadcast回去尝试唤醒所有的阻塞睡眠的线程。因为这个传入的nr_wake为INT_MAX,即唤醒尽可能多的线程。
在应用层,一般使用这样的API来进行线程间同步。
-
pthread_mutex_lock(&mutex);
-
pthread_cond_wait(&cond,&mutex);
-
pthread_mutex_unlock(&mutex);
如果将pthread_cond_wait稍微分解一下的话,大体变成下面的代码:
-
pthread_mutex_lock(&mutex);
-
do_something_1();
-
pthread_mutex_unlock(&mutex);
-
step A :int status = __futex_wait_ex(&cond->value, COND_IS_SHARED(cond->value), old_value, reltime);
-
step B: pthread_mutex_lock(&mutex);
-
do_something_2();
-
pthread_mutex_unlock(&mutex);
假如有好多的线程正在同时阻塞睡眠在futex_wait(step A)里面。这时候调用pthread_cond_broadcast会同时唤醒所有的被阻塞的线程,
然后这些线程又同时调用pthread_mutex_lock(step B)。那么这些线程中只有一个进程能够成功的
获取mutex,其余的线程又会再次进入到睡眠状态。
这种情况被称之为“惊群效应”,是应该极力避免的一种情况。因为这些线程被唤醒之后又重新进入睡眠状态,浪费了大量的CPU
资源进行无效的调度运算。
下面接着来看看android的art/runtime/base里面是如何使用futex的(
ART_USE_FUTEXES的情况)。
-
class LOCKABLE Mutex : public BaseMutex {
-
207 // Block until mutex is free then acquire exclusive access.
-
208 void ExclusiveLock(Thread* self) EXCLUSIVE_LOCK_FUNCTION();
-
209 void Lock(Thread* self) EXCLUSIVE_LOCK_FUNCTION() { ExclusiveLock(self); }
-
210
-
211 // Returns true if acquires exclusive access, false otherwise.
-
212 bool ExclusiveTryLock(Thread* self) EXCLUSIVE_TRYLOCK_FUNCTION(true);
-
213 bool TryLock(Thread* self) EXCLUSIVE_TRYLOCK_FUNCTION(true) { return ExclusiveTryLock(self); }
-
214
-
215 // Release exclusive access.
-
216 void ExclusiveUnlock(Thread* self) UNLOCK_FUNCTION();
-
217 void Unlock(Thread* self) UNLOCK_FUNCTION() { ExclusiveUnlock(self); }
-
…..
-
249 private:
-
250 #if ART_USE_FUTEXES
-
251 // 0 is unheld, 1 is held.
-
252 AtomicInteger state_;//注意这是一个AtomicInteger,用于表示该mutex是否被进程hold住
-
253 // Exclusive owner.
-
254 volatile uint64_t exclusive_owner_;//该锁owner的TID
-
255 // Number of waiting contenders.
-
256 AtomicInteger num_contenders_;
-
257 #else
-
258 pthread_mutex_t mutex_;
-
259 volatile uint64_t exclusive_owner_; // Guarded by mutex_.
-
260 #endif
-
261 const bool recursive_; // Can the lock be recursively held?
-
262 unsigned int recursion_count_;
-
263 friend class ConditionVariable;
-
264 DISALLOW_COPY_AND_ASSIGN(Mutex);
-
265 };
首先定义了类似于pthread_mutex_t的一个Mutex,实现了类似于pthread_mutex_lock/pthread_mutex_unlock的操作。
-
void Mutex::ExclusiveLock(Thread* self) {
-
DCHECK(self == NULL || self == Thread::Current());
-
if (kDebugLocking && !recursive_) {
-
AssertNotHeld(self);
-
}
-
if (!recursive_ || !IsExclusiveHeld(self)) {
-
bool done = false;
-
do {
-
int32_t cur_state = state_.LoadRelaxed();//原子操作,获取state_的当前值
-
if (LIKELY(cur_state == 0)) {//如果state_的值为0,即表示该锁没有被任何进程hold住。
-
所以当前进程就获得该锁。这个用户态变量有效的减少了syscall陷入内核态的次数。提高了性能
-
// Change state from 0 to 1 and impose load/store ordering appropriate for lock acquisition.
-
done = state_.CompareExchangeWeakAcquire(0 /* cur_state */, 1 /* new state */);
-
//原子操作,将state_ 的值从0变成1.
-
} else {
-
// Failed to acquire, hang up.
-
ScopedContentionRecorder scr(this, SafeGetTid(self), GetExclusiveOwnerTid());//debug 相关
-
num_contenders_++;//竞争者计数+1
-
if (futex(state_.Address(), FUTEX_WAIT, 1, NULL, NULL, 0) != 0) {
-
// EAGAIN and EINTR both indicate a spurious failure, try again from the beginning.
-
// We don't use TEMP_FAILURE_RETRY so we can intentionally retry to acquire the lock.
-
if ((errno != EAGAIN) && (errno != EINTR)) {
-
PLOG(FATAL) << "futex wait failed for " << name_;
-
}
-
}
-
num_contenders_--;
-
}
-
} while (!done);
-
DCHECK_EQ(state_.LoadRelaxed(), 1)//该进程获得锁,所以state_计数肯定为1
-
DCHECK_EQ(exclusive_owner_, 0U);
-
exclusive_owner_ = SafeGetTid(self);//设置锁的owner
-
RegisterAsLocked(self); //debug 相关
-
}
-
recursion_count_++;
-
if (kDebugLocking) {
-
CHECK(recursion_count_ == 1 || recursive_) << "Unexpected recursion count on mutex: "
-
<< name_ << " " << recursion_count_;
-
AssertHeld(self);
-
}
-
}
-
-
void Mutex::ExclusiveUnlock(Thread* self) {
-
DCHECK(self == NULL || self == Thread::Current());
-
AssertHeld(self);
-
DCHECK_NE(exclusive_owner_, 0U);//unlock时肯定是有进程获得了该锁。一般就是当前进程
-
recursion_count_--;
-
if (!recursive_ || recursion_count_ == 0) {
-
if (kDebugLocking) {
-
CHECK(recursion_count_ == 0 || recursive_) << "Unexpected recursion count on mutex: "
-
<< name_ << " " << recursion_count_;
-
}
-
RegisterAsUnlocked(self);
-
bool done = false;
-
do {
-
int32_t cur_state = state_.LoadRelaxed();
-
if (LIKELY(cur_state == 1)) {//state_的值肯定为0,要不然就是一个错误。
-
// We're no longer the owner.
-
exclusive_owner_ = 0;//将owner清除。正如注释所说,当前进程正在释放该锁,不是owner了
-
// Change state to 0 and impose load/store ordering appropriate for lock release.
-
// Note, the relaxed loads below musn't reorder before the CompareExchange.
-
// TODO: the ordering here is non-trivial as state is split across 3 fields, fix by placing
-
// a status bit into the state on contention.
-
done = state_.CompareExchangeWeakSequentiallyConsistent(cur_state, 0 /* new state */);//将state_状态为从0变成1,注意这个字段之前困扰了我好久。因为我当初理解,一个进程如果有几个waiter。将这个字段从1变成0,其余的waiter怎么办。
-
//答案就是其余的waiter lock 成功的时候,会再次将这个state_字段从0变成1。总之一句话,这是一个应用层的标志位,而不是内核。用户态只需要关心是否能够成功获得锁。
-
如果不能,则交给内核来仲裁。
-
if (LIKELY(done)) { // Spurious fail?
-
// Wake a contender.
-
if (UNLIKELY(num_contenders_.LoadRelaxed() > 0)) {// num_contenders_字段才真正的表示有多少了waiters
-
futex(state_.Address(), FUTEX_WAKE, 1, NULL, NULL, 0);
-
}
-
}
-
} else {
-
// Logging acquires the logging lock, avoid infinite recursion in that case.
-
if (this != Locks::logging_lock_) {
-
LOG(FATAL) << "Unexpected state_ in unlock " << cur_state << " for " << name_;
-
} else {
-
LogMessageData data(__FILE__, __LINE__, INTERNAL_FATAL, -1);
-
LogMessage::LogLine(data, StringPrintf("Unexpected state_ %d in unlock for %s",
-
cur_state, name_).c_str());
-
_exit(1);
-
}
-
}
-
} while (!done);
-
}
-
}
下面看一下类似于pthread_cond_t类型的ConditionVariables
-
// ConditionVariables allow threads to queue and sleep. Threads may then be resumed individually
-
// (Signal) or all at once (Broadcast).
-
class ConditionVariable {
-
public:
-
explicit ConditionVariable(const char* name, Mutex& mutex);
-
~ConditionVariable();
-
-
void Broadcast(Thread* self);
-
void Signal(Thread* self);
-
// TODO: No thread safety analysis on Wait and TimedWait as they call mutex operations via their
-
// pointer copy, thereby defeating annotalysis.
-
void Wait(Thread* self) NO_THREAD_SAFETY_ANALYSIS;
-
void TimedWait(Thread* self, int64_t ms, int32_t ns) NO_THREAD_SAFETY_ANALYSIS;
-
// Variant of Wait that should be used with caution. Doesn't validate that no mutexes are held
-
// when waiting.
-
// TODO: remove this.
-
void WaitHoldingLocks(Thread* self) NO_THREAD_SAFETY_ANALYSIS;
-
-
private:
-
const char* const name_;
-
// The Mutex being used by waiters. It is an error to mix condition variables between different
-
// Mutexes.
-
Mutex& guard_;//注意这边跟Mutexlock一样。也是一个Mutex的引用。这是关键所在。
-
// A counter that is modified by signals and broadcasts. This ensures that when a waiter gives up
-
// their Mutex and another thread takes it and signals, the waiting thread observes that sequence_
-
// changed and doesn't enter the wait. Modified while holding guard_, but is read by futex wait
-
// without guard_ held.
-
AtomicInteger sequence_;//这个原子量只有signal和broadcast的时候才会被调用。
-
//其实这段注释已经说的比较清楚了。是说ConditionVariable的wait在时候是一个compare wait,每次wait的都是都会在内核比较这个值。如果signal或者broadcast此时正好修改了这个值。那么内核的futex_wait会返回EWOULDBLOCK。那么这个当前准备睡眠的进程就不需要阻塞了。
-
// Number of threads that have come into to wait, not the length of the waiters on the futex as
-
// waiters may have been requeued onto guard_. Guarded by guard_.
-
volatile int32_t num_waiters_;//当前有多少了进程阻塞在这个ConditionVariable上。
-
DISALLOW_COPY_AND_ASSIGN(ConditionVariable);
-
};
wait操作大体跟pthread_cond_wait类似,调用futex_wait来获取锁,不成功则阻塞进入睡眠状态。
-
void ConditionVariable::Wait(Thread* self) {
-
guard_.CheckSafeToWait(self);
-
WaitHoldingLocks(self);
-
}
-
-
void ConditionVariable::WaitHoldingLocks(Thread* self) {
-
DCHECK(self == NULL || self == Thread::Current());
-
guard_.AssertExclusiveHeld(self);
-
unsigned int old_recursion_count = guard_.recursion_count_;
-
num_waiters_++;//阻塞计数加1
-
// Ensure the Mutex is contended so that requeued threads are awoken.
-
guard_.num_contenders_++;//有点奇怪,mutex的ExclusiveUnlock里面就是根据这个num_contenders计数来判断是否要唤醒进程。如果这边ConditionVariable的wait增加num_contenders计数,但是还没来得及调用broadcast来requeue的话。这些进程并不在uaddr2(guard_)的fetux lock上。这时候一个没有配对的被ExclusiveUnlock调用了(一般不会发生)。Futex wake并没有唤醒到任何进程。为什么不等到调用broadcast的时候再修改这个guard_.num_contenders_ 呢??
-
//答案是调用的Broadcast的线程首先通过Mutexlock来获得guard_锁。所以在这边修改guard_.num_contenders_也没关系。
-
guard_.recursion_count_ = 1;
-
int32_t cur_sequence = sequence_.LoadRelaxed();
-
guard_.ExclusiveUnlock(self);//这个很重要,用guard_来保护ConditionVariable的一些private参数。
-
if (futex(sequence_.Address(), FUTEX_WAIT, cur_sequence, NULL, NULL, 0) != 0) {//调用futex_wake来唤醒一个waiter
-
// Futex failed, check it is an expected error.
-
// EAGAIN == EWOULDBLK, so we let the caller try again.
-
// EINTR implies a signal was sent to this thread.
-
if ((errno != EINTR) && (errno != EAGAIN)) {
-
PLOG(FATAL) << "futex wait failed for " << name_;
-
}
-
}
-
guard_.ExclusiveLock(self);
-
CHECK_GE(num_waiters_, 0);
-
num_waiters_--;
-
// We awoke and so no longer require awakes from the guard_'s unlock.
-
CHECK_GE(guard_.num_contenders_.LoadRelaxed(), 0);
-
guard_.num_contenders_--;
-
guard_.recursion_count_ = old_recursion_count;
-
}
接下来是关键的broadcast函数。
-
void ConditionVariable::Broadcast(Thread* self) {
-
DCHECK(self == NULL || self == Thread::Current());
-
// TODO: enable below, there's a race in thread creation that causes false failures currently.
-
// guard_.AssertExclusiveHeld(self);
-
DCHECK_EQ(guard_.GetExclusiveOwnerTid(), SafeGetTid(self));
-
#if ART_USE_FUTEXES
-
if (num_waiters_ > 0) {
-
sequence_++; // Indicate the broadcast occurred.
-
bool done = false;
-
do {
-
int32_t cur_sequence = sequence_.LoadRelaxed();
-
// Requeue waiters onto mutex. The waiter holds the contender count on the mutex high ensuring
-
// mutex unlocks will awaken the requeued waiter thread.
-
done = futex(sequence_.Address(), FUTEX_CMP_REQUEUE, 0,
-
reinterpret_cast<const timespec*>(std::numeric_limits<int32_t>::max()),
-
guard_.state_.Address(), cur_sequence) != -1;
-
if (!done) {
-
if (errno != EAGAIN) {
-
PLOG(FATAL) << "futex cmp requeue failed for " << name_;
-
}
-
}
-
} while (!done);
-
}
-
#else
-
CHECK_MUTEX_CALL(pthread_cond_broadcast, (&cond_));
-
#endif
-
}
注意这儿的传给futex系统调用的op code跟pthread_cond_broadcast不一样。
pthread_cond_broadcast里面传入的是FUTEX_WAKE.
但是这儿传入的是FUTEX_CMP_REQUEUE。
FUTEX_CMP_REQUEUE的opcode最后会调用到内核futex_requeue函数,
将uaddr1上的所有waiters全部移到锁uaddr2上。在我们这个场景上,就是讲ConditionVariables(uaddr1)上所有的waiters全部移到Mutex(uaddr2)上去。
那么问题来了!为什么要将这些waiters全部从
ConditionVariables上移到Mutex上去呢?
还得要从上面提到的“惊群效应”说起。既然这些被全部唤醒的阻塞在
ConditionVariables上的线程又会同时去竞争
Mutex。为什么不干脆将这些waiters全部转移到Mutex上去呢!这样调用完Broadcast之后,Mutex.unlock会去唤醒其中一个线程。避免的
惊群效应,将这个Broadcast唤醒过程串行化。
Broadcast之后如何调用unlock ?,可以看下面art/runtime/gc/heap.cc里面的一段实例
-
2270 void Heap::FinishGC(Thread* self, collector::GcType gc_type) {
-
2271 MutexLock mu(self, *gc_complete_lock_);//个人认为这个名字不太准确,在android其余地方有一种更加
-
恰当的描述,Autolock。这个Mutexlock劫持了构造和析构函数。也就是Mutex.Lock和Mutex.Unlock.
-
2272 collector_type_running_ = kCollectorTypeNone;
-
2273 if (gc_type != collector::kGcTypeNone) {
-
2274 last_gc_type_ = gc_type;
-
2275 }
-
2276 // Wake anyone who may have been waiting for the GC to complete.
-
2277 gc_complete_cond_->Broadcast(self);//将等待的所有线程从gc_complete_cond_上移到mu上去。这样FinishGC退出时候,调用Mutexlock的析构函数
-
去unlock。等于变相的唤醒了一个线程A。这个线程A会从ConditionVariables.wait中退出,然后在wait里面调用了 guard_.ExclusiveLock(self);获得了这个
-
Mutex,等进程A unlock这个Mutex的时候,等于变相的又唤醒了另外一个线程B。这就是为什么在ConditionVariables.wait函数里面要递加 guard_.num_contenders_
-
2278 }
本文中并没有讲解内核的futex_requeue函数, 了解这些原理之后,内核代码比较简单。
阅读(6249) | 评论(0) | 转发(0) |