Chinaunix首页 | 论坛 | 博客
  • 博客访问: 775297
  • 博文数量: 239
  • 博客积分: 60
  • 博客等级: 民兵
  • 技术积分: 1045
  • 用 户 组: 普通用户
  • 注册时间: 2009-03-22 18:25
文章分类

全部博文(239)

文章存档

2019年(9)

2018年(64)

2017年(2)

2016年(26)

2015年(30)

2014年(41)

2013年(65)

2012年(2)

分类: LINUX

2015-09-30 17:17:26

原文地址:pthread 互斥 作者:zylthinking

代码:
  1. int
  2. __pthread_cond_wait (cond, mutex)
  3.      pthread_cond_t *cond;
  4.      pthread_mutex_t *mutex;
  5. {
  6.   struct _pthread_cleanup_buffer buffer;
  7.   struct _condvar_cleanup_buffer cbuffer;
  8.   int err;
  9.   int pshared = (cond->__data.__mutex == (void *) ~0l)
  10.           ? LLL_SHARED : LLL_PRIVATE;

  11.   /* Make sure we are along. */
  12.   lll_lock (cond->__data.__lock, pshared);

  13.   /* Now we can release the mutex. */
  14.   err = __pthread_mutex_unlock_usercnt (mutex, 0);
  15.   if (__builtin_expect (err, 0))
  16.     {
  17.       lll_unlock (cond->__data.__lock, pshared);
  18.       return err;
  19.     }

  20.   /* We have one new user of the condvar. */
  21.    cond->__data.__total_seq;
  22.    cond->__data.__futex;
  23.   cond->__data.__nwaiters = 1 << COND_NWAITERS_SHIFT;

  24.   /* Remember the mutex we are using here. If there is already a
  25.      different address store this is a bad user bug. Do not store
  26.      anything for pshared condvars. */
  27.   if (cond->__data.__mutex != (void *) ~0l)
  28.     cond->__data.__mutex = mutex;

  29.   /* Prepare structure passed to cancellation handler. */
  30.   cbuffer.cond = cond;
  31.   cbuffer.mutex = mutex;

  32.   /* Before we block we enable cancellation. Therefore we have to
  33.      install a cancellation handler. */
  34.   __pthread_cleanup_push (&buffer, __condvar_cleanup, &cbuffer);

  35.   /* The current values of the wakeup counter. The "woken" counter
  36.      must exceed this value. */
  37.   unsigned long long int val;
  38.   unsigned long long int seq;
  39.   val = seq = cond->__data.__wakeup_seq;
  40.   /* Remember the broadcast counter. */
  41.   cbuffer.bc_seq = cond->__data.__broadcast_seq;

  42.   do
  43.     {
  44.       unsigned int futex_val = cond->__data.__futex;

  45.       /* Prepare to wait. Release the condvar futex. */
  46.       lll_unlock (cond->__data.__lock, pshared);

  47.       /* Enable asynchronous cancellation. Required by the standard. */
  48.       cbuffer.oldtype = __pthread_enable_asynccancel ();

  49.       /* Wait until woken by signal or broadcast. */
  50.       lll_futex_wait (&cond->__data.__futex, futex_val, pshared);

  51.       /* Disable asynchronous cancellation. */
  52.       __pthread_disable_asynccancel (cbuffer.oldtype);

  53.       /* We are going to look at shared data again, so get the lock. */
  54.       lll_lock (cond->__data.__lock, pshared);

  55.       /* If a broadcast happened, we are done. */
  56.       if (cbuffer.bc_seq != cond->__data.__broadcast_seq)
  57.     goto bc_out;

  58.       /* Check whether we are eligible for wakeup. */
  59.       val = cond->__data.__wakeup_seq;
  60.     }
  61.   while (val == seq || cond->__data.__woken_seq == val);

  62.   /* Another thread woken up. */
  63.    cond->__data.__woken_seq;

  64.  bc_out:

  65.   cond->__data.__nwaiters -= 1 << COND_NWAITERS_SHIFT;

  66.   /* If pthread_cond_destroy was called on this varaible already,
  67.      notify the pthread_cond_destroy caller all waiters have left
  68.      and it can be successfully destroyed. */
  69.   if (cond->__data.__total_seq == -1ULL
  70.       && cond->__data.__nwaiters < (1 << COND_NWAITERS_SHIFT))
  71.     lll_futex_wake (&cond->__data.__nwaiters, 1, pshared);

  72.   /* We are done with the condvar. */
  73.   lll_unlock (cond->__data.__lock, pshared);

  74.   /* The cancellation handling is back to normal, remove the handler. */
  75.   __pthread_cleanup_pop (&buffer, 0);

  76.   /* Get the mutex before returning. */
  77.   return __pthread_mutex_cond_lock (mutex);
  78. }
