代码:
- int
-
__pthread_cond_wait (cond, mutex)
-
pthread_cond_t *cond;
-
pthread_mutex_t *mutex;
-
{
-
struct _pthread_cleanup_buffer buffer;
-
struct _condvar_cleanup_buffer cbuffer;
-
int err;
-
int pshared = (cond->__data.__mutex == (void *) ~0l)
-
? LLL_SHARED : LLL_PRIVATE;
-
-
/* Make sure we are along. */
-
lll_lock (cond->__data.__lock, pshared);
-
-
/* Now we can release the mutex. */
-
err = __pthread_mutex_unlock_usercnt (mutex, 0);
-
if (__builtin_expect (err, 0))
-
{
-
lll_unlock (cond->__data.__lock, pshared);
-
return err;
-
}
-
-
/* We have one new user of the condvar. */
-
cond->__data.__total_seq;
-
cond->__data.__futex;
-
cond->__data.__nwaiters = 1 << COND_NWAITERS_SHIFT;
-
-
/* Remember the mutex we are using here. If there is already a
-
different address store this is a bad user bug. Do not store
-
anything for pshared condvars. */
-
if (cond->__data.__mutex != (void *) ~0l)
-
cond->__data.__mutex = mutex;
-
-
/* Prepare structure passed to cancellation handler. */
-
cbuffer.cond = cond;
-
cbuffer.mutex = mutex;
-
-
/* Before we block we enable cancellation. Therefore we have to
-
install a cancellation handler. */
-
__pthread_cleanup_push (&buffer, __condvar_cleanup, &cbuffer);
-
-
/* The current values of the wakeup counter. The "woken" counter
-
must exceed this value. */
-
unsigned long long int val;
-
unsigned long long int seq;
-
val = seq = cond->__data.__wakeup_seq;
-
/* Remember the broadcast counter. */
-
cbuffer.bc_seq = cond->__data.__broadcast_seq;
-
-
do
-
{
-
unsigned int futex_val = cond->__data.__futex;
-
-
/* Prepare to wait. Release the condvar futex. */
-
lll_unlock (cond->__data.__lock, pshared);
-
-
/* Enable asynchronous cancellation. Required by the standard. */
-
cbuffer.oldtype = __pthread_enable_asynccancel ();
-
-
/* Wait until woken by signal or broadcast. */
-
lll_futex_wait (&cond->__data.__futex, futex_val, pshared);
-
-
/* Disable asynchronous cancellation. */
-
__pthread_disable_asynccancel (cbuffer.oldtype);
-
-
/* We are going to look at shared data again, so get the lock. */
-
lll_lock (cond->__data.__lock, pshared);
-
-
/* If a broadcast happened, we are done. */
-
if (cbuffer.bc_seq != cond->__data.__broadcast_seq)
-
goto bc_out;
-
-
/* Check whether we are eligible for wakeup. */
-
val = cond->__data.__wakeup_seq;
-
}
-
while (val == seq || cond->__data.__woken_seq == val);
-
-
/* Another thread woken up. */
-
cond->__data.__woken_seq;
-
-
bc_out:
-
-
cond->__data.__nwaiters -= 1 << COND_NWAITERS_SHIFT;
-
-
/* If pthread_cond_destroy was called on this varaible already,
-
notify the pthread_cond_destroy caller all waiters have left
-
and it can be successfully destroyed. */
-
if (cond->__data.__total_seq == -1ULL
-
&& cond->__data.__nwaiters < (1 << COND_NWAITERS_SHIFT))
-
lll_futex_wake (&cond->__data.__nwaiters, 1, pshared);
-
-
/* We are done with the condvar. */
-
lll_unlock (cond->__data.__lock, pshared);
-
-
/* The cancellation handling is back to normal, remove the handler. */
-
__pthread_cleanup_pop (&buffer, 0);
-
-
/* Get the mutex before returning. */
-
return __pthread_mutex_cond_lock (mutex);
-
}
很显然其中存在黑盒子, lll_XXX系列函数, 先将这些东西搞明白, 再看高层逻辑, lll_lock 的相关代码:
- #if defined NOT_IN_libc || defined UP
-
# define __lll_lock_asm_start LOCK_INSTR "cmpxchgl %1, %2\n\t"
-
#else
-
# define __lll_lock_asm_start "cmpl $0, %%gs:%P6\n\t" \
-
"je 0f\n\t" \
-
"lock\n" \
-
"0:\tcmpxchgl %1, %2\n\t"
-
#endif
-
-
#define lll_lock(futex, private) \
-
(void) \
-
({ int ignore1, ignore2; \
-
if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
-
__asm __volatile (__lll_lock_asm_start \
-
"jnz _L_lock_%=\n\t" \
-
".subsection 1\n\t" \
-
".type _L_lock_%=,@function\n" \
-
"_L_lock_%=:\n" \
-
"1:\tleal %2, %x\n" \
-
"2:\tcall __lll_lock_wait_private\n" \
-
"3:\tjmp 18f\n" \
-
"4:\t.size _L_lock_%=, 4b-1b\n\t" \
-
".previous\n" \
-
LLL_STUB_UNWIND_INFO_3 \
-
"18:" \
-
: "=a" (ignore1), "=c" (ignore2), "=m" (futex) \
-
: "0" (0), "1" (1), "m" (futex), \
-
"i" (MULTIPLE_THREADS_OFFSET) \
-
: "memory"); \
-
else \
-
{ \
-
int ignore3; \
-
__asm __volatile (__lll_lock_asm_start \
-
"jnz _L_lock_%=\n\t" \
-
".subsection 1\n\t" \
-
".type _L_lock_%=,@function\n" \
-
"_L_lock_%=:\n" \
-
"1:\tleal %2, %x\n" \
-
"0:\tmovl %8, %x\n" \
-
"2:\tcall __lll_lock_wait\n" \
-
"3:\tjmp 18f\n" \
-
"4:\t.size _L_lock_%=, 4b-1b\n\t" \
-
".previous\n" \
-
LLL_STUB_UNWIND_INFO_4 \
-
"18:" \
-
: "=a" (ignore1), "=c" (ignore2), \
-
"=m" (futex), "=&d" (ignore3) \
-
: "1" (1), "m" (futex), \
-
"i" (MULTIPLE_THREADS_OFFSET), "0" (0), \
-
"g" ((int) (private)) \
-
: "memory"); \
-
} \
-
})
为简单起见, 只观察 private == LLL_PRIVATE 这个case, 相应的c代码类似于:
- void lll_lock(int* futex){
-
int reg_file[8];
-
int eax = 0;
-
int ecx = 1;
-
-
reg_file[eax] = 0;
-
reg_file[ecx] = 1;
-
-
if(*futex == reg_file[eax]){
-
*futex = reg_file[ecx];
-
}else{
- reg_file[eax] = * futex;
-
reg_file[ecx] = (int) futex;
-
asm("call __lll_lock_wait_private");
-
}
-
}
没有查到到底是怎么一个调用方式, 但很显然其参数是通过 ecx 传递的. 其定义:
- void
-
__lll_lock_wait_private (int *futex)
-
{
-
if (*futex == 2)
-
lll_futex_wait (futex, 2, LLL_PRIVATE);
-
-
while (atomic_exchange_acq (futex, 2) != 0)
-
lll_futex_wait (futex, 2, LLL_PRIVATE);
-
}
- #define lll_futex_wait(futex, val, private) \
- lll_futex_timed_wait (futex, val, NULL, private)
- #define lll_futex_timed_wait(futex, val, timeout, private) \
- ({ \
- int __status; \
- register __typeof (val) _val asm ("edx") = (val); \
- __asm __volatile (LLL_EBX_LOAD \
- LLL_ENTER_KERNEL \
- LLL_EBX_LOAD \
- : "=a" (__status) \
- : "0" (SYS_futex), LLL_EBX_REG (futex), "S" (timeout), \
- "c" (__lll_private_flag (FUTEX_WAIT, private)), \
- "d" (_val), "i" (offsetof (tcbhead_t, sysinfo)) \
- : "memory"); \
- __status; \
- })
不考虑__lll_private_flag 到底是什么意思, lll_futex_wait简化版的C代码应该是
- sys_futex(futex, FUTEX_WAIT, val, NULL, int*, int)
futex 在 FUTEX_WAIT时, 后面两个参数忽略, 意思是, 如果 *futex == val, 则睡眠, 直到通过调用
- sys_futex(futex, FUTEX_WAKE, nr, const struct timespec*, int*, int)
而唤醒, nr 为需要唤醒的线程数。 唤醒操作是无条件的, 并不需要之前的 val 改变或不改变。
整体看一下 lll_lock(int* futex) 逻辑, 就是:
1. 如果 *futex == 0, 则 *futex = 1, 并直接成功
2. 如果 *futex != 0, 则 *futex 可能的取值只有 1 和 2。 如果是1, 则将其赋值为 2, 而后判断*futex的原值, 如果不是 0 则陷入内核, 在内核中再次判断该值, 如果仍然为 2, 则睡眠; 否则退出内核, 对*futex 重新赋值2, 再一次进行这个过程...直到*futex的原始值为0, 才退出这个循环。 自然, 如果刚进入这个函数时, *futex 本来就等于2, 第一次就不必先赋值, 直接陷入内核即可。
这个逻辑比较绕, 单看这个函数本身, 很难搞明白到底是怎么回事, 因此需要补充2点相关背景:
1. lll_unlock 的实现是先将 *futex 赋值0, 而后调用 sys_futex(futex, FUTEX_WAKE, 1, const struct timespec*, int*, int).
2. FUTEX_WAIT 和 FUTEX_WAKE 在内核中的实现是通过一个自旋锁互斥的. FUTEX_WAIT 持有自旋锁后再次进行 *futex 和 val 的比较, 如果已经不同, 则直接退出内核; 否则将 futex 挂入一个队列, 而后释放自旋锁, 然后再次查看 futex 是否仍在那个队列中, 如果已经不在了, 则说明释放自旋锁后, 再次查看之前发生了一次 FUTEX_WAKE, 将其唤醒了, 因此不用schedule, 否则schedule让出执行权。 FUTEX_WAKE 操作同样是获得自旋锁, 而后遍历挂入的队列, 将指定数目的线程唤醒(如果有那么多的线程已经挂入队列的话)
现在反过头重新看 lll_lock 逻辑:
- void
-
__lll_lock_wait_private (int *futex)
-
{
-
if (*futex == 2)
-
lll_futex_wait (futex, 2, LLL_PRIVATE);
-
-
while (atomic_exchange_acq (futex, 2) != 0)
-
lll_futex_wait (futex, 2, LLL_PRIVATE);
-
}
if 块很容易理解, 单看 while 块:
atomic_exchange_acq 原子性将 *futex 赋值 2, 而后返回其原始值。 返回的原始值现在有三种可能:
1. 0, 这是因为其他线程调用了 lll_unlock 从而赋值 0, 这种情况下, 将 *futex 赋值2后返回, 代表 lll_lock 成功, 其后的 lll_lock 都会因 *futex == 2 而最终在 if 块中休眠。
2. 1, 这也是其他线程调用了 lll_unlock, 但在这个线程通过 atomic_exchange_acq 赋值2而成功获得锁前, 另一个线程抢先调用了 lll_lock, 由于现在 *futex == 0, 直接将 *futex 赋值 1 后返回, 等这个线程唤醒执行原子赋值 2, 由于 *futex 已经是 1, 则将 1 返回, 代表无法获得锁, 只能再次进入睡眠。
3. 2, 这个情况和上面的一样, 不过现在是这个线程被唤醒调用 atomic_exchange_acq 前, 有多于一个的线程通过主动调用 lll_lock 抢先进入了睡眠(只能是主动调用, 因为 lll_unlock 只唤醒一个线程), *futex 被除第一个(成功获得锁)的那个线程外的其他线程改成了 2, 无论如何, 对于这个被唤醒的线程来说, 又是白折腾一场, 仍旧进入睡眠。
注:其实对内核相关代码仍心存疑惑, 如下:
- static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
-
struct hrtimer_sleeper *timeout)
-
{
-
/*
-
* The task state is guaranteed to be set before another task can
-
* wake it. set_current_state() is implemented using set_mb() and
-
* queue_me() calls spin_unlock() upon completion, both serializing
-
* access to the hash list and forcing another memory barrier.
-
*/
-
set_current_state(TASK_INTERRUPTIBLE);
-
queue_me(q, hb);
-
-
/* Arm the timer */
-
if (timeout) {
-
hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
-
if (!hrtimer_active(&timeout->timer))
-
timeout->task = NULL;
-
}
-
-
/*
-
* If we have been removed from the hash list, then another task
-
* has tried to wake us, and we can skip the call to schedule().
-
*/
- // 既然这里可能发生切换从而导致唤醒
-
if (likely(!plist_node_empty(&q->list))) {
- // 那么这里应该也可以, 但因为已经通过了检查
- // schedule 有可能仍会调用, 从而最坏情况导致永远睡眠
-
/*
-
* If the timer has already expired, current will already be
-
* flagged for rescheduling. Only call schedule if there
-
* is no timeout, or if it has yet to expire.
-
*/
-
if (!timeout || timeout->task)
-
schedule();
-
}
-
__set_current_state(TASK_RUNNING);
-
}
已经明白了, 在 FUTEX_WAKE 唤醒动作中, 除了从对列删除外, 还有一个动作是修改 task_struct 的state, 因此, 等这里schedule 的时候, 已经不是 TASK_INTERRUPTIBLE 了, schedule 会注意到这个情况而返回。
站位
阅读(1477) | 评论(0) | 转发(0) |