很显然其中存在黑盒子, lll_XXX系列函数, 先将这些东西搞明白, 再看高层逻辑, lll_lock 的相关代码:
  1. #if defined NOT_IN_libc || defined UP
  2. # define __lll_lock_asm_start LOCK_INSTR "cmpxchgl %1, %2\n\t"
  3. #else
  4. # define __lll_lock_asm_start "cmpl $0, %%gs:%P6\n\t" \
  5. "je 0f\n\t" \
  6. "lock\n" \
  7. "0:\tcmpxchgl %1, %2\n\t"
  8. #endif
  9. #define lll_lock(futex, private) \
  10. (void) \
  11. ({ int ignore1, ignore2; \
  12. if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
  13. __asm __volatile (__lll_lock_asm_start \
  14. "jnz _L_lock_%=\n\t" \
  15. ".subsection 1\n\t" \
  16. ".type _L_lock_%=,@function\n" \
  17. "_L_lock_%=:\n" \
  18. "1:\tleal %2, %x\n" \
  19. "2:\tcall __lll_lock_wait_private\n" \
  20. "3:\tjmp 18f\n" \
  21. "4:\t.size _L_lock_%=, 4b-1b\n\t" \
  22. ".previous\n" \
  23. LLL_STUB_UNWIND_INFO_3 \
  24. "18:" \
  25. : "=a" (ignore1), "=c" (ignore2), "=m" (futex) \
  26. : "0" (0), "1" (1), "m" (futex), \
  27. "i" (MULTIPLE_THREADS_OFFSET) \
  28. : "memory"); \
  29. else \
  30. { \
  31. int ignore3; \
  32. __asm __volatile (__lll_lock_asm_start \
  33. "jnz _L_lock_%=\n\t" \
  34. ".subsection 1\n\t" \
  35. ".type _L_lock_%=,@function\n" \
  36. "_L_lock_%=:\n" \
  37. "1:\tleal %2, %x\n" \
  38. "0:\tmovl %8, %x\n" \
  39. "2:\tcall __lll_lock_wait\n" \
  40. "3:\tjmp 18f\n" \
  41. "4:\t.size _L_lock_%=, 4b-1b\n\t" \
  42. ".previous\n" \
  43. LLL_STUB_UNWIND_INFO_4 \
  44. "18:" \
  45. : "=a" (ignore1), "=c" (ignore2), \
  46. "=m" (futex), "=&d" (ignore3) \
  47. : "1" (1), "m" (futex), \
  48. "i" (MULTIPLE_THREADS_OFFSET), "0" (0), \
  49. "g" ((int) (private)) \
  50. : "memory"); \
  51. } \
  52. })
为简单起见, 只观察 private == LLL_PRIVATE 这个case, 相应的c代码类似于:
  1. void lll_lock(int* futex){
  2.     int reg_file[8];
  3.     int eax = 0;
  4.     int ecx = 1;

  5.     reg_file[eax] = 0;
  6.     reg_file[ecx] = 1;

  7.     if(*futex == reg_file[eax]){
  8.         *futex = reg_file[ecx];
  9.     }else{
  10.         reg_file[eax] = * futex;
  11.         reg_file[ecx] = (int) futex;
  12.         asm("call __lll_lock_wait_private");
  13.     }
  14. }
没有查到到底是怎么一个调用方式, 但很显然其参数是通过 ecx 传递的. 其定义:
  1. void
  2. __lll_lock_wait_private (int *futex)
  3. {
  4.   if (*futex == 2)
  5.     lll_futex_wait (futex, 2, LLL_PRIVATE);

  6.   while (atomic_exchange_acq (futex, 2) != 0)
  7.     lll_futex_wait (futex, 2, LLL_PRIVATE);
  8. }
  1. #define lll_futex_wait(futex, val, private) \
  2. lll_futex_timed_wait (futex, val, NULL, private)

  3. #define lll_futex_timed_wait(futex, val, timeout, private) \
  4. ({ \
  5. int __status; \
  6. register __typeof (val) _val asm ("edx") = (val); \
  7. __asm __volatile (LLL_EBX_LOAD \
  8. LLL_ENTER_KERNEL \
  9. LLL_EBX_LOAD \
  10. : "=a" (__status) \
  11. : "0" (SYS_futex), LLL_EBX_REG (futex), "S" (timeout), \
  12. "c" (__lll_private_flag (FUTEX_WAIT, private)), \
  13. "d" (_val), "i" (offsetof (tcbhead_t, sysinfo)) \
  14. : "memory"); \
  15. __status; \
  16. })
不考虑__lll_private_flag 到底是什么意思, lll_futex_wait简化版的C代码应该是
  1. sys_futex(futex, FUTEX_WAIT, val, NULL, int*, int)
futex 在 FUTEX_WAIT时, 后面两个参数忽略, 意思是, 如果 *futex == val, 则睡眠, 直到通过调用
  1. sys_futex(futex, FUTEX_WAKE, nr, const struct timespec*, int*, int)
而唤醒, nr 为需要唤醒的线程数。  唤醒操作是无条件的, 并不需要之前的 val 改变或不改变。 

整体看一下 lll_lock(int* futex) 逻辑, 就是:
1.  如果 *futex == 0, 则 *futex = 1, 并直接成功
2.  如果 *futex != 0, 则 *futex 可能的取值只有 1 和 2。 如果是1, 则将其赋值为 2, 而后判断*futex的原值, 如果不是 0 则陷入内核, 在内核中再次判断该值, 如果仍然为 2, 则睡眠; 否则退出内核, 对*futex 重新赋值2, 再一次进行这个过程...直到*futex的原始值为0, 才退出这个循环。 自然, 如果刚进入这个函数时, *futex 本来就等于2, 第一次就不必先赋值, 直接陷入内核即可。

这个逻辑比较绕, 单看这个函数本身, 很难搞明白到底是怎么回事, 因此需要补充2点相关背景:
1. lll_unlock 的实现是先将 *futex 赋值0, 而后调用 sys_futex(futex, FUTEX_WAKE, 1, const struct timespec*, int*, int).
2. FUTEX_WAIT 和 FUTEX_WAKE 在内核中的实现是通过一个自旋锁互斥的. FUTEX_WAIT 持有自旋锁后再次进行 *futex 和 val 的比较, 如果已经不同, 则直接退出内核; 否则将 futex 挂入一个队列, 而后释放自旋锁, 然后再次查看 futex 是否仍在那个队列中, 如果已经不在了, 则说明释放自旋锁后, 再次查看之前发生了一次 FUTEX_WAKE, 将其唤醒了, 因此不用schedule, 否则schedule让出执行权。 FUTEX_WAKE 操作同样是获得自旋锁, 而后遍历挂入的队列, 将指定数目的线程唤醒(如果有那么多的线程已经挂入队列的话)

现在反过头重新看 lll_lock 逻辑:
  1. void
  2. __lll_lock_wait_private (int *futex)
  3. {
  4.   if (*futex == 2)
  5.     lll_futex_wait (futex, 2, LLL_PRIVATE);

  6.   while (atomic_exchange_acq (futex, 2) != 0)
  7.     lll_futex_wait (futex, 2, LLL_PRIVATE);
  8. }
if 块很容易理解, 单看 while 块:
atomic_exchange_acq 原子性将 *futex 赋值 2, 而后返回其原始值。 返回的原始值现在有三种可能:
1. 0, 这是因为其他线程调用了 lll_unlock 从而赋值 0, 这种情况下, 将 *futex 赋值2后返回, 代表 lll_lock 成功, 其后的 lll_lock 都会因 *futex == 2 而最终在 if 块中休眠。
2. 1, 这也是其他线程调用了 lll_unlock, 但在这个线程通过 atomic_exchange_acq 赋值2而成功获得锁前, 另一个线程抢先调用了 lll_lock, 由于现在 *futex == 0, 直接将 *futex 赋值 1 后返回, 等这个线程唤醒执行原子赋值 2, 由于 *futex 已经是 1, 则将 1 返回, 代表无法获得锁, 只能再次进入睡眠。
3. 2, 这个情况和上面的一样, 不过现在是这个线程被唤醒调用 atomic_exchange_acq 前, 有多于一个的线程通过主动调用 lll_lock 抢先进入了睡眠(只能是主动调用, 因为 lll_unlock 只唤醒一个线程), *futex 被除第一个(成功获得锁)的那个线程外的其他线程改成了 2, 无论如何, 对于这个被唤醒的线程来说, 又是白折腾一场, 仍旧进入睡眠。

注:其实对内核相关代码仍心存疑惑, 如下:
  1. static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
  2.                                 struct hrtimer_sleeper *timeout)
  3. {
  4.         /*
  5.          * The task state is guaranteed to be set before another task can
  6.          * wake it. set_current_state() is implemented using set_mb() and
  7.          * queue_me() calls spin_unlock() upon completion, both serializing
  8.          * access to the hash list and forcing another memory barrier.
  9.          */
  10.         set_current_state(TASK_INTERRUPTIBLE);
  11.         queue_me(q, hb);

  12.         /* Arm the timer */
  13.         if (timeout) {
  14.                 hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);
  15.                 if (!hrtimer_active(&timeout->timer))
  16.                         timeout->task = NULL;
  17.         }

  18.         /*
  19.          * If we have been removed from the hash list, then another task
  20.          * has tried to wake us, and we can skip the call to schedule().
  21.          */
  22.         // 既然这里可能发生切换从而导致唤醒
  23.         if (likely(!plist_node_empty(&q->list))) {
  24.                 // 那么这里应该也可以, 但因为已经通过了检查
  25.                 // schedule 有可能仍会调用, 从而最坏情况导致永远睡眠
  26.                 /*
  27.                  * If the timer has already expired, current will already be
  28.                  * flagged for rescheduling. Only call schedule if there
  29.                  * is no timeout, or if it has yet to expire.
  30.                  */
  31.                 if (!timeout || timeout->task)
  32.                         schedule();
  33.         }
  34.         __set_current_state(TASK_RUNNING);
  35. }
  已经明白了, 在 FUTEX_WAKE 唤醒动作中, 除了从对列删除外, 还有一个动作是修改 task_struct 的state,   因此, 等这里schedule 的时候, 已经不是 TASK_INTERRUPTIBLE 了, schedule 会注意到这个情况而返回。
站位
阅读(1463) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~